3 - Colas Seguras para Comunicación entre Hilos

Las colas de queue son la pieza central para pasar trabajo entre hilos sin exponer datos mutables. Internamente usan locks y condiciones, por lo que las operaciones put() y get() son atómicas y seguras.

3.1 El módulo queue y su importancia

  • Seguridad para threads: la cola serializa acceso concurrente, evitando carreras al encolar o desencolar.
  • Locks internos: usa un Lock y Condition para dormir consumidores hasta que haya datos, garantizando consistencia.

3.2 Tipos de colas

  • Queue: FIFO clásica.
  • LifoQueue: pila (LIFO), útil para recursiones o backtracking.
  • PriorityQueue: basada en heap; sirve para tareas con prioridad (menor valor primero).

3.3 Patrones comunes con colas

  • Productor-consumidor: uno o más productores encolan trabajos, varios consumidores los procesan.
  • Workers con colas infinitas: hilos que esperan indefinidamente, consumiendo trabajos a medida que llegan.
  • Pipelines: varias colas enlazadas donde la salida de una etapa es la entrada de la siguiente.

3.4 Manejo de tareas: put(), get(), join(), task_done()

  • Flujo típico: el productor llama put(); el consumidor hace get(), procesa y luego task_done().
  • Bloqueo vs no bloqueo: por defecto, put()/get() bloquean hasta tener espacio o datos. Usa block=False o timeout para evitar esperas indefinidas.
  • join(): el hilo principal espera a que todos los items encolados hayan sido marcados con task_done().

3.5 Aplicación: Sistema productor-consumidor con queue

El siguiente ejemplo genera tareas simples (cálculo de longitud de cadenas) y las reparte entre varios hilos consumidores. Usa señalizadores (poison pills) para detener los workers de forma limpia.

import threading
import time
from queue import Queue

NUM_WORKERS = 3
POISON = None


def productor(q, items):
    for item in items:
        q.put(item)
    # enviar señalizadores de cierre
    for _ in range(NUM_WORKERS):
        q.put(POISON)


def consumidor(q, resultados, lock):
    while True:
        item = q.get()
        if item is POISON:
            q.task_done()
            break
        inicio = time.perf_counter()
        longitud = len(item)
        time.sleep(0.2)  # simula I/O
        duracion = time.perf_counter() - inicio
        with lock:
            resultados.append((item, longitud, duracion))
        q.task_done()


def main():
    trabajos = ["alpha", "beta", "gamma", "delta", "epsilon"]
    cola = Queue(maxsize=10)
    resultados = []
    lock = threading.Lock()

    prod = threading.Thread(target=productor, args=(cola, trabajos))
    workers = [
        threading.Thread(target=consumidor, args=(cola, resultados, lock), name=f"worker-{i}")
        for i in range(NUM_WORKERS)
    ]

    inicio = time.perf_counter()
    prod.start()
    for w in workers:
        w.start()

    prod.join()
    cola.join()  # espera a que todos los items se marquen como task_done
    tiempo_total = time.perf_counter() - inicio

    for w in workers:
        w.join()

    print("Resultados:")
    for item, longitud, duracion in resultados:
        print(f"{item}: {longitud} chars en {duracion:.2f}s")
    print(f"Tiempo total: {tiempo_total:.2f}s con {NUM_WORKERS} hilos")


if __name__ == "__main__":
    main()

El productor encola tareas y luego envía POISON para indicar a cada worker que debe terminar. Queue.join() garantiza que todo lo encolado fue procesado y marcado con task_done(); así, el hilo principal sabe cuándo cerrar.

Las poison pills son valores de control que viajan por la misma cola que las tareas. Al recibir una pastilla, el consumidor sabe que debe salir ordenadamente: marca task_done() y rompe el bucle sin matar hilos a la fuerza. Se elige None porque es un singleton fácil de detectar con is y raramente es una tarea real; si tu dominio pudiera usar None como dato válido, define un sentinela propio con POISON = object() para evitar colisiones.