10 - Buenas Prácticas y Patrones en Concurrencia Python

Estas pautas ayudan a diseñar código concurrente más simple, seguro y observable. Aplica reglas claras y patrones para evitar que la complejidad crezca sin control.
Prioriza el flujo de datos por mensajes en lugar de estados globales y mide siempre antes de optimizar.

10.1 Preferir colas a memoria compartida

  • Diseño más simple: las colas eliminan la necesidad de locks finos; los datos fluyen en un solo sentido.
    Cada etapa recibe, procesa y pasa el resultado sin compartir memoria mutable.
  • Seguridad: menos riesgo de corrupción y deadlocks; las colas manejan la sincronización interna.
    Reducen el acoplamiento entre productores y consumidores.

10.2 Limitar el número de workers

  • Hilos: no superes demasiado los núcleos; para I/O-bound usa decenas, mide y ajusta.
    Si necesitas cientos de sockets, considera asyncio para reducir cambios de contexto.
  • Procesos: apunta a cpu_count() o un poco menos; evitar oversubscription mejora caché y throughput.
    Para tareas breves usa pools para amortizar el arranque de procesos.

10.3 Usar concurrent.futures como API moderna

  • Abstracción limpia: ejecutores para hilos o procesos con la misma interfaz.
    Cambiar de hilos a procesos implica cambiar solo la clase del executor.
  • Futures para errores: las excepciones se propagan al llamar result(), facilitando el manejo.
    Usa as_completed() para consumir resultados a medida que están listos.

10.4 Logging para depuración concurrente

  • Timestamps: loguea con tiempo para reconstruir secuencias.
    Define un formato consistente y en UTC si el sistema es distribuido.
  • Contexto: incluye nombre de hilo/proceso para identificar origen.
    Agrega IDs de tarea o request para seguir flujos completos.

10.5 Evitar compartir estados mutables

  • Inmutabilidad: pasar datos por mensaje/cola en lugar de compartir estructuras mutables.
    Prefiere copiar estructuras pequeñas a compartir referencias grandes con locks.
  • Menos locks: menos estado mutable significa menos necesidad de sincronización.
    Cuando necesites compartir, encapsula el acceso en una API clara.

10.6 Aplicación: Refactor de un pipeline concurrente mal diseñado

Se parte de un pipeline con locks innecesarios y pasos secuenciales ocultos. Se refactoriza usando colas y futures para mejorar rendimiento y legibilidad.
El objetivo es paralelizar etapas independientes y hacer visible el flujo de datos.

import concurrent.futures as futures
import queue
import threading
import time


# Version original (problematica)
class PipelineMalo:
    def __init__(self):
        self.lock = threading.Lock()
        self.resultados = []

    def paso1(self, item):
        time.sleep(0.1)
        return item * 2

    def paso2(self, item):
        time.sleep(0.1)
        return item + 1

    def ejecutar(self, datos):
        for d in datos:
            a = self.paso1(d)
            b = self.paso2(a)
            with self.lock:
                self.resultados.append(b)
        return self.resultados


# Version refactorizada con colas y futures
class PipelineBueno:
    def __init__(self, workers_io=4):
        self.entrada = queue.Queue()
        self.salida = queue.Queue()
        self.pool = futures.ThreadPoolExecutor(max_workers=workers_io)

    def paso1(self, item):
        time.sleep(0.1)
        return item * 2

    def paso2(self, item):
        time.sleep(0.1)
        return item + 1

    def productor(self, datos):
        for d in datos:
            self.entrada.put(d)
        self.entrada.put(None)  # sentinel

    def worker(self):
        while True:
            item = self.entrada.get()
            if item is None:
                self.entrada.task_done()
                break
            r1 = self.paso1(item)
            r2 = self.paso2(r1)
            self.salida.put(r2)
            self.entrada.task_done()

    def recolector(self):
        resultados = []
        while True:
            try:
                item = self.salida.get(timeout=0.5)
            except queue.Empty:
                if self.entrada.unfinished_tasks == 0:
                    break
                continue
            resultados.append(item)
            self.salida.task_done()
        return resultados

    def ejecutar(self, datos):
        prod = threading.Thread(target=self.productor, args=(datos,))
        workers = [threading.Thread(target=self.worker) for _ in range(4)]
        prod.start()
        for w in workers:
            w.start()

        prod.join()
        self.entrada.join()

        resultados = self.recolector()

        for w in workers:
            w.join()
        return resultados


if __name__ == "__main__":
    datos = list(range(20))
    malo = PipelineMalo()
    inicio = time.perf_counter()
    res_malo = malo.ejecutar(datos)
    t_malo = time.perf_counter() - inicio

    bueno = PipelineBueno()
    inicio = time.perf_counter()
    res_bueno = bueno.ejecutar(datos)
    t_bueno = time.perf_counter() - inicio

    print(f"Malo: {len(res_malo)} items en {t_malo:.2f}s")
    print(f"Bueno: {len(res_bueno)} items en {t_bueno:.2f}s")

El pipeline refactorizado elimina locks globales y solapa pasos usando colas y varios workers. El recolector consume la salida sin bloquear el procesamiento. El resultado es más rápido y legible, con una línea clara entre producción, trabajo y recolección.