Saltar a contenido

BaseRepository

fastapi_basekit.aio.sqlalchemy.repository.base.BaseRepository

Repositorio base para SQLAlchemy Async.

Define operaciones CRUD, acceso por filtros y carga de relaciones (joins). Establece model en la subclase (modelo ORM de SQLAlchemy).

Source code in fastapi_basekit/aio/sqlalchemy/repository/base.py
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
class BaseRepository:
    """
    Repositorio base para SQLAlchemy Async.

    Define operaciones CRUD, acceso por filtros y carga de relaciones (joins).
    Establece `model` en la subclase (modelo ORM de SQLAlchemy).
    """

    model: Type[Any]
    service: Optional[Any] = None

    def __init__(self, db: AsyncSession):
        """Inicializa el repositorio con la sesión a reutilizar."""
        self._session = db
        self.service = None

    @property
    def session(self) -> AsyncSession:
        return self._session

    def _get_field(self, field_name: str):
        """Valida y retorna el atributo (columna) del modelo."""
        if not self.model:
            raise ValueError("El modelo no está definido en el repositorio")
        field = getattr(self.model, field_name, None)
        if not field:
            raise AttributeError(
                f"El campo '{field_name}' no existe en {self.model.__name__}"
            )
        return field

    def _apply_joins(
        self, query: Select[Tuple[Any]], joins: Optional[List[str]]
    ) -> Any:
        """Aplica las opciones de carga (joins) dinámicamente al query."""
        if joins:
            for relation in joins:
                if hasattr(self.model, relation):
                    relationship_attr = getattr(self.model, relation)
                    if getattr(relationship_attr.property, "uselist", False):
                        query = query.options(selectinload(relationship_attr))
                    else:
                        query = query.options(joinedload(relationship_attr))
        return query

    def _resolve_field_path(
        self, path: str
    ) -> Tuple[Optional[Any], Dict[str, Any]]:
        """
        Resuelve una ruta de campo (ej. 'user__role__name') a su atributo
        de SQLAlchemy y el conjunto de JOINs necesarios para el filtrado.

        Args:
            path: Ruta del campo con sintaxis '__'

        Returns:
            Tuple con:
            - attr: Atributo final (columna o relación) o None si no existe
            - joins_to_apply: Dict con las relaciones intermedias (name: attr)
        """
        parts = path.split("__")
        current_model = self.model
        attr = getattr(current_model, parts[0], None)
        joins_to_apply = {}

        if attr is None:
            return None, {}

        # Recorrer la cadena de relaciones intermedias
        for part in parts[1:]:
            # Chequear si el atributo actual es una relación ORM
            if hasattr(attr.property, "entity") and isinstance(
                attr.property, Relationship
            ):
                # Es una relación: Añadir al diccionario de JOINs
                relation_name = attr.key
                joins_to_apply[relation_name] = attr

                # Mover al modelo relacionado y obtener el siguiente atributo
                current_model = attr.property.mapper.class_
                attr = getattr(current_model, part, None)

                if attr is None:
                    return None, {}
            else:
                # No es una relación o es el final del path
                break

        return attr, joins_to_apply

    def _resolve_attribute(
        self, filters: Dict[str, Any]
    ) -> Tuple[Dict[str, Tuple[Any, Any]], Dict[str, Any]]:
        """
        Resuelve los atributos finales y las relaciones para JOINs a partir
        de una ruta con '__'.

        Soporta filtrado avanzado con relaciones usando sintaxis de doble
        guion bajo (__). Por ejemplo:
        - Filtro simple: {"status": "active"} → WHERE users.status = 'active'
        - Filtro con relación: {"user_roles__role__code": "Impulsado"} →
          JOINs automáticos + condición WHERE

        Args:
            filters: Diccionario con filtros que pueden incluir rutas con '__'

        Returns:
            Tuple con:
            - resolved_filters: Dict con (columna/atributo final, valor
              procesado)
            - joins_to_apply: Dict con las relaciones que necesitan JOINs
              (nombre_relacion: atributo_relacion)

        Note:
            Los filtros que no se pueden resolver (campos/relaciones
            inexistentes) se omiten silenciosamente. Si todos los filtros
            son omitidos, `resolved_filters` estará vacío y los métodos
            que usan este resultado deben manejar este caso (retornando
            lista vacía o None según corresponda).
        """
        resolved_filters = {}
        joins_to_apply = {}

        for filter_path, value in filters.items():
            attr, field_joins = self._resolve_field_path(filter_path)

            if attr is None:
                continue

            joins_to_apply.update(field_joins)

            # Procesar valor (manejo de Enum a su valor/lista de valores)
            processed_value = value
            if (
                isinstance(value, (list, tuple))
                and value
                and hasattr(value[0], "value")
            ):
                processed_value = [v.value for v in value]
            elif hasattr(value, "value"):
                processed_value = value.value

            # Verificar que no sea una relación (solo queremos columnas para filtrar)
            is_relationship = isinstance(attr.property, Relationship)
            is_column = hasattr(attr.property, "columns") or hasattr(
                attr, "comparator"
            )

            if not is_relationship and is_column:
                resolved_filters[filter_path] = (attr, processed_value)

        return resolved_filters, joins_to_apply

    def _resolve_order_by(
        self,
        order_by: Optional[Any] = None,
        special_fields: Optional[Dict[str, Any]] = None,
    ) -> Tuple[Optional[Any], Dict[str, Any]]:
        """
        Resuelve ordenamiento y relaciones para JOINs.

        Método general que resuelve rutas de relaciones usando
        sintaxis '__' y soporta prefijo '-' para orden descendente.
        Similar a _resolve_attribute pero adaptado para order_by.

        Soporta ordenamiento avanzado con relaciones usando sintaxis
        de doble guion bajo (__). Por ejemplo:
        - Orden simple: "created_at" → ORDER BY model.created_at ASC
        - Orden descendente: "-created_at" → ORDER BY model.created_at DESC
        - Orden con relación: "user__full_name" →
          JOIN users + ORDER BY users.full_name ASC
        - Orden descendente con relación: "-user__email" →
          JOIN users + ORDER BY users.email DESC
        - Campos especiales (aliases): "borrower_name" →
          ORDER BY Users.full_name ASC (si está en special_fields)

        Args:
            order_by: Campo por el cual ordenar. Puede incluir:
                     - Campos del modelo: "created_at", "status", etc.
                     - Prefijo '-' para descendente: "-created_at", "-status"
                     - Rutas de relaciones: "user__full_name",
                       "user__role__name"
                     - Combinaciones: "-user__email"
                     - Campos especiales: "borrower_name" (si está en
                       special_fields)
                     - También puede ser una expresión SQLAlchemy
            default_order_by: Campo por defecto si order_by es None o
                             no válido.
                             Por defecto: "created_at"
            special_fields: Diccionario opcional que mapea nombres de campos
                          especiales (aliases en el query) a atributos
                          SQLAlchemy. Ejemplo:
                          {"borrower_name": Users.full_name,
                           "borrower_email": Users.email}
                          Estos campos ya están disponibles en el query sin
                          necesidad de JOIN adicional.

        Returns:
            Tuple con:
            - order_expression: Expresión SQLAlchemy para order_by
              (None si no se pudo resolver, o el mismo order_by si ya
              es una expresión SQLAlchemy)
            - joins_to_apply: Dict con las relaciones que necesitan JOINs
              (nombre_relacion: atributo_relacion)

        Note:
            Si el campo no se puede resolver (campo/relación inexistente),
            retorna (None, {}). Los métodos que usan este resultado deben
            manejar este caso aplicando un ordenamiento por defecto.
            Si order_by ya es una expresión SQLAlchemy (no string), la retorna
            sin modificar.
        """
        # Si order_by ya es una expresión SQLAlchemy (no string), retornarla
        # directamente sin procesar
        if order_by is not None and not isinstance(order_by, str):
            return order_by, {}

        if not order_by:
             # Si no hay order_by, no ordenamos (o el caller debería haber pasado un default)
             return None, {}

        joins_to_apply: Dict[str, Any] = {}

        # Detectar si tiene prefijo '-' para invertir el orden
        has_minus_prefix = order_by.startswith("-")
        field_path = order_by.lstrip("-")

        # 1. Verificar primero si es un campo especial (alias en el query)
        # Estos campos ya están disponibles sin necesidad de JOIN
        if special_fields and field_path in special_fields:
            attr = special_fields[field_path]
            # Determinar dirección del ordenamiento
            should_descend = has_minus_prefix
            order_expression = attr.desc() if should_descend else attr.asc()
            return order_expression, {}

        # 2. Si no es un campo especial, procesar con el helper
        attr, field_joins = self._resolve_field_path(field_path)

        if attr is None:
            return None, {}

        joins_to_apply.update(field_joins)

        # 4. Verificar que el atributo final es válido para ordenar
        is_relationship = isinstance(attr.property, Relationship)
        is_column = hasattr(attr.property, "columns") or hasattr(
            attr, "comparator"
        )

        if is_relationship or not is_column:
            return None, {}

        # 5. Determinar la dirección del ordenamiento
        should_descend = has_minus_prefix

        # 6. Crear la expresión de ordenamiento
        order_expression = attr.desc() if should_descend else attr.asc()

        return order_expression, joins_to_apply

    def _build_conditions(
        self,
        filters: Optional[Dict[str, Any]] = None,
        resolved_filters: Optional[Dict[str, Tuple[Any, Any]]] = None,
    ) -> List[Any]:
        """
        Construye condiciones WHERE a partir de los atributos resueltos.

        Soporta dos modos de uso:
        1. Modo legacy (retrocompatibilidad): Pasa filters directamente
        2. Modo avanzado: Pasa resolved_filters con atributos ya resueltos

        Args:
            filters: Diccionario de filtros simples (modo legacy)
            resolved_filters: Diccionario con (atributo, valor_procesado)
                             (modo avanzado)

        Returns:
            Lista de condiciones SQLAlchemy
        """
        conditions: List[Any] = []

        # Modo avanzado: usar resolved_filters si está disponible
        if resolved_filters is not None:
            for _, (field, processed_value) in resolved_filters.items():
                # Aplicar la lógica de IN o ==
                if isinstance(
                    processed_value, (list, tuple)
                ) and not isinstance(processed_value, str):
                    conditions.append(field.in_(processed_value))
                else:
                    conditions.append(field == processed_value)
        # Modo legacy: compatibilidad con código existente
        elif filters is not None:
            for fn, value in filters.items():
                field = self._get_field(fn)
                if isinstance(value, (list, tuple)) and not isinstance(
                    value, str
                ):
                    conditions.append(field.in_(value))
                else:
                    conditions.append(field == value)

        return conditions

    def _build_search_condition(
        self, search: Optional[str], search_fields: Optional[List[str]]
    ) -> Tuple[Optional[Any], Dict[str, Any]]:
        """Construye una condición OR para búsqueda textual en múltiples campos

        Usa ilike (insensible a mayúsculas) si hay término y campos válidos.
        Soporta rutas anidadas con '__'.

        Returns:
            Tuple con:
            - search_condition: Condición OR de SQLAlchemy o None
            - search_joins: Dict con JOINs necesarios para la búsqueda
        """
        if not search or not search_fields:
            return None, {}

        exprs: List[Any] = []
        search_joins: Dict[str, Any] = {}

        for field_path in search_fields:
            attr, field_joins = self._resolve_field_path(field_path)

            if attr is None:
                continue

            search_joins.update(field_joins)

            # Verificar que sea una columna válida para ILIKE
            is_column = hasattr(attr.property, "columns") or hasattr(
                attr, "comparator"
            )
            is_relationship = isinstance(attr.property, Relationship)

            if not is_relationship and is_column:
                exprs.append(attr.ilike(f"%{search}%"))

        if not exprs:
            return None, {}

        return or_(*exprs), search_joins

    async def create(self, obj_in: Any | Dict) -> Any:
        """Crea un nuevo registro en la base de datos."""
        db = self.session
        if isinstance(obj_in, dict):
            obj_in = self.model(**obj_in)
        db.add(obj_in)
        await db.flush()
        await db.refresh(obj_in)
        return obj_in

    async def _get_one(
        self,
        conditions: Optional[List[Any]] = None,
        joins: Optional[List[str]] = None,
        raise_exception: bool = False,
    ) -> Optional[Any]:
        """
        Método interno unificado para obtener un único registro.

        Args:
            conditions: Lista de condiciones WHERE
            joins: Lista de relaciones para carga eager
            raise_exception: Si True, lanza excepción si no se encuentra

        Returns:
            Registro encontrado o None
        """
        if not self.model:
            raise ValueError("El modelo no está definido en el repositorio")

        query = select(self.model)
        if conditions:
            query = query.where(and_(*conditions))

        query = self._apply_joins(query, joins)

        result = await self.session.execute(query)
        record = result.scalars().first()

        if raise_exception and record is None:
            raise NotFoundException(
                message=f"{self.model.__name__} no encontrado"
            )

        return record

    async def get(self, record_id: Union[str, UUID]) -> Optional[Any]:
        """Obtiene un registro por su ID."""
        if not self.model:
            raise ValueError("El modelo no está definido en el repositorio")
        return await self.session.get(self.model, record_id)

    async def get_by_field(self, field_name: str, value: Any) -> Optional[Any]:
        """Obtiene un registro por un campo específico."""
        field = self._get_field(field_name)
        return await self._get_one(conditions=[field == value])

    async def get_with_joins(
        self,
        record_id: Union[str, UUID],
        joins: Optional[List[str]] = None,
    ) -> Optional[Any]:
        """Obtiene un registro por ID y carga relaciones dinámicamente."""
        return await self._get_one(
            conditions=[self.model.id == record_id], joins=joins
        )

    async def get_by_field_with_joins(
        self,
        field_name: str,
        value: Any,
        joins: Optional[List[str]] = None,
    ) -> Optional[Any]:
        """
        Obtiene un registro por un campo y carga relaciones dinámicamente.

        Soporta filtros con relaciones usando sintaxis '__'.
        """
        # Si el campo tiene sintaxis '__', usar resolución de atributos
        if "__" in field_name:
            # Resolver el atributo y aplicar JOINs necesarios
            resolved_filters, joins_to_apply = self._resolve_attribute(
                {field_name: value}
            )

            if not resolved_filters:
                return None

            # Obtener el atributo resuelto
            _, (field, processed_value) = next(iter(resolved_filters.items()))

            # Construir query con JOINs de filtrado
            query = select(self.model)

            # Aplicar JOINs de filtrado
            for relation_name, relation_attr in joins_to_apply.items():
                query = query.join(relation_attr)

            # Aplicar condición
            query = query.where(field == processed_value)

            # Aplicar joins de carga
            query = self._apply_joins(query, joins)

            result = await self.session.execute(query)
            return result.scalars().first()
        else:
            # Campo simple, usar método estándar
            field = self._get_field(field_name)
            return await self._get_one(
                conditions=[field == value], joins=joins
            )

    async def get_by_filters(
        self, filters: Dict[str, Any], use_or: bool = False
    ) -> Sequence[Any]:
        """
        Obtiene registros que coinciden con los filtros especificados.

        Soporta filtros simples y filtros con relaciones usando sintaxis '__'.
        """
        if not self.model:
            raise ValueError("El modelo no está definido en el repositorio")

        # Resolver filtros y JOINs necesarios
        resolved_filters, joins_to_apply = self._resolve_attribute(filters)

        # Inicializar query
        query = select(self.model)

        # Aplicar JOINs de filtrado si hay relaciones
        for relation_name, relation_attr in joins_to_apply.items():
            query = query.join(relation_attr)

        # Construir condiciones
        conditions = self._build_conditions(resolved_filters=resolved_filters)

        # Verificar si hay condiciones antes de combinarlas
        # and_() y or_() requieren al menos un argumento
        if not conditions:
            # Si no hay condiciones válidas después de resolver filtros,
            # retornar lista vacía (comportamiento restrictivo)
            return []

        combined = or_(*conditions) if use_or else and_(*conditions)
        query = query.where(combined)

        result = await self.session.execute(query)
        return result.scalars().all()

    async def get_by_filters_with_joins(
        self,
        filters: Dict[str, Any],
        use_or: bool = False,
        joins: Optional[List[str]] = None,
        one: bool = False,
    ) -> Sequence[Any] | Optional[Any]:
        """
        Obtiene registros por filtros y carga relaciones dinámicamente.

        Soporta filtros simples y filtros con relaciones usando sintaxis '__'.
        Los JOINs de filtrado se aplican automáticamente cuando se detectan
        relaciones en los filtros.
        """
        # Resolver filtros y JOINs necesarios
        resolved_filters, joins_to_apply = self._resolve_attribute(filters)

        # Inicializar query
        query = select(self.model)

        # Aplicar JOINs de filtrado
        for relation_name, relation_attr in joins_to_apply.items():
            query = query.join(relation_attr)

        # Construir condiciones
        conditions = self._build_conditions(resolved_filters=resolved_filters)

        # Verificar si hay condiciones antes de combinarlas
        # and_() y or_() requieren al menos un argumento
        if not conditions:
            # Retornar None si one=True, o lista vacía si one=False
            return None if one else []

        combined = or_(*conditions) if use_or else and_(*conditions)
        query = query.where(combined)

        # Aplicar joins de carga (joinedload/selectinload)
        query = self._apply_joins(query, joins)

        result = await self.session.execute(query)
        return result.scalars().first() if one else result.scalars().all()

    def apply_list_filters(
        self,
        queryset: Select[Tuple[Any]],
        filters: Optional[Dict[str, Any]] = None,
        use_or: bool = False,
        joins: Optional[List[str]] = None,
        order_by: Optional[Any] = None,
        search: Optional[str] = None,
        search_fields: Optional[List[str]] = None,
    ) -> Select[Tuple[Any]]:
        """Aplica filtros estándar, búsqueda y ordenamiento al query."""
        filters = filters or {}

        # 1. RESOLUCIÓN UNIVERSAL DE FILTROS (JOINs de filtrado y atributos)
        resolved_filters, joins_to_apply = self._resolve_attribute(filters)

        # 2. Aplicar JOINs de filtrado
        for relation_name, relation_attr in joins_to_apply.items():
            queryset = queryset.join(relation_attr)

        # 3. Construir las condiciones WHERE
        conditions = self._build_conditions(resolved_filters=resolved_filters)
        combined_filters = (
            or_(*conditions)
            if (use_or and conditions)
            else and_(*conditions) if conditions else None
        )

        # 4. Aplicar búsqueda textual (search)
        search_condition, search_joins = self._build_search_condition(
            search, search_fields
        )

        # 5. Aplicar JOINs de búsqueda
        for relation_name, relation_attr in search_joins.items():
            # Evitar unir dos veces la misma relación si ya se hizo por filtros
            # NOTA: join() en SQLAlchemy es idempotente si es la misma entidad/atributo
            queryset = queryset.join(relation_attr)

        # 6. Combina todos los filtros
        if combined_filters is not None and search_condition is not None:
            queryset = queryset.where(
                and_(combined_filters, search_condition)
            )
        elif combined_filters is not None:
            queryset = queryset.where(combined_filters)
        elif search_condition is not None:
            queryset = queryset.where(search_condition)

        # 6. Resolver y aplicar orden
        order_expression, order_joins = self._resolve_order_by(
            order_by=order_by,
        )

        for relation_name, relation_attr in order_joins.items():
            queryset = queryset.join(relation_attr)

        if order_expression is not None:
            queryset = queryset.order_by(order_expression)

        queryset = self._apply_joins(queryset, joins)

        return queryset

    def build_list_queryset(
        self,
        **kwargs: Any,
    ) -> Select[Tuple[Any]]:
        """
        Construye el query inicial para listado.
        Debe retornar un objeto Select.
        """
        return select(self.model)

    async def list_paginated(
        self,
        page: int = 1,
        count: int = 25,
        filters: Optional[Dict[str, Any]] = None,
        use_or: bool = False,
        joins: Optional[List[str]] = None,
        order_by: Optional[Any] = None,
        search: Optional[str] = None,
        search_fields: Optional[List[str]] = None,
        **kwargs: Any,
    ) -> tuple[List[Any], int]:

        # 1. Construir query base
        query_kwargs = {
            "filters": filters,
            "use_or": use_or,
            "joins": joins,
            "order_by": order_by,
            "search": search,
            "search_fields": search_fields,
        }
        try:
            # Intentar con argumentos (subclases modernas)
            queryset = self.build_list_queryset(**query_kwargs)
        except TypeError:
            # Fallback sin argumentos (subclases legacy)
            queryset = self.build_list_queryset()

        # 2. Aplicar filtros estándar
        queryset = self.apply_list_filters(
            queryset=queryset,
            filters=filters,
            use_or=use_or,
            joins=joins,
            order_by=order_by,
            search=search,
            search_fields=search_fields,
        ) 
        # Count total
        db = self.session

        # Para contar, hacemos un subquery para asegurar que contamos filas únicas
        count_query = select(func.count()).select_from(queryset.subquery())
        total = (await db.execute(count_query)).scalar_one()

        # Page items
        offset = count * (page - 1)
        page_query = queryset.offset(offset).limit(count)
        result = await db.execute(page_query)
        rows = result.unique().all()

        # Procesar filas para soportar "Result Hydration"
        items = []
        for row in rows:
            # Si la fila tiene un solo elemento, tratarlo como escalar normal
            if len(row) == 1:
                items.append(row[0])
            else:
                # Fila compleja: tomar el primer elemento como entidad
                entity = row[0]

                column_keys = list(row._mapping.keys())

                for i in range(1, len(row)):
                    if i < len(column_keys):
                        key = column_keys[i]
                    else:
                        key = f"_extra_{i}"

                    value = row[i]
                    setattr(entity, key, value)

                items.append(entity)

        return items, int(total)

    async def update(
        self,
        record_id: Union[str, UUID],
        update_data: Dict[str, Any],
    ) -> Any:
        """Actualiza un registro por ID con los campos provistos."""
        if not self.model:
            raise ValueError("El modelo no está definido en el repositorio")
        db = self.session
        record = await db.get(self.model, record_id)
        if not record:
            raise NotFoundException(
                message=f"{self.model.__name__} no encontrado"
            )
        for key, value in update_data.items():
            if value is not None and hasattr(record, key):
                setattr(record, key, value)
        await db.flush()
        await db.refresh(record)
        return record

    async def delete(
        self,
        record_id: Union[str, UUID],
        model: Optional[Type[Any]] = None,
    ) -> bool:
        """Elimina un registro por ID."""
        model = model or self.model
        if not model:
            raise ValueError("El modelo no está definido en el repositorio")
        db = self.session
        record = await db.get(model, record_id)
        if not record:
            raise NotFoundException(message=f"{model.__name__} no encontrado")
        await db.delete(record)
        await db.flush()
        return True

Functions

get(record_id) async

Obtiene un registro por su ID.

Source code in fastapi_basekit/aio/sqlalchemy/repository/base.py
async def get(self, record_id: Union[str, UUID]) -> Optional[Any]:
    """Obtiene un registro por su ID."""
    if not self.model:
        raise ValueError("El modelo no está definido en el repositorio")
    return await self.session.get(self.model, record_id)

get_by_field(field_name, value) async

Obtiene un registro por un campo específico.

Source code in fastapi_basekit/aio/sqlalchemy/repository/base.py
async def get_by_field(self, field_name: str, value: Any) -> Optional[Any]:
    """Obtiene un registro por un campo específico."""
    field = self._get_field(field_name)
    return await self._get_one(conditions=[field == value])

get_by_filters(filters, use_or=False) async

Obtiene registros que coinciden con los filtros especificados.

Soporta filtros simples y filtros con relaciones usando sintaxis '__'.

Source code in fastapi_basekit/aio/sqlalchemy/repository/base.py
async def get_by_filters(
    self, filters: Dict[str, Any], use_or: bool = False
) -> Sequence[Any]:
    """
    Obtiene registros que coinciden con los filtros especificados.

    Soporta filtros simples y filtros con relaciones usando sintaxis '__'.
    """
    if not self.model:
        raise ValueError("El modelo no está definido en el repositorio")

    # Resolver filtros y JOINs necesarios
    resolved_filters, joins_to_apply = self._resolve_attribute(filters)

    # Inicializar query
    query = select(self.model)

    # Aplicar JOINs de filtrado si hay relaciones
    for relation_name, relation_attr in joins_to_apply.items():
        query = query.join(relation_attr)

    # Construir condiciones
    conditions = self._build_conditions(resolved_filters=resolved_filters)

    # Verificar si hay condiciones antes de combinarlas
    # and_() y or_() requieren al menos un argumento
    if not conditions:
        # Si no hay condiciones válidas después de resolver filtros,
        # retornar lista vacía (comportamiento restrictivo)
        return []

    combined = or_(*conditions) if use_or else and_(*conditions)
    query = query.where(combined)

    result = await self.session.execute(query)
    return result.scalars().all()

get_with_joins(record_id, joins=None) async

Obtiene un registro por ID y carga relaciones dinámicamente.

Source code in fastapi_basekit/aio/sqlalchemy/repository/base.py
async def get_with_joins(
    self,
    record_id: Union[str, UUID],
    joins: Optional[List[str]] = None,
) -> Optional[Any]:
    """Obtiene un registro por ID y carga relaciones dinámicamente."""
    return await self._get_one(
        conditions=[self.model.id == record_id], joins=joins
    )

get_by_field_with_joins(field_name, value, joins=None) async

Obtiene un registro por un campo y carga relaciones dinámicamente.

Soporta filtros con relaciones usando sintaxis '__'.

Source code in fastapi_basekit/aio/sqlalchemy/repository/base.py
async def get_by_field_with_joins(
    self,
    field_name: str,
    value: Any,
    joins: Optional[List[str]] = None,
) -> Optional[Any]:
    """
    Obtiene un registro por un campo y carga relaciones dinámicamente.

    Soporta filtros con relaciones usando sintaxis '__'.
    """
    # Si el campo tiene sintaxis '__', usar resolución de atributos
    if "__" in field_name:
        # Resolver el atributo y aplicar JOINs necesarios
        resolved_filters, joins_to_apply = self._resolve_attribute(
            {field_name: value}
        )

        if not resolved_filters:
            return None

        # Obtener el atributo resuelto
        _, (field, processed_value) = next(iter(resolved_filters.items()))

        # Construir query con JOINs de filtrado
        query = select(self.model)

        # Aplicar JOINs de filtrado
        for relation_name, relation_attr in joins_to_apply.items():
            query = query.join(relation_attr)

        # Aplicar condición
        query = query.where(field == processed_value)

        # Aplicar joins de carga
        query = self._apply_joins(query, joins)

        result = await self.session.execute(query)
        return result.scalars().first()
    else:
        # Campo simple, usar método estándar
        field = self._get_field(field_name)
        return await self._get_one(
            conditions=[field == value], joins=joins
        )

get_by_filters_with_joins(filters, use_or=False, joins=None, one=False) async

Obtiene registros por filtros y carga relaciones dinámicamente.

Soporta filtros simples y filtros con relaciones usando sintaxis '__'. Los JOINs de filtrado se aplican automáticamente cuando se detectan relaciones en los filtros.

Source code in fastapi_basekit/aio/sqlalchemy/repository/base.py
async def get_by_filters_with_joins(
    self,
    filters: Dict[str, Any],
    use_or: bool = False,
    joins: Optional[List[str]] = None,
    one: bool = False,
) -> Sequence[Any] | Optional[Any]:
    """
    Obtiene registros por filtros y carga relaciones dinámicamente.

    Soporta filtros simples y filtros con relaciones usando sintaxis '__'.
    Los JOINs de filtrado se aplican automáticamente cuando se detectan
    relaciones en los filtros.
    """
    # Resolver filtros y JOINs necesarios
    resolved_filters, joins_to_apply = self._resolve_attribute(filters)

    # Inicializar query
    query = select(self.model)

    # Aplicar JOINs de filtrado
    for relation_name, relation_attr in joins_to_apply.items():
        query = query.join(relation_attr)

    # Construir condiciones
    conditions = self._build_conditions(resolved_filters=resolved_filters)

    # Verificar si hay condiciones antes de combinarlas
    # and_() y or_() requieren al menos un argumento
    if not conditions:
        # Retornar None si one=True, o lista vacía si one=False
        return None if one else []

    combined = or_(*conditions) if use_or else and_(*conditions)
    query = query.where(combined)

    # Aplicar joins de carga (joinedload/selectinload)
    query = self._apply_joins(query, joins)

    result = await self.session.execute(query)
    return result.scalars().first() if one else result.scalars().all()

list_paginated(page=1, count=25, filters=None, use_or=False, joins=None, order_by=None, search=None, search_fields=None, **kwargs) async

Source code in fastapi_basekit/aio/sqlalchemy/repository/base.py
async def list_paginated(
    self,
    page: int = 1,
    count: int = 25,
    filters: Optional[Dict[str, Any]] = None,
    use_or: bool = False,
    joins: Optional[List[str]] = None,
    order_by: Optional[Any] = None,
    search: Optional[str] = None,
    search_fields: Optional[List[str]] = None,
    **kwargs: Any,
) -> tuple[List[Any], int]:

    # 1. Construir query base
    query_kwargs = {
        "filters": filters,
        "use_or": use_or,
        "joins": joins,
        "order_by": order_by,
        "search": search,
        "search_fields": search_fields,
    }
    try:
        # Intentar con argumentos (subclases modernas)
        queryset = self.build_list_queryset(**query_kwargs)
    except TypeError:
        # Fallback sin argumentos (subclases legacy)
        queryset = self.build_list_queryset()

    # 2. Aplicar filtros estándar
    queryset = self.apply_list_filters(
        queryset=queryset,
        filters=filters,
        use_or=use_or,
        joins=joins,
        order_by=order_by,
        search=search,
        search_fields=search_fields,
    ) 
    # Count total
    db = self.session

    # Para contar, hacemos un subquery para asegurar que contamos filas únicas
    count_query = select(func.count()).select_from(queryset.subquery())
    total = (await db.execute(count_query)).scalar_one()

    # Page items
    offset = count * (page - 1)
    page_query = queryset.offset(offset).limit(count)
    result = await db.execute(page_query)
    rows = result.unique().all()

    # Procesar filas para soportar "Result Hydration"
    items = []
    for row in rows:
        # Si la fila tiene un solo elemento, tratarlo como escalar normal
        if len(row) == 1:
            items.append(row[0])
        else:
            # Fila compleja: tomar el primer elemento como entidad
            entity = row[0]

            column_keys = list(row._mapping.keys())

            for i in range(1, len(row)):
                if i < len(column_keys):
                    key = column_keys[i]
                else:
                    key = f"_extra_{i}"

                value = row[i]
                setattr(entity, key, value)

            items.append(entity)

    return items, int(total)

build_list_queryset(**kwargs)

Construye el query inicial para listado. Debe retornar un objeto Select.

Source code in fastapi_basekit/aio/sqlalchemy/repository/base.py
def build_list_queryset(
    self,
    **kwargs: Any,
) -> Select[Tuple[Any]]:
    """
    Construye el query inicial para listado.
    Debe retornar un objeto Select.
    """
    return select(self.model)

apply_list_filters(queryset, filters=None, use_or=False, joins=None, order_by=None, search=None, search_fields=None)

Aplica filtros estándar, búsqueda y ordenamiento al query.

Source code in fastapi_basekit/aio/sqlalchemy/repository/base.py
def apply_list_filters(
    self,
    queryset: Select[Tuple[Any]],
    filters: Optional[Dict[str, Any]] = None,
    use_or: bool = False,
    joins: Optional[List[str]] = None,
    order_by: Optional[Any] = None,
    search: Optional[str] = None,
    search_fields: Optional[List[str]] = None,
) -> Select[Tuple[Any]]:
    """Aplica filtros estándar, búsqueda y ordenamiento al query."""
    filters = filters or {}

    # 1. RESOLUCIÓN UNIVERSAL DE FILTROS (JOINs de filtrado y atributos)
    resolved_filters, joins_to_apply = self._resolve_attribute(filters)

    # 2. Aplicar JOINs de filtrado
    for relation_name, relation_attr in joins_to_apply.items():
        queryset = queryset.join(relation_attr)

    # 3. Construir las condiciones WHERE
    conditions = self._build_conditions(resolved_filters=resolved_filters)
    combined_filters = (
        or_(*conditions)
        if (use_or and conditions)
        else and_(*conditions) if conditions else None
    )

    # 4. Aplicar búsqueda textual (search)
    search_condition, search_joins = self._build_search_condition(
        search, search_fields
    )

    # 5. Aplicar JOINs de búsqueda
    for relation_name, relation_attr in search_joins.items():
        # Evitar unir dos veces la misma relación si ya se hizo por filtros
        # NOTA: join() en SQLAlchemy es idempotente si es la misma entidad/atributo
        queryset = queryset.join(relation_attr)

    # 6. Combina todos los filtros
    if combined_filters is not None and search_condition is not None:
        queryset = queryset.where(
            and_(combined_filters, search_condition)
        )
    elif combined_filters is not None:
        queryset = queryset.where(combined_filters)
    elif search_condition is not None:
        queryset = queryset.where(search_condition)

    # 6. Resolver y aplicar orden
    order_expression, order_joins = self._resolve_order_by(
        order_by=order_by,
    )

    for relation_name, relation_attr in order_joins.items():
        queryset = queryset.join(relation_attr)

    if order_expression is not None:
        queryset = queryset.order_by(order_expression)

    queryset = self._apply_joins(queryset, joins)

    return queryset

create(obj_in) async

Crea un nuevo registro en la base de datos.

Source code in fastapi_basekit/aio/sqlalchemy/repository/base.py
async def create(self, obj_in: Any | Dict) -> Any:
    """Crea un nuevo registro en la base de datos."""
    db = self.session
    if isinstance(obj_in, dict):
        obj_in = self.model(**obj_in)
    db.add(obj_in)
    await db.flush()
    await db.refresh(obj_in)
    return obj_in

update(record_id, update_data) async

Actualiza un registro por ID con los campos provistos.

Source code in fastapi_basekit/aio/sqlalchemy/repository/base.py
async def update(
    self,
    record_id: Union[str, UUID],
    update_data: Dict[str, Any],
) -> Any:
    """Actualiza un registro por ID con los campos provistos."""
    if not self.model:
        raise ValueError("El modelo no está definido en el repositorio")
    db = self.session
    record = await db.get(self.model, record_id)
    if not record:
        raise NotFoundException(
            message=f"{self.model.__name__} no encontrado"
        )
    for key, value in update_data.items():
        if value is not None and hasattr(record, key):
            setattr(record, key, value)
    await db.flush()
    await db.refresh(record)
    return record

delete(record_id, model=None) async

Elimina un registro por ID.

Source code in fastapi_basekit/aio/sqlalchemy/repository/base.py
async def delete(
    self,
    record_id: Union[str, UUID],
    model: Optional[Type[Any]] = None,
) -> bool:
    """Elimina un registro por ID."""
    model = model or self.model
    if not model:
        raise ValueError("El modelo no está definido en el repositorio")
    db = self.session
    record = await db.get(model, record_id)
    if not record:
        raise NotFoundException(message=f"{model.__name__} no encontrado")
    await db.delete(record)
    await db.flush()
    return True

Construcción

class ThingRepository(BaseRepository):
    model = Thing                    # ← obligatorio

API

# Por ID
thing = await repo.get(thing_id)
thing = await repo.get_with_joins(thing_id, joins=["category"])

# Por campo
user = await repo.get_by_email("foo@bar.com")  # método custom tuyo
user = await repo.get_by_field("email", "foo@bar.com")  # genérico

# Filtros (con sintaxis __ para relaciones)
items = await repo.get_by_filters({"status": "active"})
admins = await repo.get_by_filters({"user_roles__role__code": "admin"})

# Paginado
items, total = await repo.list_paginated(
    page=1, count=20,
    filters={"status": "active"},
    search="foo",
    search_fields=["name"],
    order_by="-created_at",
    joins=["category"],
)

# Mutaciones
created = await repo.create({"name": "Foo"})
updated = await repo.update(thing_id, {"name": "Bar"})   # dict positional
deleted = await repo.delete(thing_id)                    # hard delete

Override build_list_queryset

Para queries enriquecidos o soft-delete:

def build_list_queryset(self, **kwargs):
    return select(self.model).where(self.model.deleted_at.is_(None))

Variantes

  • fastapi_basekit.aio.sqlalchemy.repository.base.BaseRepository
  • fastapi_basekit.aio.sqlmodel.repository.base.BaseRepository
  • fastapi_basekit.aio.beanie.repository.base.BeanieBaseRepository

Patrón