Skip to content

Handling WebSocket Backpressure with Slow Consumers

A real-time server produces messages at the rate of the event, not the rate of the slowest client. When one consumer — a mobile peer on a degraded link, a browser tab throttled in the background, a downstream service paused for GC — cannot drain its socket as fast as you produce, the messages have to go somewhere. Without an explicit policy they pile up in the server's send buffer, and a single slow client can drive process memory to OOM while every other client keeps working fine. The failure is insidious because it is invisible at low load and during testing: everything works until one real-world client on a bad network connects, and then memory climbs until the process is killed and every connection drops — the one slow client takes down the thousand healthy ones with it.

The root cause is that the transport's own await ws.send() only pushes back on the one coroutine that awaits it. In a fan-out you have two bad options if you stop there: either you await each send serially, in which case the broadcast freezes on the slowest socket and nobody gets timely messages; or you sidestep the await with create_task(ws.send(...)) and the library's internal write buffer grows without bound for the lagging client. Neither is acceptable. The fix is to stop treating backpressure as something the transport handles for you and make it explicit and per-client: a bounded buffer for each connection, a way to detect when that buffer fills, and a deliberate policy for what to do about it. This guide builds that up step by step so a lagging consumer is bounded in memory, detected early, and dealt with — never able to harm its neighbours.

Prerequisites

  • Python 3.11+ (for asyncio.TaskGroup and asyncio.timeout()).
  • The websockets library:
pip install websockets

1. Reproduce unbounded send growth

First make the failure visible. Spawn a writer per client that awaits send() and a producer that never waits — the naive shape where one slow client silently accumulates buffered frames. With a fast producer and one artificially slow consumer, watch the buffered backlog grow without limit.

import asyncio
import websockets


async def naive_writer(ws, produce: asyncio.Queue) -> None:
    # No bound: every message awaits send(); a slow socket lets the
    # library's internal write buffer (and any upstream list) grow unbounded.
    while True:
        msg = await produce.get()
        await ws.send(msg)        # suspends on a slow peer; backlog accumulates upstream


async def fast_producer(produce: asyncio.Queue) -> None:
    n = 0
    while True:
        await produce.put(f"event-{n}".encode())   # unbounded queue: grows forever
        n += 1
        await asyncio.sleep(0)    # produce as fast as the loop allows

There are two distinct places the backlog can accumulate, and conflating them wastes debugging time. If you await ws.send(), the backlog sits in the library's internal write buffer (and any unbounded upstream queue feeding the writer). If you fire sends with create_task(ws.send(...)), the backlog is a pile of pending tasks plus their captured payloads. Either way the symptom is the same: RSS rises in lockstep with how far one client lags. Verify: run against a deliberately throttled client (e.g. a consumer that await asyncio.sleep(1) between reads) and watch process RSS and the producer queue's qsize() climb monotonically while a fast client on the same server stays flat. That divergence — one client's backlog growing without bound while others are fine — is the precise bug the rest of this guide removes.

2. Per-client bounded asyncio.Queue

Give every connection its own bounded queue and a dedicated writer task that drains it onto the socket. The bound is the memory ceiling per client; the writer is the only coroutine that touches ws.send() for that connection.

import asyncio
import websockets

MAX_BACKLOG = 256


class Client:
    def __init__(self, ws: websockets.WebSocketServerProtocol) -> None:
        self.ws = ws
        self.queue: asyncio.Queue[bytes] = asyncio.Queue(maxsize=MAX_BACKLOG)

    async def writer(self) -> None:
        try:
            while True:
                frame = await self.queue.get()
                await self.ws.send(frame)
                self.queue.task_done()
        except websockets.ConnectionClosed:
            pass

The key invariant is that the writer task is the sole owner of ws.send() for its connection: nothing else awaits send on that socket, so the library's own transport buffer only ever holds at most one in-flight frame per client and your application queue is the real, observable bound. Enqueueing is now decoupled from sending — producers interact with queue, never with the socket. Verify: under the same fast producer, the per-client queue tops out at MAX_BACKLOG instead of growing without limit, and await queue.put() on a full queue suspends the producer rather than buffering forever. Total server memory is now bounded by MAX_BACKLOG * frame_size * num_clients, a number you can compute in advance and check against your memory budget.

3. Detect a lagging client (queue full)

A bounded queue turns "slow consumer" into a precise, observable signal: the queue is full. Use put_nowait from the broadcaster so the producer never blocks on one client; a QueueFull exception is your lag detector.

import asyncio


def offer(client: "Client", frame: bytes) -> bool:
    """Non-blocking enqueue. Returns False if the client is lagging (queue full)."""
    try:
        client.queue.put_nowait(frame)
        return True
    except asyncio.QueueFull:
        return False          # this client cannot keep up right now

Verify: with one throttled client and several fast ones, offer() returns False only for the slow client; the fast clients' queues stay shallow. Count the False returns per client to rank lag severity.

4. Policy: drop, coalesce, or disconnect the slow client

Detection demands a decision. There is no free lunch — you trade off liveness, correctness, and fairness, and the right choice depends entirely on what the data means to the consumer:

  • Drop the new frame (or drop-oldest): acceptable for state you resend, e.g. presence or telemetry where only the latest value matters and a missed sample is harmless. Drop-newest is the simplest; drop-oldest keeps the freshest data at the cost of evicting in-flight frames.
  • Coalesce: replace the queued backlog with the newest snapshot — ideal for "current value" feeds (price, position, gauge readings) where intermediate frames are obsolete the moment a newer one exists. Coalescing bounds the queue to depth one for slow clients while fast clients still get every update, and it is often the highest-quality option when the payload is a full state snapshot rather than a delta.
  • Disconnect: close the client with a backpressure close code (1013 try-again-later) when it has lagged past a grace period — the right call for ordered streams or delta encodings where dropping or reordering silently corrupts the consumer's view. A clean disconnect lets the client reconnect and request a fresh snapshot, which is recoverable; a gap it never learns about is not.

The decisive question is whether the consumer can tolerate gaps. If each message is a self-contained snapshot, drop or coalesce freely. If messages are deltas that must be applied in order, a single dropped frame desynchronises the client forever, so disconnect-and-resync is the only correct policy. Mixing the two — dropping frames on a delta stream — produces the worst kind of bug: a client that appears connected and healthy but is silently showing stale or corrupted state.

import asyncio
import websockets


async def broadcast(clients: list["Client"], frame: bytes) -> None:
    for c in clients:
        if offer(c, frame):
            c.strikes = 0
            continue
        c.strikes = getattr(c, "strikes", 0) + 1
        if c.strikes <= 3:
            # COALESCE: drop the stale backlog, keep only the freshest frame.
            _drain(c.queue)
            c.queue.put_nowait(frame)
        else:
            # DISCONNECT: persistent lagger — stop punishing everyone else.
            await c.ws.close(code=1013, reason="consumer too slow")


def _drain(q: asyncio.Queue) -> None:
    while not q.empty():
        q.get_nowait()
        q.task_done()

Verify: drive the throttled client past the strike threshold and confirm it is closed with 1013 while the fast clients see no gaps and no added latency.

5. Monitor buffer and queue depth

Backpressure is only safe if you can see it. Export per-client queue depth and the transport's own buffered bytes as metrics, and alert before the policy fires rather than after.

import time


def snapshot(clients: list["Client"]) -> dict[str, float]:
    depths = [c.queue.qsize() for c in clients]
    return {
        "clients": len(clients),
        "max_queue_depth": max(depths, default=0),
        "queues_near_full": sum(d >= MAX_BACKLOG * 0.8 for d in depths),
        "ts": time.time(),
    }

Verify: scrape the snapshot on an interval; a healthy fan-out shows max_queue_depth near zero. A client whose depth climbs toward MAX_BACKLOG is your early warning — it will trip the policy within seconds. Export these as gauges and histograms rather than logging them: you want to alert on the distribution of queue depth across clients, because a healthy system has almost all clients at depth zero with only occasional brief spikes. A rising count of queues_near_full, or a 1013 close-code rate that tracks your message-production rate, tells you the fan-out is outrunning real client capacity and you need to coalesce harder, shrink the payload, or shed load upstream rather than just disconnecting symptomatically.

Verification

  • Bounded per-client memory: with a fast producer and one stuck consumer, process RSS plateaus instead of climbing; total buffering is capped at MAX_BACKLOG * frame_size * num_clients.
  • Slow client does not harm others: fast clients see no added latency and no dropped frames while the slow client lags or is disconnected.
  • Observable lag: the slow client's qsize() rises toward MAX_BACKLOG while every other client's stays near zero, and the lagging client is dropped/coalesced/closed per policy.

Pitfalls and edge cases

  • Awaiting send() blocks the producer. Calling await ws.send() directly in a broadcast loop couples your whole fan-out to the slowest socket. Always go through a per-client queue with a dedicated writer task, and broadcast with put_nowait.
  • Broadcast to many with one slow client. A single shared queue or a serial await send() loop lets one lagging peer stall everyone. Per-client queues are the only structure that isolates lag; the WebSocket overview shows the full fan-out shape.
  • Conflating transport buffer with app queue. The library has its own write_limit buffer that applies transport-level backpressure to the writer task — but that is invisible to a put_nowait broadcaster. Your application queue is what bounds and surfaces lag; do not assume the transport limit protects a fan-out.
  • Silent drops on ordered streams. Dropping or coalescing frames corrupts a consumer that assumes a complete, ordered sequence. For such streams, disconnect (1013) and let the client reconnect and resync rather than feeding it gaps.
  • Unbounded strike growth. Tracking strikes without ever resetting them on recovery slowly disconnects clients that hiccup once. Reset the counter the moment offer() succeeds again.

Frequently Asked Questions

Why does awaiting ws.send() not give me backpressure in a broadcast?

await ws.send() only pushes back on the single coroutine awaiting it. In a serial broadcast loop that means one slow client's socket drain blocks delivery to every other client. Backpressure has to be per-client: a bounded asyncio.Queue and a dedicated writer task per connection, with the broadcaster using put_nowait so it never blocks on one peer.

Should I drop messages or disconnect a slow WebSocket client?

It depends on the data. For resendable or current-value state (presence, telemetry, prices) drop or coalesce to the newest frame. For ordered streams where gaps corrupt the consumer's view, disconnect with code 1013 after a grace period and let the client reconnect and resync, because silently dropping frames is worse than a clean disconnect.

What is the difference between the transport write buffer and my application queue?

The websockets library has an internal write buffer bounded by write_limit that applies transport-level backpressure to the writer task. Your application queue is a separate, explicit bound that a put_nowait broadcaster can observe and act on. The transport limit does not protect a fan-out, so the application queue is what bounds memory and surfaces lag per client.

How big should the per-client queue be?

Size it from the memory budget: total worst-case buffering is roughly maxsize times the average frame size times the number of clients. Pick a maxsize that bounds that product to an acceptable fraction of process memory, then tune from observed queue-depth metrics. A deeper queue tolerates brief lag spikes but delays detection of a persistently slow client.

Per-client bounded queues isolate a slow consumer One producer feeds a bounded queue per client; the lagging client's queue is full and triggers the policy while other clients drain normally. Per-client bounded queues producer put_nowait queue queue FULL -> policy queue client A (fast) client C (fast) client B (slow) A full queue bounds the slow client and triggers drop / coalesce / disconnect; fast clients are unaffected.