Skip to content

Implementing a Dead-Letter Queue with asyncio

One malformed message can take down a whole pipeline. A worker pulls an item, processing raises, and you face two bad options: requeue it — where it fails again, retries forever, and blocks everything behind it (head-of-line blocking) — or drop it, and lose data silently. Neither is acceptable in a system that must keep flowing and must not lose work. The standard answer is a dead-letter queue (DLQ): track how many times each message has been attempted, retry a few times with backoff, and after N failures route the poison message to a separate queue where it stops blocking the main flow and can be inspected or replayed later. This guide builds that with two asyncio.Queue instances and per-message attempt tracking.

Prerequisites

The shape in one sentence: a main queue, a dead-letter queue, and an attempt counter on each message — the worker retries up to a cap, then moves the message to the DLQ instead of requeuing it.

Message flow through retry to a dead-letter queue A message is dequeued from the main queue by a worker; on success it is acknowledged, on failure it is retried with backoff up to N attempts, and after the cap it is routed to a dead-letter queue for inspection or replay. Retry, then dead-letter Main queue await get() Worker process(msg) Success task_done() Dead-letter queue after N attempts retry w/ backoff (< N) give up

1. Build the main queue and a worker

Start with a bounded main queue and a worker that pulls, processes, and acknowledges. Model the message as a dataclass so it can carry metadata (the attempt count comes next).

import asyncio
from dataclasses import dataclass, field


@dataclass
class Message:
    payload: str
    attempts: int = 0
    last_error: str = field(default="", compare=False)


async def process(msg: Message) -> None:
    await asyncio.sleep(0.01)
    if "poison" in msg.payload:          # deterministic, repeatable failure
        raise ValueError(f"cannot process {msg.payload!r}")


async def worker(main_q: asyncio.Queue[Message]) -> None:
    while True:
        msg = await main_q.get()
        try:
            await process(msg)
        finally:
            main_q.task_done()

Verify: push a few non-poison messages and they process and task_done() cleanly; await main_q.join() returns. Push one "poison" message and the ValueError currently escapes into the finally then propagates — proving you need explicit failure handling, which the next steps add.

2. Track attempts per message

The decision of whether to retry or dead-letter hinges on a count. Carry an attempts counter on the message itself and increment it on each failure. Because the message object is requeued by reference, the count survives across attempts.

import asyncio


async def worker(main_q: asyncio.Queue[Message], dlq: asyncio.Queue[Message]) -> None:
    while True:
        msg = await main_q.get()
        try:
            await process(msg)
        except Exception as exc:
            msg.attempts += 1
            msg.last_error = repr(exc)
            # routing decision comes in step 4; for now, just observe the count
            print(f"attempt {msg.attempts} failed for {msg.payload!r}: {exc}")
        finally:
            main_q.task_done()

Verify: feed a poison message once and the worker prints attempt 1 failed. The counter lives on the message, not the worker, so when you requeue the same object the count is already 1 going into the next attempt — confirm by logging msg.attempts and watching it climb on a requeued message.

3. Retry with backoff before giving up

Immediate requeue hammers a possibly-overloaded downstream and spins the CPU. Insert an exponential backoff between attempts so transient failures (a brief network blip, a momentarily locked row) get a real chance to recover. Keep the worker free during the wait by sleeping before requeuing.

import asyncio


async def worker(main_q: asyncio.Queue[Message], dlq: asyncio.Queue[Message],
                 max_attempts: int = 3, base: float = 0.05) -> None:
    while True:
        msg = await main_q.get()
        try:
            await process(msg)
        except Exception as exc:
            msg.attempts += 1
            msg.last_error = repr(exc)
            if msg.attempts < max_attempts:
                delay = base * 2 ** (msg.attempts - 1)   # 0.05, 0.10, 0.20 ...
                await asyncio.sleep(delay)               # backoff before retry
                await main_q.put(msg)                    # requeue same object
        finally:
            main_q.task_done()

Verify: a transient failure that succeeds on attempt 2 shows one backoff sleep then success; a permanent failure backs off base, then 2*base, escalating each attempt. For jittered backoff that avoids retry stampedes across many workers, see Retry and Backoff Strategies.

4. Route to the DLQ after max attempts

When attempts reaches the cap, stop retrying — put the message on the dead-letter queue instead of back on the main queue. The main queue keeps flowing; the poison message is preserved with its error context for later.

import asyncio


async def worker(main_q: asyncio.Queue[Message], dlq: asyncio.Queue[Message],
                 max_attempts: int = 3, base: float = 0.05) -> None:
    while True:
        msg = await main_q.get()
        try:
            await process(msg)
        except Exception as exc:
            msg.attempts += 1
            msg.last_error = repr(exc)
            if msg.attempts >= max_attempts:
                await dlq.put(msg)                       # give up: dead-letter it
            else:
                await asyncio.sleep(base * 2 ** (msg.attempts - 1))
                await main_q.put(msg)                    # retry
        finally:
            main_q.task_done()

Verify: a poison message lands in the DLQ after exactly max_attempts failures (dlq.qsize() == 1, msg.attempts == 3), while good messages interleaved with it process normally — the main queue never stalls behind the poison. Crucially, task_done() is called for the poison message's main-queue slot, so main_q.join() still completes.

5. Inspect and replay the dead-letter queue

A DLQ is useless if nothing ever looks at it. Drain it to inspect failures (the last_error tells you why), and after a fix or deploy, replay selected messages back onto the main queue with their attempt counter reset.

import asyncio


async def inspect_dlq(dlq: asyncio.Queue[Message]) -> list[Message]:
    """Drain the DLQ non-destructively into a list for review."""
    items: list[Message] = []
    while not dlq.empty():
        items.append(dlq.get_nowait())
        dlq.task_done()
    for m in items:
        print(f"DEAD: {m.payload!r} attempts={m.attempts} error={m.last_error}")
    return items


async def replay(dlq_items: list[Message], main_q: asyncio.Queue[Message]) -> None:
    """Re-submit fixed messages with a fresh attempt budget."""
    for m in dlq_items:
        m.attempts = 0                       # reset so retries apply again
        m.last_error = ""
        await main_q.put(m)

Verify: after inspection you have a list of dead messages each with a populated last_error; after replay, those messages re-enter the main queue with attempts == 0 and are reprocessed (a since-fixed message now succeeds; a still-poison one cycles back to the DLQ). In production, back the DLQ with a durable store (a table, a Redis Stream) rather than an in-memory asyncio.Queue so dead letters survive a restart.

Verification

A correct DLQ implementation exhibits all of these:

  • Poison messages land in the DLQ: after exactly max_attempts failures a message is on the dead-letter queue, carrying its attempts count and last_error.
  • Main queue keeps flowing: good messages interleaved with poison ones are processed without delay; the poison message never causes head-of-line blocking.
  • No lost work: every message either succeeds, is in flight, or is in the DLQ — none vanish. main_q.join() completes because task_done() is called on every dequeue including dead-lettered ones.
  • Backoff between attempts: retry delays escalate per attempt; the worker is not spinning on immediate requeues.
  • Replayable: DLQ contents can be inspected by error and re-submitted with a reset attempt budget.

Pitfalls & edge cases

  • Forgetting task_done() on the dead-letter path. When you route a message to the DLQ, you must still call task_done() for its slot on the main queue, or main_q.join() hangs at shutdown. Keep task_done() in the finally so every branch hits it exactly once.
  • Resetting the counter on requeue. If you build a fresh Message (or reset attempts) when requeuing for retry, the cap never triggers and the poison message retries forever. Requeue the same object so attempts accumulates; only reset it deliberately on replay.
  • DLQ as another in-memory queue. An asyncio.Queue DLQ dies with the process — a crash loses every dead letter. For anything that must survive restart, persist dead letters to a durable store (DB table, Redis Stream, broker DLX) and treat the in-memory queue as a buffer.
  • No bound on the DLQ. A bad deploy can dead-letter a flood of messages. Bound the DLQ too, and alert on its depth — a sudden rise in dead-letter rate is a high-signal incident indicator.
  • Backoff that blocks the worker pool. Sleeping inside the worker before requeue ties up that worker for the backoff duration. Under heavy retry load this starves throughput; for high volumes, schedule the requeue with loop.call_later() or a delay queue so the worker stays free.

Frequently Asked Questions

What is a dead-letter queue and when do I need one?

A dead-letter queue is a separate queue that holds messages which failed processing repeatedly. You need one whenever a single poison message must not block the main queue (head-of-line blocking) or be lost: after N failed attempts the message is routed to the DLQ for inspection or replay instead of being retried forever or dropped.

How do I track retry attempts per message in asyncio?

Store an attempts counter on the message object itself, typically a dataclass field. Increment it on each failure and requeue the same object so the count accumulates across attempts. The worker compares attempts against a max-attempts cap to decide whether to retry with backoff or route to the dead-letter queue.

Why does my queue.join() hang when using a dead-letter queue?

join() blocks until task_done() is called once per item taken from the main queue. If the dead-letter routing path returns without calling task_done() for the main-queue slot, the unfinished counter never reaches zero and join() hangs. Keep task_done() in a finally so the success, retry, and dead-letter branches all reach it exactly once.

Should the dead-letter queue be in memory or persisted?

An in-memory asyncio.Queue DLQ is fine for buffering within a process, but it dies on restart, losing every dead letter. For anything that must survive a crash, persist dead letters to a durable store such as a database table, a Redis Stream, or a broker dead-letter exchange, and bound and alert on the DLQ depth.