Skip to content

Async Context Managers & Iterators

Async context managers and iterators extend Python's synchronous resource-management and iteration protocols into the non-blocking domain. They are the load-bearing primitives behind every well-behaved async client: a connection pool that releases sockets on cancellation, a paginated API reader that closes its HTTP session deterministically, a streaming consumer that applies backpressure instead of buffering unboundedly. This reference details the underlying async protocols, the scheduling boundaries they create, and the cleanup discipline required to build high-throughput components that never leak file descriptors, sockets, or generator frames.

The hard part is not the syntax. async with and async for are mechanically simple. The difficulty is correctness under cancellation: an __aexit__ that swallows CancelledError, an async generator abandoned mid-iteration so its finally block never runs, or a loop.run_in_executor teardown that outlives the loop it was scheduled on. Get those wrong and the failure is silent — a slow file-descriptor leak that only manifests as EMFILE after a week of uptime.

These protocols also draw a sharp line that the synchronous versions blur. A synchronous with block runs __enter__/__exit__ atomically with respect to other threads only if you hold a lock; an async with block is cooperatively interleaved with every other task on the loop, and the points at which that interleaving happens are exactly the awaits inside the hooks. That makes the async versions strictly more powerful — you can acquire a connection, negotiate TLS, and run a handshake inside __aenter__ without blocking anything else — and strictly more dangerous, because a teardown can now be pre-empted by cancellation at any of those internal awaits. The patterns in this reference exist to make that pre-emption safe and the cleanup deterministic.

Key implementation boundaries:

  • Protocol mapping: __aenter__/__aexit__ and __aiter__/__anext__ lifecycle hooks, plus their decorator-generated equivalents.
  • Event loop integration: how each await inside a hook is a cooperative yield point that can suspend setup, teardown, or a yield.
  • Cancellation safety: handling asyncio.CancelledError (a BaseException, not an Exception) so teardown completes and semantics are preserved.
  • Deterministic finalization: contextlib.aclosing() and AsyncExitStack instead of relying on garbage collection for cleanup.

Architectural principles

  • Every protocol hook is a scheduling point. __aenter__, __aexit__, and __anext__ each contain await boundaries where the loop can run other tasks. They are not atomic. Treat them as ordinary coroutines that happen to run inside async with/async for machinery — including the fact that they can be cancelled mid-execution.
  • Cancellation is a BaseException. Since Python 3.8 asyncio.CancelledError derives from BaseException, so a bare except Exception will not catch it. Teardown code must either let it propagate or catch it explicitly, do minimal cleanup, and re-raise.
  • Finalization must be deterministic, never GC-driven. An async generator that is dropped without aclose() may have its finally block run far later (or on the wrong loop) by the asyncgen finalizer hook. Wrap generators in aclosing() and stack resources in AsyncExitStack so cleanup is tied to lexical scope.
  • Cleanup work must stay non-blocking — or be explicitly off-loaded. Synchronous socket.close(), os.fsync(), or CPU work inside a hook stalls the loop for every other task. Use native async teardown, or push the blocking call to loop.run_in_executor().
  • Backpressure belongs at the iteration boundary. Async generators and iterators throttle naturally: the producer is suspended at yield/await until the consumer asks for the next item. Lean on that instead of unbounded internal buffers.

Execution model: protocol hooks on the loop scheduler

An async with statement desugars to: evaluate the manager, await manager.__aenter__(), run the body, then await manager.__aexit__(*exc_info) in a finally. The await on each hook means the loop is free to schedule other tasks while setup or teardown is in flight — and, critically, that the enclosing task can be cancelled while __aenter__ or __aexit__ is suspended at one of its own await points.

An async for desugars to a loop that repeatedly awaits __anext__() until it raises StopAsyncIteration. For an async generator, each yield is a suspension point: control returns to the consumer's await, and the generator frame is parked until __anext__() is driven again. If the consumer stops driving (a break, an exception, or task cancellation), the generator is left suspended at the yield with its try/finally not yet executed — the case aclosing() exists to fix.

Because these hooks run on the same single-threaded loop as everything else, the rules from Asyncio Fundamentals & Event Loop Architecture apply directly: a hook that blocks without yielding monopolizes the loop thread and starves all concurrent work.

There is one subtlety worth internalising about cancellation timing. When a task is cancelled, the loop arranges for CancelledError to be raised at the task's current suspension point — the await it is parked on. If that point happens to be inside __aenter__, the body never runs, but __aexit__ is not called either, because async with only registers the finally-style exit after __aenter__ returns successfully; any resource partially acquired in __aenter__ must be cleaned up by __aenter__ itself. If the suspension point is inside the body, __aexit__ runs with exc_type set to CancelledError. And if it is inside __aexit__, teardown is interrupted mid-flight — the case the cancellation-safe __aexit__ pattern below addresses. Knowing which of the three you are in tells you where the cleanup obligation lives. The diagram below traces the async with lifecycle, including the cancellation path that makes __aexit__ correctness non-negotiable.

async with lifecycle and the cancellation path A sequence showing that __aexit__ always runs after the body, including when a CancelledError interrupts the body. async with manager: lifecycle await __aenter__() acquire resource, may suspend block body uses resource, awaits I/O await __aexit__() release in a finally — always CancelledError raised in body or at an await still runs __aexit__

Pattern catalogue

@asynccontextmanager for single-resource lifecycles

When a resource has one acquire and one release, the contextlib.asynccontextmanager decorator is the lowest-ceremony correct option. Code before the yield is __aenter__; code after it is __aexit__. The non-negotiable discipline is a try/finally around the yield so teardown runs on every exit path — normal return, exception, or cancellation.

import asyncio
import contextlib
from typing import AsyncIterator


@contextlib.asynccontextmanager
async def db_session(engine) -> AsyncIterator[object]:
    session = await engine.start_session()
    try:
        yield session
        await session.commit()
    except BaseException:
        # Covers Exception AND CancelledError; rollback then re-raise.
        await session.rollback()
        raise
    finally:
        await session.close()

Use it for transactions, leased connections, temp directories — anything with a tightly coupled setup/teardown pair. Reach for a class instead when teardown needs state that outlives a single yield, or when you must inspect the propagating exception type and conditionally suppress it.

AsyncExitStack for dynamic and variable-count resources

When the number of resources is not known until runtime — opening N shards, entering a list of managers whose length varies per request — nesting async with does not compose. contextlib.AsyncExitStack lets you enter managers imperatively and guarantees they are exited in reverse order, even if a later acquisition fails partway through.

import contextlib


async def open_shards(dsns: list[str]):
    async with contextlib.AsyncExitStack() as stack:
        conns = [
            await stack.enter_async_context(db_session(make_engine(dsn)))
            for dsn in dsns
        ]
        # If enter_async_context fails on shard 3, shards 0-2 are already
        # registered and will be unwound in LIFO order on exit.
        yield_to_caller = await run_query_across(conns)
        return yield_to_caller

stack.push_async_callback() registers a coroutine to run on exit without a full manager, and stack.pop_all() transfers ownership of the registered cleanups to a new stack — useful when a factory acquires resources but the caller is responsible for releasing them later.

Async generator as a stream

An async generator is the idiomatic way to expose a paginated or streamed source. Each yield is a backpressure point: the upstream fetch only advances when the consumer pulls the next item, so a slow consumer naturally throttles the producer without an explicit semaphore. Insert an await asyncio.sleep(0) only when a tight CPU loop between yields would otherwise starve the loop.

import asyncio
from typing import AsyncIterator


async def paginated_stream(base_url: str) -> AsyncIterator[dict]:
    """Yields rows page by page; the consumer's pull rate is the backpressure."""
    page = 1
    while True:
        rows = await _fetch_page(base_url, page)
        if not rows:
            return
        for row in rows:
            yield row
        page += 1


async def _fetch_page(url: str, page: int) -> list[dict]:
    await asyncio.sleep(0.05)  # simulate network latency
    return [{"id": i, "page": page} for i in range(10)] if page <= 3 else []

aclosing() for deterministic generator cleanup

The previous generator has a finally-equivalent obligation: its underlying HTTP session must close even if the consumer breaks early. A plain async for does not call aclose() on early exit — the generator is left suspended and its finally runs whenever the asyncgen finalizer hook (or GC) gets around to it, possibly on a different loop. contextlib.aclosing() ties aclose() to lexical scope.

import asyncio
import contextlib
from typing import AsyncIterator


async def first_match(stream: AsyncIterator[dict], target: int) -> dict | None:
    # aclosing() guarantees stream.aclose() runs even though we break early.
    async with contextlib.aclosing(stream) as safe:
        async for row in safe:
            if row["id"] == target:
                return row  # break path still triggers aclose()
    return None

This is the single most common source of "ghost" connections in async services: a generator-backed stream consumed under a timeout or a break, never explicitly closed. Make aclosing() the default wrapper for any generator that owns a resource.

When you need iteration but a generator is the wrong tool — because teardown requires state beyond a single frame, or because you want explicit control over __aiter__/__anext__ — implement a class-based async iterator instead. The choice is a genuine trade-off, not a style preference:

Dimension Class-based async iterator Async generator
State management Explicit, mutable attributes; suited to pooling and retry logic Implicit via the suspended frame; linear and stack-shaped
Teardown control Full: a real aclose()/__aexit__ you author Relies on aclosing() or the asyncgen finalizer hook
Per-item overhead Lower; no generator-frame allocation Higher; each instance allocates a frame
Backpressure Manual — wire in a queue or semaphore Natural — the consumer's pull rate gates the producer
Best fit Resource pools, multiplexed sockets, custom retry Streaming APIs, paginated reads, linear pipelines

A class-based iterator's __anext__ is just a coroutine that returns the next value or raises StopAsyncIteration; the same cancellation rules apply, so any resource it holds needs the same deterministic-cleanup discipline as a context manager.

Cancellation-safe __aexit__

A class-based __aexit__ must survive being cancelled while it awaits its own teardown. The contract: catch CancelledError explicitly, do the minimum to leave the resource consistent, and re-raise. If teardown itself must complete (flushing a critical audit record), wrap that specific call in asyncio.shield() — but shield sparingly, since it lets work outlive the cancellation that requested it.

import asyncio
import logging

logger = logging.getLogger(__name__)


class LeasedConnection:
    def __init__(self, pool):
        self.pool = pool
        self.conn = None

    async def __aenter__(self) -> "LeasedConnection":
        self.conn = await self.pool.acquire()
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb) -> bool:
        try:
            if self.conn is not None:
                await self.pool.release(self.conn)
        except asyncio.CancelledError:
            # Minimal, non-blocking cleanup; never swallow the cancellation.
            self.pool.discard(self.conn)
            raise
        finally:
            self.conn = None
        return False  # never suppress the body's exception

These hooks compose directly with the broader coroutine design patterns used to structure long-lived tasks, and the cancellation discipline here is the same one covered in depth under cancellation patterns.

Resource boundaries

Async managers and iterators are where resource ceilings get enforced, so size them against the real limits:

  • File-descriptor ceiling. Every leased socket or open file consumes an FD. A leak in __aexit__ or an un-aclose()d generator burns FDs until OSError: [Errno 24] Too many open files. Track open-FD count as a gauge; it should return to baseline between request bursts.
  • Pool capacity vs concurrency. If __aenter__ acquires from a fixed-size pool, the effective concurrency of any async with-guarded section is bounded by the pool, not by how many tasks you spawn. Excess tasks queue inside acquire(). This is the right place to apply backpressure — see connection pooling and keepalive for sizing pools to throughput.
  • Generator buffering. A generator that prefetches pages into an internal asyncio.Queue(maxsize=N) trades latency for throughput but caps memory at N items. Without a bound, a fast producer feeding a slow consumer grows the queue without limit — the classic unbounded-buffer OOM.
  • Teardown fan-out. Closing many resources concurrently with a TaskGroup is faster than serial __aexit__s but multiplies peak FD/connection churn at the instant of shutdown; size the burst so it does not trip downstream rate limits.

Integrated production example

The following service consumes a paginated upstream through a cancellation-safe generator, leases a connection per batch via AsyncExitStack, and processes batches concurrently under a TaskGroup with a global asyncio.timeout. It demonstrates every pattern above wired together the way a real ingestion worker would be.

import asyncio
import contextlib
import logging
from typing import AsyncIterator

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("ingest")


class Pool:
    def __init__(self, size: int):
        self._sem = asyncio.Semaphore(size)

    async def acquire(self) -> "Conn":
        await self._sem.acquire()
        return Conn(self._sem)


class Conn:
    def __init__(self, sem: asyncio.Semaphore):
        self._sem = sem

    async def write(self, rows: list[dict]) -> None:
        await asyncio.sleep(0.01)  # simulate DB round-trip

    async def aclose(self) -> None:
        self._sem.release()


@contextlib.asynccontextmanager
async def leased(pool: Pool) -> AsyncIterator[Conn]:
    conn = await pool.acquire()
    try:
        yield conn
    finally:
        await conn.aclose()  # runs on success, error, and cancellation


async def page_stream(pages: int) -> AsyncIterator[list[dict]]:
    page = 1
    try:
        while page <= pages:
            await asyncio.sleep(0.02)  # simulate upstream fetch
            yield [{"id": page * 100 + i} for i in range(50)]
            page += 1
    finally:
        logger.info("page_stream finalized at page %d", page)


async def handle_batch(pool: Pool, batch: list[dict]) -> int:
    async with leased(pool) as conn:
        await conn.write(batch)
    return len(batch)


async def ingest(pages: int = 6, pool_size: int = 4) -> int:
    pool = Pool(pool_size)
    written = 0
    try:
        async with asyncio.timeout(5.0):
            # aclosing guarantees page_stream.aclose() even if the
            # TaskGroup raises or the outer timeout fires.
            async with contextlib.aclosing(page_stream(pages)) as stream:
                tasks = []
                try:
                    async with asyncio.TaskGroup() as tg:
                        async for batch in stream:
                            tasks.append(tg.create_task(handle_batch(pool, batch)))
                except* Exception as eg:
                    logger.error("batch failures: %s",
                                 [str(e) for e in eg.exceptions])
                    raise
            written = sum(t.result() for t in tasks)
    except TimeoutError:
        logger.error("ingest exceeded deadline; %d rows committed", written)
        raise
    logger.info("ingest complete: %d rows", written)
    return written


if __name__ == "__main__":
    asyncio.run(ingest())

Diagnostic Hook: Run this under PYTHONASYNCIODEBUG=1 and confirm the page_stream finalized log line appears on every exit path — including the timeout and exception-group branches. If it is missing on the timeout path, the aclosing() wrapper is in the wrong scope and the generator is leaking. Track the pool semaphore's _value (or instrument acquire/aclose) as a gauge; it must return to pool_size after ingest completes. A persistent deficit means a leased() block exited without its finally running.

Diagnostic Hook: deterministic-cleanup audit

Three signals catch the overwhelming majority of async-manager defects in production. (1) Open-FD gauge (len(os.listdir('/proc/self/fd')) on Linux, or psutil.Process().num_fds()) sampled per minute — a monotonic climb is a teardown leak. (2) Slow-callback warnings from loop.set_debug(True) / PYTHONASYNCIODEBUG=1 firing inside __aenter__/__aexit__ mean blocking work is running in a hook. (3) asyncio.all_tasks() count at idle — tasks parked on a generator's __anext__ that never get closed show up here as a slowly growing baseline. Alert on the trend, not the absolute value.

Failure modes

Failure mode Root cause Detection Fix
File descriptors climb until EMFILE Generator consumed with break/timeout and never aclose()d; finally deferred to GC Open-FD gauge rises monotonically; ResourceWarning under -W error Wrap the generator in contextlib.aclosing() so aclose() is tied to scope
Cancellation hangs or resource left half-open __aexit__ catches except Exception, missing CancelledError (a BaseException) Task stays in shutdown; loop.set_debug(True) shows pending teardown Catch CancelledError explicitly, do minimal cleanup, re-raise
Event loop stalls during setup/teardown Synchronous I/O (socket.close(), requests) or CPU work inside a hook Slow-callback warnings citing __aenter__/__aexit__ Use async teardown or off-load the blocking call to loop.run_in_executor()
__aexit__ silently masks errors Hook returns True (or a truthy value) unconditionally Exceptions vanish; downstream sees success on failed work Return False/None; suppress only a specific, intended exception type
Partial resources leak on multi-acquire failure Manual nested async with aborts mid-acquisition, earlier resources orphaned FD/connection count off by the number acquired before the failure Use AsyncExitStack; entered managers unwind in LIFO order automatically

Frequently asked questions

How do I safely handle asyncio.CancelledError in aexit?

Catch asyncio.CancelledError explicitly in __aexit__ (it derives from BaseException, so a bare except Exception will not catch it), perform minimal non-blocking cleanup to leave the resource consistent, then re-raise to preserve task cancellation semantics. Use asyncio.shield() only when a specific teardown call must complete despite cancellation.

Why does my async generator leak file descriptors when I break out of the loop early?

A plain async for does not call aclose() when you break or an exception propagates; the generator is left suspended at its yield and its finally block only runs when the asyncgen finalizer or garbage collector reaches it, possibly on a different loop. Wrap the generator in contextlib.aclosing() so aclose() is tied to lexical scope and runs on every exit path.

When should I use AsyncExitStack instead of nested async with?

Use AsyncExitStack when the number of resources is dynamic or only known at runtime, or when acquisition happens in a loop. It enters managers imperatively via enter_async_context and unwinds them in reverse (LIFO) order on exit, including when a later acquisition fails partway through, so already-acquired resources are still released.

Why is my async context manager stalling the event loop?

Stalls occur when __aenter__ or __aexit__ runs synchronous blocking work such as socket.close(), requests calls, time.sleep, or CPU-bound computation. Each hook runs on the single-threaded loop, so blocking work starves every other task. Replace it with native async teardown or off-load the blocking call to loop.run_in_executor(), and confirm with PYTHONASYNCIODEBUG=1 that no slow-callback warnings cite the hooks.