10 - Patrones Concurrentes en Java

Los patrones concurrentes permiten componer hilos y colecciones seguras para maximizar throughput sin perder simplicidad. El ecosistema de java.util.concurrent ofrece bloques reutilizables que evitan código frágil con wait/notify.

10.1 Productor-consumidor robusto

  • Base con BlockingQueue: simplifica la espera y señaliza backpressure sin locks manuales.
  • Evitar wait/notify: los métodos put/take ya incluyen la coordinación.
  • Apagado ordenado: usar un mensaje centinela o shutdownNow() del pool para detener workers.
import java.util.List;
import java.util.concurrent.*;

public class ProductorConsumidor {
  private static final String FIN = "__FIN__";
  private static final BlockingQueue<String> cola = new ArrayBlockingQueue<>(3);

  public static void main(String[] args) throws Exception {
    ExecutorService pool = Executors.newFixedThreadPool(3);

    Runnable productor = () -> {
      List<String> datos = List.of("a", "b", "c", "d");
      try {
        for (String d : datos) {
          cola.put(d); // bloquea si la cola esta llena
          System.out.println("Produzco " + d);
        }
        cola.put(FIN); // centinela
      } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
      }
    };

    Runnable consumidor = () -> {
      try {
        while (true) {
          String dato = cola.take(); // bloquea si no hay
          if (FIN.equals(dato)) {
            cola.put(FIN); // reinyecta para otros consumidores
            break;
          }
          System.out.println(Thread.currentThread().getName() + " procesa " + dato);
          TimeUnit.MILLISECONDS.sleep(200);
        }
      } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
      }
    };

    pool.submit(productor);
    pool.submit(consumidor);
    pool.submit(consumidor);

    pool.shutdown();
  }
}

El centinela __FIN__ permite terminar varios consumidores sin condiciones de carrera: el primero que lo toma lo reinyecta para que los demás también salgan. La cola acotada a 3 aplica backpressure: put se bloquea si el buffer está lleno. El productor recorre una lista fija y deposita el centinela al final. Cada consumidor usa take (espera si está vacío), procesa con una pausa breve y sale al recibir __FIN__. El pool de 3 hilos ejecuta 1 productor y 2 consumidores en paralelo, y luego se cierra con shutdown.

10.2 Fan-out / Fan-in

  • Fan-out: lanzar varias tareas en paralelo (pool o CompletableFuture).
  • Fan-in: recolectar resultados y agregarlos en un solo punto.
  • Clave: fijar un tamaño de pool adecuado y usar timeouts para evitar bloqueos eternos.
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.*;

public class FanOutFanIn {
  public static void main(String[] args) throws Exception {
    ExecutorService pool = Executors.newFixedThreadPool(4);
    List<Callable<Integer>> tareas = Arrays.asList(
        () -> sumar(1, 1_000),
        () -> sumar(1_000, 2_000),
        () -> sumar(2_000, 3_000));

    List<Future<Integer>> resultados = pool.invokeAll(tareas);
    int total = 0;
    for (Future<Integer> f : resultados) {
      total += f.get(1, TimeUnit.SECONDS);
    }
    pool.shutdown();
    System.out.println("Suma total fan-in: " + total);
  }

  private static int sumar(int desde, int hasta) {
    int acc = 0;
    for (int i = desde; i < hasta; i++) acc += i;
    return acc;
  }
}

Fan-out divide el rango de sumas en tres tareas independientes que corren en paralelo dentro de un pool fijo. Fan-in recolecta cada Future con un timeout de 1 segundo para evitar bloqueos prolongados y acumula el total. El patrón sirve para trocear lotes grandes y homogéneos (mismo tipo de trabajo) aprovechando varios hilos sin mezclar la lógica de agregación con el cálculo de cada parte.

10.3 Pipelines

  • Etapas separadas: lectura, procesamiento, escritura.
  • Hilos por etapa: cada paso puede usar su propio pool o hilo dedicado.
  • Buffers entre etapas: colas acotadas (BlockingQueue) para aislar ritmos distintos.
import java.util.concurrent.*;

public class PipelineDemo {
  public static void main(String[] args) throws Exception {
    BlockingQueue<String> qLectura = new ArrayBlockingQueue<>(2);
    BlockingQueue<String> qProc = new ArrayBlockingQueue<>(2);
    ExecutorService pool = Executors.newFixedThreadPool(3);

    pool.submit(() -> { // etapa lectura
      try {
        for (int i = 1; i <= 4; i++) qLectura.put("linea-" + i);
        qLectura.put("FIN");
      } catch (InterruptedException e) { Thread.currentThread().interrupt(); }
    });

    pool.submit(() -> { // etapa procesamiento
      try {
        while (true) {
          String linea = qLectura.take();
          if ("FIN".equals(linea)) { qProc.put("FIN"); break; }
          qProc.put(linea.toUpperCase());
        }
      } catch (InterruptedException e) { Thread.currentThread().interrupt(); }
    });

    pool.submit(() -> { // etapa escritura
      try {
        while (true) {
          String dato = qProc.take();
          if ("FIN".equals(dato)) break;
          System.out.println("Salida: " + dato);
        }
      } catch (InterruptedException e) { Thread.currentThread().interrupt(); }
    });

    pool.shutdown();
  }
}

Cada etapa tiene su cola acotada para absorber diferencias de ritmo y ejercer control de carga entre fases. La señal FIN recorre el pipeline para cerrar sin fugas.

10.4 Control de carga

  • Backpressure natural: BlockingQueue acotada frena a productores si los consumidores se retrasan.
  • Rechazo explícito: offer con timeout o políticas de ThreadPoolExecutor para evitar saturación.
  • Medir y ajustar: usa métricas de tamaño de cola y tiempos de espera para dimensionar pools.

10.5 Evitar locks mediante atomics + concurrent collections

  • Algoritmos sin bloqueo: ConcurrentLinkedQueue, AtomicLong y LongAdder reducen contención.
  • Estructuras amigables para lectores: ConcurrentHashMap y CopyOnWriteArrayList permiten muchas lecturas con pocas esperas.
  • Regla: usar locks solo cuando haya invariantes complejas; preferir primitives concurrentes para contadores, colas y cache.