Skip to content

Async Queue Management

asyncio.Queue is the coordination primitive that lets a producer hand work to consumers without either side knowing how many of the other exist. It is deceptively simple — put(), get(), task_done(), join() — but the operational behavior that matters in production lives in the gaps: when does put() suspend, how does maxsize turn into backpressure, what unblocks join(), and where do messages go when they keep failing. This guide covers bounded versus unbounded design, backpressure propagation, the PriorityQueue/LifoQueue variants, the join()/task_done() barrier, fan-out/fan-in topologies, and dead-letter handling, with the diagnostics to tell whether any of it is working.

Scope of this section:

  • Bounded queues and how maxsize enforces backpressure on producers.
  • The join()/task_done() completion barrier and why it deadlocks.
  • Priority and LIFO ordering via asyncio.PriorityQueue and asyncio.LifoQueue.
  • Multi-consumer fan-out, fan-in collection, and dead-letter routing for poison messages.
  • Graceful drain on shutdown and the metrics that expose queue health.

Architectural principles

  • A queue is a memory buffer with a policy. Its job is to absorb short-term rate mismatch between producers and consumers. The policy — bounded or unbounded, FIFO or priority — decides what happens when the mismatch is not short-term. Unbounded is a deferred OOM; choose a bound deliberately.
  • Backpressure is the point, not a side effect. A bounded queue whose put() blocks is the cheapest backpressure mechanism in the standard library. It propagates congestion backward to the source for free, but only if the producer actually awaits put() rather than calling put_nowait().
  • Completion is explicit. The loop has no idea when a dequeued item is "done." task_done() is the only signal, and join() blocks on the running count. Forget one task_done() and shutdown hangs forever.
  • One queue, one event loop. asyncio.Queue is not thread-safe and is bound to the loop it was created under. Cross-thread or cross-loop handoff needs loop.call_soon_threadsafe() or janus, not a shared queue.
  • Failure needs a destination. Items that fail processing repeatedly must leave the main flow. Without a dead-letter path they either retry forever (head-of-line blocking) or vanish (silent data loss).

Execution model: how put and get suspend on the loop

asyncio.Queue does not use locks in the threading sense. It coordinates entirely through the event loop using internal collections of Future objects — _getters and _putters — that represent suspended coroutines waiting for the queue to change state. This is what makes it cooperative rather than blocking.

When a coroutine calls await queue.get() on an empty queue, the queue creates a Future, parks it in _getters, and suspends the coroutine — control returns to the loop, which is free to run other tasks. When a producer later calls put(), the item is stored and the oldest waiting getter's Future is resolved via loop.call_soon(), scheduling that consumer to resume on the next loop iteration. The symmetric case applies to put() on a full bounded queue: the producer parks in _putters until a get() frees a slot. Nothing spins; nothing blocks the OS thread. For the underlying scheduler mechanics — the ready queue, call_soon, and how tasks move between suspended and runnable — see the parent overview, Concurrent Execution & Worker Patterns.

The operational consequence: a "stuck" queue is almost always a starved or blocked consumer. If a consumer runs synchronous CPU work or a blocking syscall between get() and task_done(), the loop never advances, no getter is ever rescheduled, and queue depth climbs even though workers exist. The fix is to offload that work — see CPU-Bound Task Offloading — not to add more consumers.

Bounded queue with consumers and a dead-letter branch A producer awaits put() on a bounded queue, applying backpressure; several consumers call get() and process items, routing poison messages to a dead-letter queue. Bounded queue, fan-out, and dead-letter branch Producer await put() Bounded queue maxsize = N Consumer 1 (get) Consumer 2 (get) Consumer 3 (get) Dead-letter queue after N attempts backpressure on failure

Pattern catalogue

Bounded queue for backpressure

When to use: any time a producer can outrun consumers — ingest pipelines, scrapers, log shippers. Trade-off: a bounded queue caps memory and throttles the producer, but a too-small bound starves consumers (they idle waiting on get()); a too-large bound defers backpressure until memory is already under pressure. Start at roughly 2x your consumer concurrency and tune from depth metrics.

import asyncio

async def producer(q: asyncio.Queue[int]) -> None:
    for i in range(1000):
        await q.put(i)        # suspends here when the queue is full
    await q.join()            # wait until every item is processed

async def consumer(q: asyncio.Queue[int]) -> None:
    while True:
        item = await q.get()
        try:
            await asyncio.sleep(0.01)   # do the work
        finally:
            q.task_done()               # always, even on failure

async def main() -> None:
    q: asyncio.Queue[int] = asyncio.Queue(maxsize=50)   # the bound is the backpressure
    async with asyncio.TaskGroup() as tg:
        tg.create_task(producer(q))
        for _ in range(5):
            tg.create_task(consumer(q))
        await q.join()
        raise asyncio.CancelledError   # collapse idle consumers once drained

asyncio.run(main())

The detailed treatment — reproducing unbounded growth, measuring put-wait, and propagating backpressure to the ingest source — is in Bounded asyncio.Queue with backpressure under load.

Priority queue

When to use: heterogeneous work where some items must jump the line (urgent jobs, deadline-sensitive tasks). Trade-off: ordering is no longer FIFO, so low-priority items can starve indefinitely without an aging mechanism, and comparison must be O(1) or it stalls the loop during heap rebalancing. asyncio.PriorityQueue accepts (priority, payload) tuples and orders by a min-heap.

import asyncio

async def main() -> None:
    pq: asyncio.PriorityQueue[tuple[int, str]] = asyncio.PriorityQueue()
    await pq.put((5, "batch report"))
    await pq.put((1, "user-facing request"))   # lower number = higher priority
    await pq.put((3, "cache warm"))
    while not pq.empty():
        priority, payload = await pq.get()
        print(priority, payload)               # 1, then 3, then 5
        pq.task_done()

asyncio.run(main())

For a custom heapq-backed wrapper, tie-breaking with a monotonic counter, and starvation debugging, see Implementing a priority queue with asyncio.Queue.

join()/task_done() barrier

When to use: whenever you need to know that all enqueued work has finished — drain-before-shutdown, batch completion, test synchronization. Trade-off: the barrier is only as correct as your task_done() discipline. Each put() increments an unfinished counter; each task_done() decrements it; join() returns when it hits zero. A missing task_done() hangs join() forever; an extra one raises ValueError.

import asyncio

async def worker(q: asyncio.Queue[str]) -> None:
    while True:
        item = await q.get()
        try:
            await asyncio.sleep(0.05)
        finally:
            q.task_done()      # the decrement that lets join() complete

async def main() -> None:
    q: asyncio.Queue[str] = asyncio.Queue()
    for n in range(20):
        q.put_nowait(f"job-{n}")
    workers = [asyncio.create_task(worker(q)) for _ in range(4)]
    await q.join()             # blocks until all 20 task_done() calls land
    for w in workers:
        w.cancel()
    await asyncio.gather(*workers, return_exceptions=True)

asyncio.run(main())

Multi-consumer fan-out

When to use: I/O-bound throughput scaling — N consumers pull from one shared queue and process concurrently, with the loop interleaving their awaits. Trade-off: consumers compete for items, so per-item ordering is lost; this is throughput at the cost of strict order. Fan-in (collecting results) typically uses a second output queue or a results list guarded by the TaskGroup. The bounded-queue example above is already a fan-out: one producer, five consumers. Scale consumer count to saturate downstream concurrency, not CPU — for the sizing logic, see Worker Pool Implementations.

Dead-letter routing

When to use: when a single poison message must not block the queue or be lost — message processing, webhook delivery, ETL rows. Trade-off: you add a second queue and per-message attempt tracking, in exchange for the main queue continuing to flow past failures. After N failed attempts (typically with retry and backoff between them), the item is routed to a dead-letter queue for inspection or replay instead of being retried forever.

import asyncio
from dataclasses import dataclass

@dataclass
class Msg:
    payload: str
    attempts: int = 0

async def worker(main_q: asyncio.Queue[Msg], dlq: asyncio.Queue[Msg], maxn: int = 3) -> None:
    while True:
        msg = await main_q.get()
        try:
            await process(msg)              # may raise
        except Exception:
            msg.attempts += 1
            if msg.attempts >= maxn:
                await dlq.put(msg)          # give up: route to DLQ
            else:
                await main_q.put(msg)       # requeue for another attempt
        finally:
            main_q.task_done()

The full attempt-tracking, backoff, and replay workflow is in Implementing a dead-letter queue with asyncio.

Resource boundaries

The queue is one of three limits in a producer/consumer system, and they must be tuned together:

  • maxsize (memory bound): caps in-flight items. Total memory ≈ maxsize x avg_item_size. This is your hard ceiling; size it against the container memory limit, not throughput.
  • Consumer count (concurrency bound): how many items are processed in parallel. For I/O-bound work this can far exceed CPU count; for anything CPU-bound, push it through an executor instead of adding consumers.
  • Downstream limit (saturation bound): even with many consumers, a shared asyncio.Semaphore should cap concurrent calls to a fragile dependency so the queue absorbs the burst rather than the database. The queue protects your memory; the semaphore protects everything you call.

A queue that sits permanently at maxsize means consumers are the bottleneck; one that sits permanently empty means the producer is. Backpressure depth oscillating near the bound is the healthy steady state.

Integrated production example

A bounded ingest pipeline with fan-out consumers, per-message retry with dead-letter routing, a depth/put-wait sampler, and a graceful drain on shutdown signal — the patterns above assembled into one runnable system.

import asyncio
import signal
import time
from dataclasses import dataclass, field

@dataclass(order=True)
class Job:
    priority: int
    payload: str = field(compare=False)
    attempts: int = field(default=0, compare=False)

MAX_ATTEMPTS = 3

async def process(job: Job) -> None:
    await asyncio.sleep(0.02)
    if job.payload.startswith("poison"):      # deterministic failure
        raise ValueError(f"cannot process {job.payload}")

async def producer(q: asyncio.PriorityQueue[Job], stop: asyncio.Event) -> None:
    n = 0
    while not stop.is_set():
        prio = 1 if n % 10 == 0 else 5
        payload = "poison" if n % 17 == 0 else f"job-{n}"
        await q.put(Job(prio, payload))       # backpressure: suspends when full
        n += 1
        await asyncio.sleep(0.005)

async def consumer(cid: int, q: asyncio.PriorityQueue[Job], dlq: asyncio.Queue[Job]) -> None:
    while True:
        job = await q.get()
        try:
            await process(job)
        except Exception:
            job.attempts += 1
            if job.attempts >= MAX_ATTEMPTS:
                await dlq.put(job)            # route poison to DLQ
            else:
                backoff = 0.01 * 2 ** job.attempts
                await asyncio.sleep(backoff)  # retry with backoff
                await q.put(job)
        finally:
            q.task_done()

async def monitor(q: asyncio.PriorityQueue[Job], dlq: asyncio.Queue[Job]) -> None:
    while True:
        await asyncio.sleep(1.0)
        # qsize() and the private unfinished counter expose flow health
        print(f"depth={q.qsize():3d}  unfinished={q._unfinished_tasks:3d}  dlq={dlq.qsize()}")

async def main() -> None:
    q: asyncio.PriorityQueue[Job] = asyncio.PriorityQueue(maxsize=64)
    dlq: asyncio.Queue[Job] = asyncio.Queue()
    stop = asyncio.Event()
    loop = asyncio.get_running_loop()
    for sig in (signal.SIGINT, signal.SIGTERM):
        loop.add_signal_handler(sig, stop.set)

    consumers = [asyncio.create_task(consumer(i, q, dlq)) for i in range(6)]
    mon = asyncio.create_task(monitor(q, dlq))
    prod = asyncio.create_task(producer(q, stop))

    await stop.wait()                 # run until SIGINT/SIGTERM
    await prod                        # stop accepting new work
    await q.join()                    # graceful drain: finish in-flight items
    for t in (*consumers, mon):
        t.cancel()
    await asyncio.gather(*consumers, mon, return_exceptions=True)
    print(f"drained. dead-lettered: {dlq.qsize()}")

if __name__ == "__main__":
    asyncio.run(main())

Diagnostic Hook: the monitor task above is the production signature — sample q.qsize() (depth) and q._unfinished_tasks (work in flight but not yet task_done()) on an interval and export them as gauges. Wrap await q.put() with time.perf_counter() to record put-wait time: when that rises above zero consistently, the queue is full and the producer is being throttled — that is backpressure working, and it is your earliest signal that consumers or the downstream have fallen behind, well before memory or latency alarms fire.

Diagnostic Hook — the three numbers that explain any queue

Export three gauges and you can diagnose almost any queue pathology without a profiler. Queue depth (qsize()): pinned at maxsize means consumers are the bottleneck; pinned at 0 means the producer is. Put-wait time (wall time inside await put()): consistently above zero confirms backpressure is engaging. Unfinished count (_unfinished_tasks): if it never returns to 0 after the producer stops, a task_done() is missing and join() will hang. Add a dead-letter depth gauge to catch poison-message storms.

Failure modes

Failure mode Root cause Detection Fix
Unbounded memory growth / OOM maxsize=0 (default) with producer faster than consumers RSS climbs with qsize(); container OOM-killed Set a maxsize; let await put() apply backpressure to the source
join() hangs at shutdown A get() path that skips task_done() (often an exception before the finally) _unfinished_tasks never reaches 0; shutdown never completes Put task_done() in a finally; one per get(), no more
Depth climbs despite idle workers Consumer runs blocking CPU/syscall, starving the loop; no getter is rescheduled Depth rises while CPU pins one core; PYTHONASYNCIODEBUG=1 flags slow callbacks Offload via to_thread()/executor — see CPU-Bound Task Offloading
Head-of-line blocking A poison message retried forever in place blocks everything behind it One item's attempt count climbs unboundedly; throughput stalls Cap attempts and route to a dead-letter queue
Low-priority starvation PriorityQueue with steady high-priority inflow and no aging High wait-time percentiles for low-priority tier Add an aging mechanism that boosts priority by wait time
QueueFull from put_nowait() Non-blocking put used on a full bounded queue asyncio.QueueFull raised at the producer Use await put() for backpressure, or handle QueueFull explicitly
Items lost on crash In-memory queue; process dies with items in flight Reconciliation finds gaps; no persistence Use a durable broker (Redis Streams, RabbitMQ) for at-least-once delivery

Frequently Asked Questions

What is the optimal maxsize for an asyncio.Queue in production?

There is no universal value. Start at roughly 2x your consumer concurrency, then tune from queue-depth metrics and the container memory limit. Total memory is approximately maxsize times average item size. If depth consistently pins at maxsize, add consumers or speed up the downstream; if it sits at zero, the producer is the bottleneck.

Why does queue.join() hang forever during shutdown?

join() blocks until the unfinished-task counter reaches zero, and only task_done() decrements it. If any get() path returns without calling task_done() — commonly because an exception escaped before a finally block — the counter never reaches zero and join() blocks indefinitely. Always pair get() with task_done() in a finally, exactly once per item.

When should I use put_nowait() instead of await put()?

Use await put() when you want backpressure: it suspends the producer on a full bounded queue, throttling the source. Use put_nowait() only when you can handle asyncio.QueueFull yourself — for example to drop or shed load deliberately. Calling put_nowait() on a full bounded queue raises QueueFull rather than waiting.

Can I share one asyncio.Queue across multiple event loops or threads?

No. asyncio.Queue is bound to the event loop it was created under and is not thread-safe. For cross-thread handoff use loop.call_soon_threadsafe() or a library like janus that bridges a thread-safe and an async queue; for cross-process or cross-loop coordination use a message broker such as Redis Streams, RabbitMQ, or Kafka.

How do I stop a single failing message from blocking the whole queue?

Track attempts per message and cap them. Retry with backoff up to N attempts, and after that route the message to a dead-letter queue instead of requeuing it. This keeps the main queue flowing past poison messages and preserves the failed items for inspection or replay rather than retrying them forever or dropping them silently.