本文介绍通过dmPython包和dmSQLalchemy包连接达梦数据库,使用连接池的方式进行封装配置,其他模块直接调用访问数据库。配合FastAPI在项目启动时初始化数据库连接。pip install dmPythonpip install dmSQLalchemy
二、新建DatabaseService类并初始化数据库服务(文件名称:db_service.py)from sqlalchemy import create_engine, textfrom sqlalchemy.orm import sessionmakerfrom sqlalchemy.exc import SQLAlchemyErrorfrom sqlalchemy.pool import QueuePoolfrom typing import List, Dict, Any, Optional, Tupleclass DatabaseService: def __init__(self, connection_string: str, **engine_kwargs): """ 初始化数据库服务 Args: connection_string: 数据库连接字符串 **engine_kwargs: 传递给 create_engine 的额外参数 """ # 默认连接池配置 # 默认数据库连接池配置字典 default_pool_config = { # 指定连接池的类,使用队列实现的连接池 'poolclass': QueuePool, # 连接池中的初始连接数量,保持10个连接 'pool_size': 10, # 连接池满后允许创建的额外连接数,最大可超过pool_size 20个连接 'max_overflow': 20, # 每次从连接池获取连接时,先测试连接是否有效,确保连接可用 'pool_pre_ping': True, # 连接回收时间(秒),超过3600秒的连接将被回收重置 'pool_recycle': 3600, # 从连接池获取连接的超时时间(秒) 'pool_timeout': 30, # 是否开启SQLAlchemy的调试信息输出,False表示不输出 'echo': False, # 使用SQLAlchemy 2.0的API风格 'future': True } # 合并用户提供的配置 default_pool_config.update(engine_kwargs) self.engine = create_engine(connection_string, **default_pool_config) self.sessionmaker = sessionmaker(bind=self.engine)
connection_string:参数为连接数据库的URL链接,如:dm+dmPython://User:Password@Host:Port(示例:dm+dmPython://SYSDBA:123456@localhost:5236)。default_pool_config:该配置项里的值需根据实际需求进行配置。
二、封装查询函数(在db_service.py中添加execute_query函数)def execute_query(self, sql: str, params: Optional[Dict[str, Any]] = None) -> List: """ 执行查询语句 Args: sql: SQL查询语句 params: 参数字典 Returns: 查询结果列表 """ with self.engine.connect() as connection: try: result = connection.execute(text(sql), params or {}) rows = result.fetchall() return rows except SQLAlchemyError as e: raise
说明:这里执行的是原生的SQL语句,有兴趣的朋友可以通过实体类的方式实现数据查询,建议两种方式都实现,简单查询用实体类的方式,复杂查询用原生SQL的方式。三、封装增、改、删函数(在db_service.py中添加execute_command函数)def execute_command(self, sql: str, params: Optional[Dict[str, Any]] = None) -> int: """ 执行插入、更新、删除命令 Args: sql: SQL命令语句 params: 参数字典 Returns: 影响的行数 """ session = self.sessionmaker() try: result = session.execute(text(sql), params or {}) session.commit() affected_rows = result.rowcount return affected_rows except SQLAlchemyError as e: session.rollback() raise finally: session.close()
说明:该函数包含事务处理,如实现批量的数据插入、更新和删除。#创建数据库服务实例db_service = DatabaseService("dm+dmPython://SYSDBA:123456@localhost:5236")#数据查询results = db_service.execute_query("SQL语句")#数据插入、更新、删除results = db_service.execute_command("SQL语句")
五、将封装的DatabaseService融合到FastAPI项目中:# db_servvice.py 末尾# 创建全局实例default_db_service = DatabaseService("dm+dmPython://SYSDBA:123456@localhost:5236")# 如果需要,也可以提供获取实例的函数def get_default_db_service(): return default_db_service
#在需要使用的地方调用,示例如下from app.db.db_service import get_default_db_servicedb_service = get_default_db_service()rows = db_service.execute_query("SELECT * FROM DMHR.CITY")for row in rows: print(row)
说明:app.db.db_service根据实际调整。结语:在实际项目中,还应做好日志记录、数据库连接配置参数应该独立文件中配置、数据查询实现分页等功能,笔者也将根据实际项目的实现思维后续持续更新,如果大家有什么建议或疑问,请积极留言讨论。