Skip to content

WebSocket & Real-Time Streams

A persistent WebSocket carries a full-duplex byte stream over a single TCP connection: the server can push at any time, the client can send at any time, and neither side waits for a request/response turn. That freedom is also the hazard. A WebSocket connection is a long-lived stateful object pinned to one event loop, and every connection you accept is a coroutine (or several) that lives until the peer disconnects, the network fails, or you tear it down. Where an HTTP request is born, served, and reaped in milliseconds, a WebSocket can live for hours, and its cost — a file descriptor, protocol state, queued frames, one or more tasks — is paid for the entire lifetime. Multiply that by the number of concurrent peers and the design questions stop being about latency and start being about bounded memory, fair scheduling, and how you notice a connection has silently died.

This guide covers production WebSockets in Python with the websockets library — connection lifecycle, running concurrent send and receive over one socket, ping/pong keepalive, backpressure against slow consumers, reconnection with backoff, broadcast fan-out, and graceful close. It sits within the broader async network I/O and protocol handling reference, which frames how transports, streams, and protocols share the loop. The emphasis throughout is operational: which knobs bound memory, which exceptions you must catch, what to export as a metric, and which seemingly innocent coding shapes quietly couple every client to your slowest one.

Install the library before running any snippet here:

pip install websockets

Architectural principles

  • One connection, one or more tasks, one loop. A websockets connection object is not safe to drive from multiple coroutines that both call recv(). Reads are owned by exactly one consumer; writes can come from several producers, but the connection serialises them onto a single transport. Treat the connection as a resource owned by a small, named set of tasks.
  • Receiving and sending are independent. The protocol is full-duplex, so a real client almost always needs a receive loop and a send path running concurrently. Bolting both onto one async for loop forces a request/response shape the protocol does not have, and stalls outbound traffic whenever you are blocked waiting on a frame.
  • Keepalive is a protocol concern, not an application one. websockets sends ping frames and expects pongs on a timer. Dead peers are detected by the library closing the connection, not by your code noticing silence. Hand-rolled asyncio.sleep() heartbeats drift and miss; configure ping/pong heartbeats instead.
  • Backpressure must be explicit and per-connection. await ws.send(...) only applies backpressure to the one coroutine that awaits it. A broadcast to a thousand clients where one is slow will either block the whole fan-out or buffer unboundedly. Bound each consumer with its own queue — see handling WebSocket backpressure with slow consumers.
  • Every close is eventually a ConnectionClosed. Clean shutdowns, protocol errors, and dropped TCP all surface as ConnectionClosed (or its OK/Error subclasses) on the next recv()/send(). Reconnection logic lives around that exception, not inside the read loop. ConnectionClosedOK is a clean handshake (code 1000/1001) you typically swallow; ConnectionClosedError carries a non-clean code (1006 abnormal, 1011 keepalive timeout) that you log and react to. Catching the bare ConnectionClosed base class handles both.
  • The library does the framing; you do the policy. websockets already reassembles fragments, masks client frames, answers control frames, and enforces max_size. You do not parse the wire protocol. What it cannot decide for you is what to do when a peer lags, when to give up reconnecting, or how long a handshake may take — those are application policies, and getting them wrong is where production incidents come from.

Execution model: full-duplex framing over the loop

WebSocket frames are multiplexed onto the same selector-driven event loop that runs every other coroutine in your process. When a frame arrives, the loop wakes the connection's internal protocol, which reassembles fragments, answers control frames (ping/pong/close) transparently, and hands complete messages to whichever coroutine is awaiting recv(). Outbound, send() fragments and frames your payload and writes it to the transport; if the OS send buffer is full it suspends until the socket drains. Crucially, all of this is cooperative — a single CPU-bound or blocking call inside your handler stalls ping responses, delays every other connection's frames, and can trip the peer's ping_timeout. The network I/O overview covers how the loop schedules transport callbacks in general; for WebSockets the practical rule is that the receive task must yield often and never block.

Two consequences follow directly from this shared-loop model. First, fairness is implicit, not guaranteed: the loop services ready callbacks in roughly the order they became ready, so a handler that does a 200 ms synchronous JSON-decode of a large message holds the loop for 200 ms, during which no other connection's frames are read and no pings are answered. With a thousand connections that one handler can cascade into a wave of false ping_timeout closures. Offload anything CPU-heavy — JSON of large payloads, compression, crypto — to a thread or process so the loop stays responsive. Second, the receive side and the keepalive side share the same loop: pongs are received and processed as ordinary loop events, so the same blocking call that delays your recv() also delays the library noticing the peer is alive. This is why "blocking the recv loop" appears as both a backpressure pitfall and a heartbeat pitfall — it is one root cause with two symptoms. Keep the per-frame work in the receive task tiny: validate, enqueue, return; do the expensive part downstream.

Full-duplex WebSocket over the event loop One connection driven by a recv task and a send task running concurrently, with the library answering ping/pong heartbeats. Full-duplex WebSocket over one event loop Your tasks recv task async for msg in ws send task await ws.send(...) heartbeat (library) Connection framing reassembly ping/pong on event loop Peer browser / service Frames flow both directions concurrently; a blocked recv task must never stall the send task or the heartbeat.

Pattern catalogue

The patterns below build on each other: a read-only client, then concurrent send/receive, then the keepalive and backpressure machinery a long-lived connection needs, then fan-out and reconnection. Each is a self-contained shape you can lift, and each notes when it is the right tool and what it costs.

Client: connect and run a receive loop

The simplest correct client opens a connection as an async context manager and iterates messages. The async for loop terminates cleanly when the peer closes; the context manager guarantees the close handshake runs even on exception.

import asyncio
import websockets


async def consume(uri: str) -> None:
    async with websockets.connect(uri) as ws:
        async for message in ws:          # ends on clean close
            handle(message)               # must not block the loop


def handle(message: str | bytes) -> None:
    print(f"recv {len(message)} bytes")


asyncio.run(consume("wss://stream.example.internal/feed"))

When to use: read-only feeds (market data, log tails) where you never send. Trade-off: there is no concurrent send path, so adding outbound traffic later forces a rewrite into the producer/consumer shape below.

Concurrent producer and consumer over one connection

Because the protocol is full-duplex, most clients need to read and write at the same time. Run two tasks under a TaskGroup; if either fails, the group cancels the other and the connection closes.

import asyncio
import websockets


async def session(uri: str, outbox: asyncio.Queue[str]) -> None:
    async with websockets.connect(uri) as ws:
        async with asyncio.TaskGroup() as tg:
            tg.create_task(_recv_loop(ws))
            tg.create_task(_send_loop(ws, outbox))


async def _recv_loop(ws) -> None:
    async for message in ws:
        handle(message)


async def _send_loop(ws, outbox: asyncio.Queue[str]) -> None:
    while True:
        msg = await outbox.get()
        await ws.send(msg)          # serialised onto the transport
        outbox.task_done()

When to use: any interactive client (chat, control channel, RPC-over-WS). Trade-off: exactly one task may own recv(); never start two readers on the same connection or frames interleave unpredictably.

Ping/pong heartbeat configuration

websockets keepalive is driven by two parameters. ping_interval is how often a ping is sent on an otherwise idle connection; ping_timeout is how long it waits for the pong before declaring the peer dead and closing. Tune them to your network rather than accepting the defaults blindly.

import websockets

# Server
serve = websockets.serve(
    handler, "0.0.0.0", 8765,
    ping_interval=20,   # send a ping every 20s of idleness
    ping_timeout=20,    # close if no pong within 20s
)

# Client
connect = websockets.connect(
    "wss://example.internal",
    ping_interval=20,
    ping_timeout=20,
)

When to use: every long-lived connection. Trade-off: too aggressive a ping_timeout kills healthy peers on jittery links; too loose and dead connections leak. The dedicated guide on tuning ping/pong heartbeats works through the numbers.

Bounded send queue for backpressure

When a producer is faster than the socket can drain, awaiting send() directly couples your producer to the slowest peer. Decouple with a bounded asyncio.Queue: the producer applies natural backpressure by blocking on a full queue, exactly the pattern from async queue management.

import asyncio
import websockets


async def writer(ws, sendq: asyncio.Queue[bytes]) -> None:
    while True:
        frame = await sendq.get()
        await ws.send(frame)
        sendq.task_done()


async def attach(ws) -> asyncio.Queue[bytes]:
    sendq: asyncio.Queue[bytes] = asyncio.Queue(maxsize=256)
    asyncio.create_task(writer(ws, sendq))
    return sendq        # producers do: await sendq.put(frame)

When to use: any connection where the producer rate can exceed the consumer's drain rate. Trade-off: a full queue blocks the producer — fine for a single connection, dangerous in a fan-out where one slow client must not stall the rest (covered next, and in depth in backpressure with slow consumers).

Broadcast fan-out to many clients

To push one message to every connected client, you must not await ws.send() serially in a loop — one slow peer delays everyone. Give each client its own bounded queue and a dedicated writer task; broadcasting becomes a non-blocking put_nowait per client.

import asyncio

clients: dict[object, asyncio.Queue[bytes]] = {}


def broadcast(payload: bytes) -> None:
    for ws, q in clients.items():
        try:
            q.put_nowait(payload)        # never blocks the broadcaster
        except asyncio.QueueFull:
            q._slow = True               # mark for the drop/disconnect policy

When to use: pub/sub fan-out, live dashboards, presence updates. Trade-off: per-client queues bound memory but force an explicit policy when one fills (drop, coalesce, or disconnect) — do not let a full queue silently wedge a writer.

Reconnect with backoff

Clients must assume the connection will drop. Wrap the whole session in a retry loop that catches ConnectionClosed and connection errors, then sleeps with exponential backoff and jitter before reconnecting — the same discipline as general retry and backoff strategies.

import asyncio
import random
import websockets


async def run_forever(uri: str) -> None:
    backoff = 1.0
    while True:
        try:
            async with websockets.connect(uri) as ws:
                backoff = 1.0           # reset on a successful connect
                await _recv_loop(ws)
        except (OSError, websockets.ConnectionClosed):
            await asyncio.sleep(backoff + random.uniform(0, backoff))
            backoff = min(backoff * 2, 30.0)

When to use: every production client. Trade-off: uncapped retries against a dead endpoint waste resources; cap attempts or escalate to an alert after a threshold. Resetting backoff only after a successful connect (not merely after the connect() call returns) prevents a flapping endpoint that accepts and immediately drops connections from defeating the backoff entirely — without that guard a connection that survives a fraction of a second resets the delay to one second and you hammer the server in a tight loop.

Resource boundaries

A WebSocket server is a memory-accounting problem before it is a throughput problem. Four limits matter, and each one is the cap on a distinct resource:

  • max_size caps the largest single message the library will accept (default 1 MiB). A misbehaving or hostile peer can otherwise force you to buffer an arbitrarily large frame in memory before your code ever sees it. Set it to the real maximum your protocol needs — max_size=2**20 or lower for text control channels — and treat an InvalidMessage/oversize close as a signal to inspect the offending client rather than a bug to suppress.
  • Per-connection send buffering. The library bounds its internal write buffer with write_limit (a high/low watermark pair); when the buffered bytes exceed the high mark, send() suspends until the socket drains below the low mark. That is transport-level backpressure and it is strictly per-connection — it slows the one task awaiting send(). It does not protect a broadcast loop that uses put_nowait or a fire-and-forget create_task(ws.send(...)), which is exactly why each client needs its own bounded application queue layered above the transport.
  • Inbound queue depth. On the receiving side the library buffers messages it has read but you have not yet consumed, bounded by max_queue. If your recv() consumer falls behind, this fills and the library applies backpressure to the peer at the TCP level. Tune it together with how fast your handler drains; a large max_queue hides a slow consumer at the cost of memory.
  • Connection count. Each connection is an open file descriptor plus task and protocol state — on the order of tens of kilobytes before any application buffering. Past a few thousand connections you need a raised ulimit -n, and you should bound new accepts so a connection storm cannot exhaust memory before keepalive evicts dead peers. Pair this with timeouts and deadlines on the handshake and on the first application message, so a peer that connects and then goes silent cannot occupy a slot indefinitely.

The arithmetic that should drive sizing is: worst-case server memory roughly equals num_clients * (per-connection overhead + max_queue * avg_inbound_frame + app_queue_depth * avg_outbound_frame). Fix the client count you must support and the memory budget you have, and the queue depths fall out. Sizing the application send queues is the same bounded-queue discipline covered in async queue management.

Integrated production example

A broadcast server with per-client bounded queues, library-managed heartbeats, a slow-client disconnect policy, and graceful shutdown — the pieces above assembled into one runnable program.

import asyncio
import contextlib
import signal
import websockets

CLIENTS: dict[websockets.WebSocketServerProtocol, asyncio.Queue[str]] = {}
QUEUE_DEPTH = 128


async def _writer(ws, q: asyncio.Queue[str]) -> None:
    """Drain one client's queue onto its socket; exits when the queue is closed."""
    try:
        while True:
            msg = await q.get()
            await ws.send(msg)
            q.task_done()
    except websockets.ConnectionClosed:
        pass


async def handler(ws) -> None:
    q: asyncio.Queue[str] = asyncio.Queue(maxsize=QUEUE_DEPTH)
    CLIENTS[ws] = q
    writer = asyncio.create_task(_writer(ws, q))
    try:
        async for message in ws:            # inbound from this client
            broadcast(str(message))
    except websockets.ConnectionClosed:
        pass
    finally:
        CLIENTS.pop(ws, None)
        writer.cancel()
        with contextlib.suppress(asyncio.CancelledError):
            await writer


def broadcast(payload: str) -> None:
    """Fan out to every client; disconnect any client that cannot keep up."""
    for ws, q in list(CLIENTS.items()):
        try:
            q.put_nowait(payload)
        except asyncio.QueueFull:
            # Slow consumer: enforce policy — drop the client rather than stall others.
            asyncio.create_task(ws.close(code=1013, reason="too slow"))


async def main() -> None:
    loop = asyncio.get_running_loop()
    stop = loop.create_future()
    loop.add_signal_handler(signal.SIGTERM, stop.set_result, None)
    loop.add_signal_handler(signal.SIGINT, stop.set_result, None)

    async with websockets.serve(
        handler, "0.0.0.0", 8765,
        ping_interval=20,        # heartbeat: evict dead peers
        ping_timeout=20,
        max_size=2**20,          # cap inbound message size
    ):
        print("serving on :8765")
        await stop               # block until a signal arrives
        print("draining...")
        # Closing all client connections triggers the close handshake.
        await asyncio.gather(
            *(ws.close(code=1001, reason="server shutdown") for ws in list(CLIENTS)),
            return_exceptions=True,
        )


if __name__ == "__main__":
    asyncio.run(main())

Diagnostic Hook: Export len(CLIENTS) and, per client, q.qsize() as gauges. A queue that sits near QUEUE_DEPTH is a lagging consumer about to be dropped; a rising count of 1013 close codes means your fan-out rate exceeds what some peers can drain. Track ws.latency (the library's last RTT from ping/pong) to catch network degradation before ping_timeout fires, and alert when process RSS grows faster than len(CLIENTS).

Diagnostic Hook

Run with PYTHONASYNCIODEBUG=1 in staging to surface coroutines that block the loop longer than 100 ms — a blocked handler delays every other client's frames and pongs. In production, scrape per-connection ws.latency and per-client qsize(); the earliest signal of a slow consumer is its queue depth climbing while everyone else's stays flat. A spike in 1006 (abnormal closure) close codes points at network or proxy drops; a spike in 1013 points at your own backpressure policy firing.

Failure modes

Failure mode Root cause Detection Fix
Healthy peers dropped ping_timeout too low for a jittery link Bursts of 1011/timeout closes correlate with latency spikes Raise ping_timeout; see heartbeat tuning
Dead connections leak FDs Keepalive disabled (ping_interval=None) FD count grows, no traffic on stale conns Enable ping_interval; never hand-roll sleep heartbeats
Broadcast stalls on one client await ws.send() in a serial fan-out loop One slow peer freezes all sends; queue depths uniform-high Per-client bounded queue + writer task
Server OOM under load Unbounded send buffering / no max_size RSS grows with no matching client count Bound queues, set max_size, drop slow clients
Two readers on one connection Concurrent recv() from two tasks Interleaved/garbled messages, lost frames Single owner for recv(); multiplex via a queue
Reconnect thundering herd Fixed-delay or no-jitter reconnect Synchronised reconnect spikes on the server Exponential backoff + jitter; reset on success

Frequently asked questions

Can I call recv() on the same websockets connection from two tasks?

No. Exactly one coroutine may own recv() on a connection; two concurrent readers interleave frame delivery and lose messages. Run a single receive loop and fan out internally through an asyncio.Queue. Sends may originate from several producers because the connection serialises writes onto one transport, but reads must have a single owner.

Why does my broadcast freeze when one client is slow?

Because awaiting ws.send() in a serial fan-out loop blocks on the slowest peer's socket drain. Give each client its own bounded asyncio.Queue with a dedicated writer task and broadcast with put_nowait, then apply an explicit policy (drop, coalesce, or disconnect) when a queue fills. This bounds per-client memory and isolates one slow consumer from the rest.

Should I implement my own heartbeat with asyncio.sleep?

No. The websockets library sends ping frames on ping_interval and closes the connection if no pong arrives within ping_timeout. Hand-rolled sleep heartbeats drift, miss frames when the loop is busy, and leave zombie connections. Configure ping_interval and ping_timeout and let the library detect dead peers.

How do I shut a WebSocket server down gracefully?

Wait on a stop signal, then close each connection with code 1001 (going away) so the close handshake completes, and cancel per-connection writer tasks before exit. Closing the serve() context manager stops accepting new connections; gathering the per-client close() coroutines drains the existing ones.