Skip to content

Hybrid Concurrency Models

A pure asyncio service is a single OS thread running a single event loop. The moment that service has to call something blocking — a synchronous database driver, a CPU-heavy parser, a third-party SDK with no async API — it faces a boundary problem. Run the blocking call inline and the loop freezes; every other coroutine stalls until the call returns. Run it on another thread or process and you have solved the freeze but created a new hazard: asyncio primitives are not thread-safe, and the loop's data structures must only ever be touched from the loop's own thread.

This guide is about that boundary. It covers how to push blocking work off the loop (asyncio.to_thread, loop.run_in_executor), how to hand results back across the boundary correctly (loop.call_soon_threadsafe, asyncio.run_coroutine_threadsafe), and how to share mutable state across the async/thread divide without data races. The unifying constraint is simple to state and easy to violate: there is exactly one loop thread, and only that thread may call loop or coroutine methods.

Architectural Principles

  • One thread owns the loop. Every asyncio object — tasks, futures, Lock, Queue, Event — is affine to the loop that created it and must be mutated only from that loop's thread. A worker thread that calls queue.put_nowait() on an asyncio.Queue, or sets an asyncio.Event, is a race even if it "works" most of the time.
  • Cross the boundary with the three thread-safe entry points only. From a worker thread back into the loop you have exactly three safe doors: loop.call_soon_threadsafe(callback, *args) to run a plain callback on the loop, asyncio.run_coroutine_threadsafe(coro, loop) to schedule a coroutine and get a concurrent.futures.Future back, and janus/queue.Queue style thread-safe queues drained by the loop. Nothing else is safe.
  • Blocking goes out, never down. Blocking I/O and CPU work belong in a thread or process pool, reached via asyncio.to_thread (I/O) or loop.run_in_executor with a ProcessPoolExecutor (CPU). The loop thread itself must never block.
  • Bound the threads. A thread pool is finite (Python's default executor caps at min(32, os.cpu_count()+4)). Unbounded to_thread fan-out queues behind that cap and the symptom — latency, not an error — is easy to miss. Gate fan-out with an asyncio.Semaphore.
  • Prefer message passing to shared memory. Handing immutable items across a queue boundary sidesteps almost every cross-runtime race. Reach for shared mutable state plus locks only when copying is genuinely too expensive.

Execution Model: The Loop Thread and Its Workers

The event loop runs in one thread and drives ready coroutines cooperatively: each runs until it hits an await that suspends, then yields control so the loop can run the next ready task. Nothing else runs in that thread. So when a coroutine calls a synchronous function that takes 200ms, the loop is gone for 200ms — no I/O is serviced, no timers fire, no other task advances. That is loop starvation, and it is the core failure this topic exists to prevent. The parent Concurrent Execution & Worker Patterns overview frames the broader choice between threads, processes, and async; this section is specifically about wiring them together under one loop.

The fix is to move the blocking call to a different thread (or process), where it can block harmlessly, and free the loop thread to keep servicing coroutines. asyncio.to_thread(fn, *args) does this: it submits fn to the loop's default ThreadPoolExecutor, returns an awaitable, and the awaiting coroutine suspends — so the loop runs other tasks while the worker thread blocks. When the worker finishes, the result is delivered back to the loop thread by the executor machinery, which uses loop.call_soon_threadsafe under the hood.

That last detail is the whole game. A worker thread cannot simply resolve the awaiting future itself — futures are loop-affine. It must schedule the resolution onto the loop thread. call_soon_threadsafe is the thread-safe primitive that does this: it appends a callback to the loop's ready queue and wakes the loop (via its self-pipe) so the callback runs on the loop thread on the next iteration. When you bridge results back by hand, you use the same door. The future machinery here is the same one described under Future Objects & Callbacks, and the loop-side coordination uses the synchronization primitives — which, crucially, are themselves not thread-safe.

The async/thread boundary and its safe bridges The loop thread offloads blocking work to a worker thread; results return only via call_soon_threadsafe or run_coroutine_threadsafe. One loop thread, many worker threads Loop thread (the only one) coroutine awaits to_thread(fn) loop keeps running other tasks while the worker blocks result resolves the future callback runs here, not there Worker thread blocking call SDK / sync I/O / CPU to_thread / run_in_executor call_soon_threadsafe run_coroutine_threadsafe

Pattern Catalogue

asyncio.to_thread for blocking calls

The default tool for "this function blocks and I can't make it async." await asyncio.to_thread(fn, *args, **kwargs) runs fn on the loop's default thread pool and suspends the calling coroutine until it returns. Use it for synchronous I/O — file reads, a blocking DB driver, a requests call, a vendor SDK. It is not for CPU-bound work: the GIL serializes Python bytecode, so threads give no parallelism for compute (route that to processes via CPU-bound task offloading).

import asyncio
import time

def blocking_fetch(record_id: int) -> dict:
    # Imagine a synchronous SDK / driver here.
    time.sleep(0.3)
    return {"id": record_id, "status": "ok"}

async def handler(record_id: int) -> dict:
    # Loop stays free for other coroutines while this thread blocks.
    return await asyncio.to_thread(blocking_fetch, record_id)

The trade-off: the default pool is shared and small. Heavy to_thread use across many call sites contends for the same threads. For a hot path, pass a dedicated executor to loop.run_in_executor instead, and see running blocking SDK calls with asyncio.to_thread for the full pattern with concurrency limits.

run_coroutine_threadsafe from a thread

The inverse direction: code running in a worker thread (or any non-loop thread) needs to invoke a coroutine on the loop. You cannot await from a synchronous thread, and you must not touch the loop directly. asyncio.run_coroutine_threadsafe(coro, loop) submits the coroutine to the loop thread-safely and returns a concurrent.futures.Future you can block on with .result(timeout=...).

import asyncio

async def persist(item: dict) -> int:
    await asyncio.sleep(0)        # real async DB write here
    return item["id"]

def worker(loop: asyncio.AbstractEventLoop, item: dict) -> int:
    # Called from a plain thread that is NOT the loop thread.
    fut = asyncio.run_coroutine_threadsafe(persist(item), loop)
    return fut.result(timeout=5)  # concurrent.futures.Future, blocks this thread

Use it when the thread legitimately needs the coroutine's result. The returned future is a concurrent.futures.Future, not an asyncio.Future — block on it with .result(), never await it.

call_soon_threadsafe to hand results back

When the worker thread only needs to deliver a value (no return value needed), loop.call_soon_threadsafe(callback, *args) is lighter than scheduling a whole coroutine. It appends callback to the loop's ready queue and wakes the loop. The classic use is resolving an asyncio.Future from the thread that produced its result.

import asyncio
import threading

def run_in_thread(loop, fut: asyncio.Future):
    result = sum(range(1_000_000))           # work on the worker thread
    # Resolve the loop-affine future ON the loop thread:
    loop.call_soon_threadsafe(fut.set_result, result)

async def bridge() -> int:
    loop = asyncio.get_running_loop()
    fut = loop.create_future()
    threading.Thread(target=run_in_thread, args=(loop, fut), daemon=True).start()
    return await fut                          # suspends; resolved from the thread

This is exactly the mechanism to_thread uses internally. Reach for it directly when integrating a callback-based C library or a third-party thread that calls you back.

Thread-safe queue bridge

For sustained streaming between a thread (or pool) and the loop, a per-item future is clumsy; use a thread-safe queue. The producer thread puts onto a queue.Queue (thread-safe, blocking); a loop-side draining task moves items into an asyncio.Queue for async consumers. The bridge is what crosses the boundary safely — never let a thread call asyncio.Queue.put_nowait.

import asyncio
import queue

async def drain(thread_q: queue.Queue, async_q: asyncio.Queue) -> None:
    loop = asyncio.get_running_loop()
    while True:
        # get() blocks, so run it in a thread to avoid stalling the loop:
        item = await loop.run_in_executor(None, thread_q.get)
        if item is None:           # poison pill
            break
        await async_q.put(item)    # backpressure: awaits if async_q is full

The janus library packages this dual queue if you want it off the shelf; the manual version above is stdlib-only and shows the boundary explicitly.

Protecting shared state

When threads and coroutines genuinely share mutable state, you need a threading.Lock (not asyncio.Lock — it is loop-affine and not thread-safe). The hard rule: never hold a threading.Lock across an await, or the loop stalls while the lock is held. Keep the locked region synchronous and short, or copy data out under the lock and process it after release. The full treatment, including deadlock prevention and load-tested validation, is in how to safely share state between async tasks and threads.

Resource Boundaries

Threads are not free: each costs memory (default ~8MB stack) and the default executor caps at min(32, os.cpu_count() + 4) workers. Unbounded to_thread calls do not error when that cap is reached — they queue, silently, behind it. The symptom is latency creep under load, not an exception, which makes it pernicious. Bound fan-out explicitly with an asyncio.Semaphore sized to your real thread budget:

1
2
3
4
5
6
7
import asyncio

_gate = asyncio.Semaphore(16)   # never more than 16 blocking calls in flight

async def bounded_call(fn, *args):
    async with _gate:
        return await asyncio.to_thread(fn, *args)

For CPU-bound offload to a ProcessPoolExecutor, the limit is os.cpu_count(), not 32 — oversubscribing cores thrashes. And every item crossing a process boundary is pickled, so payload size and picklability are first-class constraints; see CPU-bound task offloading for IPC sizing.

Integrated Example

A small ingest service: an async HTTP-style handler accepts records, offloads a blocking SDK call to a bounded thread pool, runs a CPU-bound transform on a process pool, and a separate producer thread feeds work in through a thread-safe bridge. It demonstrates all three crossings — to_thread, run_in_executor (process), and call_soon_threadsafe/run_coroutine_threadsafe.

import asyncio
import os
import queue
import threading
import time
from concurrent.futures import ProcessPoolExecutor

# --- blocking + CPU work (module-level so the process pool can pickle it) ---
def blocking_sdk_call(record_id: int) -> dict:
    time.sleep(0.2)                       # synchronous vendor SDK
    return {"id": record_id, "raw": record_id * 7}

def cpu_transform(payload: dict) -> dict:
    total = sum(i * i for i in range(200_000))   # CPU-bound
    return {"id": payload["id"], "score": payload["raw"] + (total % 97)}


class IngestService:
    def __init__(self, thread_limit: int = 16, proc_workers: int | None = None):
        self._gate = asyncio.Semaphore(thread_limit)
        self._procs = ProcessPoolExecutor(max_workers=proc_workers or os.cpu_count())
        self._results: asyncio.Queue[dict] = asyncio.Queue()

    async def process(self, record_id: int) -> None:
        loop = asyncio.get_running_loop()
        async with self._gate:                                   # bound thread fan-out
            raw = await asyncio.to_thread(blocking_sdk_call, record_id)
        scored = await loop.run_in_executor(self._procs, cpu_transform, raw)
        await self._results.put(scored)

    async def results(self) -> dict:
        return await self._results.get()

    def shutdown(self) -> None:
        self._procs.shutdown(wait=True, cancel_futures=True)


def producer_thread(loop: asyncio.AbstractEventLoop, svc: IngestService, n: int) -> None:
    """Runs OFF the loop thread; schedules coroutines back onto the loop safely."""
    for record_id in range(n):
        fut = asyncio.run_coroutine_threadsafe(svc.process(record_id), loop)
        fut.result(timeout=10)            # concurrent.futures.Future — block this thread


async def main() -> None:
    loop = asyncio.get_running_loop()
    svc = IngestService()
    n = 12
    # Feed work from a separate OS thread to prove the boundary works both ways.
    t = threading.Thread(target=producer_thread, args=(loop, svc, n), daemon=True)
    t.start()
    try:
        async with asyncio.timeout(30):
            for _ in range(n):
                item = await svc.results()
                print("scored", item)
    finally:
        await asyncio.to_thread(t.join)   # join the producer off the loop
        svc.shutdown()


if __name__ == "__main__":
    asyncio.run(main())

Diagnostic Hook: Enable loop.set_debug(True) and watch for the "Executing … took N seconds" slow-callback warnings (threshold loop.slow_callback_duration, default 0.1s) — any blocking call that slipped onto the loop thread surfaces here. In parallel, sample the default executor's saturation: a growing gap between to_thread submissions and completions, or rising end-to-end latency with flat CPU, means you are queued behind the 32-thread cap and need a larger or dedicated pool.

Diagnostic Hook: the boundary tripwires

Three signals catch nearly every hybrid bug. (1) Slow-callback warnings under loop.set_debug(True) mean blocking work is running on the loop thread — find it and offload it. (2) RuntimeError: ... attached to a different loop or sporadic state corruption means an asyncio object was touched from a worker thread — route it through call_soon_threadsafe. (3) Latency creep with idle CPU means thread-pool saturation behind the default cap — bound fan-out with a Semaphore and size a dedicated executor.

Failure Modes

Failure mode Root cause Detection Fix
Loop freezes during a call Blocking function run inline on the loop thread loop.set_debug(True) slow-callback warning; flat throughput during the call Wrap the call in asyncio.to_thread / run_in_executor
Sporadic state corruption Worker thread mutated an asyncio object (Queue, Event, Future) directly RuntimeError about wrong loop, or intermittent lost items Use call_soon_threadsafe / run_coroutine_threadsafe; never touch loop objects off-thread
Latency creep, CPU idle Unbounded to_thread fan-out queued behind the 32-thread default cap Rising p99 with flat CPU; submissions outpacing completions Bound with asyncio.Semaphore; size a dedicated ThreadPoolExecutor
Whole loop hangs on a lock threading.Lock held across an await Loop stops servicing all tasks; deadlock under contention Keep locked regions synchronous; release before await
Process offload OOMs / is slow Large, hard-to-pickle payloads crossing the process boundary High IPC latency; pickling errors; memory spikes Shrink payloads, pass handles/paths, or use shared_memory

Frequently Asked Questions

Why are asyncio primitives not thread-safe?

asyncio objects such as Lock, Queue, Event, and Future are affine to the event loop that created them and assume all access happens on that loop's single thread. They perform no internal locking against other OS threads, so calling their methods from a worker thread is a data race. To cross the boundary safely, use loop.call_soon_threadsafe to run a callback on the loop thread, or asyncio.run_coroutine_threadsafe to schedule a coroutine, both of which marshal the work onto the loop thread.

When should I use asyncio.to_thread versus loop.run_in_executor?

Use asyncio.to_thread for ad-hoc blocking calls on the shared default thread pool; it is concise and correct for most synchronous I/O. Use loop.run_in_executor when you need a dedicated, separately sized executor (to isolate a hot path from the shared pool) or when offloading CPU-bound work to a ProcessPoolExecutor. to_thread always uses the default ThreadPoolExecutor, which caps at min(32, os.cpu_count()+4) workers.

How do I return a result from a worker thread back to the event loop?

If the worker only needs to deliver a value, create an asyncio.Future on the loop and call loop.call_soon_threadsafe(fut.set_result, value) from the thread, then await the future. If the thread needs to invoke a coroutine and get its result, use asyncio.run_coroutine_threadsafe(coro, loop), which returns a concurrent.futures.Future you block on with .result(timeout=...). Never resolve a loop-affine future directly from the worker thread.

Why does my hybrid service get slower under load even though CPU is idle?

Almost always thread-pool saturation. asyncio.to_thread uses the default executor, capped at min(32, os.cpu_count()+4) threads. Beyond that cap, calls queue silently rather than erroring, so end-to-end latency climbs while CPU stays low. Bound fan-out with an asyncio.Semaphore sized to your thread budget and, for hot paths, allocate a dedicated ThreadPoolExecutor used via loop.run_in_executor.