Objetivo: tomar una carpeta con 50 imágenes, aplicar filtros pesados (blur gaussiano, detección de bordes) y guardar los resultados usando procesos para el trabajo CPU-bound y hilos con threading.Queue para coordinar la E/S de disco.
Se evalúa la elección correcta de procesos para CPU, hilos para I/O, la sincronización productor-consumidor y el manejo seguro de resultados.
Hay 50 imágenes en disco. Leerlas y escribirlas es I/O-bound; aplicar filtros convolucionales es CPU-bound. Queremos solapar la lectura/escritura con procesos que ejecutan el filtro en paralelo en varios núcleos.
Queue; un hilo despachador toma rutas y las envía al pool de procesos.ProcessPoolExecutor aplica filtros pesados; cada proceso carga la imagen, aplica blur y bordes, y guarda el resultado.None) para detener el despachador; as_completed recolecta resultados y errores.imagenes/ con las 50 entradas y salida/ para los procesados.Ejemplo completo con Pillow (pip install pillow) y concurrent.futures. Ajusta MAX_WORKERS a tus núcleos; por defecto usa os.cpu_count(). El procesamiento aplica un filtro drástico tipo neón/solarizado: mezcla solarización, relieve y bordes colorizados para un cambio visual marcado.
import concurrent.futures as futures
import os
import queue
import threading
from pathlib import Path
from PIL import Image, ImageFilter, ImageOps
CARPETA = Path("imagenes")
SALIDA = Path("salida")
SALIDA.mkdir(exist_ok=True)
MAX_WORKERS = os.cpu_count() or 4
def procesar_imagen(path: Path, salida: Path):
with Image.open(path) as img:
base = img.convert("RGB")
solar = ImageOps.solarize(base, threshold=128)
emboss = base.filter(ImageFilter.EMBOSS)
edges = base.convert("L").filter(ImageFilter.FIND_EDGES)
edges = ImageOps.autocontrast(edges)
edges_color = ImageOps.colorize(edges, black="#0b0c10", white="#39ff14") # bordes neón
mix = Image.blend(solar, emboss, alpha=0.5)
combinado = Image.blend(mix, edges_color, alpha=0.6)
destino = salida / f"proc_{path.name}"
combinado.save(destino)
return destino.name
def productor(rutas_queue: queue.Queue):
for path in CARPETA.iterdir():
if path.suffix.lower() in {".png", ".jpg", ".jpeg"}:
rutas_queue.put(path)
rutas_queue.put(None) # sentinel
def despachador(rutas_queue: queue.Queue, pool: futures.ProcessPoolExecutor, salida: Path, futuros):
while True:
path = rutas_queue.get()
if path is None:
rutas_queue.task_done()
break
fut = pool.submit(procesar_imagen, path, salida)
futuros.append(fut)
rutas_queue.task_done()
def main():
rutas_queue = queue.Queue(maxsize=20)
futuros = []
prod_thread = threading.Thread(target=productor, args=(rutas_queue,))
with futures.ProcessPoolExecutor(max_workers=MAX_WORKERS) as pool:
disp_thread = threading.Thread(target=despachador, args=(rutas_queue, pool, SALIDA, futuros))
prod_thread.start()
disp_thread.start()
prod_thread.join()
rutas_queue.join()
disp_thread.join()
for fut in futures.as_completed(futuros):
try:
nombre = fut.result()
print(f"OK: {nombre}")
except Exception as exc:
print(f"Fallo procesando imagen: {exc}")
if __name__ == "__main__":
main()
Queue para I/O de disco.join() para no perder tareas.as_completed y captura de excepciones para no silenciar errores.