Skip to content

Best practices for async context managers in Python

A production-focused guide to implementing robust async context managers in Python. Covers the exact lifecycle contract, cancellation-safe teardown patterns, generator-based alternatives, and diagnostic workflows for verifying non-blocking resource management under high-concurrency event loops.

Key Implementation Requirements: - Strict separation of async I/O and blocking operations during setup/teardown - Explicit handling of asyncio.CancelledError to prevent resource leaks - Correct use of contextlib.asynccontextmanager for maintainable single-yield patterns - Event loop diagnostic hooks to measure __aexit__ latency and verify cleanup

Core Protocol Implementation (__aenter__ / __aexit__)

The class-based async context manager protocol requires strict adherence to coroutine semantics and explicit return values. Both __aenter__ and __aexit__ must be declared with async def to ensure they yield control back to the event loop during I/O waits.

Lifecycle Contract: 1. __aenter__ must return the managed resource instance or self. Returning None breaks async with binding. 2. __aexit__ must return False or None to allow normal exception propagation. Returning True unconditionally masks critical failures. 3. Synchronous blocking calls (e.g., socket.close(), os.fsync(), heavy CPU work) must be offloaded to loop.run_in_executor() or replaced with native async equivalents.

For a deeper breakdown of the underlying lifecycle contract, refer to the Async Context Managers & Iterators specification.

Diagnostic Hook: Wrap __aexit__ execution with loop.time() delta logging to detect teardown latency spikes under load.

import asyncio
import logging

logger = logging.getLogger(__name__)

class AsyncConnectionManager:
 def __init__(self, conn_pool):
 self.pool = conn_pool
 self.conn = None
 self._loop = asyncio.get_running_loop()

 async def __aenter__(self) -> "AsyncConnectionManager":
 # Async acquisition only
 self.conn = await self.pool.acquire()
 return self

 async def __aexit__(self, exc_type, exc_val, exc_tb) -> bool:
 start = self._loop.time()
 try:
 if self.conn:
 await self.pool.release(self.conn)
 finally:
 elapsed_ms = (self._loop.time() - start) * 1000
 if elapsed_ms > 10.0:
 logger.warning(
 "__aexit__ teardown exceeded 10ms SLA: %.2fms | exc_type=%s",
 elapsed_ms, exc_type
 )
 return False # Propagate exceptions

Cancellation Safety & Exception Propagation

Task cancellation is a first-class control flow mechanism in asyncio. Since Python 3.8, asyncio.CancelledError inherits from BaseException, not Exception. Catching only Exception swallows cancellation signals, leaving resources in a half-open state and starving the event loop.

Cancellation Handling Rules: 1. Catch BaseException or explicitly asyncio.CancelledError in __aexit__. 2. Execute non-blocking cleanup immediately. 3. Re-raise the exception to preserve cancellation semantics. 4. Use asyncio.shield() only when teardown must survive parent task cancellation (e.g., flushing critical audit logs).

Diagnostic Hook: Assert asyncio.current_task().cancelled() == True before cleanup to verify cancellation state during testing.

import asyncio

class CancellationSafeResource:
 def __init__(self, fd):
 self.fd = fd
 self._is_open = True

 async def __aenter__(self):
 await self._async_open()
 return self

 async def __aexit__(self, exc_type, exc_val, exc_tb):
 # Explicitly catch BaseException to intercept CancelledError
 try:
 if self._is_open:
 await self._async_close()
 except asyncio.CancelledError:
 # Verify cancellation state for diagnostic logging
 if asyncio.current_task().cancelled():
 # Perform critical, non-blocking cleanup
 self._is_open = False
 # Always re-raise to maintain cancellation propagation
 raise
 except Exception:
 self._is_open = False
 raise
 finally:
 self._is_open = False
 return False

Generator-Based Managers via contextlib.asynccontextmanager

The @contextlib.asynccontextmanager decorator automates protocol generation, reducing boilerplate for single-resource lifecycles. It enforces a strict single-yield boundary: code before yield executes during __aenter__, and code after executes during __aexit__.

Generator Discipline: 1. Use exactly one yield statement. Multiple yields corrupt async generator state and break async with semantics. 2. Wrap the yield in a try/finally block to guarantee teardown execution regardless of exceptions or cancellation. 3. Avoid return statements inside the generator; they trigger StopAsyncIteration prematurely.

Diagnostic Hook: Monitor agen.ag_running state during debugging to detect suspended generator leaks.

import contextlib
import asyncio

@contextlib.asynccontextmanager
async def managed_transaction(db_engine):
 session = await db_engine.start_session()
 try:
 yield session
 except asyncio.CancelledError:
 # Rollback on cancellation
 await session.rollback()
 raise
 except Exception:
 await session.rollback()
 raise
 else:
 await session.commit()
 finally:
 await session.close()

Production Debugging Workflow & Teardown Verification

Identifying resource leaks and event loop starvation requires systematic telemetry. Enable debug flags, enforce teardown SLAs, and isolate failing managers under concurrent load.

Step-by-Step Diagnostic Workflow: 1. Enable PYTHONASYNCIODEBUG=1 and call loop.set_debug(True) to trigger slow callback warnings (>100ms default). 2. Profile __aexit__ execution with time.perf_counter_ns() to enforce <1ms teardown SLAs. 3. Verify connection pool drains and file descriptor closure under concurrent load using asyncio.gather(..., return_exceptions=True). 4. Implement a diagnostic wrapper that logs exception types, elapsed teardown duration, and active task count.

When configuring event loop scheduling and debug flags, align your telemetry with the Asyncio Fundamentals & Event Loop Architecture guidelines.

Diagnostic Hook Implementation: Context manager wrapper with perf_counter profiling and exception isolation.

import asyncio
import time
from functools import wraps
from typing import Type, Any

def profile_async_cm(cls):
 """Diagnostic wrapper for async context manager classes."""
 original_aexit = cls.__aexit__

 @wraps(original_aexit)
 async def instrumented_aexit(self, exc_type: Type[BaseException], exc_val: BaseException, exc_tb: Any) -> bool:
 start_ns = time.perf_counter_ns()
 try:
 return await original_aexit(self, exc_type, exc_val, exc_tb)
 finally:
 elapsed_ms = (time.perf_counter_ns() - start_ns) / 1_000_000
 active_tasks = len(asyncio.all_tasks())
 if elapsed_ms > 1.0:
 print(
 f"[DIAG] {cls.__name__}.__aexit__ latency: {elapsed_ms:.2f}ms | "
 f"Exception: {exc_type.__name__ if exc_type else 'None'} | "
 f"Active Tasks: {active_tasks}"
 )

 cls.__aexit__ = instrumented_aexit
 return cls

# Usage: Apply to class definition
@profile_async_cm
class HighThroughputClient:
 async def __aenter__(self): ...
 async def __aexit__(self, exc_type, exc_val, exc_tb): ...

Common Mistakes

  • Blocking I/O in lifecycle methods: Performing synchronous socket.close(), file.flush(), or CPU-bound work in __aenter__/__aexit__ stalls the event loop.
  • Swallowing CancelledError: Catching only Exception ignores BaseException inheritance, preventing proper task cancellation and causing resource leaks.
  • Unconditional return True: Returning True from __aexit__ suppresses all exceptions, masking critical failures and breaking error propagation chains.
  • Multiple yield statements: Using @contextlib.asynccontextmanager with multiple yields corrupts async generator state and violates the single-entry/single-exit contract.
  • Ignoring Python 3.8+ semantics: Failing to account for CancelledError inheriting from BaseException leads to incorrect exception filtering.
  • Unawaited cleanup: Calling synchronous cleanup methods instead of awaiting async teardown operations leaves resources in a half-open state.

FAQ

Q: How do I handle asyncio.CancelledError without breaking the event loop or leaking resources?

A: Catch BaseException or explicitly asyncio.CancelledError in __aexit__, perform non-blocking cleanup, then re-raise the exception to preserve cancellation semantics.

Q: When should I prefer contextlib.asynccontextmanager over a class-based implementation?

A: Use it for straightforward, single-resource lifecycles where setup and teardown are tightly coupled. It reduces boilerplate but requires strict single-yield discipline to avoid generator state issues.

Q: Why does __aexit__ sometimes block the event loop, and how can I enforce non-blocking teardown?

A: Synchronous cleanup calls (e.g., socket.close(), file.flush()) or heavy computations stall the loop. Replace them with async equivalents or offload to run_in_executor, and enforce <1ms teardown SLAs via perf_counter profiling.

Q: How can I verify that async context managers clean up correctly under high concurrency?

A: Enable PYTHONASYNCIODEBUG=1, use loop.set_debug(True), and wrap __aexit__ in a diagnostic timer. Run load tests with asyncio.gather and assert that active file descriptors and connection counts return to baseline after teardown.