Skip to content

Bounded asyncio.Queue with Backpressure Under Load

An unbounded asyncio.Queue is a memory leak waiting for a slow consumer. The default asyncio.Queue() has maxsize=0, meaning every put() succeeds instantly no matter how far behind the consumers are — so a producer that reads from a fast source (a socket, a Kafka partition, a file) will happily accept items into the queue faster than they drain. Under sustained load the queue grows until RSS climbs into the container limit and the process is OOM-killed. The fix is a bounded queue whose put() blocks when full, turning the consumer's pace into backpressure that propagates all the way back to the source. This guide reproduces the growth, applies the bound, measures that backpressure is engaging, and pushes it to the ingest layer.

Prerequisites

  • Python 3.11+. Examples use asyncio.TaskGroup, asyncio.timeout(), and time.perf_counter(); stdlib only.
  • A running event loop under asyncio.run(); the queue is bound to that loop.
  • The basics of queue coordination. This is a detail page under Async Queue Management; skim that overview for how put()/get() suspend on the loop and what task_done()/join() do. For the broader producer/consumer model, see Concurrent Execution & Worker Patterns.

The core idea in one line: a bounded queue is a backpressure mechanism, and await put() is the throttle. Everything below is about confirming it actually throttles and that the throttle reaches the source rather than dead-ending at the producer.

Backpressure propagation from consumer to source A bounded queue at capacity blocks the producer's put(); the blocked producer stops reading from the ingest source, so backpressure flows backward from consumer to queue to producer to source. Backpressure flows upstream Source socket / Kafka Producer await put() Bounded queue maxsize = N FULL Consumer slow backpressure: slow consumer -> full queue -> blocked put -> paused source read

1. Reproduce unbounded growth

Before fixing it, watch it break. An unbounded queue with a producer faster than the consumer grows without limit; the depth is the leak.

import asyncio


async def fast_producer(q: asyncio.Queue[bytes]) -> None:
    for _ in range(100_000):
        await q.put(b"x" * 1024)        # 1 KiB payloads, no real wait
        await asyncio.sleep(0)          # yield so the consumer can run


async def slow_consumer(q: asyncio.Queue[bytes]) -> None:
    while True:
        await q.get()
        await asyncio.sleep(0.001)      # 1 ms per item -> far slower than producer
        q.task_done()


async def main() -> None:
    q: asyncio.Queue[bytes] = asyncio.Queue()   # maxsize=0 -> UNBOUNDED
    prod = asyncio.create_task(fast_producer(q))
    cons = asyncio.create_task(slow_consumer(q))
    for _ in range(5):
        await asyncio.sleep(0.2)
        print("depth:", q.qsize())      # climbs every sample
    prod.cancel(); cons.cancel()


asyncio.run(main())

Verify: depth rises monotonically every sample (hundreds, then thousands of items) — the queue is holding everything the producer outran the consumer by. Watch RSS in parallel (ps -o rss= -p $(pgrep -f your_script)) and it climbs in lockstep. This is the OOM trajectory.

2. Set maxsize to make put() block

Change one argument. With maxsize=N, await put() suspends the producer when the queue holds N items, so the queue can never exceed N — memory is now bounded by N x item_size.

import asyncio


async def main() -> None:
    q: asyncio.Queue[bytes] = asyncio.Queue(maxsize=100)   # the only change
    prod = asyncio.create_task(fast_producer(q))            # same producer as step 1
    cons = asyncio.create_task(slow_consumer(q))            # same slow consumer
    for _ in range(5):
        await asyncio.sleep(0.2)
        print("depth:", q.qsize())      # pins near 100 and stays there
    prod.cancel(); cons.cancel()


asyncio.run(main())

Verify: depth climbs to ~100 and then holds — it never exceeds maxsize. RSS plateaus instead of climbing. The producer is no longer racing ahead; it is being paced by the consumer through the full queue.

3. Confirm the producer awaits put() and is throttled

The bound only helps if the producer uses await put() (not put_nowait()). Prove the producer is genuinely suspending — not spinning, not dropping — by timing the call when the queue is full.

import asyncio
import time


async def main() -> None:
    q: asyncio.Queue[int] = asyncio.Queue(maxsize=2)
    await q.put(1)
    await q.put(2)                       # queue now full
    # This put must wait until a consumer frees a slot:
    async def free_a_slot() -> None:
        await asyncio.sleep(0.15)
        await q.get()
    asyncio.create_task(free_a_slot())
    start = time.perf_counter()
    await q.put(3)                       # blocks ~0.15s
    waited = time.perf_counter() - start
    print(f"put waited {waited*1000:.0f} ms")   # ~150 ms


asyncio.run(main())

Verify: the put waits roughly 150 ms — the time until a slot opened. A near-zero wait would mean the queue was unbounded or you used put_nowait(). This measured wait is the backpressure signal in its rawest form: the producer paid time because the consumer was behind.

4. Measure queue depth and put-wait as metrics

Turn step 3's one-off measurement into a continuous gauge. Wrap put() to record wall time spent waiting and sample depth; these two numbers are the entire health story of a bounded queue.

import asyncio
import time


class MeteredQueue(asyncio.Queue):
    def __init__(self, *a, **kw):
        super().__init__(*a, **kw)
        self.total_put_wait = 0.0
        self.put_count = 0

    async def put(self, item) -> None:
        start = time.perf_counter()
        await super().put(item)
        self.total_put_wait += time.perf_counter() - start
        self.put_count += 1

    @property
    def avg_put_wait_ms(self) -> float:
        return (self.total_put_wait / self.put_count * 1000) if self.put_count else 0.0


async def sampler(q: MeteredQueue) -> None:
    while True:
        await asyncio.sleep(1.0)
        print(f"depth={q.qsize():3d}/{q.maxsize}  avg_put_wait={q.avg_put_wait_ms:.2f}ms")

Verify: with a slow consumer, avg_put_wait rises above zero and depth sits near maxsize — backpressure is engaging. With a fast consumer, avg_put_wait stays near zero and depth hovers low. Export both as gauges (queue_depth, queue_put_wait_ms) and alert when put-wait climbs: it is the earliest, cheapest signal that consumers have fallen behind, ahead of any memory or latency alarm.

5. Propagate backpressure to the ingest source

A blocked put() only helps if the producer's blocking actually slows the source. If the producer drains a socket into the queue, suspending on put() means it stops calling recv(), so the OS TCP receive window shrinks and the sender is throttled — backpressure reaches the wire. The anti-pattern is a producer that reads everything into memory first, or uses put_nowait() with a fallback buffer: that re-introduces the unbounded growth one layer up.

import asyncio


async def ingest(reader: asyncio.StreamReader, q: asyncio.Queue[bytes]) -> None:
    """Read a stream into a bounded queue; backpressure reaches the socket."""
    while True:
        chunk = await reader.read(4096)     # next read is gated by the queue
        if not chunk:
            break
        await q.put(chunk)                  # SUSPENDS when full -> we stop read()ing
        # While suspended here, no recv() runs; the TCP window closes and the
        # remote sender is throttled. That is end-to-end backpressure.

Verify: under a fast remote sender and a slow consumer, the queue stays near maxsize and the connection's send rate drops to match consumer throughput — observable as a smaller TCP receive queue (ss -tin) on the socket. The producer never buffers more than maxsize chunks. Contrast with a put_nowait() + overflow-list design: there, the overflow list grows unbounded and you are back to step 1.

Verification

A correctly bounded, backpressuring pipeline shows all of these:

  • Bounded memory: RSS plateaus under sustained overload instead of climbing; qsize() never exceeds maxsize.
  • Depth oscillates near maxsize: under load the queue sits close to full and dips as consumers drain — the healthy steady state, not pinned at zero (consumer-starved) or growing (unbounded).
  • Put-wait above zero under load: the queue_put_wait_ms metric rises when consumers fall behind and returns to zero when they catch up.
  • Source throttled: the ingest source's intake rate falls to match consumer throughput — TCP receive window shrinks, Kafka poll rate drops, file read pauses.
  • No secondary buffer: there is no overflow list, no put_nowait() fallback, no read-into-memory step that re-creates unbounded growth upstream of the queue.

Pitfalls & edge cases

  • put_nowait() on a full queue raises QueueFull. It does not block, so it gives you no backpressure. Use it only when you intend to shed load and explicitly catch asyncio.QueueFull — never as a "faster put."
  • Deadlock if consumers stop. If every consumer dies or blocks (an unhandled exception, a blocking syscall), the queue fills, put() suspends forever, and the producer hangs. Supervise consumers under a TaskGroup so a consumer crash cancels the producer instead of deadlocking it; guard put() with asyncio.timeout() if you need a liveness ceiling.
  • Drain on shutdown. On SIGTERM, stop the producer first, then await q.join() to let consumers finish in-flight items before cancelling them — otherwise you drop everything still buffered. Wrapping the producer cancel before the join() is the correct ordering.
  • Too-small maxsize starves consumers. If the bound is below your consumer concurrency, workers idle on empty get() while the producer is throttled — you have added latency for no memory benefit. Size maxsize at roughly 2x consumer count as a starting point, then tune from the depth metric.
  • Blocking work in the consumer defeats the bound. If a consumer runs CPU-bound or blocking work between get() and task_done(), it starves the loop, no slots free, and the producer blocks even though the queue "should" drain. Offload that work — see Async Queue Management on consumer starvation.

Frequently Asked Questions

How does a bounded asyncio.Queue apply backpressure?

When the queue holds maxsize items, await queue.put() suspends the producer until a consumer calls get() and frees a slot. The producer being suspended means it stops pulling from its source, so the slowdown propagates upstream — for a socket producer, it stops calling recv(), shrinking the TCP receive window and throttling the sender.

What maxsize should I use for a bounded queue?

There is no universal value. Start at roughly 2x your consumer concurrency, then tune from the queue-depth metric and the container memory limit, since total memory is about maxsize times average item size. Too small starves consumers; too large defers backpressure until memory is already under pressure.

Will put_nowait() give me backpressure?

No. put_nowait() raises asyncio.QueueFull immediately on a full queue instead of waiting, so it provides no throttling. Use await put() for backpressure; reserve put_nowait() for deliberate load shedding where you catch QueueFull and drop or divert the item.

How do I avoid a deadlock when the queue is full and consumers stop?

If all consumers die or block, the full queue makes put() suspend forever and the producer hangs. Supervise producer and consumers under a TaskGroup so a consumer crash cancels the producer, and optionally wrap put() in asyncio.timeout() to enforce a liveness ceiling instead of blocking indefinitely.