11 - Problema resuelto: Procesamiento de imágenes con paralelismo mixto (CPU-bound)

Objetivo: tomar una carpeta con 50 imágenes, aplicar filtros pesados (blur gaussiano, detección de bordes) y guardar los resultados usando procesos para el trabajo CPU-bound y hilos con threading.Queue para coordinar la E/S de disco.
Se evalúa la elección correcta de procesos para CPU, hilos para I/O, la sincronización productor-consumidor y el manejo seguro de resultados.

11.1 Descripción del reto

Hay 50 imágenes en disco. Leerlas y escribirlas es I/O-bound; aplicar filtros convolucionales es CPU-bound. Queremos solapar la lectura/escritura con procesos que ejecutan el filtro en paralelo en varios núcleos.

11.2 Estrategia de solución

  • I/O con hilos: un hilo productor recorre la carpeta y encola rutas en Queue; un hilo despachador toma rutas y las envía al pool de procesos.
  • CPU con procesos: un ProcessPoolExecutor aplica filtros pesados; cada proceso carga la imagen, aplica blur y bordes, y guarda el resultado.
  • Sincronización: la cola tiene un sentinel (None) para detener el despachador; as_completed recolecta resultados y errores.

11.3 Pasos para resolver

  1. Crear carpetas: imagenes/ con las 50 entradas y salida/ para los procesados.
  2. Levantar cola de rutas con un hilo productor.
  3. Despachar rutas al pool de procesos desde un hilo consumidor.
  4. Procesar cada imagen en un proceso (CPU-bound) y guardar en disco.
  5. Esperar a que todos los futuros terminen y manejar errores.

11.4 Codificación

Ejemplo completo con Pillow (pip install pillow) y concurrent.futures. Ajusta MAX_WORKERS a tus núcleos; por defecto usa os.cpu_count(). El procesamiento aplica un filtro drástico tipo neón/solarizado: mezcla solarización, relieve y bordes colorizados para un cambio visual marcado.

import concurrent.futures as futures
import os
import queue
import threading
from pathlib import Path

from PIL import Image, ImageFilter, ImageOps

CARPETA = Path("imagenes")
SALIDA = Path("salida")
SALIDA.mkdir(exist_ok=True)
MAX_WORKERS = os.cpu_count() or 4


def procesar_imagen(path: Path, salida: Path):
    with Image.open(path) as img:
        base = img.convert("RGB")
        solar = ImageOps.solarize(base, threshold=128)
        emboss = base.filter(ImageFilter.EMBOSS)
        edges = base.convert("L").filter(ImageFilter.FIND_EDGES)
        edges = ImageOps.autocontrast(edges)
        edges_color = ImageOps.colorize(edges, black="#0b0c10", white="#39ff14")  # bordes neón
        mix = Image.blend(solar, emboss, alpha=0.5)
        combinado = Image.blend(mix, edges_color, alpha=0.6)
        destino = salida / f"proc_{path.name}"
        combinado.save(destino)
    return destino.name


def productor(rutas_queue: queue.Queue):
    for path in CARPETA.iterdir():
        if path.suffix.lower() in {".png", ".jpg", ".jpeg"}:
            rutas_queue.put(path)
    rutas_queue.put(None)  # sentinel


def despachador(rutas_queue: queue.Queue, pool: futures.ProcessPoolExecutor, salida: Path, futuros):
    while True:
        path = rutas_queue.get()
        if path is None:
            rutas_queue.task_done()
            break
        fut = pool.submit(procesar_imagen, path, salida)
        futuros.append(fut)
        rutas_queue.task_done()


def main():
    rutas_queue = queue.Queue(maxsize=20)
    futuros = []

    prod_thread = threading.Thread(target=productor, args=(rutas_queue,))
    with futures.ProcessPoolExecutor(max_workers=MAX_WORKERS) as pool:
        disp_thread = threading.Thread(target=despachador, args=(rutas_queue, pool, SALIDA, futuros))

        prod_thread.start()
        disp_thread.start()

        prod_thread.join()
        rutas_queue.join()
        disp_thread.join()

        for fut in futures.as_completed(futuros):
            try:
                nombre = fut.result()
                print(f"OK: {nombre}")
            except Exception as exc:
                print(f"Fallo procesando imagen: {exc}")


if __name__ == "__main__":
    main()
Ejemplo del filtro de efecto neón aplicado a una imagen

11.5 Qué se evalúa

  • Elección correcta: procesos para CPU-bound (filtros pesados), hilos y Queue para I/O de disco.
  • Sincronización: sentinel en la cola y espera con join() para no perder tareas.
  • Manejo de resultados: recolección con as_completed y captura de excepciones para no silenciar errores.