异步编程新纪元:FastAPI与达梦数据库的异步完美融合
在当今高并发的互联网时代,异步编程已成为提升应用性能的关键技术。今天,我们将深入探讨如何将FastAPI与达梦数据库的异步驱动相结合,构建高性能的异步数据服务。这不仅是对传统同步模式的升级,更是对国产数据库技术栈的一次重要革新。
异步驱动的核心优势
相比同步版本,异步编程具有以下显著优势:
- 1. 高并发处理:能够处理数千个并发连接而不会阻塞
- 2. 资源高效:减少线程切换开销,提高CPU利用率
环境准备
1. 安装达梦异步数据库驱动
达梦数据库提供了专门的异步Python驱动程序dmAsync,需要从达梦官网下载并安装:
# 进入达梦数据库安装目录下的异步驱动文件夹
cd /opt/dmdbms/drivers/python/dmAsync
# 执行安装命令
python setup.py install
2. 安装SQLAlchemy及其他依赖
pip install fastapi uvicorn sqlalchemy pydantic
pip install sqlalchemy[asyncio] # 安装SQLAlchemy异步支持
核心代码实现
异步数据库连接配置
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy import Column, Integer, String, select, FetchedValue
from sqlalchemy.orm import declarative_base
import asyncio
# 创建异步数据库引擎
# 注意:使用dm+dmAsync协议,这是达梦异步驱动的特有格式
engine = create_async_engine(
"dm+dmAsync://SYSDBA:xxxxx@localhost:5236",
echo=True, # 开启SQL语句日志输出
pool_size=20, # 连接池大小
max_overflow=10, # 最大溢出连接数
pool_pre_ping=True # 连接池预检,确保连接有效
)
# 创建异步会话工厂
AsyncSessionLocal = sessionmaker(
autocommit=False,
autoflush=False,
bind=engine,
class_=AsyncSession, # 指定使用异步Session类
expire_on_commit=False # 提交后不使对象过期
)
# 异步依赖注入:获取数据库会话
async def get_db():
async with AsyncSessionLocal() as session:
try:
yield session
finally:
await session.close()
Base = declarative_base()
异步数据模型定义
class Item(Base):
__tablename__ = "items"
# 使用server_default处理自增列
id = Column(Integer, primary_key=True, autoincrement=True, server_default=FetchedValue())
# 明确设置字段长度和约束
name = Column(String(255), nullable=False, comment="商品名称")
description = Column(String(500), comment="商品描述")
price = Column(Integer, comment="商品价格")
# 添加时间戳字段(可选)
created_at = Column(String(20), server_default=FetchedValue(), comment="创建时间")
updated_at = Column(String(20), server_default=FetchedValue(), comment="更新时间")
Pydantic模型定义(支持异步验证)
from pydantic import BaseModel, ConfigDict, Field
from typing import List, Optional
from datetime import datetime
class ItemBase(BaseModel):
name: str = Field(..., min_length=1, max_length=255, description="商品名称")
description: Optional[str] = Field(None, max_length=500, description="商品描述")
price: int = Field(..., ge=0, description="商品价格")
class ItemCreate(ItemBase):
pass
class ItemUpdate(ItemBase):
name: Optional[str] = Field(None, min_length=1, max_length=255)
price: Optional[int] = Field(None, ge=0)
class ItemResponse(ItemBase):
id: int
created_at: Optional[str] = None
updated_at: Optional[str] = None
model_config = ConfigDict(from_attributes=True)
FastAPI异步应用实现
from fastapi import FastAPI, Depends, HTTPException, Query, status
from contextlib import asynccontextmanager
import logging
# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# 使用生命周期管理器的现代方式
@asynccontextmanager
async def lifespan(app: FastAPI):
# 启动时:创建数据库表
logger.info("Starting up... Creating database tables")
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
yield
# 关闭时:清理资源
logger.info("Shutting down... Closing database connections")
await engine.dispose()
app = FastAPI(
title="达梦异步API服务",
description="基于FastAPI和达梦异步驱动的RESTful API",
version="1.0.0",
lifespan=lifespan # 使用异步生命周期管理器
)
异步API端点实现
@app.post("/items/create_item",
response_model=ItemResponse,
status_code=status.HTTP_201_CREATED,
summary="创建商品",
description="创建一个新的商品记录")
async def create_item(item: ItemCreate, db: AsyncSession = Depends(get_db)):
"""
创建新的商品
- **name**: 商品名称(必填)
- **description**: 商品描述(可选)
- **price**: 商品价格(必须大于等于0)
"""
try:
# 构建新商品对象
db_item = Item(
name=item.name,
description=item.description,
price=item.price
)
# 添加到会话
db.add(db_item)
# 提交事务
await db.commit()
# 刷新获取自增ID
await db.refresh(db_item)
logger.info(f"商品创建成功: ID={db_item.id}, Name={db_item.name}")
return db_item
except Exception as e:
await db.rollback()
logger.error(f"商品创建失败: {str(e)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"创建商品失败: {str(e)}"
)
@app.get("/items/read_items",
response_model=List[ItemResponse],
summary="获取商品列表",
description="分页获取所有商品列表")
async def read_items(
skip: int = Query(0, ge=0, description="跳过记录数"),
limit: int = Query(100, ge=1, le=1000, description="每页记录数"),
db: AsyncSession = Depends(get_db)
):
"""
获取商品列表,支持分页
- **skip**: 跳过的记录数(默认0)
- **limit**: 每页记录数(默认100,最大1000)
"""
try:
# 执行异步查询
result = await db.execute(
select(Item)
.order_by(Item.id)
.offset(skip)
.limit(limit)
)
items = result.scalars().all()
# 获取总数(可选)
count_result = await db.execute(select(func.count()).select_from(Item))
total = count_result.scalar()
logger.info(f"获取商品列表成功: 总数={total}, 本次返回={len(items)}")
return items
except Exception as e:
logger.error(f"获取商品列表失败: {str(e)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"读取商品列表失败: {str(e)}"
)
@app.get("/items/read_item/{item_id}",
response_model=ItemResponse,
summary="获取单个商品",
description="根据ID获取单个商品的详细信息")
async def read_item(
item_id: int = Query(..., ge=1, description="商品ID"),
db: AsyncSession = Depends(get_db)
):
"""
根据ID获取单个商品详情
"""
try:
# 使用异步get方法
item = await db.get(Item, item_id)
if item is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="商品不存在"
)
logger.info(f"获取商品成功: ID={item_id}")
return item
except HTTPException:
raise
except Exception as e:
logger.error(f"获取商品失败: ID={item_id}, 错误={str(e)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"读取商品失败: {str(e)}"
)
@app.put("/items/update_item/{item_id}",
response_model=ItemResponse,
summary="更新商品",
description="根据ID更新商品信息")
async def update_item(
item_id: int,
item: ItemUpdate,
db: AsyncSession = Depends(get_db)
):
"""
更新商品信息
支持部分更新,只更新提供的字段
"""
try:
# 获取现有商品
db_item = await db.get(Item, item_id)
if db_item is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="商品不存在"
)
# 只更新提供的字段
update_data = item.dict(exclude_unset=True)
for field, value in update_data.items():
if value is not None:
setattr(db_item, field, value)
# 标记为已修改
db.add(db_item)
await db.commit()
await db.refresh(db_item)
logger.info(f"商品更新成功: ID={item_id}")
return db_item
except HTTPException:
raise
except Exception as e:
await db.rollback()
logger.error(f"商品更新失败: ID={item_id}, 错误={str(e)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"更新商品失败: {str(e)}"
)
@app.delete("/items/delete_item/{item_id}",
summary="删除商品",
description="根据ID删除商品")
async def delete_item(
item_id: int,
db: AsyncSession = Depends(get_db)
):
"""
删除指定ID的商品
"""
try:
db_item = await db.get(Item, item_id)
if db_item is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="商品不存在"
)
await db.delete(db_item)
await db.commit()
logger.info(f"商品删除成功: ID={item_id}")
return {
"message": "商品删除成功",
"item_id": item_id,
"timestamp": datetime.now().isoformat()
}
except HTTPException:
raise
except Exception as e:
await db.rollback()
logger.error(f"商品删除失败: ID={item_id}, 错误={str(e)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"删除商品失败: {str(e)}"
)
高级功能:批量操作与事务管理
from sqlalchemy import insert, update, delete
from fastapi import BackgroundTasks
@app.post("/items/batch_create",
summary="批量创建商品",
description="一次性创建多个商品")
async def batch_create_items(
items: List[ItemCreate],
background_tasks: BackgroundTasks,
db: AsyncSession = Depends(get_db)
):
"""
批量创建商品,提高效率
"""
try:
db_items = [
Item(**item.dict())
for item in items
]
db.add_all(db_items)
await db.commit()
# 为每个新商品刷新
for db_item in db_items:
await db.refresh(db_item)
# 异步后处理(示例)
background_tasks.add_task(process_items_async, db_items)
logger.info(f"批量创建成功: 数量={len(db_items)}")
return db_items
except Exception as e:
await db.rollback()
logger.error(f"批量创建失败: {str(e)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"批量创建失败: {str(e)}"
)
async def process_items_async(items: List[Item]):
"""异步后处理任务"""
# 这里可以添加索引更新、缓存刷新等异步操作
logger.info(f"开始异步处理 {len(items)} 个商品")
await asyncio.sleep(1) # 模拟异步操作
logger.info(f"异步处理完成")
# 使用事务装饰器的高级用法
from sqlalchemy.ext.asyncio import async_transaction
@app.post("/items/transaction_example")
async def transaction_example(db: AsyncSession = Depends(get_db)):
"""
复杂事务操作示例
"""
try:
# 开始事务
async with async_transaction(db):
# 操作1
item1 = Item(name="商品1", price=100)
db.add(item1)
# 操作2
item2 = Item(name="商品2", price=200)
db.add(item2)
# 如果所有操作成功,事务自动提交
# 如果出现异常,事务自动回滚
return {"message": "事务执行成功"}
except Exception as e:
logger.error(f"事务执行失败: {str(e)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"事务执行失败: {str(e)}"
)
运行异步应用
# 启动异步FastAPI应用
# --workers 参数可以指定工作进程数
uvicorn main:app --reload --host 0.0.0.0 --port 8000 --workers 4
# 生产环境推荐使用
uvicorn main:app --host 0.0.0.0 --port 8000 --workers 4 --loop asyncio
性能测试与对比
同步 vs 异步性能对比
# 性能测试脚本示例
import asyncio
import aiohttp
import time
import httpx
async def async_test():
"""异步请求测试"""
async with httpx.AsyncClient() as client:
tasks = []
for i in range(100):
task = client.post(
"http://localhost:8000/items/create_item",
json={"name": f"测试商品{i}", "price": i * 10}
)
tasks.append(task)
start = time.time()
responses = await asyncio.gather(*tasks)
end = time.time()
print(f"异步100次请求耗时: {end - start:.2f}秒")
def sync_test():
"""同步请求测试"""
import requests
start = time.time()
for i in range(100):
requests.post(
"http://localhost:8000/items/create_item",
json={"name": f"测试商品{i}", "price": i * 10}
)
end = time.time()
print(f"同步100次请求耗时: {end - start:.2f}秒")
最佳实践建议
1. 连接池优化
engine = create_async_engine(
DATABASE_URL,
pool_size=20,
max_overflow=10,
pool_timeout=30,
pool_recycle=3600,
pool_pre_ping=True
)
2. 错误重试机制
from tenacity import retry, stop_after_attempt, wait_exponential
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=4, max=10)
)
async def safe_db_operation():
# 数据库操作
pass
3. 监控与日志
# 添加SQL查询日志
import logging
logging.basicConfig()
logging.getLogger('sqlalchemy.engine').setLevel(logging.INFO)
4. 健康检查端点
@app.get("/health")
async def health_check(db: AsyncSession = Depends(get_db)):
"""数据库健康检查"""
try:
# 执行简单查询检查连接
result = await db.execute(select(1))
return {
"status": "healthy",
"database": "connected",
"timestamp": datetime.now().isoformat()
}
except Exception as e:
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail=f"数据库连接失败: {str(e)}"
)
常见问题与解决方案
问题1:连接超时
解决方案:调整连接池参数和超时设置
问题2:自增ID获取
解决方案:使用server_default=FetchedValue()配合refresh()
问题3:事务管理
解决方案:使用async_transaction上下文管理器
问题4:性能瓶颈
解决方案:
总结
通过本文的异步实现,我们展示了FastAPI与达梦数据库异步驱动的完美结合。这种架构不仅提升了系统的并发处理能力,还为构建高性能、高可用的现代应用提供了坚实的基础。
核心价值:
异步编程是现代Web开发的必然趋势,而FastAPI与达梦数据库的结合为国产技术栈的发展开辟了新的道路。无论你是构建微服务、数据中台还是企业级应用,这套技术组合都能提供强大的支持。
赶快尝试这个异步方案,体验前所未有的性能和开发效率吧!