Skip to content

Async HTTP & signals

The aio module has a small set of utilities that make long-running async programs easier to write.

Fan out HTTP requests

grab_all_urls(key_url_map, ...) issues requests concurrently with a bounded semaphore and returns (status, body) or (None, error_str) per key:

import asyncio
from vayu.aio import grab_all_urls, RateLimit
from datetime import timedelta

urls = {f"user_{i}": f"https://api.example.com/users/{i}" for i in range(500)}

results = asyncio.run(
    grab_all_urls(
        urls,
        concurrency=50,
        timeout=10,
        headers={"Authorization": "Bearer ..."},
    )
)

for key, (status, body) in results.items():
    ...

Parameters worth knowing:

  • concurrency — both the semaphore limit and the aiohttp TCPConnector limit.
  • timeout — per-request, seconds.
  • headers, auth — passed through to aiohttp.ClientSession.
  • rate_limit — a RateLimit(limit, period) dataclass (period is a timedelta).
  • print_progress — prints Progress: i/N on stderr, in-place via \r.

Errors are caught and stored as (None, "<exception repr>") rather than raised; you iterate the result dict to find failures.

Shutdown signals

attach_shutdown_signals(event) wires SIGINT / SIGTERM to set an asyncio.Event, so cooperating tasks can shut down cleanly:

import asyncio
from vayu.aio import attach_shutdown_signals, sleep_until_signal

async def main():
    shutdown = asyncio.Event()
    attach_shutdown_signals(shutdown)

    while not shutdown.is_set():
        await do_work()
        # sleep 30s OR wake early if shutdown fires
        woken_by_signal = await sleep_until_signal(30, shutdown)
        if woken_by_signal:
            break

asyncio.run(main())

sleep_until_signal(duration, event) returns True if the signal fired, False if the sleep completed normally. duration can be a number of seconds or a timedelta.

Race helper

get_first_and_cancel_rest(*tasks) waits for the first completed task and cancels the rest:

from vayu.aio import get_first_and_cancel_rest

first = await get_first_and_cancel_rest(fetch_a(), fetch_b(), fetch_c())
value = first.result()

Useful for "whoever answers first wins" patterns.

See also