Skip to content

Migrating legacy threading code to asyncio without downtime

You have a live service built on threading and ThreadPoolExecutor, and you need it on asyncio — for connection density, structured cancellation, or to shed per-thread stack memory — without a maintenance window. A big-bang rewrite is not an option: the service takes traffic, the blocking calls are buried in vendor SDKs, and a single synchronous call left on the event loop will freeze every request at once. The safe path is incremental: audit what will block the loop, wrap it behind an async bridge, run both paths behind a flag, then drain and cut over. This guide walks that path with verifiable checkpoints at each phase.

Prerequisites

Zero-downtime threading to asyncio migration timeline Four sequential phases — audit, bridge, dual-path routing, and drain-and-cutover — with traffic shifting gradually from the threaded path to the async path. Four-phase cutover 1. Audit map locks, queues, blocking calls 2. Bridge run_in_executor wrap blocking fns 3. Dual-path flag-routed A/B both paths 4. Cutover drain + retire thread pool 10% async 50% async 100% async Traffic shifts gradually; rollback is a flag flip at any point.

Step 1: Audit the blocking calls that will starve the loop

Before any code moves, snapshot every thread's stack and flag the primitives that block: lock acquisitions, queue.get(), blocking socket reads, time.sleep. These are the calls that, run directly in a coroutine, freeze the whole loop.

import sys
import threading
import logging

BLOCKING = ("socket.recv", "queue.get", "Lock.acquire", "time.sleep", "read")


def audit_threads() -> dict[str, list[str]]:
    """Snapshot active threads and flag blocking primitives in their stacks."""
    frames = sys._current_frames()
    report: dict[str, list[str]] = {}
    for thr in threading.enumerate():
        frame = frames.get(thr.ident)
        stack, cur = [], frame
        while cur:
            stack.append(f"{cur.f_code.co_filename}:{cur.f_lineno} ({cur.f_code.co_name})")
            cur = cur.f_back
        report[thr.name] = stack
        if any(ind in line for line in stack for ind in BLOCKING):
            logging.warning("Thread %r holds a blocking primitive", thr.name)
    return report

Verify: run this under live load (or pair it with py-spy dump --threads against the production PID). Every flagged call site is something you must either replace with an async-native client or route through the bridge in Step 2. Record baseline P95/P99 latency, throughput, and RSS now — they are your regression gates for the cutover.

Step 2: Build the run_in_executor bridge

Wrap each blocking function in an async shim so coroutines can call it without blocking the loop. The shim runs the synchronous call on a dedicated ThreadPoolExecutor and awaits its result.

import asyncio
import concurrent.futures
import logging
from functools import partial
from typing import Any, Callable

log = logging.getLogger(__name__)

# Dedicated, bounded executor for legacy blocking calls.
LEGACY_POOL = concurrent.futures.ThreadPoolExecutor(
    max_workers=10, thread_name_prefix="legacy_bridge"
)


async def run_legacy(fn: Callable[..., Any], *args: Any, **kwargs: Any) -> Any:
    """Await a synchronous function on the bridge pool without blocking the loop."""
    loop = asyncio.get_running_loop()
    try:
        return await loop.run_in_executor(LEGACY_POOL, partial(fn, *args, **kwargs))
    except asyncio.CancelledError:
        # The coroutine is cancelled; the executor task runs to completion.
        log.warning("Bridge call cancelled; underlying thread finishes its work.")
        raise

Verify: with the bridge in place, an async handler that calls await run_legacy(blocking_db_query, sql) keeps the loop responsive — confirm by measuring event-loop lag (a backgrounded await asyncio.sleep(0) timer) stays low while the blocking call runs. Use asyncio.to_thread() directly for one-off blocking I/O; keep a named, bounded pool for hot paths so you can monitor its queue depth. Reserve CPU-bound legacy functions for a ProcessPoolExecutor instead — to_thread would saturate the bridge and starve I/O, as detailed in CPU-bound task offloading.

Step 3: Route dual-path behind a feature flag

Stand up the async handler beside the legacy one and route a configurable fraction of traffic to it. Both paths must share the exact request/response contract so you can compare them and roll back instantly by flipping the flag.

import asyncio
import logging
from dataclasses import dataclass
from typing import Any, Awaitable, Callable

log = logging.getLogger(__name__)


@dataclass
class Router:
    async_weight: float  # 0.0 .. 1.0, sourced from a live config flag
    legacy: Callable[[dict], Awaitable[Any]]
    new: Callable[[dict], Awaitable[Any]]

    def __post_init__(self):
        self._inflight: dict[str, asyncio.Task] = {}

    async def handle(self, req_id: str, payload: dict) -> Any:
        loop = asyncio.get_running_loop()
        use_new = (loop.time() % 1.0) < self.async_weight
        coro = self.new(payload) if use_new else self.legacy(payload)
        task = asyncio.create_task(coro, name=f"route-{req_id}")
        self._inflight[req_id] = task
        try:
            return await task
        finally:
            self._inflight.pop(req_id, None)

    async def drain(self, timeout: float = 30.0) -> None:
        if not self._inflight:
            return
        done, _ = await asyncio.wait(list(self._inflight.values()), timeout=timeout)
        for t in done:
            if not t.cancelled() and t.exception():
                log.error("Drained task failed: %s", t.exception())

Verify: at 10% async weight, the async path's latency and error-rate distributions should track the legacy path within a few percent. A >10% divergence in timeout rate almost always means a blocking call slipped onto the loop unbridged — go back to Step 1's audit for that handler. The _inflight registry is what makes the drain in Step 4 lossless.

Step 4: Drain and cut over

Ramp the flag (10% to 50% to 100% over a day or two, validating SLOs at each step), then retire the thread pool by draining in-flight work before closing the loop's resources.

import asyncio
import concurrent.futures
import logging

log = logging.getLogger(__name__)


async def cutover(router, executor: concurrent.futures.ThreadPoolExecutor,
                  timeout: float = 45.0) -> None:
    """Drain in-flight requests, then retire the legacy thread pool cleanly."""
    log.info("Cutover: draining in-flight routed tasks")
    await router.drain(timeout=timeout)

    loop = asyncio.get_running_loop()
    # ThreadPoolExecutor.shutdown(wait=True) is blocking; offload it.
    executor.shutdown(wait=False, cancel_futures=False)
    await loop.run_in_executor(None, executor.shutdown, True)

    # Close async generators so sockets/FDs are released, no ResourceWarning.
    await loop.shutdown_asyncgens()
    log.info("Cutover complete: legacy thread pool retired")

Verify: post-cutover, confirm zero dropped connections, RSS stable or lower than the Phase 1 baseline (you have shed per-thread stacks), and no ResourceWarning traces. len(threading.enumerate()) should fall to the loop's small fixed set. If thread count stays elevated, the bridge pool was never drained.

Verification

The migration is complete and safe when, at 100% async weight:

  • P95/P99 latency and error rate match or beat the Phase 1 baseline recorded in Step 1.
  • Resident memory is flat or reduced — the visible payoff of dropping OS-thread stacks for coroutine state.
  • threading.enumerate() shows only the loop's own threads plus the (now idle, drainable) bridge pool; no ResourceWarning on shutdown.
  • Flipping async_weight back to 0.0 still serves traffic correctly — keep the legacy path until you have soaked at 100% for a full traffic cycle.

Pitfalls & edge cases

  • A blocking call left on the loop. requests.get() or a sync driver inside a coroutine freezes every concurrent task at once. Anything not async-native must go through run_in_executor/to_thread.
  • Shared mutable state across the thread/async boundary. A bridge thread mutating a dict an async task iterates raises RuntimeError: dictionary changed size during iteration. Guard shared state with asyncio.Lock plus a threading.Lock, or pass immutable copies.
  • asyncio.to_thread for CPU-heavy work. It uses the default thread pool; CPU work saturates it and starves I/O. Route compute to a ProcessPoolExecutor — see Threading vs Multiprocessing vs Asyncio.
  • Undrained async generators on shutdown. Skipping loop.shutdown_asyncgens() leaves sockets and file descriptors open, leaking in long-running services.
  • No connection draining at cutover. Killing workers mid-stream drops TCP connections and triggers client retry storms. Always drain in-flight tasks and handle SIGTERM gracefully.

Frequently Asked Questions

Can I migrate to asyncio without rewriting my entire codebase?

Yes. Wrap legacy blocking functions with asyncio.to_thread() or loop.run_in_executor() so they run on a thread pool while coroutines await them. This enables incremental migration that preserves existing business logic until each path can be rewritten to native async I/O.

How do I prevent event loop starvation during migration?

Never run blocking I/O or CPU work directly on the loop. Offload it to a dedicated ThreadPoolExecutor or ProcessPoolExecutor, and watch loop.slow_callback_duration (set to about 0.1s) to catch accidental blocking early.

What is the safest way to cut over traffic without dropping requests?

Use a dual-path router controlled by a dynamic feature flag. Send a small percentage of traffic to the async path, validate metrics, then ramp gradually. Drain in-flight requests and call executor.shutdown(wait=True) before retiring the thread pool so nothing is dropped.

How do I debug deadlocks that span threads and async tasks?

Tag logs with a unique request ID across both boundaries, enable PYTHONASYNCIODEBUG=1 in staging to surface unawaited coroutines and long callbacks, and cross-reference asyncio.all_tasks() with threading.enumerate() to find cross-boundary lock contention.