Skip to content

Async Database Drivers: Querying Without Stalling the Loop

The database is where most "async" services quietly become synchronous. A coroutine that calls psycopg2.execute() or sqlite3 looks fine in code review, but at runtime it freezes the entire event loop for the full round-trip to the database — every other connection, request, and timer on that loop waits behind it. Native async drivers (asyncpg for PostgreSQL, aiomysql for MySQL, motor for MongoDB) solve this by speaking the wire protocol over a non-blocking socket registered with the loop selector, so a query that waits on the network yields control instead of holding it. This guide covers how those drivers integrate with the scheduler, when a blocking driver must be pushed off the loop, how to build and share a connection pool correctly, and how to size that pool against the server's hard connection ceiling. The deep-dive on avoiding event loop blocking with asyncpg turns these principles into a debugging workflow.

asyncpg, aiomysql, and motor are third-party libraries. Install the one you need, e.g. pip install asyncpg.

Architectural Principles

  • A synchronous DB call inside a coroutine stalls every task on the loop. There is no magic that makes psycopg2 cooperative just because it sits in an async def. The C extension blocks the OS thread, and that thread is running the event loop. Until the query returns, no other coroutine advances.
  • One connection is not concurrency-safe — acquire from the pool per operation. A database connection is a single ordered stream of request/response frames. Issue two queries on one connection from two tasks and their bytes interleave on the wire, corrupting the protocol. Concurrency comes from multiple connections, which is exactly what a pool hands out.
  • Pool size is bounded by the server's max_connections, not your ambition. Every pooled connection consumes a backend process or thread and memory on the database. Size the pool so that all your application instances combined stay comfortably under the server limit, or the database starts refusing connections.
  • A transaction pins a connection for its entire lifetime. BEGIN through COMMIT must run on the same connection. While a transaction is open that connection is unavailable to anyone else, so a transaction held across a slow await removes a connection from the usable pool and can deadlock under load.
  • Prepared statements bind to a connection. Drivers like asyncpg cache prepared statements per connection. This makes pooled, reused connections fast — but it is another reason a connection is stateful and must not be shared across tasks mid-flight.

Execution Model: How a Driver Reaches the Loop

A native async driver opens its socket in non-blocking mode and registers the file descriptor with the loop's selector (epoll on Linux, kqueue on the BSDs). When you await conn.fetch(...), the driver writes the query frame, then suspends the coroutine and tells the loop "wake me when this socket is readable." Control returns to the loop, which runs other ready coroutines. When the database replies, the selector marks the FD readable, the loop resumes your coroutine, and the driver parses the response. This is the same I/O-multiplexing model that powers every other primitive in Network I/O & Protocol Handling — the database is just another socket the loop watches.

A blocking driver breaks this contract. psycopg2's C layer calls recv() synchronously and does not return to Python until the row data arrives, so the loop never regains control. The only safe way to use such a driver from async code is to run it on a worker thread via asyncio.to_thread, which moves the blocking recv() off the loop thread and onto a thread pool. The same boundary applies to CPU-heavy work after the query: deserializing tens of thousands of rows or running a Pandas transform on the result is CPU-bound work that must be offloaded, not run inline on the loop. How aggressively you tune the loop (its selector, the slow_callback_duration threshold) belongs to event loop configuration.


Pattern Catalogue

Create and Share One Pool

Create the pool once at startup and pass it everywhere. A pool is itself a concurrency primitive; creating one per request defeats the point and reopens TCP/auth handshakes constantly.

# pip install asyncpg
import asyncpg

class Database:
    def __init__(self, dsn: str) -> None:
        self._dsn = dsn
        self._pool: asyncpg.Pool | None = None

    async def connect(self) -> None:
        # min_size warms connections at startup; max_size caps fan-out.
        self._pool = await asyncpg.create_pool(
            self._dsn,
            min_size=5,
            max_size=20,
            command_timeout=10.0,      # per-statement deadline
            max_inactive_connection_lifetime=300.0,  # recycle idle conns
        )

    async def close(self) -> None:
        if self._pool is not None:
            await self._pool.close()  # drains in-flight queries

    @property
    def pool(self) -> asyncpg.Pool:
        assert self._pool is not None, "call connect() first"
        return self._pool

Diagnostic Hook: Log pool.get_size() and pool.get_idle_size() on an interval. A persistently zero idle count means the pool is saturated and callers are queuing for connections.

Acquire Per Query From the Pool

Each logical operation borrows a connection, uses it, and returns it. async with pool.acquire() guarantees the connection goes back even on exception — never store the acquired connection beyond the block.

async def get_user(db: Database, user_id: int) -> asyncpg.Record | None:
    async with db.pool.acquire() as conn:          # borrow
        return await conn.fetchrow(
            "SELECT id, email, created_at FROM users WHERE id = $1",
            user_id,                               # $1 -> bound parameter
        )
    # connection is returned to the pool here, automatically

async def list_active(db: Database, limit: int) -> list[asyncpg.Record]:
    # A short query: acquire, run, release. Do NOT hold the conn
    # across unrelated awaits — that starves the pool.
    async with db.pool.acquire() as conn:
        return await conn.fetch(
            "SELECT id, email FROM users WHERE active ORDER BY id LIMIT $1",
            limit,
        )

Diagnostic Hook: Time the acquire() itself. Wrap it in asyncio.timeout() and record the wait; rising acquire latency is the leading indicator that the pool is too small for the offered load.

Transaction Context

async with conn.transaction() issues BEGIN, commits on clean exit, and rolls back if the block raises. The connection is pinned for the whole block, so keep transactions short and never await slow, unrelated I/O inside one.

async def transfer(db: Database, src: int, dst: int, cents: int) -> None:
    async with db.pool.acquire() as conn:
        async with conn.transaction():             # BEGIN ... COMMIT
            await conn.execute(
                "UPDATE accounts SET balance = balance - $1 WHERE id = $2",
                cents, src,
            )
            await conn.execute(
                "UPDATE accounts SET balance = balance + $1 WHERE id = $2",
                cents, dst,
            )
            # any exception here -> automatic ROLLBACK on block exit

Diagnostic Hook: Track transaction duration and emit a warning past a threshold (e.g. 250 ms). Long transactions hold a connection, hold row locks, and bloat the database's MVCC dead-tuple count.

Bulk Insert via executemany / copy

For high-volume writes, a per-row execute in a loop pays a round-trip per row. executemany batches the parameter sets; copy_records_to_table uses PostgreSQL's binary COPY protocol and is dramatically faster for large loads.

async def bulk_load(db: Database, rows: list[tuple]) -> None:
    async with db.pool.acquire() as conn:
        # Fast path: COPY protocol, single stream, no per-row round-trip.
        await conn.copy_records_to_table(
            "events",
            records=rows,
            columns=["user_id", "kind", "payload"],
        )

async def upsert_many(db: Database, rows: list[tuple]) -> None:
    async with db.pool.acquire() as conn:
        async with conn.transaction():
            await conn.executemany(
                "INSERT INTO tags (name, weight) VALUES ($1, $2) "
                "ON CONFLICT (name) DO UPDATE SET weight = EXCLUDED.weight",
                rows,                              # list of param tuples
            )

Diagnostic Hook: Compare rows/second between the executemany and copy paths in staging. If COPY is not an order of magnitude faster, your bottleneck is elsewhere (network, indexes, or loop contention).

Wrap a Sync Driver With asyncio.to_thread

When no native async driver exists (or you must reuse a sqlite3/psycopg2 data layer), isolate every blocking call behind asyncio.to_thread so the loop stays free. Note that a thread-wrapped sync connection is still a single connection — protect it with a lock or give each worker its own.

import asyncio
import sqlite3

def _query_sync(db_path: str, user_id: int) -> tuple | None:
    # Runs on a worker thread; blocking sqlite3 calls are fine here.
    conn = sqlite3.connect(db_path)
    try:
        cur = conn.execute("SELECT id, email FROM users WHERE id = ?", (user_id,))
        return cur.fetchone()
    finally:
        conn.close()

async def get_user_sqlite(db_path: str, user_id: int) -> tuple | None:
    # to_thread moves the blocking work off the loop thread entirely.
    return await asyncio.to_thread(_query_sync, db_path, user_id)

Diagnostic Hook: The default thread pool has a bounded size (min(32, os.cpu_count() + 4)). If every worker thread is parked in a blocking query, to_thread calls start queuing — monitor the executor's backlog the same way you monitor the connection pool.


Resource Boundaries: Pool Size vs Server Limits

Pool sizing is an arithmetic constraint, not a preference. The hard ceiling is the server's max_connections (PostgreSQL defaults to 100, of which several are reserved for superusers and replication). Your budget is shared across every application instance and every background worker that talks to the same database:

(instances × max_size) + admin/replication headroom  <  server max_connections

Run 8 application pods with max_size=20 and you have requested 160 connections against a 100-connection server — the database will reject connections under load with FATAL: too many connections. Conversely, a pool far larger than your concurrency just holds idle backends hostage. Derive max_size from the same throughput math used for HTTP fan-out in connection pooling and keep-alive: in-flight queries needed ≈ target query rate × mean query latency. Add modest headroom and round to a value that, multiplied by instance count, stays under the server budget. For very wide deployments, put a server-side pooler (PgBouncer in transaction mode) between the app and the database and size the app pools against the pooler, not the raw backend.


Integrated Example: A Bounded asyncpg Repository

This repository creates one pool, runs reads and a transactional write, processes results with bounded concurrency, and instruments acquire latency — the shape you want in production.

# pip install asyncpg
import asyncio
import logging
import time
import asyncpg

logger = logging.getLogger("repo")


class UserRepository:
    def __init__(self, dsn: str, *, max_size: int = 20) -> None:
        self._dsn = dsn
        self._max_size = max_size
        self._pool: asyncpg.Pool | None = None
        # Gate callers to the pool size so excess load waits at ONE place.
        self._gate = asyncio.Semaphore(max_size)

    async def open(self) -> None:
        self._pool = await asyncpg.create_pool(
            self._dsn, min_size=2, max_size=self._max_size,
            command_timeout=10.0,
        )

    async def close(self) -> None:
        if self._pool:
            await self._pool.close()

    async def _acquire(self):
        start = time.perf_counter()
        async with self._gate:
            conn_ctx = self._pool.acquire()       # type: ignore[union-attr]
            conn = await conn_ctx.__aenter__()
            wait_ms = (time.perf_counter() - start) * 1000
            if wait_ms > 50:
                logger.warning("pool acquire waited %.1fms", wait_ms)
            return conn_ctx, conn

    async def fetch_user(self, user_id: int) -> asyncpg.Record | None:
        ctx, conn = await self._acquire()
        try:
            return await conn.fetchrow(
                "SELECT id, email FROM users WHERE id = $1", user_id
            )
        finally:
            await ctx.__aexit__(None, None, None)  # always release

    async def record_login(self, user_id: int, ip: str) -> None:
        ctx, conn = await self._acquire()
        try:
            async with conn.transaction():         # pinned, short
                await conn.execute(
                    "UPDATE users SET last_login = now() WHERE id = $1",
                    user_id,
                )
                await conn.execute(
                    "INSERT INTO login_events (user_id, ip) VALUES ($1, $2)",
                    user_id, ip,
                )
        finally:
            await ctx.__aexit__(None, None, None)


async def main() -> None:
    repo = UserRepository("postgresql://app@localhost/app", max_size=10)
    await repo.open()
    try:
        ids = list(range(1, 101))
        # Bounded concurrency: the Semaphore caps in-flight queries to
        # the pool size, so 100 tasks never overwhelm 10 connections.
        async with asyncio.TaskGroup() as tg:
            for uid in ids:
                tg.create_task(repo.fetch_user(uid))
        await repo.record_login(1, "203.0.113.7")
    finally:
        await repo.close()


if __name__ == "__main__":
    asyncio.run(main())

Diagnostic Hook: Export three numbers and alert on them: pool acquire wait (p99), active connections vs the server's max_connections, and the database's slow-query log count. Enable loop.slow_callback_duration in staging — if a query path ever trips it, a blocking call has slipped onto the loop and the fix is in the asyncpg blocking guide.


Diagnostic Hook: What to Watch in Production

Instrument the boundary between your application and the database with these signals and thresholds:

  • Pool acquire wait time (p50/p99). Rising p99 means the pool is too small or transactions are held too long. Alert above ~50 ms p99.
  • Active connections vs server max_connections. Plot pool size summed across instances against the server ceiling; alert at 80% utilization, before the database starts rejecting.
  • Slow-query log. Enable log_min_duration_statement on PostgreSQL; a spike correlates query-side latency with your acquire-wait spikes.
  • Loop blocking. Set loop.set_debug(True) and lower slow_callback_duration in staging; any warning naming your DB code path means a synchronous call is on the loop.
Async DB driver data flow Tasks acquire connections from an asyncpg pool that registers sockets with the loop selector and reaches the database; a blocking sync driver is routed through a thread executor to keep the loop free. Async DB Driver Data Flow Coroutines task A task B task C asyncpg Pool acquire / release conn 1..N Loop selector epoll / kqueue non-blocking FD Database max_connections Thread executor to_thread wraps blocking sync driver blocking driver -> off the loop

Failure Modes

Failure mode Root cause Detection Fix
Whole service freezes during queries A synchronous driver (psycopg2, sqlite3) called directly in a coroutine blocks the loop thread loop.slow_callback_duration warnings naming the DB call; all latencies spike together Switch to a native async driver, or wrap the sync call in asyncio.to_thread
Intermittent protocol / decode errors One connection shared across concurrent tasks; their query frames interleave on the wire Random asyncpg protocol errors under concurrency; errors vanish at concurrency 1 Acquire a fresh connection per task via pool.acquire(); never store and reuse a borrowed connection
FATAL: too many connections Sum of pool max_size across instances exceeds the server max_connections DB connection-count metric near the server limit; connect errors under scale-out Lower max_size, count all instances in the budget, or front the DB with PgBouncer in transaction mode
Pool acquire stalls / timeouts Pool far smaller than offered concurrency, or transactions held across slow awaits pin connections Rising acquire-wait p99; idle connections at zero while tasks queue Size the pool to in-flight query demand; keep transactions short and free of unrelated I/O
Slow CPU spikes after queries return Heavy row deserialization or transforms run inline on the loop CPU-bound slow_callback_duration warnings after fetch, not during Offload the transform via CPU-bound task offloading

Frequently Asked Questions

Why does a synchronous database driver block my whole async service?

A synchronous driver like psycopg2 or sqlite3 calls recv() on the socket and does not return to Python until the rows arrive. That call runs on the thread that hosts the event loop, so the loop cannot advance any other coroutine until the query completes. Use a native async driver, or wrap the blocking call in asyncio.to_thread so it runs on a worker thread.

Can I share one asyncpg connection across multiple tasks?

No. A connection is a single ordered stream of protocol frames. Two tasks issuing queries on the same connection interleave their bytes on the wire and corrupt the protocol, producing intermittent decode errors. Acquire a separate connection per task from the pool; concurrency comes from multiple connections, not one shared one.

How large should my asyncpg connection pool be?

Size it to the in-flight query demand: target query rate times mean query latency, plus modest headroom. Then check the constraint that the sum of max_size across all application instances stays below the server's max_connections. Exceeding that limit causes the database to reject connections with a too-many-connections error.

Why should I keep transactions short in async code?

A transaction pins its connection from BEGIN to COMMIT, so that connection is unavailable to the rest of the pool for the whole block. A transaction held across a slow or unrelated await removes a connection from circulation, holds row locks, and can deadlock or starve the pool under load. Do only the related writes inside the transaction and commit promptly.