Async HTTP & signals¶
The aio module has a small set of utilities that make long-running async programs easier to write.
Fan out HTTP requests¶
grab_all_urls(key_url_map, ...) issues requests concurrently with a bounded semaphore and returns (status, body) or (None, error_str) per key:
import asyncio
from vayu.aio import grab_all_urls, RateLimit
from datetime import timedelta
urls = {f"user_{i}": f"https://api.example.com/users/{i}" for i in range(500)}
results = asyncio.run(
grab_all_urls(
urls,
concurrency=50,
timeout=10,
headers={"Authorization": "Bearer ..."},
)
)
for key, (status, body) in results.items():
...
Parameters worth knowing:
concurrency— both the semaphore limit and theaiohttpTCPConnectorlimit.timeout— per-request, seconds.headers,auth— passed through toaiohttp.ClientSession.rate_limit— aRateLimit(limit, period)dataclass (periodis atimedelta).print_progress— printsProgress: i/Non stderr, in-place via\r.
Errors are caught and stored as (None, "<exception repr>") rather than raised; you iterate the result dict to find failures.
Shutdown signals¶
attach_shutdown_signals(event) wires SIGINT / SIGTERM to set an asyncio.Event, so cooperating tasks can shut down cleanly:
import asyncio
from vayu.aio import attach_shutdown_signals, sleep_until_signal
async def main():
shutdown = asyncio.Event()
attach_shutdown_signals(shutdown)
while not shutdown.is_set():
await do_work()
# sleep 30s OR wake early if shutdown fires
woken_by_signal = await sleep_until_signal(30, shutdown)
if woken_by_signal:
break
asyncio.run(main())
sleep_until_signal(duration, event) returns True if the signal fired, False if the sleep completed normally. duration can be a number of seconds or a timedelta.
Race helper¶
get_first_and_cancel_rest(*tasks) waits for the first completed task and cancels the rest:
from vayu.aio import get_first_and_cancel_rest
first = await get_first_and_cancel_rest(fetch_a(), fetch_b(), fetch_c())
value = first.result()
Useful for "whoever answers first wins" patterns.