Choosing between ThreadPoolExecutor and ProcessPoolExecutor for data pipelines¶
You have a multi-stage data pipeline — fetch, parse, transform, write — and throughput is wrong in a way that does not match your worker count. Doubling max_workers on a ThreadPoolExecutor leaves wall-clock time flat because the transform stage is GIL-bound. Or you switch a stage to ProcessPoolExecutor and resident memory triples while the stage runs slower, because every DataFrame is being pickled across the process boundary. The executor choice is not a heuristic; it is a measurement. This guide gives you the measurement, the routing architecture that follows from it, and the recovery path when a worker pool breaks mid-run.
Prerequisites¶
- Python 3.11+ for the
asynciointegration notes and modernconcurrent.futuressemantics (cancel_futuresonshutdown). pip install psutil numpyfor the memory-delta and shared-memory examples (the routing and recovery code is stdlib-only).- Assumed knowledge: you are comfortable with
concurrent.futures.Futureand the GIL trade-offs in Threading vs Multiprocessing vs Asyncio. For the broader worker-topology context this fits into, see Concurrent Execution & Worker Patterns.
The short answer: route a stage by its CPU-time-to-wall-clock ratio. Below ~0.4 it is I/O-bound — use threads. Above ~0.7 it is CPU-bound — use processes. The steps below make that measurable, then build the pipeline around it.
Step 1: Classify each stage by its CPU/wall ratio¶
The single discriminator is how much of a stage's wall-clock time is spent executing Python bytecode versus waiting. Measure time.process_time() (CPU) against time.perf_counter() (wall) per stage on representative data.
Verify: a ratio below 0.4 means the stage is mostly waiting on I/O — threads will overlap that wait. A ratio above 0.7 means it is CPU-saturated — threads will serialize on the GIL and only processes give a speedup. A ratio of 0.4–0.7 is mixed; decouple it (Step 3) or push the I/O part into asyncio.
Step 2: Measure the pickle IPC cost before committing to processes¶
ProcessPoolExecutor pickles every argument and return value across the boundary. For large arrays or frames, that serialization can cost more than the compute it enables. Measure it explicitly and route around it with multiprocessing.shared_memory when it dominates.
Verify: if pickle_cost() reports a serialization time above ~15% of the stage's compute time, or RSS scales linearly with max_workers, pass a shared-memory name plus shape/dtype instead of the array. Workers attach with SharedMemory(name=...), compute, and close() — the bytes never cross the pipe.
Step 3: Decouple I/O and CPU stages through a bounded queue¶
Production pipelines are heterogeneous. Pin the I/O stage to threads, the CPU stage to processes, and put a bounded queue.Queue between them so a slow consumer applies backpressure on the producer instead of buffering unboundedly into memory.
Verify: under load the queue should hover near full when the CPU stage is the bottleneck and near empty when the I/O stage is. A queue that is always full and a producer always blocked means you are CPU-bound and should add process workers; always empty means add I/O threads. The bounded maxsize is what keeps RSS flat — see async queue management for the backpressure mechanics generalized.
Step 4: Recover from BrokenProcessPool without losing the batch¶
A worker that segfaults or is OOM-killed poisons the whole ProcessPoolExecutor — every pending future raises BrokenProcessPool. A resilient pipeline catches that, checkpoints progress, and respawns a fresh pool with reduced concurrency.
Verify: induce a failure (a worker that calls os._exit(1) or allocates past the cgroup limit) and confirm the run logs the backoff, respawns, and completes rather than crashing. In production, alert on the respawn count — repeated BrokenProcessPool is almost always a memory ceiling, in which case lower max_workers or move large payloads to shared memory rather than retrying blindly.
Verification¶
The pipeline is routed correctly when these hold under representative load:
- The classifier from Step 1 agrees with observed behavior: process-routed stages drive CPU% toward
100 × n_workers, thread-routed stages keep CPU% low while wall-clock drops. psutilRSS is flat (or sub-linear) inmax_workers. Linear growth meanspicklepayloads are too large — shared memory should have flattened it.- Doubling workers on the bottleneck stage moves wall-clock time; doubling them on the non-bottleneck stage does not. That asymmetry confirms the bound queue is doing its job.
Pitfalls & edge cases¶
ProcessPoolExecutoron I/O fetchers. Pure waste: you paypickleand spawn cost for work that threads (or async) do for free. Reserve processes forcpu_to_wall > 0.7stages only.- Sharing connections or file handles across
fork. Database connections, sockets, and in-memory caches are not safe to inherit. Open them lazily inside the worker, or use per-process pools. - Defaulting
max_workerstoos.cpu_count()for I/O stages. That under-provisions I/O concurrency and over-provisions CPU. Size threads for I/O capacity, processes for physical cores — see CPU-bound task offloading. - Skipping
shutdown(wait=True). Orphaned worker processes accumulate across restarts as zombies and leak memory. Always drain, ideally withcancel_futures=Trueon abnormal exit. - Monotonic
tracemallocgrowth across batches. Indicates worker-process leakage; isolate the stage in a pool with a strict lifecycle (maxtasksperchildviamultiprocessing.Pool, or periodic pool recycling).
Frequently Asked Questions¶
How do I determine the optimal max_workers for a CPU-bound stage?
Start at os.cpu_count() for pure CPU work and benchmark; exceeding physical cores degrades throughput via context switching. For I/O stages scale higher, to os.cpu_count() times two to four, and verify with cProfile that the stage is actually CPU-saturated before adding workers.
Can I share a large array between ProcessPoolExecutor workers without copying?
Not via standard pickling. Use multiprocessing.shared_memory or a zero-copy format such as pyarrow to pass a memory view, sending only a name plus shape and dtype to workers. This avoids serializing the payload across the process boundary.
What should I do when BrokenProcessPool occurs mid-pipeline?
Catch the exception, shut the pool down with cancel_futures=True, checkpoint processed offsets, back off, and respawn a fresh ProcessPoolExecutor with reduced max_workers. Repeated breakage usually signals a memory ceiling, so lower concurrency or move large payloads to shared memory.
When should I use asyncio instead of concurrent.futures for a pipeline?
Prefer asyncio when orchestrating thousands of concurrent network connections or integrating async-native libraries like aiohttp and asyncpg. Use concurrent.futures for CPU offloading or wrapping synchronous code. A common hybrid runs I/O through asyncio and CPU work through a ProcessPoolExecutor via loop.run_in_executor.
Related¶
- Threading vs Multiprocessing vs Asyncio — the parent guide with the full model decision matrix.
- CPU-bound task offloading — sibling reference for routing compute off the main thread or loop.
- Concurrent Execution & Worker Patterns — up to the parent overview for worker lifecycle and dispatch patterns.