5 - Pools de Procesos y Pools de Hilos

Los pools permiten reutilizar hilos o procesos para ejecutar muchas tareas sin pagar el costo de crearlos y destruirlos cada vez. En Python hay dos familias: multiprocessing.Pool y los ejecutores de concurrent.futures (ThreadPoolExecutor y ProcessPoolExecutor).
Se levantan un número fijo de workers y se reparten trabajos; esto mantiene la latencia baja y evita la sobrecarga de spawn repetido.

5.1 Qué son los pools y por qué se usan

  • Reutilización de workers: se crean pocos hilos o procesos y se mantienen vivos para varias tareas.
    Ideal cuando hay muchas unidades pequeñas de trabajo.
  • Evitar overhead: reduce el tiempo de arranque y la presión de memoria frente a crear un worker por tarea.
    En CPU-bound, el ahorro evita que el costo de creación opaque la ganancia de paralelismo.

5.2 multiprocessing.Pool

  • apply() / apply_async(): ejecutan una función con argumentos; la versión async devuelve un objeto AsyncResult para consultar más tarde.
    Útil si quieres lanzar varias tareas y revisar resultados al final.
  • map() / starmap(): distribuyen una función sobre un iterable (o sobre tuplas en starmap).
    Simplifican el reparto de lotes sin escribir bucles manuales.
  • close(), join(), terminate(): close() deja de aceptar trabajos; join() espera; terminate() aborta de inmediato.
    Aplica close()+join() para cierres limpios y reserva terminate() para emergencias.

5.3 concurrent.futures.ThreadPoolExecutor

  • submit(): agenda una función y devuelve un future (promesa de resultado).
    Puedes guardar la lista de futures y procesar según se completen.
  • result(): bloquea hasta obtener el valor o lanza la excepción ocurrida en el worker.
    Envuelve la llamada en try/except para no perder errores de hilos.
  • Futures: permiten verificar estado (done()), cancelar y componer tareas.
    Usa as_completed() para consumir resultados en orden de finalización.
  • Excepciones: cualquier error en el worker se propaga al llamar result(); siempre captura en el código llamador.
    Sirve para loguear fallos individuales sin detener todo el lote.

5.4 concurrent.futures.ProcessPoolExecutor

  • Uso similar: misma interfaz que ThreadPoolExecutor pero con procesos, ideal para CPU-bound.
    Reparte trabajo pesado sin toparse con el GIL.
  • Serialización automática: los argumentos y retornos se envían por pickle; evita objetos no picklables.
    Funciones anidadas, lambdas o closures que capturan contexto no se pueden enviar: define funciones al nivel del módulo y pasa solo datos serializables (listas, dicts, strings, números). Si necesitas compartir grandes datos, considera memoria compartida.

5.5 Aplicación: Procesamiento paralelo de imágenes con pool de procesos

El ejemplo usa Pillow (pip install pillow) para aplicar filtros a una carpeta. Compara una versión secuencial con otra usando ProcessPoolExecutor. Cambia CARPETA y asegúrate de que exista con imágenes soportadas.
El resultado se guarda en imagenes_procesadas; si la carpeta no existe, se crea al vuelo.

import os
import time
from concurrent.futures import ProcessPoolExecutor
from pathlib import Path

from PIL import Image, ImageFilter

CARPETA = Path("imagenes")
SALIDA = Path("imagenes_procesadas")
SALIDA.mkdir(exist_ok=True)


def procesar(path: Path):
    with Image.open(path) as img:
        blur = img.filter(ImageFilter.GaussianBlur(radius=2))
        gray = blur.convert("L")
        destino = SALIDA / f"proc_{path.name}"
        gray.save(destino)
    return path.name, destino.name


def version_secuencial(paths):
    inicio = time.perf_counter()
    resultados = [procesar(p) for p in paths]
    return resultados, time.perf_counter() - inicio


def version_pool(paths, workers):
    inicio = time.perf_counter()
    with ProcessPoolExecutor(max_workers=workers) as pool:
        resultados = list(pool.map(procesar, paths))
    return resultados, time.perf_counter() - inicio


if __name__ == "__main__":
    paths = [p for p in CARPETA.iterdir() if p.suffix.lower() in {".png", ".jpg", ".jpeg"}]
    if not paths:
        raise SystemExit("No hay imagenes en la carpeta 'imagenes'")

    sec, t_sec = version_secuencial(paths)
    pool_res, t_pool = version_pool(paths, os.cpu_count())

    print(f"Secuencial: {len(sec)} archivos en {t_sec:.2f}s")
    print(f"Pool:       {len(pool_res)} archivos en {t_pool:.2f}s con {os.cpu_count()} procesos")

Los pools amortizan el costo de creación de procesos: abren unos pocos al inicio y los reutilizan para cada imagen. En carpetas pequeñas el pool puede no ganar por el overhead de IPC; en lotes grandes de imágenes el paralelismo real suele reducir el tiempo total.