9 - Ejemplos Reales y Casos Avanzados

Estos casos ilustran patrones habituales en sistemas concurrentes de producción: scraping masivo, simulaciones, pipelines de logs y automatización de tareas de fondo.
La elección de modelo (hilos, procesos, shared memory) depende de si el trabajo es I/O-bound o CPU-bound y del costo de mover datos.

9.1 Web scraping concurrente (threading + queue)

Ideal para miles de solicitudes I/O-bound. Usa hilos con una Queue de URLs, asigna un número razonable de workers (decenas, no miles), y controla respetuosamente el ritmo con sleep o rate limiting.
Loguea errores de red y reintentos; respeta robots.txt y limita concurrencia por host.

9.2 Simulación física paralela (multiprocessing + shared_memory)

Divide el espacio de simulación en regiones y asigna cada una a un proceso. Usa memoria compartida para leer el estado global y, si es posible, escribe en buffers separados para luego componerlos y evitar colisiones.
Balancea la carga: regiones con más partículas necesitan más tiempo; redistribuye o ajusta el tamano de los bloques.

9.3 Procesamiento de logs a gran escala (ProcessPool)

Un enfoque map-reduce simple: leer fragmentos de logs, mapear a métricas (contar eventos, agrupar), y reducir acumulando resultados parciales. ProcessPoolExecutor reparte el parseo pesado entre núcleos.
Lee en bloques fijos, evita cargar todo el archivo en RAM y combina parciales de forma incremental.

9.4 Automatización de tareas en segundo plano (ThreadPool)

Workers persistentes atienden colas de trabajos pequeños: enviar emails, limpiar archivos temporales, ejecutar webhooks. Un ThreadPoolExecutor mantiene el pool y evita crear hilos por cada evento.
Encola trabajos con reintentos y registra las fallas; usa un apagado ordenado para no perder tareas en progreso.

9.5 Aplicación: Mini framework concurrente de tareas

El siguiente ejemplo define una cola de tareas, ejecutores con ThreadPoolExecutor para I/O y un supervisor que puede escalar a ProcessPoolExecutor para pasos CPU-bound.
El diseño separa la decisión (supervisor) de la ejecución (executors) para poder cambiar políticas sin tocar la lógica de negocio.

import concurrent.futures as futures
import queue
import threading
import time
from typing import Callable


class TaskQueue:
    def __init__(self, maxsize=0):
        self.q = queue.Queue(maxsize=maxsize)
        self.shutdown_flag = threading.Event()

    def submit(self, fn: Callable, *args, **kwargs):
        self.q.put((fn, args, kwargs))

    def close(self):
        self.shutdown_flag.set()
        self.q.put(None)  # sentinel


class IOExecutor:
    def __init__(self, workers=8):
        self.pool = futures.ThreadPoolExecutor(max_workers=workers)

    def run(self, fn, *args, **kwargs):
        return self.pool.submit(fn, *args, **kwargs)

    def shutdown(self):
        self.pool.shutdown(wait=True)


class CPUExecutor:
    def __init__(self, workers=None):
        self.pool = futures.ProcessPoolExecutor(max_workers=workers)

    def run(self, fn, *args, **kwargs):
        return self.pool.submit(fn, *args, **kwargs)

    def shutdown(self):
        self.pool.shutdown(wait=True)


def supervisor(task_queue: TaskQueue, io_exec: IOExecutor, cpu_exec: CPUExecutor):
    pending = []
    while True:
        try:
            item = task_queue.q.get(timeout=0.2)
        except queue.Empty:
            if task_queue.shutdown_flag.is_set():
                break
            continue
        if item is None:
            break
        fn, args, kwargs = item
        destino = kwargs.pop("destino", "io")
        if destino == "cpu":
            pending.append(cpu_exec.run(fn, *args, **kwargs))
        else:
            pending.append(io_exec.run(fn, *args, **kwargs))
    # esperar resultados y reportar errores
    for fut in futures.as_completed(pending):
        try:
            fut.result()
        except Exception as exc:
            print(f"Tarea falló: {exc}")


# Funciones de ejemplo
def tarea_io(url: str):
    time.sleep(0.2)
    print(f"Descargado {url}")


def tarea_cpu(n: int):
    total = sum(i * i for i in range(n))
    print(f"CPU listo para {n}: {total}")


if __name__ == "__main__":
    tq = TaskQueue()
    io_exec = IOExecutor(workers=5)
    cpu_exec = CPUExecutor()

    # Encolar trabajos mixtos
    for i in range(5):
        tq.submit(tarea_io, f"https://api.example.com/item/{i}", destino="io")
    for n in (5_000_000, 6_000_000):
        tq.submit(tarea_cpu, n, destino="cpu")

    supervisor_thread = threading.Thread(target=supervisor, args=(tq, io_exec, cpu_exec))
    supervisor_thread.start()

    # cerrar la cola una vez encolado todo
    tq.close()
    supervisor_thread.join()
    io_exec.shutdown()
    cpu_exec.shutdown()

Este mini framework separa tareas I/O-bound y CPU-bound: las primeras van a hilos, las segundas a procesos. Un hilo supervisor consume la cola y decide a qué ejecutor enviar cada trabajo, registrando errores en los futures.