Las colecciones de java.util.concurrent ofrecen seguridad de memoria y eficiencia bajo contención sin que debas escribir locks manuales. Combinan algoritmos sin bloqueo, divisiones internas y estrategias de copia para escalar con muchos hilos.
put() y take() bloquean, offer() y poll() permiten timeouts (límites de espera); ideal para backpressure natural (controlar cuándo el productor debe frenar).LinkedBlockingQueue: enlazada, opcionalmente acotada.ArrayBlockingQueue: acotada y circular, soporta políticas de justicia.SynchronousQueue: no almacena; cada put espera un take (o viceversa).PriorityBlockingQueue: orden natural o Comparator; sin límite fijo, atención a posibles crecimientos.wait/notify; poll(2, TimeUnit.SECONDS) devuelve null si no hay datos a tiempo.import java.time.LocalTime;
import java.util.concurrent.*;
public class BlockingQueueDemo {
private static final BlockingQueue<String> cola = new LinkedBlockingQueue<>(2);
public static void main(String[] args) throws Exception {
ExecutorService pool = Executors.newCachedThreadPool();
Runnable productor = () -> {
for (int i = 1; i <= 4; i++) {
try {
String mensaje = "orden-" + i;
boolean encolado = cola.offer(mensaje, 500, TimeUnit.MILLISECONDS);
System.out.println(Thread.currentThread().getName() + " encola " + mensaje + " ok? " + encolado);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
};
Runnable consumidor = () -> {
try {
while (true) {
String msg = cola.poll(1, TimeUnit.SECONDS);
if (msg == null) {
System.out.println("Timeout de consumo, sigo monitoreando...");
continue;
}
System.out.println("[" + LocalTime.now() + "] proc: " + msg);
TimeUnit.MILLISECONDS.sleep(300);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
};
pool.submit(productor);
pool.submit(productor);
pool.submit(consumidor);
TimeUnit.SECONDS.sleep(3);
pool.shutdownNow();
}
}
La cola está acotada a 2 elementos, por lo que offer con timeout regula la producción. El consumidor usa poll con límite de tiempo para evitar bloqueo indefinido y seguir monitoreando el flujo. El ejemplo incluye un main listo para ejecutar y observar el comportamiento.
get() y recorridos internos usan visibilidad segura sin frenar a escritores.putIfAbsent(), compute(), computeIfAbsent(), merge() ejecutan la lógica dentro de la región crítica evitando race conditions.import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ConcurrentHashMapDemo {
private static final Map<String, Integer> inventario = new ConcurrentHashMap<>();
public static void main(String[] args) throws InterruptedException {
ExecutorService pool = Executors.newFixedThreadPool(4);
Runnable lector = () -> System.out.println("stock libro=" + inventario.getOrDefault("libro", 0));
Runnable escritor = () -> {
inventario.putIfAbsent("libro", 0);
inventario.compute("libro", (k, v) -> v + 1);
inventario.merge("revista", 1, Integer::sum); // suma sin perder datos
};
for (int i = 0; i < 3; i++) pool.submit(escritor);
for (int i = 0; i < 3; i++) pool.submit(lector);
pool.shutdown();
while (!pool.isTerminated()) { Thread.onSpinWait(); }
System.out.println("Inventario final: " + inventario);
}
}
El programa arma un pool fijo de 4 hilos para simular un inventario: los escritores crean la clave si falta con putIfAbsent, incrementan stock con compute y agregan revistas con merge sin perder sumas concurrentes. Los lectores consultan en paralelo usando getOrDefault. Al final se cierra el pool (shutdown) y se espera a que todos terminen para imprimir el inventario final. Sirve para ver cómo actualizar y leer un mapa compartido sin escribir locks.
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
public class CopyOnWriteDemo {
public static void main(String[] args) {
List<String> listeners = new CopyOnWriteArrayList<>();
listeners.add("audit");
listeners.add("metrics");
for (String l : listeners) { // seguro aunque se modifique
System.out.println("notifico a " + l);
if ("metrics".equals(l)) {
listeners.add("alerts"); // no lanza ConcurrentModificationException
}
}
System.out.println("Listeners finales: " + listeners);
}
}
La lista copia su arreglo interno en cada mutación, permitiendo iteraciones seguras en paralelo a inserciones. Es útil cuando las escrituras son pocas y las lecturas constantes.
BlockingQueue.Delayed y solo los libera cuando el delay vence.transfer() bloquea hasta que un consumidor recibe el elemento.tryTransfer() devuelve false si no hay consumidor listo, sin encolar.import java.util.concurrent.*;
public class DelayTransferDemo {
static class TareaRetrasada implements Delayed {
private final String nombre;
private final long triggerNanos;
TareaRetrasada(String nombre, long delay, TimeUnit unit) {
this.nombre = nombre;
this.triggerNanos = System.nanoTime() + unit.toNanos(delay);
}
public long getDelay(TimeUnit unit) {
return unit.convert(triggerNanos - System.nanoTime(), TimeUnit.NANOSECONDS);
}
public int compareTo(Delayed o) {
return Long.compare(this.getDelay(TimeUnit.NANOSECONDS), o.getDelay(TimeUnit.NANOSECONDS));
}
public String toString() { return nombre; }
}
public static void main(String[] args) throws Exception {
DelayQueue<TareaRetrasada> delayQueue = new DelayQueue<>();
LinkedTransferQueue<String> transferQueue = new LinkedTransferQueue<>();
ExecutorService pool = Executors.newFixedThreadPool(2);
pool.submit(() -> {
try {
delayQueue.put(new TareaRetrasada("reintentar-email", 2, TimeUnit.SECONDS));
delayQueue.put(new TareaRetrasada("limpiar-cache", 1, TimeUnit.SECONDS));
System.out.println("Transfer bloqueante...");
transferQueue.transfer("config caliente"); // espera consumidor listo
} catch (InterruptedException e) { Thread.currentThread().interrupt(); }
});
pool.submit(() -> {
try {
System.out.println("DelayQueue entrega: " + delayQueue.take());
System.out.println("DelayQueue entrega: " + delayQueue.take());
System.out.println("Recibido via transfer: " + transferQueue.take());
} catch (InterruptedException e) { Thread.currentThread().interrupt(); }
});
pool.shutdown();
}
}
DelayQueue retiene elementos hasta que expira su delay; LinkedTransferQueue permite handoff (entrega mano a mano) directo con transfer() o encolado si no hay consumidor. Son útiles para reintentos diferidos y handshakes rápidos.
BlockingQueue acotada aporta backpressure y métricas claras.ForkJoinPool) y ConcurrentLinkedQueue para tareas ligeras.CopyOnWriteArrayList para listeners + ConcurrentLinkedQueue para eventos.BlockingQueue enlazadas entre etapas; DelayQueue para reintentos.