Performance
N+1 queries → joins
Mal:
things = await repo.list_paginated()
for t in things:
print(t.category.name) # ← una query extra por thing
Bien — eager load:
BaseRepository._apply_joins() detecta tipo de relación:
- uselist=True (1:N) → selectinload
- uselist=False (N:1) → joinedload
Joins por acción — get_kwargs_query
def get_kwargs_query(self) -> dict:
if self.action == "list_things":
return {"joins": ["category", "owner"]}
if self.action == "retrieve":
return {"joins": ["category", "owner", "tags", "comments"]}
return {}
selectinload vs joinedload
selectinload— segunda query conWHERE id IN (...). Mejor para 1:N (sin row multiplication).joinedload— un JOIN, devuelve cartesian product. Mejor para N:1 simple.
from sqlalchemy.orm import joinedload, selectinload
stmt = (
select(Thing)
.options(
selectinload(Thing.tags), # 1:N → 2 queries
joinedload(Thing.category), # N:1 → 1 query con JOIN
)
)
Connection pool
engine = create_async_engine(
DATABASE_URL,
pool_size=20, # default 5
max_overflow=10, # default 10
pool_pre_ping=True, # ping antes de usar conexión (evita stale)
pool_recycle=3600, # reciclar cada hora
)
NullPool solo para tests (cada query abre + cierra conexión).
Indexes
Models DEBEN declarar indexes en columnas filtradas/buscadas/joineadas:
class Thing(BaseModel):
__tablename__ = "things"
slug: Mapped[str] = mapped_column(String(100), unique=True, index=True)
company_id: Mapped[UUID] = mapped_column(GUID(), ForeignKey("companies.id"), index=True)
status: Mapped[str] = mapped_column(String(20), index=True)
Compound indexes:
from sqlalchemy import Index
class Thing(BaseModel):
__table_args__ = (
Index("ix_things_company_status", "company_id", "status"),
)
Query profiling
Con echo=True cada query SQL se imprime. Cuenta tu N+1 ahí.
Para producción usa SQLAlchemy events:
from sqlalchemy import event
import time
@event.listens_for(engine.sync_engine, "before_cursor_execute")
def before(conn, cursor, statement, parameters, context, executemany):
context._query_start = time.time()
@event.listens_for(engine.sync_engine, "after_cursor_execute")
def after(conn, cursor, statement, parameters, context, executemany):
elapsed = time.time() - context._query_start
if elapsed > 0.5: # log slow queries
logger.warning("Slow query (%.2fs): %s", elapsed, statement)
Caching con Redis
import json
from app.config.settings import settings
import redis.asyncio as aioredis
redis_client = aioredis.from_url(settings.redis_url)
async def get_cached(key: str):
data = await redis_client.get(key)
return json.loads(data) if data else None
async def set_cached(key: str, value: dict, ttl: int = 60):
await redis_client.setex(key, ttl, json.dumps(value, default=str))
Cache aggressively read-heavy endpoints (catalogs, configs). Invalida en mutations.
Async vs sync
Toda la lib es async. NO bloquees el event loop con:
- requests.get() → usa httpx.AsyncClient
- time.sleep() → usa asyncio.sleep()
- boto3 → usa aioboto3
Para CPU-bound work (image resize, ML inference) usa run_in_executor o ARQ background tasks.
Background tasks
# Con ARQ
from arq.connections import ArqRedis
async def schedule_task(redis: ArqRedis, thing_id):
await redis.enqueue_job("process_thing", thing_id)
Worker corre en proceso separado, libera el request thread.