cotengra.parallel

Interface for parallelism.

This module centralizes cotengra’s lightweight parallel execution logic. The main user entry point is parse_parallel_arg, which turns parallel arguments such as False, True, "auto", "threads", "loky", "concurrent.futures", "dask", and "ray" into a pool-like executor or None. Local process pools are cached and reused in the process that created them, while distributed backends try to discover an existing scheduler or runtime before creating one explicitly.

The "auto" mode remembers the last persistent non-thread backend and uses it for future automatic parallel work. To avoid recursive process pools, worker subprocesses are marked when tasks are submitted through submit; inside such workers parallel="auto" resolves to serial execution. A PID guard provides the same protection for forked children that inherit module state. Threads remain explicit-only and are not remembered as the automatic backend.

The helper functions submit, scatter, can_scatter, and should_nest provide a small common interface over local, dask, and ray executors while preserving backend-specific behavior where needed.

Attributes

Classes

CachedProcessPoolExecutor

CachedThreadPoolExecutor

RayFuture

Basic concurrent.futures like future wrapping a ray ObjectRef.

RayExecutor

Basic concurrent.futures like interface using ray.

Functions

_remember_auto_backend(backend)

choose_default_num_workers()

get_pool([n_workers, maybe_create, backend])

Get a parallel pool.

_infer_backed_cached(pool_class)

_infer_backend(pool)

Return the backend type of pool - cached for speed.

get_n_workers([pool])

Extract how many workers our pool has (mostly for working out how many

parse_parallel_arg(parallel)

set_parallel_backend(backend)

Create a parallel pool of type backend and register persistent

maybe_leave_pool(pool)

Logic required for nested parallelism in dask.distributed.

maybe_rejoin_pool(is_worker, pool)

Logic required for nested parallelism in dask.distributed.

_worker_call(fn, args, kwargs)

Run fn inside a worker, marking the process as a worker first.

submit(pool, fn, *args, **kwargs)

Interface for submitting fn(*args, **kwargs) to pool.

scatter(pool, data)

Interface for maybe turning data into a remote object or reference.

can_scatter(pool)

Whether pool can make objects remote.

should_nest(pool)

Given argument pool should we try nested parallelism.

get_loky_get_reusable_executor()

_get_process_pool_cf([n_workers])

_get_thread_pool_cf([n_workers])

_get_pool_dask([n_workers, maybe_create])

Maybe get an existing or create a new dask.distrbuted client.

_maybe_leave_pool_dask()

_rejoin_pool_dask()

get_ray()

_unpack_futures_tuple(x)

_unpack_futures_list(x)

_unpack_futures_dict(x)

_unpack_futures_identity(x)

_unpack_futures(x)

Allows passing futures by reference - takes e.g. args and kwargs and

get_remote_fn(fn, **remote_opts)

Cached retrieval of remote function.

get_fn_as_remote_object(fn)

get_deploy(**remote_opts)

Alternative for 'non-function' callables - e.g. partial

_get_pool_ray([n_workers, maybe_create])

Maybe get an existing or create a new RayExecutor, thus initializing,

Module Contents

cotengra.parallel._AUTO_BACKEND = None
cotengra.parallel._AUTO_BACKEND_PID = None
cotengra.parallel._IS_WORKER = False
cotengra.parallel._remember_auto_backend(backend)[source]
cotengra.parallel.have_loky
cotengra.parallel.have_joblib
cotengra.parallel._DEFAULT_BACKEND = 'loky'
cotengra.parallel.choose_default_num_workers()[source]
cotengra.parallel.get_pool(n_workers=None, maybe_create=False, backend=None)[source]

Get a parallel pool.

cotengra.parallel._infer_backed_cached(pool_class)[source]
cotengra.parallel._infer_backend(pool)[source]

Return the backend type of pool - cached for speed.

cotengra.parallel.get_n_workers(pool=None)[source]

Extract how many workers our pool has (mostly for working out how many tasks to pre-dispatch).

cotengra.parallel.parse_parallel_arg(parallel)[source]
cotengra.parallel.set_parallel_backend(backend)[source]

Create a parallel pool of type backend and register persistent backends as the default for 'auto' parallel.

The 'threads' backend remains explicit-only and is never remembered as the default for 'auto'.

cotengra.parallel.maybe_leave_pool(pool)[source]

Logic required for nested parallelism in dask.distributed.

cotengra.parallel.maybe_rejoin_pool(is_worker, pool)[source]

Logic required for nested parallelism in dask.distributed.

cotengra.parallel._worker_call(fn, args, kwargs)[source]

Run fn inside a worker, marking the process as a worker first.

The marker is intentionally sticky for the lifetime of the worker process: any later parallel="auto" work in the same subprocess should remain serial rather than creating nested process pools.

cotengra.parallel.submit(pool, fn, *args, **kwargs)[source]

Interface for submitting fn(*args, **kwargs) to pool.

cotengra.parallel.scatter(pool, data)[source]

Interface for maybe turning data into a remote object or reference.

cotengra.parallel.can_scatter(pool)[source]

Whether pool can make objects remote.

cotengra.parallel.should_nest(pool)[source]

Given argument pool should we try nested parallelism.

cotengra.parallel.get_loky_get_reusable_executor()[source]
class cotengra.parallel.CachedProcessPoolExecutor[source]
_pool = None
_n_workers = -1
_pid = None
__call__(n_workers=None)[source]
is_initialized()[source]
shutdown()[source]
__del__()[source]
cotengra.parallel.ProcessPoolHandler
cotengra.parallel._get_process_pool_cf(n_workers=None)[source]
class cotengra.parallel.CachedThreadPoolExecutor[source]
_pool = None
_n_workers = -1
_pid = None
__call__(n_workers=None)[source]
is_initialized()[source]
shutdown()[source]
__del__()[source]
cotengra.parallel.ThreadPoolHandler
cotengra.parallel._get_thread_pool_cf(n_workers=None)[source]
cotengra.parallel._get_pool_dask(n_workers=None, maybe_create=False)[source]

Maybe get an existing or create a new dask.distrbuted client.

Parameters:
  • n_workers (None or int, optional) – The number of workers to request if creating a new client.

  • maybe_create (bool, optional) – Whether to create an new local cluster and client if no existing client is found.

Return type:

None or dask.distributed.Client

cotengra.parallel._maybe_leave_pool_dask()[source]
cotengra.parallel._rejoin_pool_dask()[source]
cotengra.parallel.get_ray()[source]
class cotengra.parallel.RayFuture(obj)[source]

Basic concurrent.futures like future wrapping a ray ObjectRef.

__slots__ = ('_obj', '_cancelled')
_obj
_cancelled = False
result(timeout=None)[source]
done()[source]
cancel()[source]
cotengra.parallel._unpack_futures_tuple(x)[source]
cotengra.parallel._unpack_futures_list(x)[source]
cotengra.parallel._unpack_futures_dict(x)[source]
cotengra.parallel._unpack_futures_identity(x)[source]
cotengra.parallel._unpack_dispatch
cotengra.parallel._unpack_futures(x)[source]

Allows passing futures by reference - takes e.g. args and kwargs and replaces all RayFuture objects with their underyling ObjectRef within all nested tuples, lists and dicts.

[Subclassing ObjectRef might avoid needing this.]

cotengra.parallel.get_remote_fn(fn, **remote_opts)[source]

Cached retrieval of remote function.

cotengra.parallel.get_fn_as_remote_object(fn)[source]
cotengra.parallel.get_deploy(**remote_opts)[source]

Alternative for ‘non-function’ callables - e.g. partial functions - pass the callable object too.

class cotengra.parallel.RayExecutor(*args, default_remote_opts=None, **kwargs)[source]

Basic concurrent.futures like interface using ray.

default_remote_opts
_maybe_inject_remote_opts(remote_opts=None)[source]

Return the default remote options, possibly overriding some with those supplied by a submit call.

submit(fn, *args, pure=False, remote_opts=None, **kwargs)[source]

Remotely run fn(*args, **kwargs), returning a RayFuture.

map(func, *iterables, remote_opts=None)[source]

Remote map func over arguments iterables.

scatter(data)[source]

Push data into the distributed store, returning an ObjectRef that can be supplied to submit calls for example.

shutdown()[source]

Shutdown the parent ray cluster, this RayExecutor instance itself does not need any cleanup.

cotengra.parallel._RAY_EXECUTOR = None
cotengra.parallel._get_pool_ray(n_workers=None, maybe_create=False)[source]

Maybe get an existing or create a new RayExecutor, thus initializing, ray.

Parameters:
  • n_workers (None or int, optional) – The number of workers to request if creating a new client.

  • maybe_create (bool, optional) – Whether to create initialize ray and return a RayExecutor if not initialized already.

Return type:

None or RayExecutor