Los patrones concurrentes permiten componer hilos y colecciones seguras para maximizar throughput sin perder simplicidad. El ecosistema de java.util.concurrent ofrece bloques reutilizables que evitan código frágil con wait/notify.
BlockingQueue: simplifica la espera y señaliza backpressure sin locks manuales.wait/notify: los métodos put/take ya incluyen la coordinación.shutdownNow() del pool para detener workers.import java.util.List;
import java.util.concurrent.*;
public class ProductorConsumidor {
private static final String FIN = "__FIN__";
private static final BlockingQueue<String> cola = new ArrayBlockingQueue<>(3);
public static void main(String[] args) throws Exception {
ExecutorService pool = Executors.newFixedThreadPool(3);
Runnable productor = () -> {
List<String> datos = List.of("a", "b", "c", "d");
try {
for (String d : datos) {
cola.put(d); // bloquea si la cola esta llena
System.out.println("Produzco " + d);
}
cola.put(FIN); // centinela
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
};
Runnable consumidor = () -> {
try {
while (true) {
String dato = cola.take(); // bloquea si no hay
if (FIN.equals(dato)) {
cola.put(FIN); // reinyecta para otros consumidores
break;
}
System.out.println(Thread.currentThread().getName() + " procesa " + dato);
TimeUnit.MILLISECONDS.sleep(200);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
};
pool.submit(productor);
pool.submit(consumidor);
pool.submit(consumidor);
pool.shutdown();
}
}
El centinela __FIN__ permite terminar varios consumidores sin condiciones de carrera: el primero que lo toma lo reinyecta para que los demás también salgan. La cola acotada a 3 aplica backpressure: put se bloquea si el buffer está lleno. El productor recorre una lista fija y deposita el centinela al final. Cada consumidor usa take (espera si está vacío), procesa con una pausa breve y sale al recibir __FIN__. El pool de 3 hilos ejecuta 1 productor y 2 consumidores en paralelo, y luego se cierra con shutdown.
CompletableFuture).import java.util.Arrays;
import java.util.List;
import java.util.concurrent.*;
public class FanOutFanIn {
public static void main(String[] args) throws Exception {
ExecutorService pool = Executors.newFixedThreadPool(4);
List<Callable<Integer>> tareas = Arrays.asList(
() -> sumar(1, 1_000),
() -> sumar(1_000, 2_000),
() -> sumar(2_000, 3_000));
List<Future<Integer>> resultados = pool.invokeAll(tareas);
int total = 0;
for (Future<Integer> f : resultados) {
total += f.get(1, TimeUnit.SECONDS);
}
pool.shutdown();
System.out.println("Suma total fan-in: " + total);
}
private static int sumar(int desde, int hasta) {
int acc = 0;
for (int i = desde; i < hasta; i++) acc += i;
return acc;
}
}
Fan-out divide el rango de sumas en tres tareas independientes que corren en paralelo dentro de un pool fijo. Fan-in recolecta cada Future con un timeout de 1 segundo para evitar bloqueos prolongados y acumula el total. El patrón sirve para trocear lotes grandes y homogéneos (mismo tipo de trabajo) aprovechando varios hilos sin mezclar la lógica de agregación con el cálculo de cada parte.
BlockingQueue) para aislar ritmos distintos.import java.util.concurrent.*;
public class PipelineDemo {
public static void main(String[] args) throws Exception {
BlockingQueue<String> qLectura = new ArrayBlockingQueue<>(2);
BlockingQueue<String> qProc = new ArrayBlockingQueue<>(2);
ExecutorService pool = Executors.newFixedThreadPool(3);
pool.submit(() -> { // etapa lectura
try {
for (int i = 1; i <= 4; i++) qLectura.put("linea-" + i);
qLectura.put("FIN");
} catch (InterruptedException e) { Thread.currentThread().interrupt(); }
});
pool.submit(() -> { // etapa procesamiento
try {
while (true) {
String linea = qLectura.take();
if ("FIN".equals(linea)) { qProc.put("FIN"); break; }
qProc.put(linea.toUpperCase());
}
} catch (InterruptedException e) { Thread.currentThread().interrupt(); }
});
pool.submit(() -> { // etapa escritura
try {
while (true) {
String dato = qProc.take();
if ("FIN".equals(dato)) break;
System.out.println("Salida: " + dato);
}
} catch (InterruptedException e) { Thread.currentThread().interrupt(); }
});
pool.shutdown();
}
}
Cada etapa tiene su cola acotada para absorber diferencias de ritmo y ejercer control de carga entre fases. La señal FIN recorre el pipeline para cerrar sin fugas.
BlockingQueue acotada frena a productores si los consumidores se retrasan.offer con timeout o políticas de ThreadPoolExecutor para evitar saturación.ConcurrentLinkedQueue, AtomicLong y LongAdder reducen contención.ConcurrentHashMap y CopyOnWriteArrayList permiten muchas lecturas con pocas esperas.