cotengra.parallel#

Module Contents#

Classes#

CachedProcessPoolExecutor

RayFuture

Basic concurrent.futures like future wrapping a ray ObjectRef.

RayExecutor

Basic concurrent.futures like interface using ray.

Functions#

get_pool([n_workers, maybe_create, backend])

Get a parallel pool.

_infer_backed_cached(pool_class)

_infer_backend(pool)

Return the backend type of pool - cached for speed.

get_n_workers([pool])

Extract how many workers our pool has (mostly for working out how many

parse_parallel_arg(parallel)

set_parallel_backend(backend)

Create a parallel pool of type backend which registers it as the

maybe_leave_pool(pool)

Logic required for nested parallelism in dask.distributed.

maybe_rejoin_pool(is_worker, pool)

Logic required for nested parallelism in dask.distributed.

submit(pool, fn, *args, **kwargs)

Interface for submitting fn(*args, **kwargs) to pool.

scatter(pool, data)

Interface for maybe turning data into a remote object or reference.

can_scatter(pool)

Whether pool can make objects remote.

should_nest(pool)

Given argument pool should we try nested parallelism.

get_loky_get_reusable_executor()

_shutdown_cached_process_pool()

_get_pool_cf([n_workers])

_get_pool_dask([n_workers, maybe_create])

Maybe get an existing or create a new dask.distrbuted client.

_maybe_leave_pool_dask()

_rejoin_pool_dask()

get_ray()

_unpack_futures_tuple(x)

_unpack_futures_list(x)

_unpack_futures_dict(x)

_unpack_futures_identity(x)

_unpack_futures(x)

Allows passing futures by reference - takes e.g. args and kwargs and

get_remote_fn(fn, **remote_opts)

Cached retrieval of remote function.

get_fn_as_remote_object(fn)

get_deploy(**remote_opts)

Alternative for 'non-function' callables - e.g. partial

_get_pool_ray([n_workers, maybe_create])

Maybe get an existing or create a new RayExecutor, thus initializing,

Attributes#

cotengra.parallel._AUTO_BACKEND#
cotengra.parallel._DEFAULT_BACKEND = concurrent.futures#
cotengra.parallel.get_pool(n_workers=None, maybe_create=False, backend=None)#

Get a parallel pool.

cotengra.parallel._infer_backed_cached(pool_class)#
cotengra.parallel._infer_backend(pool)#

Return the backend type of pool - cached for speed.

cotengra.parallel.get_n_workers(pool=None)#

Extract how many workers our pool has (mostly for working out how many tasks to pre-dispatch).

cotengra.parallel.parse_parallel_arg(parallel)#
cotengra.parallel.set_parallel_backend(backend)#

Create a parallel pool of type backend which registers it as the default for 'auto' parallel.

cotengra.parallel.maybe_leave_pool(pool)#

Logic required for nested parallelism in dask.distributed.

cotengra.parallel.maybe_rejoin_pool(is_worker, pool)#

Logic required for nested parallelism in dask.distributed.

cotengra.parallel.submit(pool, fn, *args, **kwargs)#

Interface for submitting fn(*args, **kwargs) to pool.

cotengra.parallel.scatter(pool, data)#

Interface for maybe turning data into a remote object or reference.

cotengra.parallel.can_scatter(pool)#

Whether pool can make objects remote.

cotengra.parallel.should_nest(pool)#

Given argument pool should we try nested parallelism.

cotengra.parallel.get_loky_get_reusable_executor()#
class cotengra.parallel.CachedProcessPoolExecutor#
__call__(n_workers=None)#
is_initialized()#
shutdown()#
__del__()#
cotengra.parallel.PoolHandler#
cotengra.parallel._shutdown_cached_process_pool()#
cotengra.parallel._get_pool_cf(n_workers=None)#
cotengra.parallel._get_pool_dask(n_workers=None, maybe_create=False)#

Maybe get an existing or create a new dask.distrbuted client.

Parameters:
  • n_workers (None or int, optional) – The number of workers to request if creating a new client.

  • maybe_create (bool, optional) – Whether to create an new local cluster and client if no existing client is found.

Return type:

None or dask.distributed.Client

cotengra.parallel._maybe_leave_pool_dask()#
cotengra.parallel._rejoin_pool_dask()#
cotengra.parallel.get_ray()#
class cotengra.parallel.RayFuture(obj)#

Basic concurrent.futures like future wrapping a ray ObjectRef.

__slots__ = ['_obj', '_cancelled']#
result(timeout=None)#
done()#
cancel()#
cotengra.parallel._unpack_futures_tuple(x)#
cotengra.parallel._unpack_futures_list(x)#
cotengra.parallel._unpack_futures_dict(x)#
cotengra.parallel._unpack_futures_identity(x)#
cotengra.parallel._unpack_dispatch#
cotengra.parallel._unpack_futures(x)#

Allows passing futures by reference - takes e.g. args and kwargs and replaces all RayFuture objects with their underyling ObjectRef within all nested tuples, lists and dicts.

[Subclassing ObjectRef might avoid needing this.]

cotengra.parallel.get_remote_fn(fn, **remote_opts)#

Cached retrieval of remote function.

cotengra.parallel.get_fn_as_remote_object(fn)#
cotengra.parallel.get_deploy(**remote_opts)#

Alternative for ‘non-function’ callables - e.g. partial functions - pass the callable object too.

class cotengra.parallel.RayExecutor(*args, default_remote_opts=None, **kwargs)#

Basic concurrent.futures like interface using ray.

_maybe_inject_remote_opts(remote_opts=None)#

Return the default remote options, possibly overriding some with those supplied by a submit call.

submit(fn, *args, pure=False, remote_opts=None, **kwargs)#

Remotely run fn(*args, **kwargs), returning a RayFuture.

map(func, *iterables, remote_opts=None)#

Remote map func over arguments iterables.

scatter(data)#

Push data into the distributed store, returning an ObjectRef that can be supplied to submit calls for example.

shutdown()#

Shutdown the parent ray cluster, this RayExecutor instance itself does not need any cleanup.

cotengra.parallel._RAY_EXECUTOR#
cotengra.parallel._get_pool_ray(n_workers=None, maybe_create=False)#

Maybe get an existing or create a new RayExecutor, thus initializing, ray.

Parameters:
  • n_workers (None or int, optional) – The number of workers to request if creating a new client.

  • maybe_create (bool, optional) – Whether to create initialize ray and return a RayExecutor if not initialized already.

Return type:

None or RayExecutor