Skip to content

Best practices for async context managers in Python

You wrote an async context manager that leases a connection in __aenter__ and releases it in __aexit__, it passes every test, and a week into production you start seeing OSError: [Errno 24] Too many open files. The manager is leaking — not on the happy path, but on cancellation and on exceptions, where __aexit__ either never finishes or swallows the signal that should have driven cleanup. This guide builds a correct async context manager from the protocol up: the exact lifecycle contract, cancellation-safe teardown, the @asynccontextmanager and AsyncExitStack shortcuts, and a verification step that proves cleanup actually happened under load.

The reason unit tests miss this class of bug is that they almost always exercise the happy path: enter, do work, exit cleanly. Production exercises the unhappy paths constantly — a request times out, a client disconnects, a TaskGroup sibling raises and cancels everything else. Each of those routes through __aexit__ with either an exception set or a pending cancellation, and that is precisely where a naïve implementation breaks. The fix is not more defensive code; it is understanding the exact contract the protocol guarantees and writing teardown that holds under cancellation. Every step below ends with a concrete check so you can confirm the behaviour rather than assume it.

Prerequisites

  • Python 3.11+. The examples use asyncio.timeout(), asyncio.TaskGroup, and exception groups (except*). On 3.8–3.10 substitute asyncio.wait_for.
  • Fluency with the protocol. This is a practical build-out; for the conceptual model of how protocol hooks schedule on the loop, read the parent overview, Async Context Managers & Iterators.
  • Loop scheduling background. Cancellation and slow-callback behaviour reference the Asyncio Fundamentals & Event Loop Architecture overview.
  • Standard library only — no third-party packages required to run any snippet.

1. Honour the lifecycle contract in __aenter__/__aexit__

Three rules make async with behave. __aenter__ must return the object bound by as (usually self); returning None silently binds None, and the resulting AttributeError deep in the block is maddening to trace back to a missing return self. __aexit__ must return False or None so the body's exception propagates — returning a truthy value suppresses it, which is almost never what you want and turns a failed operation into a silent success. And any blocking work in either hook stalls the whole loop, because both hooks run as ordinary coroutines on the single-threaded scheduler; keep them to async operations only, or off-load the blocking call to an executor.

import asyncio
import logging

logger = logging.getLogger(__name__)


class LeasedConnection:
    def __init__(self, pool):
        self.pool = pool
        self.conn = None

    async def __aenter__(self) -> "LeasedConnection":
        self.conn = await self.pool.acquire()  # async acquisition only
        return self  # MUST return the bound resource

    async def __aexit__(self, exc_type, exc_val, exc_tb) -> bool:
        if self.conn is not None:
            await self.pool.release(self.conn)
            self.conn = None
        return False  # propagate the body's exception

Verify: async with LeasedConnection(pool) as c: binds a non-None c, and a raise inside the block escapes the async with rather than being swallowed.

2. Make __aexit__ cancellation-safe

asyncio.CancelledError derives from BaseException, not Exception. A try/except Exception in __aexit__ therefore does not catch cancellation — the teardown is interrupted at its first await and the resource is left half-open. Catch CancelledError explicitly (or use finally), run minimal non-blocking cleanup, and re-raise so the cancellation propagates.

There is a second trap here that is easy to miss: the cleanup you run in the except asyncio.CancelledError branch must itself be non-blocking and must not await anything that can suspend, because the task is already being torn down and a fresh suspension can be cancelled again or race the loop shutdown. That is why the example below calls a synchronous self.pool.discard(...) rather than await self.pool.release(...) on the cancellation path — it returns the connection's accounting to a consistent state immediately, without yielding. Reserve await-based release for the normal path where the loop is still healthy.

import asyncio


class CancellationSafeResource:
    def __init__(self, pool):
        self.pool = pool
        self.conn = None

    async def __aenter__(self) -> "CancellationSafeResource":
        self.conn = await self.pool.acquire()
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb) -> bool:
        try:
            if self.conn is not None:
                await self.pool.release(self.conn)
        except asyncio.CancelledError:
            self.pool.discard(self.conn)  # synchronous, non-blocking
            raise  # never swallow cancellation
        finally:
            self.conn = None
        return False

Verify: cancel the task mid-block (task.cancel()), await it, and assert the pool's in-use count returned to zero. The CancelledError should still surface from await task.

3. Prefer @asynccontextmanager for single-resource lifecycles

When setup and teardown are one acquire and one release, the decorator removes the class boilerplate. The rule: exactly one yield, wrapped in try/finally so teardown runs on every exit path. Catch BaseException (not just Exception) if you need explicit rollback-on-cancellation. The mental model is that the decorator turns your generator into a context manager whose __aenter__ runs the generator up to the yield and whose __aexit__ resumes it past the yield — throwing the body's exception (including CancelledError) back in at the yield point if one occurred. That is why an except/finally placed around the yield sees exactly the exception the body raised, and why a stray return before the finally would skip teardown.

import contextlib
from typing import AsyncIterator


@contextlib.asynccontextmanager
async def managed_transaction(engine) -> AsyncIterator[object]:
    session = await engine.start_session()
    try:
        yield session
        await session.commit()
    except BaseException:  # includes CancelledError
        await session.rollback()
        raise
    finally:
        await session.close()

Verify: a clean block commits and closes; an exception or cancellation inside the block rolls back and still closes. Confirm the generator has exactly one yield — a second one raises RuntimeError: generator didn't stop.

4. Compose dynamic resources with AsyncExitStack

When the resource count is variable or acquired in a loop, nesting async with does not compose and a mid-loop failure orphans everything acquired so far. contextlib.AsyncExitStack enters managers imperatively and guarantees reverse-order teardown even on partial failure.

import contextlib


async def open_all(engines: list) -> None:
    async with contextlib.AsyncExitStack() as stack:
        sessions = [
            await stack.enter_async_context(managed_transaction(e))
            for e in engines
        ]
        # If acquisition fails on engine[k], sessions[0..k-1] are already
        # registered and unwind in LIFO order when the stack exits.
        await run_across(sessions)
    # All sessions committed/closed here, newest first.


async def run_across(sessions: list) -> None:
    for s in sessions:
        await s.execute("SELECT 1")

Verify: inject a failure on the third acquisition and assert the first two sessions were rolled back and closed — none left open.

The decision tree below summarises which of the three implementation styles to reach for, given the shape of the resource you are managing.

Choosing an async context manager implementation A decision tree mapping resource count and teardown complexity to a class manager, asynccontextmanager, or AsyncExitStack. Which implementation? Resource count known at write time? AsyncExitStack variable / looped acquisition @asynccontextmanager one acquire + release, no exit-time state class manager teardown needs state or exc inspection no yes, simple yes, complex All three carry the same cancellation contract: catch CancelledError, clean up, re-raise.

5. Instrument teardown to enforce an SLA

Teardown latency is invisible until you measure it. Wrap __aexit__ with a timer so a slow release (a stuck release(), a synchronous call sneaking in) is logged rather than silently degrading throughput.

import asyncio
import logging

logger = logging.getLogger(__name__)


class ProfiledResource:
    def __init__(self, pool):
        self.pool = pool
        self.conn = None

    async def __aenter__(self) -> "ProfiledResource":
        self.conn = await self.pool.acquire()
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb) -> bool:
        loop = asyncio.get_running_loop()
        start = loop.time()
        try:
            if self.conn is not None:
                await self.pool.release(self.conn)
        finally:
            elapsed_ms = (loop.time() - start) * 1000
            if elapsed_ms > 10.0:
                logger.warning(
                    "__aexit__ exceeded 10ms SLA: %.2fms (exc=%s)",
                    elapsed_ms, exc_type,
                )
            self.conn = None
        return False

Diagnostic Hook: ship the elapsed_ms as a histogram and alert on the p99. A teardown p99 that drifts upward under load is the earliest signal of pool exhaustion or a blocking call creeping into a hook — long before FD counts climb. Pair it with PYTHONASYNCIODEBUG=1 so the loop also flags any hook that blocks past slow_callback_duration.

Verification

Prove cleanup under the conditions that actually break it — concurrency and cancellation — not just the happy path.

import asyncio


async def verify(pool, n: int = 200) -> None:
    async def use():
        async with CancellationSafeResource(pool):
            await asyncio.sleep(0.01)

    async with asyncio.TaskGroup() as tg:
        tasks = [tg.create_task(use()) for _ in range(n)]
        await asyncio.sleep(0.005)
        for t in tasks[: n // 2]:
            t.cancel()  # cancel half mid-teardown

    assert pool.in_use == 0, f"leaked {pool.in_use} connections"
    print("OK: pool drained to baseline after mixed completion + cancellation")

Expected result: the assertion holds and pool.in_use is 0. In a real service, the equivalent check is an open-FD gauge (psutil.Process().num_fds() or len(os.listdir('/proc/self/fd'))) returning to its idle baseline after a traffic burst. A non-zero residue is a leaked __aexit__ path — almost always a swallowed CancelledError (Step 2).

To turn this into a regression guard, run the same harness with the deliberately broken variant — an __aexit__ that catches except Exception instead of BaseException — and confirm the assertion fails. If both the correct and broken versions pass, your test is not actually cancelling tasks while teardown is in flight; tighten the asyncio.sleep timing so cancellation lands during pool.release, not before __aenter__ or after __aexit__. A leak test that cannot catch the canonical bug is worse than no test, because it manufactures false confidence.

Pitfalls & edge cases

  • except Exception in teardown. Misses CancelledError; the resource is left half-open on cancellation. Always catch BaseException/CancelledError explicitly or rely on finally.
  • Unconditional return True. Suppresses every exception from the body, turning real failures into silent successes. Return False/None; suppress only one specific, intended exception type.
  • Blocking calls in hooks. socket.close(), file.flush(), os.fsync(), or CPU work in __aenter__/__aexit__ stalls the loop. Use async equivalents or loop.run_in_executor().
  • Over-using asyncio.shield(). Shielding teardown lets it outlive the cancellation that requested it and can hang shutdown. Shield only a single critical call (e.g. an audit flush), never the whole __aexit__.
  • Multiple yields with @asynccontextmanager. A second yield raises RuntimeError: generator didn't stop. Keep exactly one, inside try/finally.

FAQ

How do I handle asyncio.CancelledError without breaking the loop or leaking resources?

Catch asyncio.CancelledError explicitly in __aexit__ (or rely on a finally block, since it derives from BaseException and a bare except Exception will miss it), perform non-blocking cleanup, then re-raise so cancellation semantics are preserved and the resource is not left half-open.

When should I prefer contextlib.asynccontextmanager over a class-based implementation?

Use the decorator for single-resource lifecycles where setup and teardown are tightly coupled. It removes boilerplate but requires exactly one yield inside a try/finally. Use a class when teardown needs state beyond a single yield or when you must inspect and conditionally suppress the propagating exception.

Why does aexit sometimes block the event loop and how do I enforce non-blocking teardown?

Synchronous cleanup such as socket.close(), file.flush(), or CPU work stalls the single-threaded loop because the hook runs on it. Replace these with async equivalents or off-load to loop.run_in_executor(), enforce a teardown SLA by timing __aexit__ with loop.time(), and run with PYTHONASYNCIODEBUG=1 to catch slow-callback warnings.

How can I verify async context managers clean up correctly under high concurrency?

Run many concurrent uses under a TaskGroup, cancel a fraction mid-teardown, and assert the pool or in-use count returns to baseline. In production, watch an open-FD gauge such as psutil.Process().num_fds() and confirm it returns to its idle baseline after a traffic burst; a residue indicates a leaked __aexit__ path, usually a swallowed CancelledError.