Implementing a Priority Queue with asyncio.Queue¶
You have a single stream of async work, but not all of it is equally urgent: a user-facing request should jump ahead of a batch report, an expiring job should beat a fresh one. A plain asyncio.Queue is strictly FIFO, so urgent items wait behind whatever arrived first. The symptom that brings teams here is concrete — p99 latency on high-priority work tracks total queue depth instead of staying flat, because the queue has no notion of priority. The standard library ships asyncio.PriorityQueue, but production use needs more than that: deterministic tie-breaking for equal priorities, backpressure, and a way to detect when low-priority items are being starved. This guide builds and instruments a heap-backed priority queue that does all three.
Prerequisites¶
- Python 3.11+. The examples use
asyncio.TaskGroupand standardheapq/itertools; no third-party dependencies. - A running event loop. All snippets run under
asyncio.run(); the queue is bound to that loop. - Familiarity with the basics. This is a detail page under Async Queue Management; skim that overview for
put/get/task_donesemantics and how a bounded queue applies backpressure. For the broader producer/consumer model see Concurrent Execution & Worker Patterns.
The short version: asyncio.PriorityQueue works for simple cases, but the moment you have equal priorities or mutable payloads you need a (priority, counter, payload) shape so Python never tries to compare the payloads. The steps below build that, then make it observable.
1. Subclass asyncio.Queue with a heap backing¶
asyncio.Queue exposes three override points — _init, _put, _get — that control the internal container without touching its async machinery (the getter/putter futures, maxsize enforcement, and task_done() all keep working). Swap the default collections.deque for a list managed by heapq, and you get priority ordering while inheriting backpressure and the completion barrier for free.
Verify: enqueue (5, "a"), (1, "b"), (3, "c") and pop three times — you must get "b", "c", "a". Because _get returns only the payload, callers never see the priority/counter machinery. The inherited qsize(), maxsize, and task_done() work unchanged.
2. Break ties with a monotonic counter¶
The itertools.count() in step 1 is not optional — it is what stops Python from ever comparing two payloads. A min-heap compares tuples field by field; with (priority, payload), two items of equal priority force a comparison of the payloads, which raises TypeError for unorderable types (dicts, custom objects) and silently reorders for others. The counter sits between priority and payload, is always unique, and guarantees insertion order within a priority tier (FIFO-within-priority).
Verify: the output is ['first', 'second', 'third']. Remove the counter (revert _put/_get to (priority, payload)) and the same test raises TypeError: '<' not supported between instances of 'dict' and 'dict' the moment two equal priorities collide — proving the counter is load-bearing, not decorative.
3. Enforce backpressure with maxsize¶
Because you subclassed asyncio.Queue rather than reimplementing it, maxsize already works: await put() suspends the producer when the heap is full, applying backpressure to the source exactly as a FIFO bounded queue does. Confirm it rather than assume it.
Verify: you see full: True, then put correctly blocked on full queue, then size after drain+put: 2. The asyncio.timeout(0.1) proves put() is genuinely suspending on the full heap, not silently growing it. For the deeper treatment of sizing and propagating that backpressure to the ingest source, see Bounded asyncio.Queue with backpressure under load.
4. Drive it with a worker pool¶
A priority queue is only useful with consumers that respect it. Run a fan-out of workers under a TaskGroup, each pulling the most urgent item and marking completion so join() can drain the queue at shutdown.
Verify: processed: 20. Because workers always pop the heap root, the priority-1 jobs are picked up ahead of priority-9 jobs that arrived earlier — instrument done with timestamps and the urgent items' completion times cluster early regardless of insertion order.
5. Detect starvation with a watchdog¶
A priority queue's failure mode is the mirror of FIFO's: under steady high-priority inflow, low-priority items can wait forever. Make it observable by stamping each item with an enqueue time and sampling the oldest waiting item's age.
Verify: feed the queue a steady stream of priority-1 items plus a single priority-9 item, and the watchdog logs a starvation warning once the low-priority item's wait exceeds the SLA. The fix is an aging mechanism: periodically rewrite long-waiting items with a boosted priority (a heapify after decrementing their priority key), trading strict ordering for bounded wait time.
Verification¶
A correct, production-ready priority queue satisfies all of the following:
- Ordering: items dequeue by ascending priority; equal priorities dequeue in insertion order (FIFO-within-tier). The step 1 and step 2 tests pass deterministically.
- No payload comparison: enqueuing unorderable payloads with equal priority never raises
TypeError— the counter absorbs every tie. - Backpressure intact:
await put()suspends on a full queue (maxsize), confirmed by theasyncio.timeouttest in step 3. - Completion barrier intact:
await q.join()returns only after exactly onetask_done()per item; the worker-pool test drains cleanly. - Starvation visible: the watchdog emits a metric when the heap root's wait time crosses the SLA, giving you a signal to enable aging.
Pitfalls & edge cases¶
- Comparing payloads. Storing
(priority, payload)works until two equal priorities collide on an unorderable payload, then raisesTypeErrorat runtime under load — the worst time to find it. Always interpose a monotonic counter or adataclass(order=True)withcompare=Falseon the payload field. - Heavy
__lt__or comparison logic. Heap rebalancing calls comparisons O(log n) times per operation, all on the event loop thread. Any I/O or expensive computation in comparison stalls the loop — precompute the priority as a plain integer before enqueuing. - Reaching into
_queuefor anything but inspection. Readingq._queue[0]for a watchdog is fine; mutating it bypassesheapqinvariants and corrupts ordering. To change priorities (aging), pop, rewrite, and re-push under the queue's own coordination, or maintain an index andheapq.heapify. - Forgetting
task_done(). Inherited fromasyncio.Queue: everyget()must be matched by exactly onetask_done(), orjoin()hangs at shutdown. Keep it in afinally. - Unbounded growth.
PriorityQueue()with nomaxsizeis unbounded — a fast producer balloons the heap. Setmaxsizesoput()applies backpressure; see the bounded-queue guide for sizing.
Frequently Asked Questions¶
Why not just use asyncio.PriorityQueue from the standard library?
asyncio.PriorityQueue works for simple (priority, payload) cases but compares payloads on equal priorities, which raises TypeError for unorderable payloads and silently reorders others. Subclassing to insert a monotonic counter as (priority, counter, payload) gives deterministic FIFO-within-priority ordering and avoids comparing payloads at all.
How do I handle equal-priority tasks fairly?
Insert a monotonic counter from itertools.count() between the priority and the payload. Python's tuple comparison then resolves equal priorities by the counter, processing items in insertion order (FIFO within the same priority tier) and never touching the payload.
How do I prevent low-priority starvation?
Run a watchdog that stamps each item with an enqueue time and samples the heap root's wait time, emitting a metric when it crosses the SLA. Fix persistent starvation with an aging mechanism that periodically boosts the priority of long-waiting items, trading strict ordering for a bounded maximum wait.
Does the priority queue still support maxsize backpressure?
Yes. Because it subclasses asyncio.Queue rather than reimplementing it, maxsize, await put() backpressure, qsize(), and the task_done()/join() barrier all work unchanged. Only the internal ordering container is swapped for a heap.
Related¶
- Async Queue Management — up to the overview for
put/get/joinsemantics and the full pattern catalogue. - Bounded asyncio.Queue with backpressure under load — sizing
maxsizeand pushing backpressure to the ingest source. - Worker Pool Implementations — sizing the consumer pool that drains a prioritized queue.