:py:mod:`cotengra.parallel` =========================== .. py:module:: cotengra.parallel .. autoapi-nested-parse:: Interface for parallelism. Module Contents --------------- Classes ~~~~~~~ .. autoapisummary:: cotengra.parallel.CachedProcessPoolExecutor cotengra.parallel.CachedThreadPoolExecutor cotengra.parallel.RayFuture cotengra.parallel.RayExecutor Functions ~~~~~~~~~ .. autoapisummary:: cotengra.parallel.choose_default_num_workers cotengra.parallel.get_pool cotengra.parallel._infer_backed_cached cotengra.parallel._infer_backend cotengra.parallel.get_n_workers cotengra.parallel.parse_parallel_arg cotengra.parallel.set_parallel_backend cotengra.parallel.maybe_leave_pool cotengra.parallel.maybe_rejoin_pool cotengra.parallel.submit cotengra.parallel.scatter cotengra.parallel.can_scatter cotengra.parallel.should_nest cotengra.parallel.get_loky_get_reusable_executor cotengra.parallel._shutdown_cached_process_pool cotengra.parallel._get_process_pool_cf cotengra.parallel._shutdown_cached_thread_pool cotengra.parallel._get_thread_pool_cf cotengra.parallel._get_pool_dask cotengra.parallel._maybe_leave_pool_dask cotengra.parallel._rejoin_pool_dask cotengra.parallel.get_ray cotengra.parallel._unpack_futures_tuple cotengra.parallel._unpack_futures_list cotengra.parallel._unpack_futures_dict cotengra.parallel._unpack_futures_identity cotengra.parallel._unpack_futures cotengra.parallel.get_remote_fn cotengra.parallel.get_fn_as_remote_object cotengra.parallel.get_deploy cotengra.parallel._get_pool_ray Attributes ~~~~~~~~~~ .. autoapisummary:: cotengra.parallel._AUTO_BACKEND cotengra.parallel._DEFAULT_BACKEND cotengra.parallel.ProcessPoolHandler cotengra.parallel.ThreadPoolHandler cotengra.parallel._unpack_dispatch cotengra.parallel._RAY_EXECUTOR .. py:data:: _AUTO_BACKEND .. py:data:: _DEFAULT_BACKEND :value: 'concurrent.futures' .. py:function:: choose_default_num_workers() .. py:function:: get_pool(n_workers=None, maybe_create=False, backend=None) Get a parallel pool. .. py:function:: _infer_backed_cached(pool_class) .. py:function:: _infer_backend(pool) Return the backend type of ``pool`` - cached for speed. .. py:function:: get_n_workers(pool=None) Extract how many workers our pool has (mostly for working out how many tasks to pre-dispatch). .. py:function:: parse_parallel_arg(parallel) .. py:function:: set_parallel_backend(backend) Create a parallel pool of type ``backend`` which registers it as the default for ``'auto'`` parallel. .. py:function:: maybe_leave_pool(pool) Logic required for nested parallelism in dask.distributed. .. py:function:: maybe_rejoin_pool(is_worker, pool) Logic required for nested parallelism in dask.distributed. .. py:function:: submit(pool, fn, *args, **kwargs) Interface for submitting ``fn(*args, **kwargs)`` to ``pool``. .. py:function:: scatter(pool, data) Interface for maybe turning ``data`` into a remote object or reference. .. py:function:: can_scatter(pool) Whether ``pool`` can make objects remote. .. py:function:: should_nest(pool) Given argument ``pool`` should we try nested parallelism. .. py:function:: get_loky_get_reusable_executor() .. py:class:: CachedProcessPoolExecutor .. py:method:: __call__(n_workers=None) .. py:method:: is_initialized() .. py:method:: shutdown() .. py:method:: __del__() .. py:data:: ProcessPoolHandler .. py:function:: _shutdown_cached_process_pool() .. py:function:: _get_process_pool_cf(n_workers=None) .. py:class:: CachedThreadPoolExecutor .. py:method:: __call__(n_workers=None) .. py:method:: is_initialized() .. py:method:: shutdown() .. py:method:: __del__() .. py:data:: ThreadPoolHandler .. py:function:: _shutdown_cached_thread_pool() .. py:function:: _get_thread_pool_cf(n_workers=None) .. py:function:: _get_pool_dask(n_workers=None, maybe_create=False) Maybe get an existing or create a new dask.distrbuted client. :param n_workers: The number of workers to request if creating a new client. :type n_workers: None or int, optional :param maybe_create: Whether to create an new local cluster and client if no existing client is found. :type maybe_create: bool, optional :rtype: None or dask.distributed.Client .. py:function:: _maybe_leave_pool_dask() .. py:function:: _rejoin_pool_dask() .. py:function:: get_ray() .. py:class:: RayFuture(obj) Basic ``concurrent.futures`` like future wrapping a ray ``ObjectRef``. .. py:attribute:: __slots__ :value: ('_obj', '_cancelled') .. py:method:: result(timeout=None) .. py:method:: done() .. py:method:: cancel() .. py:function:: _unpack_futures_tuple(x) .. py:function:: _unpack_futures_list(x) .. py:function:: _unpack_futures_dict(x) .. py:function:: _unpack_futures_identity(x) .. py:data:: _unpack_dispatch .. py:function:: _unpack_futures(x) Allows passing futures by reference - takes e.g. args and kwargs and replaces all ``RayFuture`` objects with their underyling ``ObjectRef`` within all nested tuples, lists and dicts. [Subclassing ``ObjectRef`` might avoid needing this.] .. py:function:: get_remote_fn(fn, **remote_opts) Cached retrieval of remote function. .. py:function:: get_fn_as_remote_object(fn) .. py:function:: get_deploy(**remote_opts) Alternative for 'non-function' callables - e.g. partial functions - pass the callable object too. .. py:class:: RayExecutor(*args, default_remote_opts=None, **kwargs) Basic ``concurrent.futures`` like interface using ``ray``. .. py:method:: _maybe_inject_remote_opts(remote_opts=None) Return the default remote options, possibly overriding some with those supplied by a ``submit call``. .. py:method:: submit(fn, *args, pure=False, remote_opts=None, **kwargs) Remotely run ``fn(*args, **kwargs)``, returning a ``RayFuture``. .. py:method:: map(func, *iterables, remote_opts=None) Remote map ``func`` over arguments ``iterables``. .. py:method:: scatter(data) Push ``data`` into the distributed store, returning an ``ObjectRef`` that can be supplied to ``submit`` calls for example. .. py:method:: shutdown() Shutdown the parent ray cluster, this ``RayExecutor`` instance itself does not need any cleanup. .. py:data:: _RAY_EXECUTOR .. py:function:: _get_pool_ray(n_workers=None, maybe_create=False) Maybe get an existing or create a new RayExecutor, thus initializing, ray. :param n_workers: The number of workers to request if creating a new client. :type n_workers: None or int, optional :param maybe_create: Whether to create initialize ray and return a RayExecutor if not initialized already. :type maybe_create: bool, optional :rtype: None or RayExecutor