How to Safely Share State Between Async Tasks and Threads¶
You have an asyncio service that also runs OS threads — a ThreadPoolExecutor for blocking I/O, a vendor SDK with its own callback thread, a to_thread worker — and they share state: a cache, a counter, a config dict. The symptoms when this goes wrong are nasty precisely because they are intermittent: a dictionary that is occasionally missing a key, a counter that is off by a few under load, an event loop that hangs for no obvious reason. The root cause is almost always the same: asyncio primitives are cooperative and loop-affine, OS threads are preemptive, and the two synchronization worlds do not mix.
This guide is a diagnostic-to-implementation path: prove where the boundary is, build a thread-safe bridge so the runtimes never touch the same object directly, pair locks correctly so neither side deadlocks the other, and validate the result under deliberately hostile concurrency.
Prerequisites¶
- Python 3.11+ (for
asyncio.timeout()and currentto_threadsemantics). - Familiarity with the boundary rules in the parent Hybrid Concurrency Models guide and the broader Concurrent Execution & Worker Patterns overview.
- An understanding that
asyncio.Lock,asyncio.Queue, andasyncio.Eventare not thread-safe — see Asyncio Synchronization Primitives.
1. Diagnose where the boundary actually is¶
Cross-runtime races show up as sporadic deadlocks, silently corrupted dictionaries, or an unresponsive loop that is blocked waiting on a thread-held resource. Before adding synchronization, find the exact point where the loop yields to a thread and where shared state is mutated. Amplify the race window by shrinking the thread switch interval, and capture C-level stacks with faulthandler.
Verify: run with python -X faulthandler your_app.py. With setswitchinterval(1e-6), any unsynchronized compound mutation that was "usually fine" now corrupts or raises reliably — that reproducibility is the signal you have located the real boundary.
Diagnostic Hook: Log
threading.current_thread().nameand the on-loop flag at every mutation site. A mutation that logson_loop=Falsefor an object you assumed was loop-only is your bug.
2. Architect a thread-safe queue bridge¶
The robust fix is to stop sharing the object at all. Decouple the runtimes with two queues: a queue.Queue (thread-safe, blocking) on the thread side and an asyncio.Queue (loop-aware) on the async side, joined by a loop-side drain task. The thread never touches an asyncio object; the loop never blocks on a queue.Queue.get() directly.
Verify: push items from several threads with submit_from_thread while a coroutine loops on consume(). Item count in equals item count out, with zero direct cross-runtime access. Watch self._async_q.qsize() climb toward maxsize under burst load — that bounded growth is correct backpressure, not a leak.
Diagnostic Hook: Track both
queue.Queue.qsize()andasyncio.Queue.qsize(). A full thread queue with an empty async queue means the drain task is starved or stalled — check it is actually scheduled.
3. Pair locks correctly and prevent deadlock¶
When copying through a queue is too expensive and you must share mutable state in place, use a threading.Lock for cross-thread access and an asyncio.Lock only within the loop — and never confuse them. threading.Lock blocks the whole thread; held across an await, it freezes the loop. asyncio.Lock raises or misbehaves if acquired off its owning loop. Bound async acquisition with a timeout so loop starvation surfaces as an error instead of a silent hang.
Verify: the locked region in update_from_thread contains no await (it cannot — it is a sync method), and the async read always releases. Under contention the read either returns a consistent snapshot or raises the timeout RuntimeError; it never hangs the loop indefinitely.
Diagnostic Hook: Wrap
asyncio.Lock.acquire()inasyncio.timeout(). A fired timeout is a precise, actionable signal of loop starvation — far better than an unbounded hang you have to attach a debugger to.
4. Validate under hostile concurrency¶
A bug that appears once per million operations will not show up in a casual test. Force interleaving: many threads writing while many coroutines read, with the switch interval cranked down, and assert a structural invariant (final count, checksum, version monotonicity) at the end.
Verify: run pytest -p no:cacheprovider. The final assertion must hold every run; flakiness here is not a flaky test, it is a real race you have not yet closed. Re-run with sys.setswitchinterval(1e-6) set in a fixture to multiply the pressure.
Diagnostic Hook: Run the suite repeatedly (
pytest --count=50withpytest-repeat) and treat any single failure as a hard fail. Cross-runtime races are probabilistic; one failure in fifty means the bug is present, not rare.
Verification¶
A correctly bridged service shows: the event loop never logs slow-callback warnings attributable to lock waits; the integrity test passes deterministically across dozens of runs; queue sizes rise and fall with load but do not grow unbounded; and no RuntimeError about a wrong event loop ever appears. If all four hold under your switch-interval-amplified load test, the boundary is race-free.
Pitfalls & Edge Cases¶
- Holding
threading.Lockacross anawait. This stalls the entire loop while the lock is held — every coroutine starves. Keep locked sections synchronous, or copy data out under the lock and process after release. - Using
asyncio.Lock(orQueue/Event) from a worker thread. These are loop-affine and not thread-safe; off-loop use races or raisesRuntimeError. Usethreading.Lock/queue.Queueon the thread side and bridge withcall_soon_threadsafe. - Assuming the GIL makes compound operations atomic. The GIL protects a single bytecode op, not a
read-modify-writesequence (d[k] = d[k] + 1,lst.appendthenlst.pop). Multi-step mutations need an explicit lock. - Swallowing
queue.Empty/QueueEmpty. Ignoring these leads to silent item loss or infinite blocking. Handle them explicitly or use the blockingget()off the loop viarun_in_executor. - Forgetting cancellation cleanup. If the drain task is cancelled, in-flight items in the
queue.Queueare stranded. Drain or re-enqueue on shutdown, and always send the poison pill.
Frequently Asked Questions¶
Can I use a single lock for both asyncio tasks and threads?
No. asyncio.Lock is not thread-safe and is bound to the loop that created it; using it from another thread races or raises RuntimeError. Use threading.Lock for cross-thread access and asyncio.Lock for intra-loop coordination, or decouple the runtimes entirely with a thread-safe queue bridge.
How do I prevent deadlocks when sharing state between async and threaded workers?
Never hold a threading.Lock across an await boundary, since it freezes the whole event loop. Keep locked regions synchronous and short, schedule state updates back onto the loop with loop.call_soon_threadsafe, and bound asyncio.Lock acquisition with asyncio.timeout() so starvation raises an error instead of hanging.
Is queue.Queue safe to use directly with asyncio?
It is thread-safe but its get() blocks, which would stall the event loop if called on the loop thread. Drain it from the loop via loop.run_in_executor(None, q.get) into an asyncio.Queue, or use a dedicated bridge task. Async consumers then await the asyncio.Queue normally.
Related¶
- Hybrid Concurrency Models — up to the overview for the full async/thread boundary and its safe bridges.
- Concurrent Execution & Worker Patterns — the parent overview for choosing between threads, processes, and async.
- Asyncio Synchronization Primitives — why
asyncio.LockandEventcannot be shared with threads.