Skip to content

Offloading CPU Work with loop.run_in_executor

The symptom is unmistakable in production: a service that handles hundreds of concurrent requests fine suddenly logs latency spikes across all of them whenever one particular endpoint is hit. The culprit is almost always a synchronous CPU function called from inside a coroutine — a thumbnail resize, a PDF parse, a regex over a large body, a checksum. Because the asyncio event loop runs on one thread, that function holds the loop hostage for its entire duration; nothing else advances until it returns. Health checks time out, heartbeats miss, and the latency you added to one request is paid by every other request currently in flight.

This guide walks through detecting the block, then moving the work to an executor with asyncio.to_thread or loop.run_in_executor so your await flow continues unchanged and the loop stays responsive. The end state is a single await that submits the work elsewhere and resumes when the result arrives — the calling code barely changes, but the loop is freed the instant the work is submitted rather than the instant it finishes.

Prerequisites

  • Python 3.11+ for asyncio.timeout() and asyncio.TaskGroup (the offloading APIs themselves, to_thread and run_in_executor, work on 3.9+).
  • Familiarity with the trade-offs in the parent CPU-Bound Task Offloading overview — specifically when threads help (GIL-releasing C work) versus when you need processes (pure-Python CPU).
  • A grasp of the single-threaded scheduler from the Concurrent Execution & Worker Patterns overview.
Blocked loop versus offloaded loop Two timelines: inline CPU work blocks every task for its duration, while offloaded CPU work lets the loop keep serving tasks. Inline CPU vs offloaded CPU Inline (blocked) task A CPU work blocks the loop task B (late) Offloaded (responsive) task A task B task C CPU work runs in a worker pool loop keeps scheduling A, B, C while the worker computes

1. Detect the block

Before changing anything, prove the loop is blocked and find the exact callback. Turn on debug mode and lower the slow-callback threshold to your latency budget.

import asyncio
import time


def slow_parse(payload: bytes) -> int:
    # Stand-in for a real CPU function called from a coroutine.
    total = 0
    for b in payload * 5000:
        total = (total + b) % 1_000_003
    return total


async def handler(payload: bytes) -> int:
    return slow_parse(payload)  # BLOCKS the loop for its full duration


async def main() -> None:
    loop = asyncio.get_running_loop()
    loop.set_debug(True)
    loop.slow_callback_duration = 0.05   # warn on any 50 ms+ block
    await handler(b"x" * 2000)


asyncio.run(main())

Verify: the run logs Executing <Handle ...handler...> took 0.NNN seconds. That single line names the frame and confirms the loop was stalled — your target for offloading. If you cannot reproduce it locally, the same warnings appear in staging under load; grep the logs for took and slow callback to find every blocking call at once, not just the one you suspected. Setting PYTHONASYNCIODEBUG=1 in the environment enables the same debug mode without code changes.

2. Wrap with asyncio.to_thread (GIL-releasing case)

If the heavy work lives in a C extension that releases the GIL — hashlib, zlib, NumPy, Pillow — a thread runs it genuinely in parallel with the loop. asyncio.to_thread is the minimal change: it submits the call to the default thread pool and returns an awaitable, with zero serialization because the thread shares memory.

import asyncio
import hashlib


def checksum(data: bytes) -> str:
    # hashlib releases the GIL during the digest loop.
    return hashlib.sha256(data * 10_000).hexdigest()


async def handler(data: bytes) -> str:
    # Only the submit happens on the loop thread; the digest runs
    # in a worker thread while the loop serves other tasks.
    return await asyncio.to_thread(checksum, data)


async def main() -> None:
    loop = asyncio.get_running_loop()
    loop.set_debug(True)
    loop.slow_callback_duration = 0.05
    print(await handler(b"payload"))


asyncio.run(main())

Verify: rerun with debug on — the slow-callback warning for handler is gone. If it persists, the work is not releasing the GIL (it is pure-Python), and you need step 3 instead.

3. Use ProcessPoolExecutor via run_in_executor (pure-Python CPU)

When the hot loop is pure Python, the GIL serializes it even in a thread, so to_thread will not help. Move it to a separate process with loop.run_in_executor and a ProcessPoolExecutor. The function must be defined at module top level so it is importable and picklable in the worker.

import asyncio
from concurrent.futures import ProcessPoolExecutor


def count_primes(n: int) -> int:
    # Pure-Python CPU: nothing releases the GIL.
    return sum(
        all(k % d for d in range(2, int(k ** 0.5) + 1))
        for k in range(2, n)
    )


async def main() -> None:
    loop = asyncio.get_running_loop()
    loop.set_debug(True)
    loop.slow_callback_duration = 0.05
    with ProcessPoolExecutor(max_workers=2) as pool:
        result = await loop.run_in_executor(pool, count_primes, 200_000)
    print(result)


asyncio.run(main())

Verify: the result is correct and no slow-callback warning fires for the submitting coroutine. Spawn cost appears once on the first submit, not per call.

4. Pass a shared executor and bound concurrency

Per-call pools waste their spawn cost, and unbounded submits queue futures in memory. Build one executor for the process lifetime and cap in-flight submissions with an asyncio.Semaphore sized near the worker count.

import asyncio
from concurrent.futures import ProcessPoolExecutor


def cpu_kernel(n: int) -> int:
    return sum(k * k % 97 for k in range(n))


async def offload(pool, n, sem):
    loop = asyncio.get_running_loop()
    async with sem:                       # backpressure: cap concurrent submits
        return await loop.run_in_executor(pool, cpu_kernel, n)


async def main() -> None:
    sem = asyncio.Semaphore(4)            # ~2x a 2-worker pool
    with ProcessPoolExecutor(max_workers=2) as pool:
        async with asyncio.TaskGroup() as tg:
            tasks = [tg.create_task(offload(pool, n, sem))
                     for n in range(300_000, 300_000 + 8)]
        print([t.result() for t in tasks])


asyncio.run(main())

Verify: memory stays flat as you raise the number of items, because the semaphore holds in-flight work to four submissions regardless of how many tasks you create.

5. Handle exceptions and cancellation from the executor

Exceptions raised in a worker propagate through the awaited future and can be caught normally. Cancellation is the subtle part: a timeout cancels the future, but a worker that is already running keeps running until it returns — cancel() on a started executor job is best-effort.

import asyncio
from concurrent.futures import BrokenExecutor, ProcessPoolExecutor


def maybe_fail(n: int) -> int:
    if n < 0:
        raise ValueError("negative input")
    return sum(range(n))


async def guarded(pool, n, timeout=2.0):
    loop = asyncio.get_running_loop()
    fut = loop.run_in_executor(pool, maybe_fail, n)
    try:
        async with asyncio.timeout(timeout):
            return await fut
    except ValueError as exc:              # worker exception, re-raised here
        print("worker raised:", exc)
        raise
    except TimeoutError:
        fut.cancel()                       # best-effort; running job continues
        print("timed out; worker may still be computing")
        raise
    except BrokenExecutor:                 # a worker died (OOM/segfault)
        print("pool broken; rebuild required")
        raise

Verify: calling guarded(pool, -1) prints worker raised: negative input and re-raises; a too-short timeout prints the timeout message. The pool itself stays usable after a normal exception (only BrokenExecutor means it must be rebuilt).

Verification

Run the offending endpoint under load with loop.set_debug(True) and loop.slow_callback_duration set to your SLA (e.g. 0.05). Before offloading, the logs carry Executing <Handle ...> took N seconds lines and tail latency on unrelated requests rises whenever the CPU endpoint is hit. After offloading, those warnings disappear, p99 latency on other endpoints stops correlating with the CPU endpoint, and overall throughput on multi-core hosts rises for the process-pool case. Treat "no slow-callback warnings under load" as the pass condition.

A second, quantitative check: fire N concurrent offloads of a fixed-duration kernel and measure wall-clock time. For the thread path with GIL-releasing work, total time should approach the single-call duration (true overlap); if it scales linearly with N, the GIL is not being released and you need a process pool. For the process path, total time should fall toward ceil(N / workers) times the single-call duration once spawn cost is amortized — if it does not improve at all, suspect that pickling the arguments or results dominates the compute and revisit your payload sizes.

Pitfalls & edge cases

  • Unpicklable arguments or results (process pool). Lambdas, closures, local functions, and many C-extension objects cannot pickle. Define worker functions at module top level; for large arrays, pass a multiprocessing.shared_memory name rather than the array. A PicklingError on submit is the tell.
  • ProcessPool startup cost. The first submit pays process spawn (tens to hundreds of milliseconds). Build the pool once at startup, not per request, or the spawn cost dominates short tasks.
  • Forgetting to await the future. loop.run_in_executor(...) returns a future immediately; if you do not await it, the work runs but you never see the result or its exception, and you get a "coroutine/future was never awaited"-class warning. Always bind and await.
  • Cancellation does not kill a running worker. future.cancel() only prevents a queued job from starting; a job already executing in a thread or process runs to completion. Size timeouts to match, and rely on process-level limits for hard kills.
  • to_thread on pure-Python work. It moves the call off the loop's stack but the GIL serializes the bytecode back onto the critical path, so the slow-callback warning persists and you gain almost nothing. Confirm the library releases the GIL, or use a process pool.

Frequently Asked Questions

Does cancelling the future stop a running executor job?

No. future.cancel() only prevents a job that is still queued from starting. A job already executing in a thread or process runs to completion regardless, because asyncio cannot interrupt synchronous code in another thread or process. Size your timeouts accordingly and rely on process-level limits for hard termination.

Why does my to_thread offload still block the event loop?

asyncio.to_thread runs the call in a thread, but if the work is pure-Python bytecode the GIL serializes it back onto the loop's critical path, so the slow-callback warning persists and you gain almost nothing. Threads only help when the work is inside a C extension that releases the GIL; for pure-Python CPU use a ProcessPoolExecutor via run_in_executor.

Why do I get a PicklingError when using run_in_executor with a process pool?

A ProcessPoolExecutor pickles the function, its arguments, and its result to cross the process boundary. Lambdas, closures, local functions, and many C-extension objects are not picklable. Define worker functions at module top level, pass plain picklable arguments, and for large arrays pass a multiprocessing.shared_memory name rather than copying the array.