Estos casos ilustran patrones habituales en sistemas concurrentes de producción: scraping masivo, simulaciones, pipelines de logs y automatización de tareas de fondo.
La elección de modelo (hilos, procesos, shared memory) depende de si el trabajo es I/O-bound o CPU-bound y del costo de mover datos.
Ideal para miles de solicitudes I/O-bound. Usa hilos con una Queue de URLs, asigna un número razonable de workers (decenas, no miles), y controla respetuosamente el ritmo con sleep o rate limiting.
Loguea errores de red y reintentos; respeta robots.txt y limita concurrencia por host.
Divide el espacio de simulación en regiones y asigna cada una a un proceso. Usa memoria compartida para leer el estado global y, si es posible, escribe en buffers separados para luego componerlos y evitar colisiones.
Balancea la carga: regiones con más partículas necesitan más tiempo; redistribuye o ajusta el tamano de los bloques.
Un enfoque map-reduce simple: leer fragmentos de logs, mapear a métricas (contar eventos, agrupar), y reducir acumulando resultados parciales. ProcessPoolExecutor reparte el parseo pesado entre núcleos.
Lee en bloques fijos, evita cargar todo el archivo en RAM y combina parciales de forma incremental.
Workers persistentes atienden colas de trabajos pequeños: enviar emails, limpiar archivos temporales, ejecutar webhooks. Un ThreadPoolExecutor mantiene el pool y evita crear hilos por cada evento.
Encola trabajos con reintentos y registra las fallas; usa un apagado ordenado para no perder tareas en progreso.
El siguiente ejemplo define una cola de tareas, ejecutores con ThreadPoolExecutor para I/O y un supervisor que puede escalar a ProcessPoolExecutor para pasos CPU-bound.
El diseño separa la decisión (supervisor) de la ejecución (executors) para poder cambiar políticas sin tocar la lógica de negocio.
import concurrent.futures as futures
import queue
import threading
import time
from typing import Callable
class TaskQueue:
def __init__(self, maxsize=0):
self.q = queue.Queue(maxsize=maxsize)
self.shutdown_flag = threading.Event()
def submit(self, fn: Callable, *args, **kwargs):
self.q.put((fn, args, kwargs))
def close(self):
self.shutdown_flag.set()
self.q.put(None) # sentinel
class IOExecutor:
def __init__(self, workers=8):
self.pool = futures.ThreadPoolExecutor(max_workers=workers)
def run(self, fn, *args, **kwargs):
return self.pool.submit(fn, *args, **kwargs)
def shutdown(self):
self.pool.shutdown(wait=True)
class CPUExecutor:
def __init__(self, workers=None):
self.pool = futures.ProcessPoolExecutor(max_workers=workers)
def run(self, fn, *args, **kwargs):
return self.pool.submit(fn, *args, **kwargs)
def shutdown(self):
self.pool.shutdown(wait=True)
def supervisor(task_queue: TaskQueue, io_exec: IOExecutor, cpu_exec: CPUExecutor):
pending = []
while True:
try:
item = task_queue.q.get(timeout=0.2)
except queue.Empty:
if task_queue.shutdown_flag.is_set():
break
continue
if item is None:
break
fn, args, kwargs = item
destino = kwargs.pop("destino", "io")
if destino == "cpu":
pending.append(cpu_exec.run(fn, *args, **kwargs))
else:
pending.append(io_exec.run(fn, *args, **kwargs))
# esperar resultados y reportar errores
for fut in futures.as_completed(pending):
try:
fut.result()
except Exception as exc:
print(f"Tarea falló: {exc}")
# Funciones de ejemplo
def tarea_io(url: str):
time.sleep(0.2)
print(f"Descargado {url}")
def tarea_cpu(n: int):
total = sum(i * i for i in range(n))
print(f"CPU listo para {n}: {total}")
if __name__ == "__main__":
tq = TaskQueue()
io_exec = IOExecutor(workers=5)
cpu_exec = CPUExecutor()
# Encolar trabajos mixtos
for i in range(5):
tq.submit(tarea_io, f"https://api.example.com/item/{i}", destino="io")
for n in (5_000_000, 6_000_000):
tq.submit(tarea_cpu, n, destino="cpu")
supervisor_thread = threading.Thread(target=supervisor, args=(tq, io_exec, cpu_exec))
supervisor_thread.start()
# cerrar la cola una vez encolado todo
tq.close()
supervisor_thread.join()
io_exec.shutdown()
cpu_exec.shutdown()
Este mini framework separa tareas I/O-bound y CPU-bound: las primeras van a hilos, las segundas a procesos. Un hilo supervisor consume la cola y decide a qué ejecutor enviar cada trabajo, registrando errores en los futures.