cotengra.parallel

Interface for parallelism.

Attributes

Classes

CachedProcessPoolExecutor

CachedThreadPoolExecutor

RayFuture

Basic concurrent.futures like future wrapping a ray ObjectRef.

RayExecutor

Basic concurrent.futures like interface using ray.

Functions

choose_default_num_workers()

get_pool([n_workers, maybe_create, backend])

Get a parallel pool.

_infer_backed_cached(pool_class)

_infer_backend(pool)

Return the backend type of pool - cached for speed.

get_n_workers([pool])

Extract how many workers our pool has (mostly for working out how many

parse_parallel_arg(parallel)

set_parallel_backend(backend)

Create a parallel pool of type backend which registers it as the

maybe_leave_pool(pool)

Logic required for nested parallelism in dask.distributed.

maybe_rejoin_pool(is_worker, pool)

Logic required for nested parallelism in dask.distributed.

submit(pool, fn, *args, **kwargs)

Interface for submitting fn(*args, **kwargs) to pool.

scatter(pool, data)

Interface for maybe turning data into a remote object or reference.

can_scatter(pool)

Whether pool can make objects remote.

should_nest(pool)

Given argument pool should we try nested parallelism.

get_loky_get_reusable_executor()

_shutdown_cached_process_pool()

_get_process_pool_cf([n_workers])

_shutdown_cached_thread_pool()

_get_thread_pool_cf([n_workers])

_get_pool_dask([n_workers, maybe_create])

Maybe get an existing or create a new dask.distrbuted client.

_maybe_leave_pool_dask()

_rejoin_pool_dask()

get_ray()

_unpack_futures_tuple(x)

_unpack_futures_list(x)

_unpack_futures_dict(x)

_unpack_futures_identity(x)

_unpack_futures(x)

Allows passing futures by reference - takes e.g. args and kwargs and

get_remote_fn(fn, **remote_opts)

Cached retrieval of remote function.

get_fn_as_remote_object(fn)

get_deploy(**remote_opts)

Alternative for 'non-function' callables - e.g. partial

_get_pool_ray([n_workers, maybe_create])

Maybe get an existing or create a new RayExecutor, thus initializing,

Module Contents

cotengra.parallel._AUTO_BACKEND = None
cotengra.parallel._DEFAULT_BACKEND = 'concurrent.futures'
cotengra.parallel.choose_default_num_workers()[source]
cotengra.parallel.get_pool(n_workers=None, maybe_create=False, backend=None)[source]

Get a parallel pool.

cotengra.parallel._infer_backed_cached(pool_class)[source]
cotengra.parallel._infer_backend(pool)[source]

Return the backend type of pool - cached for speed.

cotengra.parallel.get_n_workers(pool=None)[source]

Extract how many workers our pool has (mostly for working out how many tasks to pre-dispatch).

cotengra.parallel.parse_parallel_arg(parallel)[source]
cotengra.parallel.set_parallel_backend(backend)[source]

Create a parallel pool of type backend which registers it as the default for 'auto' parallel.

cotengra.parallel.maybe_leave_pool(pool)[source]

Logic required for nested parallelism in dask.distributed.

cotengra.parallel.maybe_rejoin_pool(is_worker, pool)[source]

Logic required for nested parallelism in dask.distributed.

cotengra.parallel.submit(pool, fn, *args, **kwargs)[source]

Interface for submitting fn(*args, **kwargs) to pool.

cotengra.parallel.scatter(pool, data)[source]

Interface for maybe turning data into a remote object or reference.

cotengra.parallel.can_scatter(pool)[source]

Whether pool can make objects remote.

cotengra.parallel.should_nest(pool)[source]

Given argument pool should we try nested parallelism.

cotengra.parallel.get_loky_get_reusable_executor()[source]
class cotengra.parallel.CachedProcessPoolExecutor[source]
__call__(n_workers=None)[source]
is_initialized()[source]
shutdown()[source]
__del__()[source]
cotengra.parallel.ProcessPoolHandler
cotengra.parallel._shutdown_cached_process_pool()[source]
cotengra.parallel._get_process_pool_cf(n_workers=None)[source]
class cotengra.parallel.CachedThreadPoolExecutor[source]
__call__(n_workers=None)[source]
is_initialized()[source]
shutdown()[source]
__del__()[source]
cotengra.parallel.ThreadPoolHandler
cotengra.parallel._shutdown_cached_thread_pool()[source]
cotengra.parallel._get_thread_pool_cf(n_workers=None)[source]
cotengra.parallel._get_pool_dask(n_workers=None, maybe_create=False)[source]

Maybe get an existing or create a new dask.distrbuted client.

Parameters:
  • n_workers (None or int, optional) – The number of workers to request if creating a new client.

  • maybe_create (bool, optional) – Whether to create an new local cluster and client if no existing client is found.

Return type:

None or dask.distributed.Client

cotengra.parallel._maybe_leave_pool_dask()[source]
cotengra.parallel._rejoin_pool_dask()[source]
cotengra.parallel.get_ray()[source]
class cotengra.parallel.RayFuture(obj)[source]

Basic concurrent.futures like future wrapping a ray ObjectRef.

__slots__ = ('_obj', '_cancelled')
result(timeout=None)[source]
done()[source]
cancel()[source]
cotengra.parallel._unpack_futures_tuple(x)[source]
cotengra.parallel._unpack_futures_list(x)[source]
cotengra.parallel._unpack_futures_dict(x)[source]
cotengra.parallel._unpack_futures_identity(x)[source]
cotengra.parallel._unpack_dispatch
cotengra.parallel._unpack_futures(x)[source]

Allows passing futures by reference - takes e.g. args and kwargs and replaces all RayFuture objects with their underyling ObjectRef within all nested tuples, lists and dicts.

[Subclassing ObjectRef might avoid needing this.]

cotengra.parallel.get_remote_fn(fn, **remote_opts)[source]

Cached retrieval of remote function.

cotengra.parallel.get_fn_as_remote_object(fn)[source]
cotengra.parallel.get_deploy(**remote_opts)[source]

Alternative for ‘non-function’ callables - e.g. partial functions - pass the callable object too.

class cotengra.parallel.RayExecutor(*args, default_remote_opts=None, **kwargs)[source]

Basic concurrent.futures like interface using ray.

_maybe_inject_remote_opts(remote_opts=None)[source]

Return the default remote options, possibly overriding some with those supplied by a submit call.

submit(fn, *args, pure=False, remote_opts=None, **kwargs)[source]

Remotely run fn(*args, **kwargs), returning a RayFuture.

map(func, *iterables, remote_opts=None)[source]

Remote map func over arguments iterables.

scatter(data)[source]

Push data into the distributed store, returning an ObjectRef that can be supplied to submit calls for example.

shutdown()[source]

Shutdown the parent ray cluster, this RayExecutor instance itself does not need any cleanup.

cotengra.parallel._RAY_EXECUTOR = None
cotengra.parallel._get_pool_ray(n_workers=None, maybe_create=False)[source]

Maybe get an existing or create a new RayExecutor, thus initializing, ray.

Parameters:
  • n_workers (None or int, optional) – The number of workers to request if creating a new client.

  • maybe_create (bool, optional) – Whether to create initialize ray and return a RayExecutor if not initialized already.

Return type:

None or RayExecutor