Skip to content

vayu.parallel

parallel

SafeCounter

Bases: ABC

Concurrency-safe keyed counter.

Two implementations are chosen by make(executor_cls): ThreadSafeCounter (lock-guarded dict) for threads and ProcessSafeCounter (multiprocessing.Manager dict) for processes.

make staticmethod

make(executor_cls: type) -> SafeCounter

Return a counter matched to the given executor class.

Raises:

Type Description
ValueError

If executor_cls isn't ThreadPoolExecutor or ProcessPoolExecutor.

Source code in vayu/parallel.py
@staticmethod
def make(executor_cls: type) -> "SafeCounter":
    """Return a counter matched to the given executor class.

    Raises:
        ValueError: If ``executor_cls`` isn't ``ThreadPoolExecutor`` or
            ``ProcessPoolExecutor``.
    """
    if executor_cls == ThreadPoolExecutor:
        return ThreadSafeCounter()
    elif executor_cls == ProcessPoolExecutor:
        return ProcessSafeCounter()
    else:
        raise ValueError(f"Invalid executor type {executor_cls}")

SafeDict

Bases: ABC

Concurrency-safe keyed dict.

Two implementations are chosen by make(executor_cls): ThreadSafeDict (lock-guarded dict) for threads and ProcessSafeDict (multiprocessing.Manager dict) for processes.

make staticmethod

make(executor_cls: type) -> SafeDict

Return a dict matched to the given executor class.

Raises:

Type Description
ValueError

If executor_cls isn't ThreadPoolExecutor or ProcessPoolExecutor.

Source code in vayu/parallel.py
@staticmethod
def make(executor_cls: type) -> "SafeDict":
    """Return a dict matched to the given executor class.

    Raises:
        ValueError: If ``executor_cls`` isn't ``ThreadPoolExecutor`` or
            ``ProcessPoolExecutor``.
    """
    if executor_cls == ThreadPoolExecutor:
        return ThreadSafeDict()
    elif executor_cls == ProcessPoolExecutor:
        return ProcessSafeDict()
    else:
        raise ValueError(f"Invalid executor type {executor_cls}")

ParallelRunner

ParallelRunner(
    n_workers: int = None,
    executor_cls: type = None,
    initializer: Callable = None,
    init_args: tuple = None,
    return_when: str = ALL_COMPLETED,
    print_progress_every: float = 5,
    report_exceptions: bool = True,
)

Simplified wrapper over ThreadPoolExecutor / ProcessPoolExecutor.

Adds keyed task submission, a background progress line, and captured exceptions (failed tasks store the exception as their result rather than raising through run()).

A single ParallelRunner instance runs once: you call add() any number of times, then exactly one run(). Calling add() after run(), or run() twice, raises RuntimeError.

Construct a runner.

Parameters:

Name Type Description Default
n_workers int

Worker count. Defaults to the executor's default.

None
executor_cls type

Either ThreadPoolExecutor or ProcessPoolExecutor. Required; passing anything else raises.

None
initializer Callable

Optional worker initializer (e.g. for loading process-local state).

None
init_args tuple

Positional args forwarded to initializer.

None
return_when str

Forwarded to concurrent.futures.wait. Defaults to ALL_COMPLETED.

ALL_COMPLETED
print_progress_every float

Progress-line refresh interval, seconds.

5
report_exceptions bool

If True, log failures via the vayu logger in addition to capturing them in the result dict.

True
Source code in vayu/parallel.py
def __init__(
    self,
    n_workers: int = None,
    executor_cls: type = None,
    initializer: Callable = None,
    init_args: tuple = None,
    return_when: str = ALL_COMPLETED,
    print_progress_every: float = 5,
    report_exceptions: bool = True,
):
    """Construct a runner.

    Args:
        n_workers: Worker count. Defaults to the executor's default.
        executor_cls: Either ``ThreadPoolExecutor`` or
            ``ProcessPoolExecutor``. Required; passing anything else raises.
        initializer: Optional worker initializer (e.g. for loading
            process-local state).
        init_args: Positional args forwarded to ``initializer``.
        return_when: Forwarded to ``concurrent.futures.wait``. Defaults to
            ``ALL_COMPLETED``.
        print_progress_every: Progress-line refresh interval, seconds.
        report_exceptions: If True, log failures via the ``vayu`` logger
            in addition to capturing them in the result dict.
    """
    if executor_cls not in (ThreadPoolExecutor, ProcessPoolExecutor):
        raise ValueError(f"Invalid executor type {executor_cls}")

    self._n_workers = n_workers
    self._return_when = return_when
    self._tasks = dict()
    self._counter = SafeCounter.make(executor_cls)
    self._executor_cls = executor_cls if executor_cls else ThreadPoolExecutor
    self._executor = None
    self._initializer = initializer
    self._init_args = init_args or ()
    self._print_progress_every = print_progress_every
    self._state = ParallelRunner.State.INIT
    self._completion_signal: threading.Event = threading.Event()
    self._n_tasks = 0
    self._report_exceptions = report_exceptions

add

add(key: Hashable, func: Callable, *args, **kwargs)

Queue a task to be run by run(). Keys must be unique; later adds overwrite.

Source code in vayu/parallel.py
def add(self, key: Hashable, func: Callable, *args, **kwargs):
    """Queue a task to be run by ``run()``. Keys must be unique; later adds overwrite."""
    if self._state != ParallelRunner.State.INIT:
        raise RuntimeError("Cannot add tasks after calling run()")
    self._tasks[key] = (func, args, kwargs)
    self._n_tasks += 1

run

run() -> Dict[Hashable, object]

Execute all queued tasks and return a dict of {key: result_or_exception}.

Exceptions from individual tasks are captured (not raised). If the calling thread gets a KeyboardInterrupt, the runner cancels pending futures and re-raises.

Source code in vayu/parallel.py
def run(self) -> Dict[Hashable, object]:
    """Execute all queued tasks and return a dict of ``{key: result_or_exception}``.

    Exceptions from individual tasks are captured (not raised). If the
    calling thread gets a ``KeyboardInterrupt``, the runner cancels pending
    futures and re-raises.
    """
    if self._state != ParallelRunner.State.INIT:
        raise RuntimeError("Cannot call run() more than once")
    self._state = ParallelRunner.State.RUNNING

    progress_thread = threading.Thread(target=self._print_progress)
    progress_thread.start()

    futures = dict()
    with self._executor_cls(
        max_workers=self._n_workers, initializer=self._initializer, initargs=self._init_args
    ) as ex:
        self._executor = ex
        for _ in range(len(self._tasks)):
            key, (func, args, kwargs) = self._tasks.popitem()
            futures[key] = ex.submit(
                self._run, self._counter, key, self._report_exceptions, func, *args, **kwargs
            )
        try:
            wait(list(futures.values()), return_when=self._return_when)
            print("Values fetched")
        except KeyboardInterrupt:
            print("Shutting down...")
            self.shutdown()
            raise
        finally:
            self._state = ParallelRunner.State.DONE
            self._completion_signal.set()
    return {key: future.result() for key, future in futures.items()}

shutdown

shutdown(wait=True, *, cancel_futures=True)

Stop the underlying executor. Passes through to Executor.shutdown.

Source code in vayu/parallel.py
def shutdown(self, wait=True, *, cancel_futures=True):
    """Stop the underlying executor. Passes through to ``Executor.shutdown``."""
    self._executor.shutdown(wait=wait, cancel_futures=cancel_futures)