Parallel execution¶
ParallelRunner is a thin wrapper over concurrent.futures.ThreadPoolExecutor / ProcessPoolExecutor that adds:
- Keyed task submission (
add(key, fn, *args, **kwargs)) and a keyed result dict fromrun(). - A background progress line (
success / failure / completion,exec/s, elapsed). - Exception capture — failed tasks store the exception instance as their result instead of raising.
- Initializer/initargs pass-through (handy for worker-local globals).
Thread pool example¶
from concurrent.futures import ThreadPoolExecutor
from vayu.parallel import ParallelRunner
def fetch(url: str) -> bytes:
...
runner = ParallelRunner(
n_workers=20,
executor_cls=ThreadPoolExecutor,
print_progress_every=2, # seconds
)
urls = [...]
for u in urls:
runner.add(key=u, func=fetch, url=u)
results = runner.run() # {url: bytes | Exception}
Process pool with an initializer¶
from concurrent.futures import ProcessPoolExecutor
from vayu.parallel import ParallelRunner
def init_worker():
global MODEL
MODEL = load_model_from_disk()
def score(record):
return MODEL.predict(record)
runner = ParallelRunner(
n_workers=8,
executor_cls=ProcessPoolExecutor,
initializer=init_worker,
)
for i, rec in enumerate(records):
runner.add(i, score, rec)
results = runner.run()
Semantics to know¶
- One shot.
run()is idempotent-unfriendly: calling it twice raises.add()afterrun()raises. - Exceptions are captured. If
report_exceptions=True(default), the exception is logged via thevayulogger, then stored in the result dict for that key. If you want failures to propagate, check results yourself: - Keyboard interrupt.
run()catchesKeyboardInterrupt, callsshutdown(cancel_futures=True), then re-raises.
Thread-safe / process-safe collections¶
SafeCounter and SafeDict pick the right implementation based on executor class:
from concurrent.futures import ProcessPoolExecutor
from vayu.parallel import SafeCounter, SafeDict
counter = SafeCounter.make(ProcessPoolExecutor) # uses multiprocessing.Manager
cache = SafeDict.make(ProcessPoolExecutor)
counter.increment("hits")
cache.set("answer", 42)
For in-process threads, SafeCounter.make(ThreadPoolExecutor) / SafeDict.make(ThreadPoolExecutor) return lock-guarded dict implementations (much cheaper than the Manager-backed versions).