Skip to content

vayu.aio

aio

RateLimit dataclass

RateLimit(limit: int, period: timedelta)

A cap of limit requests per period.

Attributes:

Name Type Description
limit int

Maximum number of requests allowed in each period.

period timedelta

The rolling window, as a timedelta.

get_first_and_cancel_rest async

get_first_and_cancel_rest(
    *tasks: Union[Task, Coroutine]
) -> Task

Await the first task to complete and cancel the rest.

Useful for "whoever answers first wins" patterns (e.g. racing a sleep against a shutdown event, or racing multiple identical HTTP calls).

Parameters:

Name Type Description Default
*tasks Union[Task, Coroutine]

Tasks or coroutines to race.

()

Returns:

Type Description
Task

The completed task. Call .result() on it to get the value.

Source code in vayu/aio.py
async def get_first_and_cancel_rest(*tasks: Union[asyncio.Task, Coroutine]) -> asyncio.Task:
    """Await the first task to complete and cancel the rest.

    Useful for "whoever answers first wins" patterns (e.g. racing a sleep
    against a shutdown event, or racing multiple identical HTTP calls).

    Args:
        *tasks: Tasks or coroutines to race.

    Returns:
        The completed task. Call ``.result()`` on it to get the value.
    """
    done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
    for task in pending:
        task.cancel()

    return done.pop()

sleep_until_signal async

sleep_until_signal(
    duration: Union[timedelta, Number], signal: Event
) -> bool

Sleep for duration or until signal fires, whichever is first.

Parameters:

Name Type Description Default
duration Union[timedelta, Number]

A timedelta or a number of seconds.

required
signal Event

An event that, when set, wakes the sleeper early.

required

Returns:

Type Description
bool

True if the signal fired before the sleep elapsed, False otherwise.

Source code in vayu/aio.py
async def sleep_until_signal(duration: Union[timedelta, Number], signal: asyncio.Event) -> bool:
    """Sleep for ``duration`` or until ``signal`` fires, whichever is first.

    Args:
        duration: A ``timedelta`` or a number of seconds.
        signal: An event that, when set, wakes the sleeper early.

    Returns:
        ``True`` if the signal fired before the sleep elapsed, ``False`` otherwise.
    """
    if isinstance(duration, (int, float)):
        duration = timedelta(seconds=duration)

    sleep_task = asyncio.create_task(asyncio.sleep(duration.total_seconds()))
    signal_task = asyncio.create_task(signal.wait())
    done = await get_first_and_cancel_rest(sleep_task, signal_task)
    return done is signal_task

attach_shutdown_signals

attach_shutdown_signals(
    shutdown: Event, signals: Optional[List] = None
)

Wire OS signals to an asyncio.Event so cooperating tasks can shut down cleanly.

Defaults to SIGINT and SIGTERM. Must be called from within a running event loop.

Parameters:

Name Type Description Default
shutdown Event

The event to set when any of the signals is received.

required
signals Optional[List]

Signals to attach. Defaults to [SIGINT, SIGTERM].

None
Source code in vayu/aio.py
def attach_shutdown_signals(shutdown: asyncio.Event, signals: Optional[List] = None):
    """Wire OS signals to an ``asyncio.Event`` so cooperating tasks can shut down cleanly.

    Defaults to ``SIGINT`` and ``SIGTERM``. Must be called from within a running
    event loop.

    Args:
        shutdown: The event to set when any of the signals is received.
        signals: Signals to attach. Defaults to ``[SIGINT, SIGTERM]``.
    """
    signals = signals or [SIGINT, SIGTERM]
    loop = asyncio.get_running_loop()

    def set_signal():
        shutdown.set()

    for signal in signals:
        loop.add_signal_handler(signal, set_signal)

grab_all_urls async

grab_all_urls(
    key_url_map: Dict[Any, str],
    concurrency: int = 100,
    timeout: int = 15,
    print_progress: bool = True,
    headers: Optional[Dict[str, str]] = None,
    auth: Optional[BasicAuth] = None,
    rate_limit: Optional[RateLimit] = None,
) -> Dict[Any, Any]

Fetch many URLs concurrently and return the responses keyed the same way.

Errors are captured, not raised: failed keys land in the result dict as (None, "<exception repr>") so the caller can inspect or retry them.

Parameters:

Name Type Description Default
key_url_map Dict[Any, str]

Mapping from caller-chosen key to URL.

required
concurrency int

Upper bound on in-flight requests (also the aiohttp TCPConnector limit).

100
timeout int

Per-request timeout in seconds.

15
print_progress bool

If True, prints Progress: i/N in-place via \r.

True
headers Optional[Dict[str, str]]

Optional headers forwarded to every request.

None
auth Optional[BasicAuth]

Optional aiohttp.BasicAuth forwarded to every request.

None
rate_limit Optional[RateLimit]

Reserved for future use.

None

Returns:

Type Description
Dict[Any, Any]

A dict keyed the same as key_url_map. Each value is

Dict[Any, Any]

(status_code, body_bytes) on success or (None, error_str)

Dict[Any, Any]

on failure.

Source code in vayu/aio.py
async def grab_all_urls(
    key_url_map: Dict[Any, str],
    concurrency: int = 100,
    timeout: int = 15,
    print_progress: bool = True,
    headers: Optional[Dict[str, str]] = None,
    auth: Optional[aiohttp.BasicAuth] = None,
    rate_limit: Optional[RateLimit] = None,
) -> Dict[Any, Any]:
    """Fetch many URLs concurrently and return the responses keyed the same way.

    Errors are captured, not raised: failed keys land in the result dict as
    ``(None, "<exception repr>")`` so the caller can inspect or retry them.

    Args:
        key_url_map: Mapping from caller-chosen key to URL.
        concurrency: Upper bound on in-flight requests (also the ``aiohttp``
            ``TCPConnector`` limit).
        timeout: Per-request timeout in seconds.
        print_progress: If True, prints ``Progress: i/N`` in-place via ``\\r``.
        headers: Optional headers forwarded to every request.
        auth: Optional ``aiohttp.BasicAuth`` forwarded to every request.
        rate_limit: Reserved for future use.

    Returns:
        A dict keyed the same as ``key_url_map``. Each value is
        ``(status_code, body_bytes)`` on success or ``(None, error_str)``
        on failure.
    """
    results = {}
    tasks = []

    semaphore = asyncio.Semaphore(concurrency)

    async def fetch_url(session, url, key, results: Dict[Any, Any], timeout):
        async with semaphore:
            try:
                async with session.get(url, timeout=timeout) as response:
                    results[key] = (response.status, await response.read())
            except Exception as e:
                results[key] = None, str(e)

    async with aiohttp.ClientSession(
        connector=TCPConnector(limit=concurrency), headers=headers, auth=auth
    ) as session:
        for key, url in key_url_map.items():
            task = fetch_url(session, url, key, results, timeout)
            tasks.append(task)

        for i, task in enumerate(asyncio.as_completed(tasks)):
            await task
            if print_progress:
                print(f"Progress: {i+1}/{len(tasks)}", end="\r")

    return results