Objetivo: construir un mini-servidor con aiohttp que recibe peticiones HTTP, consulta varias APIs públicas en paralelo (bitcoin, clima, USD) y responde combinando los resultados.
Se evalúa el uso correcto de asyncio.gather(), la diferencia entre asincronía y multithreading, y el manejo de timeouts/errores en tareas I/O-bound.
Queremos exponer un endpoint /resumen que devuelva un JSON con datos de precio BTC, clima y tipo de cambio USD. Las llamadas externas deben suceder en paralelo y con límites de tiempo para evitar que un proveedor lento bloquee la respuesta.
asyncio aprovecha el tiempo de espera de red sin sobrecargar al planificador con cientos de hilos.ClientSession compartido por petición para mantener conexiones y encabezados comunes.asyncio.gather() dispara las llamadas en paralelo lógico; si alguna falla, no derriba las demás.aiohttp.web.Application con rutas /resumen y /salud.ClientSession por request con timeout total.asyncio.create_task para cada API y recolectarlas con gather.Ejemplo completo: pip install aiohttp y luego python servidor_async.py. Incluye asyncio.gather(), ClientTimeout, manejo de errores por proveedor y diferencia explícita de asincronía vs hilos.
import asyncio
import time
from aiohttp import ClientSession, ClientTimeout, web
API_CONFIG = {
"btc": "https://api.coinbase.com/v2/prices/BTC-USD/spot",
"clima": "https://api.open-meteo.com/v1/forecast?latitude=-34.61&longitude=-58.38¤t=temperature_2m",
"usd": "https://api.exchangerate.host/latest?base=USD&symbols=EUR,ARS"
}
async def fetch_json(session: ClientSession, nombre: str, url: str):
inicio = time.perf_counter()
try:
async with session.get(url) as resp:
resp.raise_for_status()
data = await resp.json()
return nombre, {"ok": True, "data": data, "ms": (time.perf_counter() - inicio) * 1000}
except Exception as exc:
return nombre, {"ok": False, "error": str(exc), "ms": (time.perf_counter() - inicio) * 1000}
async def resumen_handler(request: web.Request):
timeout_total = float(request.query.get("timeout", "4"))
client_timeout = ClientTimeout(total=timeout_total)
async with ClientSession(timeout=client_timeout) as session:
tareas = [asyncio.create_task(fetch_json(session, nombre, url)) for nombre, url in API_CONFIG.items()]
resultados = await asyncio.gather(*tareas, return_exceptions=True)
combinados = {}
for resultado in resultados:
if isinstance(resultado, Exception):
combinados["desconocido"] = {"ok": False, "error": str(resultado)}
continue
nombre, payload = resultado
combinados[nombre] = payload
payload_final = {
"origen": "aiohttp",
"timeout_s": timeout_total,
"asincronia": "Un solo hilo, muchas corrutinas; ideal para I/O-bound.",
"multithreading": "Hilos no son necesarios aquí; habría sobrecosto sin ganar CPU.",
"resultados": combinados,
}
return web.json_response(payload_final)
async def salud_handler(_request: web.Request):
return web.json_response({"status": "ok"})
def crear_app():
app = web.Application()
app.router.add_get("/resumen", resumen_handler)
app.router.add_get("/salud", salud_handler)
return app
if __name__ == "__main__":
web.run_app(crear_app(), host="0.0.0.0", port=8080)
Notas clave del código:
asyncio.gather(..., return_exceptions=True) evita que una falla detenga el resto; cada resultado se normaliza.ClientTimeout(total=timeout_total) corta toda la reunión de APIs si se excede el límite; puedes ajustar por query string ?timeout=2.5.python servidor_async.py y ejecuta curl http://localhost:8080/resumen.curl \"http://localhost:8080/resumen?timeout=1\" y verifica que los campos con error indiquen timeout/estado HTTP.