from abc import ABC, abstractmethodfrom typing import Any, Generic, List, TypeVarimport sqlalchemyfrom sqlalchemy import asc, desc, func, selectfrom sqlalchemy.exc import IntegrityError, SQLAlchemyErrorfrom sqlalchemy.ext.asyncio import AsyncSession# 假设你的Base类在这里定义from .... import Basefrom domain.value_objects.ordering import Orderingfrom domain.entities.base import EntityBasefrom domain.exceptions.common import DatabaseExceptionEntity = TypeVar("Entity", bound=EntityBase)SqlAlchemyModel = TypeVar("SqlAlchemyModel", bound=Base)class SqlAlchemyAbstractRepository(ABC, Generic[Entity, SqlAlchemyModel]): # 子类必须指定具体的ORM模型类 model: type[SqlAlchemyModel] def __init__(self, session: AsyncSession) -> None: self._session = session asyncdef save(self, entity: Entity) -> Entity: """保存实体,返回包含数据库生成字段(如ID)的完整实体""" model = self._entity_to_model(entity) self._session.add(model) await self._session.flush() await self._session.refresh(model) return self._model_to_entity(model) asyncdef update( self, fields_to_update: dict[str, Any], **filters, ) -> int: """根据过滤条件更新字段,返回受影响的行数""" try: filter_conditions = self._get_filters(**filters) query = ( sqlalchemy.update(self.model) .where(*filter_conditions) .values(fields_to_update) ) result = await self._session.execute(query) await self._session.flush() return result.rowcount # type: ignore[attr-defined] except IntegrityError as exception: await self._session.rollback() raise exception except SQLAlchemyError as exception: await self._session.rollback() raise DatabaseException from exception asyncdef list_all( self, page: int = 1, limit: int = 10, order_by: str = "created_at", ordering: Ordering = Ordering.asc, **filters, ) -> List[Entity]: """分页列表查询,支持排序和过滤""" query = select(self.model) filter_conditions = self._get_filters(**filters) query = query.where(*filter_conditions) # 排序 query = query.order_by( self._get_order_expression(order_by=order_by, ordering=ordering) ) # 分页 offset = (page - 1) * limit query = query.offset(offset).limit(limit) result = await self._session.execute(query) models = result.scalars().all() return [self._model_to_entity(model) for model in models] asyncdef get( self, **filters, ) -> Entity | None: """根据过滤条件获取单个实体""" query = select(self.model) filter_conditions = self._get_filters(**filters) query = query.where(*filter_conditions) model = await self._session.scalar(query) return self._model_to_entity(model) if model elseNone asyncdef exists( self, **filters, ) -> bool: """检查是否存在满足条件的记录""" query = select(self.model) filter_conditions = self._get_filters(**filters) query = query.where(*filter_conditions) result = await self._session.scalar(query) return result isnotNone asyncdef delete( self, **filters, ) -> int: """根据过滤条件删除记录,返回删除的行数""" try: query = sqlalchemy.delete(self.model) filter_conditions = self._get_filters(**filters) query = query.where(*filter_conditions) result = await self._session.execute(query) await self._session.flush() return result.rowcount # type: ignore[attr-defined] except SQLAlchemyError as e: await self._session.rollback() raise DatabaseException from e asyncdef count( self, **filters, ) -> int: """统计满足条件的记录数""" filter_conditions = self._get_filters(**filters) return ( await self._session.scalar( select(func.count()).select_from(self.model).where(*filter_conditions) ) or0 ) @staticmethod @abstractmethod def _model_to_entity(model: SqlAlchemyModel) -> Entity: """将ORM模型转换为领域实体——子类必须实现""" raise NotImplementedError("Subclasses must implement _model_to_entity") @staticmethod @abstractmethod def _entity_to_model(entity: Entity) -> SqlAlchemyModel: """将领域实体转换为ORM模型——子类必须实现""" raise NotImplementedError("Subclasses must implement _entity_to_model") @abstractmethod def _get_filters(self, **filters) -> List[Any]: """将业务层过滤条件转换为SQLAlchemy查询条件——子类可重写""" return [] @staticmethod def _get_order_expression( order_by: str, ordering: Ordering ) -> sqlalchemy.UnaryExpression[str]: """生成排序表达式""" if ordering == Ordering.asc: return asc(order_by) return desc(order_by)