9 - Estructuras concurrentes (java.util.concurrent)

Las colecciones de java.util.concurrent ofrecen seguridad de memoria y eficiencia bajo contención sin que debas escribir locks manuales. Combinan algoritmos sin bloqueo, divisiones internas y estrategias de copia para escalar con muchos hilos.

9.1 BlockingQueue

  • Interfaz clave: put() y take() bloquean, offer() y poll() permiten timeouts (límites de espera); ideal para backpressure natural (controlar cuándo el productor debe frenar).
  • Implementaciones:
    • LinkedBlockingQueue: enlazada, opcionalmente acotada.
    • ArrayBlockingQueue: acotada y circular, soporta políticas de justicia.
    • SynchronousQueue: no almacena; cada put espera un take (o viceversa).
    • PriorityBlockingQueue: orden natural o Comparator; sin límite fijo, atención a posibles crecimientos.
  • Elementos bloqueantes y timeouts: evita wait/notify; poll(2, TimeUnit.SECONDS) devuelve null si no hay datos a tiempo.
import java.time.LocalTime;
import java.util.concurrent.*;

public class BlockingQueueDemo {
  private static final BlockingQueue<String> cola = new LinkedBlockingQueue<>(2);

  public static void main(String[] args) throws Exception {
    ExecutorService pool = Executors.newCachedThreadPool();

    Runnable productor = () -> {
      for (int i = 1; i <= 4; i++) {
        try {
          String mensaje = "orden-" + i;
          boolean encolado = cola.offer(mensaje, 500, TimeUnit.MILLISECONDS);
          System.out.println(Thread.currentThread().getName() + " encola " + mensaje + " ok? " + encolado);
        } catch (InterruptedException e) {
          Thread.currentThread().interrupt();
          break;
        }
      }
    };

    Runnable consumidor = () -> {
      try {
        while (true) {
          String msg = cola.poll(1, TimeUnit.SECONDS);
          if (msg == null) {
            System.out.println("Timeout de consumo, sigo monitoreando...");
            continue;
          }
          System.out.println("[" + LocalTime.now() + "] proc: " + msg);
          TimeUnit.MILLISECONDS.sleep(300);
        }
      } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
      }
    };

    pool.submit(productor);
    pool.submit(productor);
    pool.submit(consumidor);

    TimeUnit.SECONDS.sleep(3);
    pool.shutdownNow();
  }
}

La cola está acotada a 2 elementos, por lo que offer con timeout regula la producción. El consumidor usa poll con límite de tiempo para evitar bloqueo indefinido y seguir monitoreando el flujo. El ejemplo incluye un main listo para ejecutar y observar el comportamiento.

9.2 ConcurrentHashMap

  • Segmentación interna: usa divisiones pequeñas con CAS o locks cortos por bucket; evita un único lock grande.
  • Lecturas no bloqueantes: get() y recorridos internos usan visibilidad segura sin frenar a escritores.
  • Operaciones seguras: putIfAbsent(), compute(), computeIfAbsent(), merge() ejecutan la lógica dentro de la región crítica evitando race conditions.
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ConcurrentHashMapDemo {
  private static final Map<String, Integer> inventario = new ConcurrentHashMap<>();

  public static void main(String[] args) throws InterruptedException {
    ExecutorService pool = Executors.newFixedThreadPool(4);

    Runnable lector = () -> System.out.println("stock libro=" + inventario.getOrDefault("libro", 0));
    Runnable escritor = () -> {
      inventario.putIfAbsent("libro", 0);
      inventario.compute("libro", (k, v) -> v + 1);
      inventario.merge("revista", 1, Integer::sum); // suma sin perder datos
    };

    for (int i = 0; i < 3; i++) pool.submit(escritor);
    for (int i = 0; i < 3; i++) pool.submit(lector);

    pool.shutdown();
    while (!pool.isTerminated()) { Thread.onSpinWait(); }
    System.out.println("Inventario final: " + inventario);
  }
}

El programa arma un pool fijo de 4 hilos para simular un inventario: los escritores crean la clave si falta con putIfAbsent, incrementan stock con compute y agregan revistas con merge sin perder sumas concurrentes. Los lectores consultan en paralelo usando getOrDefault. Al final se cierra el pool (shutdown) y se espera a que todos terminen para imprimir el inventario final. Sirve para ver cómo actualizar y leer un mapa compartido sin escribir locks.

9.3 CopyOnWriteArrayList

  • Ideal para lectores frecuentes: iteraciones y lecturas sin locks ni ConcurrentModificationException.
  • Costo: cada modificación clona el arreglo interno; evita si hay escrituras intensivas.
  • Usos típicos: listas de listeners, configuraciones que casi no cambian.
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;

public class CopyOnWriteDemo {
  public static void main(String[] args) {
    List<String> listeners = new CopyOnWriteArrayList<>();
    listeners.add("audit");
    listeners.add("metrics");

    for (String l : listeners) { // seguro aunque se modifique
      System.out.println("notifico a " + l);
      if ("metrics".equals(l)) {
        listeners.add("alerts"); // no lanza ConcurrentModificationException
      }
    }
    System.out.println("Listeners finales: " + listeners);
  }
}

La lista copia su arreglo interno en cada mutación, permitiendo iteraciones seguras en paralelo a inserciones. Es útil cuando las escrituras son pocas y las lecturas constantes.

9.4 ConcurrentLinkedQueue

  • Estructura sin locks: usa CAS (algoritmo de Michael-Scott) sobre nodos enlazados para evitar bloqueos.
  • Productores y consumidores concurrentes: no bloquea, por lo que es adecuada para colas internas de bajo costo.
  • No tiene capacidad: si necesitas backpressure usa una BlockingQueue.

9.5 DelayQueue

  • Para tareas diferidas: almacena elementos Delayed y solo los libera cuando el delay vence.
  • Sin límite: vigila posibles crecimientos; puedes envolverla con lógica de límites.
  • Combinación típica: workers que consumen y reintentan tareas fallidas luego de cierto tiempo.

9.6 LinkedTransferQueue

  • Transferencias sincronizadas: transfer() bloquea hasta que un consumidor recibe el elemento.
  • Modo no bloqueante: tryTransfer() devuelve false si no hay consumidor listo, sin encolar.
  • Basada en nodos enlazados: escala bien con muchos productores/consumidores.
import java.util.concurrent.*;

public class DelayTransferDemo {
  static class TareaRetrasada implements Delayed {
    private final String nombre;
    private final long triggerNanos;

    TareaRetrasada(String nombre, long delay, TimeUnit unit) {
      this.nombre = nombre;
      this.triggerNanos = System.nanoTime() + unit.toNanos(delay);
    }
    public long getDelay(TimeUnit unit) {
      return unit.convert(triggerNanos - System.nanoTime(), TimeUnit.NANOSECONDS);
    }
    public int compareTo(Delayed o) {
      return Long.compare(this.getDelay(TimeUnit.NANOSECONDS), o.getDelay(TimeUnit.NANOSECONDS));
    }
    public String toString() { return nombre; }
  }

  public static void main(String[] args) throws Exception {
    DelayQueue<TareaRetrasada> delayQueue = new DelayQueue<>();
    LinkedTransferQueue<String> transferQueue = new LinkedTransferQueue<>();
    ExecutorService pool = Executors.newFixedThreadPool(2);

    pool.submit(() -> {
      try {
        delayQueue.put(new TareaRetrasada("reintentar-email", 2, TimeUnit.SECONDS));
        delayQueue.put(new TareaRetrasada("limpiar-cache", 1, TimeUnit.SECONDS));
        System.out.println("Transfer bloqueante..."); 
        transferQueue.transfer("config caliente"); // espera consumidor listo
      } catch (InterruptedException e) { Thread.currentThread().interrupt(); }
    });

    pool.submit(() -> {
      try {
        System.out.println("DelayQueue entrega: " + delayQueue.take());
        System.out.println("DelayQueue entrega: " + delayQueue.take());
        System.out.println("Recibido via transfer: " + transferQueue.take());
      } catch (InterruptedException e) { Thread.currentThread().interrupt(); }
    });

    pool.shutdown();
  }
}

DelayQueue retiene elementos hasta que expira su delay; LinkedTransferQueue permite handoff (entrega mano a mano) directo con transfer() o encolado si no hay consumidor. Son útiles para reintentos diferidos y handshakes rápidos.

9.7 Casos típicos de uso

  • Productor-consumidor profesional: BlockingQueue acotada aporta backpressure y métricas claras.
  • Work-stealing: colas sin bloqueo (p.ej. ForkJoinPool) y ConcurrentLinkedQueue para tareas ligeras.
  • Pub-sub simple: CopyOnWriteArrayList para listeners + ConcurrentLinkedQueue para eventos.
  • Coordinación de pipelines: varias BlockingQueue enlazadas entre etapas; DelayQueue para reintentos.