Skip to content

Bridging Callback APIs to async with Futures

You have a library that does not speak async. It either hands you a result through a completion callback — client.fetch(url, on_done=...) — or it runs work on its own thread and signals you when finished. Your service is built on async/await, and you want to call this library as if it were a coroutine: result = await client_async.fetch(url). The bridge between the two worlds is a single asyncio.Future: you create one, register the library's callback to resolve it, and await the future. The subtlety is doing this safely — futures are not thread-safe, can only be resolved once, and will hang the caller forever if the callback never fires.

This guide builds that bridge step by step: create the future, resolve it from a callback, await it under a deadline, resolve it safely from another thread, and finally fold it all into a reusable async helper.

Prerequisites

  • Python 3.11+ — this guide uses asyncio.timeout(), the context-manager deadline introduced in 3.11.
  • Familiarity with the low-level mechanics in Future Objects & Callbacks, the overview this page expands on, and with how the loop schedules callbacks described in Asyncio Fundamentals & Event Loop Architecture.
  • A callback- or thread-based library to wrap. The examples use a small fake SDK so they run as-is.
Bridging a callback to an awaitable Future A callback or background thread routes its result through call_soon_threadsafe, which resolves the Future on the loop thread and resumes the awaiting coroutine. From callback to awaitable callback / thread produces result call_soon_threadsafe hop to loop thread Future set_result() await fut coroutine resumes

The fake SDK used throughout — a callback API and, later, a threaded one:

import threading
import time
from typing import Callable


class CallbackClient:
    """Same-thread callback API: result delivered via on_done(value, error)."""

    def fetch(self, key: str, on_done: Callable[[str | None, Exception | None], None]) -> None:
        if key == "boom":
            on_done(None, RuntimeError("upstream failure"))
        else:
            on_done(f"value-for-{key}", None)


class ThreadedClient:
    """Threaded API: work runs off-thread, then on_done is called from it."""

    def fetch(self, key: str, on_done: Callable[[str | None, Exception | None], None]) -> None:
        def _run() -> None:
            time.sleep(0.1)  # blocking work on a worker thread
            on_done(f"value-for-{key}", None)

        threading.Thread(target=_run, daemon=True).start()

1. Create a Future with loop.create_future()

The bridge starts with one future, created from the running loop. Use loop.create_future() rather than asyncio.Future(): it binds the future to the active loop and honours custom loop implementations like uvloop.

import asyncio


async def make_future() -> None:
    loop = asyncio.get_running_loop()
    fut: asyncio.Future[str] = loop.create_future()

    print(fut.done())   # False — PENDING, nobody has resolved it yet
    print(repr(fut))    # <Future pending> — shows state and callbacks

    fut.set_result("manual")
    print(await fut)    # "manual"


asyncio.run(make_future())

Verify: fut.done() is False immediately after creation and True after set_result; await fut returns the value you set. The repr() is your friend — it prints the state and any registered callbacks.

2. Resolve it from a callback

Now wire the library's callback to resolve the future. The callback closes over fut and calls set_result on success or set_exception on failure, mapping the library's (value, error) convention onto the future's state.

import asyncio


async def fetch(client: "CallbackClient", key: str) -> str:
    loop = asyncio.get_running_loop()
    fut: asyncio.Future[str] = loop.create_future()

    def _on_done(value: str | None, error: Exception | None) -> None:
        if fut.done():            # never resolve twice
            return
        if error is not None:
            fut.set_exception(error)
        else:
            fut.set_result(value)  # type: ignore[arg-type]

    client.fetch(key, _on_done)
    return await fut


async def main() -> None:
    client = CallbackClient()
    print(await fetch(client, "users"))      # value-for-users
    try:
        await fetch(client, "boom")
    except RuntimeError as exc:
        print(f"raised: {exc}")              # raised: upstream failure


asyncio.run(main())

Verify: a successful key returns the value; the "boom" key surfaces the library's error as a real raised exception at the await, with the original traceback preserved. The if fut.done() guard means a misbehaving library that calls back twice cannot crash you with InvalidStateError.

3. Await it with a timeout

A bridge that can hang is a liability — if the library never calls back, your coroutine waits forever. Wrap the await in asyncio.timeout() so a missing callback becomes a TimeoutError you can handle, and so cancellation cleans the future up.

import asyncio


class SilentClient:
    def fetch(self, key, on_done):  # never calls on_done
        pass


async def fetch_with_timeout(client, key: str, *, timeout: float) -> str:
    loop = asyncio.get_running_loop()
    fut: asyncio.Future[str] = loop.create_future()

    def _on_done(value, error):
        if fut.done():
            return
        fut.set_exception(error) if error else fut.set_result(value)

    client.fetch(key, _on_done)
    async with asyncio.timeout(timeout):
        return await fut


async def main() -> None:
    try:
        await fetch_with_timeout(SilentClient(), "x", timeout=0.2)
    except TimeoutError:
        print("timed out — callback never fired")


asyncio.run(main())

Verify: against SilentClient the call returns after ~0.2s with a TimeoutError instead of hanging. When the timeout fires, asyncio.timeout() cancels the await fut, which leaves the future cancelled — so a late callback hitting the if fut.done() guard is a no-op rather than an error.

4. Resolve safely from another thread

When the library runs its callback on its own thread (the ThreadedClient), you must not touch the future from that thread — futures and the loop are not thread-safe. Capture the running loop on the async side and resolve through loop.call_soon_threadsafe, which enqueues the resolution on the loop thread and wakes the selector. This is the same cross-thread discipline covered in hybrid concurrency models.

import asyncio


async def fetch_threaded(client: "ThreadedClient", key: str, *, timeout: float = 1.0) -> str:
    loop = asyncio.get_running_loop()
    fut: asyncio.Future[str] = loop.create_future()

    def _on_done(value: str | None, error: Exception | None) -> None:
        # Runs on the library's thread — hop back to the loop thread.
        def _resolve() -> None:
            if fut.done():
                return
            fut.set_exception(error) if error else fut.set_result(value)  # type: ignore[arg-type]

        loop.call_soon_threadsafe(_resolve)

    client.fetch(key, _on_done)
    async with asyncio.timeout(timeout):
        return await fut


async def main() -> None:
    print(await fetch_threaded(ThreadedClient(), "orders"))  # value-for-orders


asyncio.run(main())

Verify: the threaded fetch returns the correct value with no RuntimeError. If you (wrongly) call fut.set_result directly inside _on_done, you will hit a RuntimeError about the future being attached to a different loop, or intermittent corruption — proof that call_soon_threadsafe is doing real work.

5. Wrap it as a reusable async helper

Fold the plumbing into one helper so callers never see a future. This version handles both same-thread and threaded callbacks by always hopping through call_soon_threadsafe (a no-op cost when already on the loop thread), guards against double-resolution, and takes a timeout.

import asyncio
from typing import Any, Callable


async def bridge(
    register: Callable[[Callable[[Any, Exception | None], None]], None],
    *,
    timeout: float = 1.0,
) -> Any:
    """Turn a register(on_done) callback API into an awaitable result."""
    loop = asyncio.get_running_loop()
    fut: asyncio.Future[Any] = loop.create_future()

    def _on_done(value: Any, error: Exception | None) -> None:
        def _resolve() -> None:
            if fut.done():
                return
            fut.set_exception(error) if error else fut.set_result(value)

        loop.call_soon_threadsafe(_resolve)

    register(_on_done)
    async with asyncio.timeout(timeout):
        return await fut


async def main() -> None:
    cb = CallbackClient()
    th = ThreadedClient()
    # Adapt each client's fetch into a register(on_done) closure.
    print(await bridge(lambda cb_done: cb.fetch("a", cb_done)))
    print(await bridge(lambda cb_done: th.fetch("b", cb_done), timeout=2.0))


asyncio.run(main())

Verify: both clients return through the same helper. Because the helper is generic over register, you can wrap any callback API by passing a lambda that adapts its signature to on_done(value, error).

Verification

Run any snippet directly; each is self-contained against the fake clients defined above. Expected signals:

  • Success path returns the produced value; error path raises the library's exception at the await with its original traceback intact.
  • Timeout path (SilentClient) returns control after the deadline with TimeoutError, never hanging.
  • Threaded path resolves with no RuntimeError about loops; remove the call_soon_threadsafe hop and you should see that error appear, confirming the bridge is exercising the cross-thread path.
  • Add repr(fut) logging and you will see the future move from <Future pending> to <Future finished result=...> exactly once per call.

Pitfalls & edge cases

  • Setting the result twice. If the library can call back more than once (retries, success-then-late-error), the second set_result/set_exception raises InvalidStateError. Always guard with if fut.done(): return as the first line of the resolver.
  • Resolving from the wrong thread without call_soon_threadsafe. Calling fut.set_result directly from a library thread is undefined behaviour: you may corrupt loop state or hit a "different loop" RuntimeError. Route every off-thread resolution through call_soon_threadsafe.
  • Never resolving — the silent hang. A library that finishes via an error path without invoking your callback leaves the future PENDING and the awaiter stuck. The asyncio.timeout() wrapper is mandatory, not optional, for any third-party callback you do not fully trust.
  • Ignoring cancellation cleanup. When the await is cancelled (timeout or caller cancellation), the future is cancelled but the library may still call back later. The if fut.done() guard makes that late callback a harmless no-op; without it the late set_result raises InvalidStateError inside call_soon_threadsafe, surfacing only in the loop's exception handler.
  • Leaking registrations. If the library lets you deregister a callback, do it in a finally after the await; otherwise a callback that never fires keeps the closure (and anything it captured) alive for the process lifetime.

Frequently Asked Questions

Why use loop.create_future() instead of asyncio.Future()?

loop.create_future() binds the future to the currently running loop and lets a custom loop implementation, such as uvloop, supply its own future class. Constructing asyncio.Future() directly bypasses that and can attach the future to the wrong loop, leading to RuntimeError when you resolve or await it.

How do I stop a wrapped callback API from hanging my coroutine?

Wrap the await on the future in async with asyncio.timeout(timeout). If the library never invokes your callback, the timeout cancels the await and raises TimeoutError instead of leaving the future PENDING forever. Combine it with an if fut.done() guard so a late callback after the timeout is a harmless no-op.

What happens if the library calls my callback twice?

The second call to set_result or set_exception raises InvalidStateError because the future is already done. Guard the resolver with if fut.done(): return as its first line so only the first callback resolves the future and any subsequent calls are ignored.

Do I need call_soon_threadsafe if the callback runs on the same thread?

Strictly no, but routing every resolution through loop.call_soon_threadsafe makes one helper correct for both same-thread and threaded callbacks at negligible cost. If you know the callback always fires on the loop thread you can call set_result directly, but a single safe path avoids subtle bugs when a library later changes its threading behaviour.