Estas pautas ayudan a diseñar código concurrente más simple, seguro y observable. Aplica reglas claras y patrones para evitar que la complejidad crezca sin control.
Prioriza el flujo de datos por mensajes en lugar de estados globales y mide siempre antes de optimizar.
asyncio para reducir cambios de contexto.cpu_count() o un poco menos; evitar oversubscription mejora caché y throughput.concurrent.futures como API modernaresult(), facilitando el manejo.as_completed() para consumir resultados a medida que están listos.Se parte de un pipeline con locks innecesarios y pasos secuenciales ocultos. Se refactoriza usando colas y futures para mejorar rendimiento y legibilidad.
El objetivo es paralelizar etapas independientes y hacer visible el flujo de datos.
import concurrent.futures as futures
import queue
import threading
import time
# Version original (problematica)
class PipelineMalo:
def __init__(self):
self.lock = threading.Lock()
self.resultados = []
def paso1(self, item):
time.sleep(0.1)
return item * 2
def paso2(self, item):
time.sleep(0.1)
return item + 1
def ejecutar(self, datos):
for d in datos:
a = self.paso1(d)
b = self.paso2(a)
with self.lock:
self.resultados.append(b)
return self.resultados
# Version refactorizada con colas y futures
class PipelineBueno:
def __init__(self, workers_io=4):
self.entrada = queue.Queue()
self.salida = queue.Queue()
self.pool = futures.ThreadPoolExecutor(max_workers=workers_io)
def paso1(self, item):
time.sleep(0.1)
return item * 2
def paso2(self, item):
time.sleep(0.1)
return item + 1
def productor(self, datos):
for d in datos:
self.entrada.put(d)
self.entrada.put(None) # sentinel
def worker(self):
while True:
item = self.entrada.get()
if item is None:
self.entrada.task_done()
break
r1 = self.paso1(item)
r2 = self.paso2(r1)
self.salida.put(r2)
self.entrada.task_done()
def recolector(self):
resultados = []
while True:
try:
item = self.salida.get(timeout=0.5)
except queue.Empty:
if self.entrada.unfinished_tasks == 0:
break
continue
resultados.append(item)
self.salida.task_done()
return resultados
def ejecutar(self, datos):
prod = threading.Thread(target=self.productor, args=(datos,))
workers = [threading.Thread(target=self.worker) for _ in range(4)]
prod.start()
for w in workers:
w.start()
prod.join()
self.entrada.join()
resultados = self.recolector()
for w in workers:
w.join()
return resultados
if __name__ == "__main__":
datos = list(range(20))
malo = PipelineMalo()
inicio = time.perf_counter()
res_malo = malo.ejecutar(datos)
t_malo = time.perf_counter() - inicio
bueno = PipelineBueno()
inicio = time.perf_counter()
res_bueno = bueno.ejecutar(datos)
t_bueno = time.perf_counter() - inicio
print(f"Malo: {len(res_malo)} items en {t_malo:.2f}s")
print(f"Bueno: {len(res_bueno)} items en {t_bueno:.2f}s")
El pipeline refactorizado elimina locks globales y solapa pasos usando colas y varios workers. El recolector consume la salida sin bloquear el procesamiento. El resultado es más rápido y legible, con una línea clara entre producción, trabajo y recolección.