Skip to content

When to Use asyncio.run vs loop.run_until_complete

You have a coroutine and you need to run it from synchronous code. Two APIs do this — asyncio.run() and loop.run_until_complete() — and choosing wrong produces concrete failures: RuntimeError: asyncio.run() cannot be called from a running event loop when you nest them, file-descriptor leaks and ResourceWarning when you forget loop.close(), and dropped tasks when you reuse a loop that an earlier run already half-closed. The two are not interchangeable: asyncio.run() owns the loop lifecycle end to end, while run_until_complete() is a single method on a loop you must create, configure, drain, and close. This guide gives a decision rule for which to reach for, shows the correct pattern for each, and covers migrating legacy manual-loop code to the modern entrypoint on Python 3.11+.

Prerequisites

  • Python 3.11+ for asyncio.Runner, which is the modern way to get manual-loop control without the deprecated policy API and is what asyncio.run() is built on.
  • Understanding of loop lifecycle and cleanup — the Event Loop Configuration overview covers the backend, exception-handler, and shutdown stages that asyncio.run() configures for you, all resting on the model in Asyncio Fundamentals & Event Loop Architecture.
  • A CI pipeline able to surface deprecation warnings: python -W default::DeprecationWarning app.py.

The decision in one diagram

asyncio.run() is the answer in nearly every case. loop.run_until_complete() is correct only when something else already owns a long-lived loop you must reuse, or when you are inside a synchronous framework callback that hands you a loop to drive.

Choosing asyncio.run vs loop.run_until_complete A decision tree: if no loop is running and you own the lifecycle, use asyncio.run or asyncio.Runner; if a loop already runs, await directly; if a framework owns a long-lived loop, use run_until_complete. Which entrypoint drives the coroutine? Is a loop already running in this thread? await directly never nest a runner yes Does a framework own a long-lived loop you reuse? no asyncio.run() / Runner you own lifecycle; auto create, cancel, drain, close no run_until_complete() framework owns loop; you drive one coroutine, manual cleanup yes Default to asyncio.run(); run_until_complete is for loops you do not own.

1. Use asyncio.run() for any process you own

asyncio.run() is the entrypoint for a CLI, a worker, or a microservice main — anything where your code owns the loop from start to finish. It creates a fresh thread-local loop, runs the coroutine, cancels leftover tasks, drains async generators, and closes the loop, even on exception. On 3.11+ it is a wrapper over asyncio.Runner, so you get loop_factory for backend injection without the deprecated policy system. The one hard constraint: it raises RuntimeError if called from an already-running loop.

import asyncio
import signal


async def worker(task_id: int) -> None:
    try:
        await asyncio.sleep(5)
    except asyncio.CancelledError:
        print(f"task {task_id} cancelled")
        raise


async def main() -> None:
    loop = asyncio.get_running_loop()
    stop = asyncio.Event()
    for sig in (signal.SIGINT, signal.SIGTERM):
        loop.add_signal_handler(sig, stop.set)
    async with asyncio.TaskGroup() as tg:
        for i in range(3):
            tg.create_task(worker(i))


if __name__ == "__main__":
    asyncio.run(main())  # owns create, run, cancel, drain, close

Verify: after the call returns, the loop is closed and no tasks linger. In a test, asyncio.run(main()) should leave no Task was destroyed but it is pending warnings.

2. Use loop.run_until_complete() when you do not own the loop

run_until_complete() is correct when something else manages a long-lived loop and you must drive a single coroutine on it: a synchronous framework callback (an older WSGI bridge, a legacy Celery worker), a test fixture sharing one loop across many cases, or a per-thread worker that constructs its own loop once and reuses it. You take on the lifecycle: create the loop, optionally configure it, run, then cancel pending tasks and close — every time.

import asyncio
from typing import Any


async def compute() -> dict[str, Any]:
    await asyncio.sleep(0.1)
    return {"status": "ok"}


def sync_bridge() -> dict[str, Any]:
    """Drive one coroutine from synchronous code that owns the loop."""
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    try:
        return loop.run_until_complete(compute())
    finally:
        pending = asyncio.all_tasks(loop)
        for t in pending:
            t.cancel()
        if pending:
            loop.run_until_complete(asyncio.gather(*pending, return_exceptions=True))
        loop.close()

Verify: assert loop.is_closed() in the finally-tested path, and confirm len(asyncio.all_tasks(loop)) == 0 before close. If you skip the cancel-and-gather, leftover tasks raise warnings and leak descriptors.

3. Never nest — await directly inside a running loop

The most common production error is calling asyncio.run() (or run_until_complete()) from code that is already running on a loop — inside a coroutine, an async web handler, or a Jupyter cell. Both raise RuntimeError: ... cannot be called from a running event loop. The fix is to stop trying to start a loop and simply schedule onto the one that exists.

import asyncio


async def already_async() -> None:
    # WRONG: asyncio.run(do_work())  -> RuntimeError, a loop is already running
    # RIGHT: await the coroutine, or schedule a task on the current loop.
    result = await do_work()
    task = asyncio.create_task(background())  # fire-and-forget onto this loop
    await task


async def do_work() -> int:
    await asyncio.sleep(0)
    return 42


async def background() -> None:
    await asyncio.sleep(0.1)

Verify: asyncio.get_running_loop() succeeds (does not raise) wherever you were tempted to call asyncio.run() — that is proof a loop already exists and you must await instead.

4. Migrate legacy manual-loop code to asyncio.run / Runner

Pre-3.7 code typically reads loop = asyncio.get_event_loop(); loop.run_until_complete(main()). On 3.10+ get_event_loop() without a running loop is deprecated and creates orphaned loops. Migrate by deleting the manual lifecycle and letting asyncio.run() own it; if you genuinely need a custom backend or several top-level runs on one loop, switch to asyncio.Runner with loop_factory instead of policy mutation.

import asyncio

# BEFORE (deprecated get_event_loop, manual close):
# loop = asyncio.get_event_loop()
# try:
#     loop.run_until_complete(main())
# finally:
#     loop.close()

# AFTER, simple case:
# asyncio.run(main())

# AFTER, custom backend / multiple top-level runs on one configured loop:
def loop_factory():
    try:
        import uvloop
        return uvloop.new_event_loop()
    except ImportError:
        return asyncio.new_event_loop()


async def main() -> None:
    await asyncio.sleep(0)


with asyncio.Runner(loop_factory=loop_factory) as runner:
    runner.run(main())

Verify: run under python -W default::DeprecationWarning and confirm no DeprecationWarning: There is no current event loop lines remain. The lifecycle and backend-injection mechanics here are detailed in the Event Loop Configuration overview.

Verification

A correctly migrated codebase satisfies all of the following:

  • No nesting errors: no RuntimeError: ... called from a running event loop in logs; every in-loop call site uses await/create_task, confirmed by asyncio.get_running_loop() succeeding there.
  • No deprecation noise: python -W default::DeprecationWarning app.py is clean of get_event_loop() warnings.
  • Deterministic cleanup: after each asyncio.run() or manual run, loop.is_closed() is True and asyncio.all_tasks(loop) is empty.
  • Stable descriptors: os.listdir('/proc/self/fd') shows a flat count across repeated runs — proof no loop or transport is leaking.
Concern asyncio.run() loop.run_until_complete()
Loop creation automatic, thread-local manual via new_event_loop()
Cleanup automatic on success or exception manual cancel + gather + close()
Signal handling integrates cleanly inside main() you register and tear down handlers
Backend injection loop_factory (3.11+) set on the loop you create
Correct when you own the process lifecycle a framework/thread owns a reused loop

Pitfalls & edge cases

  • Calling asyncio.run() twice with shared state. Each call builds and tears down a fresh loop; objects bound to the first loop (locks, queues, connections) are invalid in the second. Create loop-bound primitives inside main().
  • Omitting cancel-and-gather before loop.close(). With run_until_complete(), closing while tasks are pending raises warnings, leaks descriptors, and can corrupt connection pools. Always cancel, gather(..., return_exceptions=True), then close.
  • Assuming run_until_complete() handles signals. It does not. You must loop.add_signal_handler() yourself, or SIGINT/SIGTERM will terminate abruptly with unflushed buffers.
  • Sharing one loop across threads. A loop is single-thread-affine. To hand work to a loop on another thread, use asyncio.run_coroutine_threadsafe(), not a direct run_until_complete() from the foreign thread — otherwise RuntimeError: This event loop is already running.
  • Reaching for nest_asyncio to allow nesting. It monkey-patches the loop to permit re-entrancy and masks the real design error. Restructure to await on the existing loop instead.

Frequently Asked Questions

Is loop.run_until_complete() deprecated in Python 3.12+?

No. The method itself remains fully supported. Only asyncio.get_event_loop() is deprecated when invoked without a running loop. run_until_complete() is the standard for driving a single coroutine on a loop you manage, such as synchronous-framework bridges.

Can I use asyncio.run() in a multi-threaded application?

Yes, but at most once per thread, and each thread must build its own loop. Concurrent asyncio.run() calls across isolated threads are safe; never share a loop instance across thread boundaries. To hand work to a loop on another thread, use asyncio.run_coroutine_threadsafe().

How do I handle graceful shutdown when using run_until_complete()?

Manually register OS signal handlers via loop.add_signal_handler(), then on signal cancel the tasks from asyncio.all_tasks(), await asyncio.gather(*tasks, return_exceptions=True), and call loop.close() to release selectors and transports. asyncio.run() does the equivalent automatically.