Las aplicaciones web modernas requieren APIs rápidas, confiables y escalables para manejar el procesamiento de datos en tiempo real. En esta guía completa, exploraremos cómo aprovechar FastAPI y Pydantic para construir APIs REST de alto rendimiento que puedan servir eficientemente aplicaciones de datos en tiempo real.
¿Por qué FastAPI y Pydantic?
FastAPI ha emergido como uno de los frameworks de Python más poderosos para construir APIs modernas, combinando la velocidad de la programación asíncrona con la generación automática de documentación de la API. Pydantic, su biblioteca complementaria, proporciona capacidades robustas de validación y serialización de datos que hacen que las respuestas de tu API sean tanto confiables como predecibles.
Arquitectura principal
Comencemos examinando la arquitectura principal de una aplicación FastAPI. El siguiente ejemplo demuestra una estructura básica para una API de datos en tiempo real:
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import List
import asyncio
from datetime import datetime
app = FastAPI(title="Real-Time Data API", version="1.0.0")
class DataPoint(BaseModel):
id: int
value: float
timestamp: datetime
source: str
class DataResponse(BaseModel):
data: List[DataPoint]
metadata: dict
@app.get("/data", response_model=DataResponse)
async def get_real_time_data():
# Simulate real-time data processing
data_points = [
DataPoint(
id=1,
value=42.5,
timestamp=datetime.now(),
source="sensor_001"
)
]
return DataResponse(
data=data_points,
metadata={"count": len(data_points), "processed_at": datetime.now()}
)Optimización del rendimiento
Una de las principales fortalezas de FastAPI son sus características de optimización del rendimiento. Así es como implementar el procesamiento asíncrono para datos en tiempo real:
from fastapi import BackgroundTasks
import asyncio
from concurrent.futures import ThreadPoolExecutor
async def process_real_time_data(data: List[DataPoint]) -> List[DataPoint]:
"""Simulate heavy processing in background"""
# In real scenarios, this could involve database operations,
# machine learning inference, or external API calls
await asyncio.sleep(0.1) # Simulate async work
return [point for point in data if point.value > 0]
@app.post("/data/batch", response_model=DataResponse)
async def process_batch_data(
data: List[DataPoint],
background_tasks: BackgroundTasks
):
# Process data asynchronously
processed_data = await process_real_time_data(data)
# Add to background task for further processing
background_tasks.add_task(process_background_tasks, processed_data)
return DataResponse(
data=processed_data,
metadata={"count": len(processed_data), "processed_at": datetime.now()}
)
async def process_background_tasks(data: List[DataPoint]):
"""Handle background processing tasks"""
# Database updates, notifications, or other side effects
passValidación de Pydantic para integridad de datos
La validación de esquemas de Pydantic garantiza que tus datos en tiempo real mantengan consistencia e integridad. Aquí tienes un ejemplo avanzado con validación personalizada:
from pydantic import validator, root_validator
from typing import Optional
class AdvancedDataPoint(BaseModel):
id: int
value: float
timestamp: datetime
source: str
category: Optional[str] = None
@validator('value')
def value_must_be_positive(cls, v):
if v < 0:
raise ValueError('Value must be positive')
return v
@validator('source')
def source_must_be_valid(cls, v):
valid_sources = ['sensor_001', 'sensor_002', 'api_client']
if v not in valid_sources:
raise ValueError('Invalid source')
return v
@root_validator
def validate_category_based_on_source(cls, values):
source = values.get('source')
category = values.get('category')
if source == 'sensor_001' and category != 'temperature':
raise ValueError('Temperature sensors must have temperature category')
return values
class EventStream(BaseModel):
events: List[AdvancedDataPoint]
stream_id: str
last_updated: datetime
class Config:
json_encoders = {
datetime: lambda v: v.isoformat()
}Integración de WebSocket en tiempo real
Para aplicaciones verdaderamente en tiempo real, integra el soporte de WebSocket con FastAPI:
from fastapi import WebSocket, WebSocketDisconnect
import json
class ConnectionManager:
def __init__(self):
self.active_connections: List[WebSocket] = []
async def connect(self, websocket: WebSocket):
await websocket.accept()
self.active_connections.append(websocket)
def disconnect(self, websocket: WebSocket):
self.active_connections.remove(websocket)
async def send_personal_message(self, message: str, websocket: WebSocket):
await websocket.send_text(message)
async def broadcast(self, message: str):
for connection in self.active_connections:
await connection.send_text(message)
manager = ConnectionManager()
@app.websocket("/ws/{client_id}")
async def websocket_endpoint(websocket: WebSocket, client_id: str):
await manager.connect(websocket)
try:
while True:
data = await websocket.receive_text()
# Process real-time data
response = {"message": f"Echo: {data}", "client": client_id}
await manager.send_personal_message(json.dumps(response), websocket)
except WebSocketDisconnect:
manager.disconnect(websocket)Monitoreo y verificaciones de salud
Implementa un monitoreo completo para tu API en tiempo real:
from fastapi.middleware.tracking import TrackingMiddleware
# Add tracking middleware
app.add_middleware(TrackingMiddleware)
@app.get("/health")
async def health_check():
return {
"status": "healthy",
"timestamp": datetime.now().isoformat(),
"services": {
"database": "connected",
"cache": "connected"
}
}
@app.get("/metrics")
async def get_metrics():
import psutil
return {
"cpu_percent": psutil.cpu_percent(),
"memory_percent": psutil.virtual_memory().percent,
"active_connections": len(manager.active_connections)
}Consideraciones para despliegue en producción
Para despliegues en producción, considera usar Gunicorn con trabajadores uvicorn:
# gunicorn_config.py
bind = "0.0.0.0:8000"
workers = 4
worker_class = "uvicorn.workers.UvicornWorker"
worker_connections = 1000
timeout = 30
keepalive = 2
max_requests = 1000
max_requests_jitter = 100
preload = FalseConclusión
Construir APIs REST de alto rendimiento para aplicaciones de datos en tiempo real con FastAPI y Pydantic proporciona a los desarrolladores una combinación poderosa de características. La documentación OpenAPI automática, la robusta validación de datos y las capacidades asíncronas hacen que esta pila sea ideal para aplicaciones modernas. Siguiendo los patrones demostrados en esta guía, podrás construir APIs que no solo tengan un buen rendimiento, sino que también mantengan la integridad de los datos y proporcionen una excelente experiencia de desarrollo a través de una documentación API completa.
Ya sea que estés construyendo plataformas de datos IoT, sistemas de trading financiero o paneles de análisis en tiempo real, FastAPI y Pydantic ofrecen la base necesaria para crear APIs escalables, mantenibles y de alto rendimiento que puedan manejar eficientemente cargas de datos en tiempo real.