cotengra.parallel#

Interface for parallelism.

Module Contents#

Classes#

CachedProcessPoolExecutor

RayFuture

Basic concurrent.futures like future wrapping a ray ObjectRef.

RayExecutor

Basic concurrent.futures like interface using ray.

Functions#

get_pool([n_workers, maybe_create, backend])

Get a parallel pool.

_infer_backed_cached(pool_class)

_infer_backend(pool)

Return the backend type of pool - cached for speed.

get_n_workers([pool])

Extract how many workers our pool has (mostly for working out how many

parse_parallel_arg(parallel)

set_parallel_backend(backend)

Create a parallel pool of type backend which registers it as the

maybe_leave_pool(pool)

Logic required for nested parallelism in dask.distributed.

maybe_rejoin_pool(is_worker, pool)

Logic required for nested parallelism in dask.distributed.

submit(pool, fn, *args, **kwargs)

Interface for submitting fn(*args, **kwargs) to pool.

scatter(pool, data)

Interface for maybe turning data into a remote object or reference.

can_scatter(pool)

Whether pool can make objects remote.

should_nest(pool)

Given argument pool should we try nested parallelism.

get_loky_get_reusable_executor()

_shutdown_cached_process_pool()

_get_pool_cf([n_workers])

_get_pool_dask([n_workers, maybe_create])

Maybe get an existing or create a new dask.distrbuted client.

_maybe_leave_pool_dask()

_rejoin_pool_dask()

get_ray()

_unpack_futures_tuple(x)

_unpack_futures_list(x)

_unpack_futures_dict(x)

_unpack_futures_identity(x)

_unpack_futures(x)

Allows passing futures by reference - takes e.g. args and kwargs and

get_remote_fn(fn, **remote_opts)

Cached retrieval of remote function.

get_fn_as_remote_object(fn)

get_deploy(**remote_opts)

Alternative for 'non-function' callables - e.g. partial

_get_pool_ray([n_workers, maybe_create])

Maybe get an existing or create a new RayExecutor, thus initializing,

Attributes#

cotengra.parallel._AUTO_BACKEND#
cotengra.parallel._DEFAULT_BACKEND = 'concurrent.futures'#
cotengra.parallel.get_pool(n_workers=None, maybe_create=False, backend=None)[source]#

Get a parallel pool.

cotengra.parallel._infer_backed_cached(pool_class)[source]#
cotengra.parallel._infer_backend(pool)[source]#

Return the backend type of pool - cached for speed.

cotengra.parallel.get_n_workers(pool=None)[source]#

Extract how many workers our pool has (mostly for working out how many tasks to pre-dispatch).

cotengra.parallel.parse_parallel_arg(parallel)[source]#
cotengra.parallel.set_parallel_backend(backend)[source]#

Create a parallel pool of type backend which registers it as the default for 'auto' parallel.

cotengra.parallel.maybe_leave_pool(pool)[source]#

Logic required for nested parallelism in dask.distributed.

cotengra.parallel.maybe_rejoin_pool(is_worker, pool)[source]#

Logic required for nested parallelism in dask.distributed.

cotengra.parallel.submit(pool, fn, *args, **kwargs)[source]#

Interface for submitting fn(*args, **kwargs) to pool.

cotengra.parallel.scatter(pool, data)[source]#

Interface for maybe turning data into a remote object or reference.

cotengra.parallel.can_scatter(pool)[source]#

Whether pool can make objects remote.

cotengra.parallel.should_nest(pool)[source]#

Given argument pool should we try nested parallelism.

cotengra.parallel.get_loky_get_reusable_executor()[source]#
class cotengra.parallel.CachedProcessPoolExecutor[source]#
__call__(n_workers=None)[source]#
is_initialized()[source]#
shutdown()[source]#
__del__()[source]#
cotengra.parallel.PoolHandler#
cotengra.parallel._shutdown_cached_process_pool()[source]#
cotengra.parallel._get_pool_cf(n_workers=None)[source]#
cotengra.parallel._get_pool_dask(n_workers=None, maybe_create=False)[source]#

Maybe get an existing or create a new dask.distrbuted client.

Parameters:
  • n_workers (None or int, optional) – The number of workers to request if creating a new client.

  • maybe_create (bool, optional) – Whether to create an new local cluster and client if no existing client is found.

Return type:

None or dask.distributed.Client

cotengra.parallel._maybe_leave_pool_dask()[source]#
cotengra.parallel._rejoin_pool_dask()[source]#
cotengra.parallel.get_ray()[source]#
class cotengra.parallel.RayFuture(obj)[source]#

Basic concurrent.futures like future wrapping a ray ObjectRef.

__slots__ = ('_obj', '_cancelled')#
result(timeout=None)[source]#
done()[source]#
cancel()[source]#
cotengra.parallel._unpack_futures_tuple(x)[source]#
cotengra.parallel._unpack_futures_list(x)[source]#
cotengra.parallel._unpack_futures_dict(x)[source]#
cotengra.parallel._unpack_futures_identity(x)[source]#
cotengra.parallel._unpack_dispatch#
cotengra.parallel._unpack_futures(x)[source]#

Allows passing futures by reference - takes e.g. args and kwargs and replaces all RayFuture objects with their underyling ObjectRef within all nested tuples, lists and dicts.

[Subclassing ObjectRef might avoid needing this.]

cotengra.parallel.get_remote_fn(fn, **remote_opts)[source]#

Cached retrieval of remote function.

cotengra.parallel.get_fn_as_remote_object(fn)[source]#
cotengra.parallel.get_deploy(**remote_opts)[source]#

Alternative for ‘non-function’ callables - e.g. partial functions - pass the callable object too.

class cotengra.parallel.RayExecutor(*args, default_remote_opts=None, **kwargs)[source]#

Basic concurrent.futures like interface using ray.

_maybe_inject_remote_opts(remote_opts=None)[source]#

Return the default remote options, possibly overriding some with those supplied by a submit call.

submit(fn, *args, pure=False, remote_opts=None, **kwargs)[source]#

Remotely run fn(*args, **kwargs), returning a RayFuture.

map(func, *iterables, remote_opts=None)[source]#

Remote map func over arguments iterables.

scatter(data)[source]#

Push data into the distributed store, returning an ObjectRef that can be supplied to submit calls for example.

shutdown()[source]#

Shutdown the parent ray cluster, this RayExecutor instance itself does not need any cleanup.

cotengra.parallel._RAY_EXECUTOR#
cotengra.parallel._get_pool_ray(n_workers=None, maybe_create=False)[source]#

Maybe get an existing or create a new RayExecutor, thus initializing, ray.

Parameters:
  • n_workers (None or int, optional) – The number of workers to request if creating a new client.

  • maybe_create (bool, optional) – Whether to create initialize ray and return a RayExecutor if not initialized already.

Return type:

None or RayExecutor