Skip to content

Parallel execution

ParallelRunner is a thin wrapper over concurrent.futures.ThreadPoolExecutor / ProcessPoolExecutor that adds:

  • Keyed task submission (add(key, fn, *args, **kwargs)) and a keyed result dict from run().
  • A background progress line (success / failure / completion, exec/s, elapsed).
  • Exception capture — failed tasks store the exception instance as their result instead of raising.
  • Initializer/initargs pass-through (handy for worker-local globals).

Thread pool example

from concurrent.futures import ThreadPoolExecutor
from vayu.parallel import ParallelRunner

def fetch(url: str) -> bytes:
    ...

runner = ParallelRunner(
    n_workers=20,
    executor_cls=ThreadPoolExecutor,
    print_progress_every=2,   # seconds
)

urls = [...]
for u in urls:
    runner.add(key=u, func=fetch, url=u)

results = runner.run()        # {url: bytes | Exception}

Process pool with an initializer

from concurrent.futures import ProcessPoolExecutor
from vayu.parallel import ParallelRunner

def init_worker():
    global MODEL
    MODEL = load_model_from_disk()

def score(record):
    return MODEL.predict(record)

runner = ParallelRunner(
    n_workers=8,
    executor_cls=ProcessPoolExecutor,
    initializer=init_worker,
)
for i, rec in enumerate(records):
    runner.add(i, score, rec)
results = runner.run()

Semantics to know

  • One shot. run() is idempotent-unfriendly: calling it twice raises. add() after run() raises.
  • Exceptions are captured. If report_exceptions=True (default), the exception is logged via the vayu logger, then stored in the result dict for that key. If you want failures to propagate, check results yourself:
    errors = {k: v for k, v in results.items() if isinstance(v, Exception)}
    
  • Keyboard interrupt. run() catches KeyboardInterrupt, calls shutdown(cancel_futures=True), then re-raises.

Thread-safe / process-safe collections

SafeCounter and SafeDict pick the right implementation based on executor class:

from concurrent.futures import ProcessPoolExecutor
from vayu.parallel import SafeCounter, SafeDict

counter = SafeCounter.make(ProcessPoolExecutor)   # uses multiprocessing.Manager
cache = SafeDict.make(ProcessPoolExecutor)

counter.increment("hits")
cache.set("answer", 42)

For in-process threads, SafeCounter.make(ThreadPoolExecutor) / SafeDict.make(ThreadPoolExecutor) return lock-guarded dict implementations (much cheaper than the Manager-backed versions).

See also