Skip to content

Choosing asyncio.timeout vs wait_for

You have two ways to bound an async operation in time, and they are not interchangeable. asyncio.wait_for(aw, t) wraps a single awaitable — it cancels that awaitable on expiry, awaits it to completion, and raises TimeoutError. asyncio.timeout(t) (added in Python 3.11) is a context manager that bounds an arbitrary block of code, gives cleaner cancellation semantics, and composes by nesting. Reaching for wait_for when you really want to bound a multi-step block leads to either deeply nested wrappers or a timeout that silently doesn't cover the whole operation. This guide shows when each fits, how to express the same intent both ways, the absolute-deadline variant, and how to migrate older wait_for code to timeout cleanly.

Prerequisites

  • Python 3.11+asyncio.timeout() and asyncio.timeout_at() were added in 3.11. On 3.10 and earlier only wait_for exists.
  • Familiarity with the scheduled-cancellation model from the parent Timeouts & Deadlines overview and the broader Resilience, Cancellation & Error Handling execution model.
  • Comfort with try/except around await and with asyncio.run().

Step 1 — Wrap a Single Call With wait_for

When the unit you want to bound is exactly one awaitable, wait_for is the most direct expression. It cancels the awaitable on expiry and, crucially, awaits the cancelled awaitable before raising TimeoutError, so the inner coroutine has finished unwinding by the time you handle the error.

import asyncio

async def slow_call() -> str:
    await asyncio.sleep(5)
    return "done"

async def main() -> None:
    try:
        result = await asyncio.wait_for(slow_call(), timeout=1.0)
        print(result)
    except TimeoutError:
        # slow_call has already been cancelled AND awaited here.
        print("call exceeded 1s")

asyncio.run(main())

What to verify: the TimeoutError is raised about 1 second in, and slow_call is fully cancelled — add a finally inside it and confirm the finally ran before the except block printed.

Step 2 — Express the Same Bound With asyncio.timeout()

The context-manager form does the same job for one call but reads as a scope rather than a wrapper, and it converts the deadline cancellation into TimeoutError for you. For a single call the two are equivalent in effect; the difference shows up the moment you have more than one statement.

import asyncio

async def slow_call() -> str:
    await asyncio.sleep(5)
    return "done"

async def main() -> None:
    try:
        async with asyncio.timeout(1.0):
            result = await slow_call()
            print(result)
    except TimeoutError:
        print("block exceeded 1s")

asyncio.run(main())

What to verify: identical 1-second behaviour to Step 1. Note you did not have to pass the awaitable as an argument — the bound applies to whatever runs inside the async with.

Step 3 — Wrap a Multi-Step Block (Where timeout Shines)

This is the case wait_for handles badly. To bound three sequential calls as one operation with wait_for, you would wrap them in a helper coroutine just to have a single awaitable. asyncio.timeout() bounds the block directly — branches, loops, and all.

import asyncio

async def pipeline(req) -> dict:
    # One bound over the WHOLE sequence; no helper coroutine needed.
    async with asyncio.timeout(2.0):
        session = await open_session()
        user = await session.get_user(req["id"])
        if user["premium"]:
            await session.refresh_entitlements(user)
        return await session.render(user)

async def open_session(): ...

The wait_for equivalent forces an awkward inner coroutine:

async def _body(req): ...        # all three steps moved here
result = await asyncio.wait_for(_body(req), timeout=2.0)

What to verify: the whole block is cancelled at the 2-second mark regardless of which step is in flight, and you did not need to extract a helper just to get one awaitable.

Step 4 — Absolute Deadline With timeout_at

When the bound is "finish by wall-clock instant T" rather than "within N seconds from here," use asyncio.timeout_at(). This is the right tool for spreading one budget across sequential steps, because the deadline does not drift as each step consumes time.

import asyncio

async def handle(req, budget: float = 1.5) -> dict:
    loop = asyncio.get_running_loop()
    deadline = loop.time() + budget          # compute ONCE
    async with asyncio.timeout_at(deadline):
        auth = await authenticate(req)        # consumes part of the budget
        return await load(auth)               # gets only what's left

async def authenticate(req): ...
async def load(auth): ...

What to verify: if authenticate is slow, load gets correspondingly less time — the deadline is fixed, so the second step inherits the remaining budget rather than a fresh full timeout.

Step 5 — Handle TimeoutError and Ensure Cleanup Runs

Both APIs raise TimeoutError (since 3.11, asyncio.TimeoutError is an alias of the builtin TimeoutError). Catch it narrowly and let any other exception propagate. Cleanup in finally runs during the cancellation, so keep it short or shield it.

import asyncio

async def with_cleanup(resource) -> dict:
    handle = await resource.open()
    try:
        async with asyncio.timeout(1.0):
            return await resource.read(handle)
    except TimeoutError:
        return {"degraded": True}
    finally:
        # Runs even on timeout; keep it cheap or shield a critical part.
        await resource.close(handle)

async def main_resource(): ...

What to verify: on timeout you see the degraded result and resource.close ran. Catch only TimeoutError, never a bare except, or you will swallow the injected cancellation.

Verification

Confirm correct behaviour across all three signals:

  • TimeoutError fires at the right time. Instrument the elapsed time around the bound; it should be very close to the configured value, not noticeably longer (longer means a blocking call is delaying delivery) and not shorter (shorter means an inner bound or external cancel fired).
  • The inner task is fully cancelled, with no leak. After a wait_for timeout, the awaitable is guaranteed already awaited. After a timeout() block, assert len(asyncio.all_tasks()) returns to baseline — a lingering task means a child you spawned inside the block escaped the bound.
  • Cleanup completed. Any finally/__aexit__ you rely on should have run; verify with a flag set in the finalizer.

Pitfalls & Edge Cases

  • wait_for can swallow a result on a near-miss. If the awaitable finishes just as the timeout fires, wait_for may still raise TimeoutError and discard the completed result. For operations whose side effects matter, prefer timeout() around the call so the result is observable, or check idempotency.
  • Double timeout / redundant nesting. Wrapping a wait_for(x, 1.0) inside asyncio.timeout(2.0) is two competing bounds on one call; the tighter one (1.0) always wins and the outer is dead weight. Use one bound per call and reserve nesting for genuinely different scopes (per-call vs whole-operation).
  • A blocking/sync call ignores both. Neither API can interrupt synchronous work — time.sleep, requests.get, a CPU loop — because cancellation is delivered only at await points. Move such work off the loop with asyncio.to_thread() so it sits behind a real await the bound can reach.
  • Catching CancelledError too broadly defeats the timeout. A wait_for or timeout() works by injecting a CancelledError; an except Exception or bare except inside the bounded coroutine that suppresses it makes the operation un-cancellable and the deadline never takes effect. Let CancelledError/BaseException propagate. See the parent Timeouts & Deadlines overview for the full cancellation-safety treatment.
  • The 3.11 behaviour change for wait_for. Before 3.11, wait_for had edge cases where a cancellation racing the timeout could leave the inner task not fully awaited. From 3.11 it reliably cancels and awaits the inner awaitable before raising. If you are migrating from 3.10, do not rely on the old timing; the modern guarantee is stronger and matches timeout().

Frequently Asked Questions

When should I use asyncio.timeout instead of wait_for?

Use asyncio.timeout() when you need to bound an arbitrary block of code rather than a single awaitable, when you want to nest per-call and whole-operation deadlines, or when an absolute deadline via timeout_at is more natural. Use wait_for when the unit is exactly one awaitable and you want it cancelled and awaited before TimeoutError is raised.

Do asyncio.timeout and wait_for raise the same exception?

Yes. Both raise TimeoutError on expiry. Since Python 3.11 asyncio.TimeoutError is an alias of the builtin TimeoutError, so you can catch the builtin in both cases.

What changed about wait_for in Python 3.11?

From 3.11, wait_for reliably cancels the inner awaitable and awaits it to completion before raising TimeoutError, closing earlier edge cases where a cancellation racing the timeout could leave the inner task not fully awaited. The guarantee now matches asyncio.timeout().

Choosing wait_for vs timeout vs timeout_at A decision tree: bounding one awaitable points to wait_for; bounding a block or needing nesting points to asyncio.timeout; an absolute wall-clock deadline points to asyncio.timeout_at. Which Timeout API? What are you bounding? one call, or a block? A single awaitable cancel + await it A block / nesting many awaits, branches A wall-clock instant shared budget wait_for(aw, t) asyncio.timeout(t) timeout_at(T)