Skip to content

Choosing between ThreadPoolExecutor and ProcessPoolExecutor for data pipelines

You have a multi-stage data pipeline — fetch, parse, transform, write — and throughput is wrong in a way that does not match your worker count. Doubling max_workers on a ThreadPoolExecutor leaves wall-clock time flat because the transform stage is GIL-bound. Or you switch a stage to ProcessPoolExecutor and resident memory triples while the stage runs slower, because every DataFrame is being pickled across the process boundary. The executor choice is not a heuristic; it is a measurement. This guide gives you the measurement, the routing architecture that follows from it, and the recovery path when a worker pool breaks mid-run.

Prerequisites

  • Python 3.11+ for the asyncio integration notes and modern concurrent.futures semantics (cancel_futures on shutdown).
  • pip install psutil numpy for the memory-delta and shared-memory examples (the routing and recovery code is stdlib-only).
  • Assumed knowledge: you are comfortable with concurrent.futures.Future and the GIL trade-offs in Threading vs Multiprocessing vs Asyncio. For the broader worker-topology context this fits into, see Concurrent Execution & Worker Patterns.

The short answer: route a stage by its CPU-time-to-wall-clock ratio. Below ~0.4 it is I/O-bound — use threads. Above ~0.7 it is CPU-bound — use processes. The steps below make that measurable, then build the pipeline around it.

Executor routing for a two-stage pipeline I/O-bound fetch stage runs on a ThreadPoolExecutor, pushes into a bounded queue that applies backpressure, and a CPU-bound transform stage drains it on a ProcessPoolExecutor. Stage routing by bottleneck Fetch (I/O) ThreadPoolExecutor GIL freed on socket ratio < 0.4 Bounded queue maxsize = N backpressure Transform (CPU) ProcessPoolExecutor bypasses the GIL ratio > 0.7 Pickle cost lives on the queue boundary into the process pool. If IPC time > compute time, route a shared-memory view instead.

Step 1: Classify each stage by its CPU/wall ratio

The single discriminator is how much of a stage's wall-clock time is spent executing Python bytecode versus waiting. Measure time.process_time() (CPU) against time.perf_counter() (wall) per stage on representative data.

import cProfile
import pstats
import time
from typing import Callable


def classify_stage(func: Callable, *args, **kwargs) -> dict:
    """Measure wall vs CPU time to classify a pipeline stage."""
    wall_start, cpu_start = time.perf_counter(), time.process_time()
    pr = cProfile.Profile()
    pr.enable()
    func(*args, **kwargs)
    pr.disable()
    wall = time.perf_counter() - wall_start
    cpu = time.process_time() - cpu_start
    ratio = cpu / wall if wall else 0.0
    return {
        "wall_s": round(wall, 4),
        "cpu_s": round(cpu, 4),
        "cpu_to_wall": round(ratio, 3),
        "route": "ProcessPoolExecutor" if ratio > 0.7 else "ThreadPoolExecutor",
    }

Verify: a ratio below 0.4 means the stage is mostly waiting on I/O — threads will overlap that wait. A ratio above 0.7 means it is CPU-saturated — threads will serialize on the GIL and only processes give a speedup. A ratio of 0.4–0.7 is mixed; decouple it (Step 3) or push the I/O part into asyncio.

Step 2: Measure the pickle IPC cost before committing to processes

ProcessPoolExecutor pickles every argument and return value across the boundary. For large arrays or frames, that serialization can cost more than the compute it enables. Measure it explicitly and route around it with multiprocessing.shared_memory when it dominates.

import io
import pickle
import time
import numpy as np
from multiprocessing import shared_memory


def pickle_cost(obj) -> tuple[int, float]:
    """Return (payload bytes, seconds to serialize) for an IPC argument."""
    start = time.perf_counter()
    buf = io.BytesIO()
    pickle.dump(obj, buf)
    return len(buf.getvalue()), time.perf_counter() - start


def to_shared(data: np.ndarray) -> tuple[str, tuple, str]:
    """Zero-copy hand-off: ship a name, not the array."""
    shm = shared_memory.SharedMemory(create=True, size=data.nbytes)
    np.ndarray(data.shape, dtype=data.dtype, buffer=shm.buf)[:] = data[:]
    return shm.name, data.shape, str(data.dtype)

Verify: if pickle_cost() reports a serialization time above ~15% of the stage's compute time, or RSS scales linearly with max_workers, pass a shared-memory name plus shape/dtype instead of the array. Workers attach with SharedMemory(name=...), compute, and close() — the bytes never cross the pipe.

Step 3: Decouple I/O and CPU stages through a bounded queue

Production pipelines are heterogeneous. Pin the I/O stage to threads, the CPU stage to processes, and put a bounded queue.Queue between them so a slow consumer applies backpressure on the producer instead of buffering unboundedly into memory.

import concurrent.futures
import logging
import queue
import threading
from typing import Callable, Iterator

log = logging.getLogger(__name__)


class PipelineRouter:
    def __init__(self, io_workers=8, cpu_workers=4, queue_size=100):
        self.io_pool = concurrent.futures.ThreadPoolExecutor(max_workers=io_workers)
        self.cpu_pool = concurrent.futures.ProcessPoolExecutor(max_workers=cpu_workers)
        self.q: queue.Queue = queue.Queue(maxsize=queue_size)
        self._stop = threading.Event()

    def submit_io(self, fetch: Callable, *args):
        return self.io_pool.submit(self._io_wrap, fetch, *args)

    def _io_wrap(self, fetch, *args):
        try:
            self.q.put(fetch(*args), timeout=30)  # blocks when consumers lag
        except Exception as e:
            log.error("IO stage failed: %s", e)
            self._stop.set()

    def run_cpu(self, transform: Callable, items: Iterator):
        futs = []
        for item in items:
            if self._stop.is_set():
                break
            futs.append(self.cpu_pool.submit(transform, item))
        for f in concurrent.futures.as_completed(futs):
            yield f.result(timeout=60)

    def shutdown(self, wait=True):
        self._stop.set()
        self.io_pool.shutdown(wait=wait)
        self.cpu_pool.shutdown(wait=wait)

Verify: under load the queue should hover near full when the CPU stage is the bottleneck and near empty when the I/O stage is. A queue that is always full and a producer always blocked means you are CPU-bound and should add process workers; always empty means add I/O threads. The bounded maxsize is what keeps RSS flat — see async queue management for the backpressure mechanics generalized.

Step 4: Recover from BrokenProcessPool without losing the batch

A worker that segfaults or is OOM-killed poisons the whole ProcessPoolExecutor — every pending future raises BrokenProcessPool. A resilient pipeline catches that, checkpoints progress, and respawns a fresh pool with reduced concurrency.

import concurrent.futures
import logging
import time
from concurrent.futures.process import BrokenProcessPool
from typing import Callable

log = logging.getLogger(__name__)


def run_with_recovery(factory: Callable[[], concurrent.futures.Executor],
                      func: Callable, *args, max_retries=3):
    pool = factory()
    for attempt in range(1, max_retries + 1):
        try:
            return pool.submit(func, *args).result(timeout=120)
        except BrokenProcessPool as e:
            log.warning("BrokenProcessPool (attempt %d): %s", attempt, e)
            pool.shutdown(wait=False, cancel_futures=True)
            time.sleep(2 ** attempt)  # backoff before respawn
            pool = factory()
        except Exception:
            log.error("Non-recoverable error; not retrying", exc_info=True)
            raise
    raise RuntimeError("Executor recovery exhausted")

Verify: induce a failure (a worker that calls os._exit(1) or allocates past the cgroup limit) and confirm the run logs the backoff, respawns, and completes rather than crashing. In production, alert on the respawn count — repeated BrokenProcessPool is almost always a memory ceiling, in which case lower max_workers or move large payloads to shared memory rather than retrying blindly.

Verification

The pipeline is routed correctly when these hold under representative load:

  • The classifier from Step 1 agrees with observed behavior: process-routed stages drive CPU% toward 100 × n_workers, thread-routed stages keep CPU% low while wall-clock drops.
  • psutil RSS is flat (or sub-linear) in max_workers. Linear growth means pickle payloads are too large — shared memory should have flattened it.
  • Doubling workers on the bottleneck stage moves wall-clock time; doubling them on the non-bottleneck stage does not. That asymmetry confirms the bound queue is doing its job.

Pitfalls & edge cases

  • ProcessPoolExecutor on I/O fetchers. Pure waste: you pay pickle and spawn cost for work that threads (or async) do for free. Reserve processes for cpu_to_wall > 0.7 stages only.
  • Sharing connections or file handles across fork. Database connections, sockets, and in-memory caches are not safe to inherit. Open them lazily inside the worker, or use per-process pools.
  • Defaulting max_workers to os.cpu_count() for I/O stages. That under-provisions I/O concurrency and over-provisions CPU. Size threads for I/O capacity, processes for physical cores — see CPU-bound task offloading.
  • Skipping shutdown(wait=True). Orphaned worker processes accumulate across restarts as zombies and leak memory. Always drain, ideally with cancel_futures=True on abnormal exit.
  • Monotonic tracemalloc growth across batches. Indicates worker-process leakage; isolate the stage in a pool with a strict lifecycle (maxtasksperchild via multiprocessing.Pool, or periodic pool recycling).

Frequently Asked Questions

How do I determine the optimal max_workers for a CPU-bound stage?

Start at os.cpu_count() for pure CPU work and benchmark; exceeding physical cores degrades throughput via context switching. For I/O stages scale higher, to os.cpu_count() times two to four, and verify with cProfile that the stage is actually CPU-saturated before adding workers.

Can I share a large array between ProcessPoolExecutor workers without copying?

Not via standard pickling. Use multiprocessing.shared_memory or a zero-copy format such as pyarrow to pass a memory view, sending only a name plus shape and dtype to workers. This avoids serializing the payload across the process boundary.

What should I do when BrokenProcessPool occurs mid-pipeline?

Catch the exception, shut the pool down with cancel_futures=True, checkpoint processed offsets, back off, and respawn a fresh ProcessPoolExecutor with reduced max_workers. Repeated breakage usually signals a memory ceiling, so lower concurrency or move large payloads to shared memory.

When should I use asyncio instead of concurrent.futures for a pipeline?

Prefer asyncio when orchestrating thousands of concurrent network connections or integrating async-native libraries like aiohttp and asyncpg. Use concurrent.futures for CPU offloading or wrapping synchronous code. A common hybrid runs I/O through asyncio and CPU work through a ProcessPoolExecutor via loop.run_in_executor.