大家好,我是木木。
今天给大家分享一个优雅的 Python 库,python-cqrs。
python-cqrs 是一个事件驱动架构框架,帮你把命令和查询分离,轻松构建可扩展的分布式系统。
项目地址:https://github.com/pypatterns/python-cqrs官方文档:https://mkdocs.python-cqrs.dev/
读写分离
核心就是 CQRS 模式——Command 负责写,Query 负责读,各走各的路,互不干扰。适合读写比例悬殊的业务场景,比如订单系统、库存管理。
事件驱动
支持领域事件自动派发,Handler 处理完命令后可以产出事件,框架负责把事件推送到对应 Handler,实现真正的解耦。
内置中间件
提供 Middleware 机制,可以在请求前后插入日志、鉴权、限流等逻辑,统一管理横切关注点,不侵入业务代码。
安装
pipinstallpython-cqrs
可选依赖:pip install python-cqrs[kafka](Kafka 支持)、pip install python-cqrs[examples](含 FastAPI 集成)
基础用法:发送命令
这段代码演示最简单的 CQRS 命令发送——定义一个创建订单命令,绑定 Handler,通过 Mediator 发送执行。
importasyncioimportdiimportcqrsfromcqrs.requestsimportbootstrapclassCreateOrderCommand(cqrs.Request):order_id:stramount:floatclassCreateOrderHandler(cqrs.RequestHandler[CreateOrderCommand,None]):asyncdefhandle(self,request:CreateOrderCommand)->None:print(f"[Order Created] id={request.order_id}, amount={request.amount}")defcommands_mapper(mapper:cqrs.RequestMap)->None:mapper.bind(CreateOrderCommand,CreateOrderHandler)container=di.Container()mediator=bootstrap.bootstrap(di_container=container,commands_mapper=commands_mapper)asyncdefmain():awaitmediator.send(CreateOrderCommand(order_id="ORD-001",amount=128.50))awaitmediator.send(CreateOrderCommand(order_id="ORD-002",amount=59.99))asyncio.run(main())
基础用法:Command + Query 读写分离
这段代码演示如何同时注册命令和查询 Handler,命令负责写数据,查询负责读数据,体现 CQRS 读写分离的核心思想。
importasyncioimportdiimportcqrsfromcqrs.requestsimportbootstrap_orders={}classCreateOrderCommand(cqrs.Request):order_id:stramount:floatclassCreateOrderHandler(cqrs.RequestHandler[CreateOrderCommand,None]):asyncdefhandle(self,request:CreateOrderCommand)->None:_orders[request.order_id]={"amount":request.amount}print(f"[Created] {request.order_id}: {request.amount}")classGetOrderQuery(cqrs.Request):order_id:strclassOrderInfo(cqrs.Response):order_id:stramount:floatclassGetOrderHandler(cqrs.RequestHandler[GetOrderQuery,OrderInfo]):asyncdefhandle(self,request:GetOrderQuery)->OrderInfo:data=_orders.get(request.order_id)ifdata:returnOrderInfo(order_id=request.order_id,amount=data["amount"])raiseKeyError(f"Order {request.order_id} not found")defcmd_mapper(m:cqrs.RequestMap)->None:m.bind(CreateOrderCommand,CreateOrderHandler)defqry_mapper(m:cqrs.RequestMap)->None:m.bind(GetOrderQuery,GetOrderHandler)container=di.Container()mediator=bootstrap.bootstrap(di_container=container,commands_mapper=cmd_mapper,queries_mapper=qry_mapper)asyncdefmain():awaitmediator.send(CreateOrderCommand(order_id="A001",amount=200.0))result=awaitmediator.send(GetOrderQuery(order_id="A001"))print(f"[Query Result] id={result.order_id}, amount={result.amount}")asyncio.run(main())

进阶:中间件机制
这段代码展示如何通过 Middleware 在命令执行前后添加日志,实现非侵入式的横切逻辑,是构建生产级应用的关键能力。
importasyncioimportdiimportcqrsfromcqrs.requestsimportbootstrapclassCreateOrderCommand(cqrs.Request):order_id:stramount:floatclassCreateOrderHandler(cqrs.RequestHandler[CreateOrderCommand,None]):asyncdefhandle(self,request:CreateOrderCommand)->None:print(f"[Command] Creating order {request.order_id}, amount={request.amount}")classLoggingMiddleware:asyncdef__call__(self,request,handle):print(f"[Middleware] >>> {type(request).__name__}")result=awaithandle(request)print(f"[Middleware] <<< {type(request).__name__} done")returnresultdefcmd_mapper(m:cqrs.RequestMap)->None:m.bind(CreateOrderCommand,CreateOrderHandler)container=di.Container()mediator=bootstrap.bootstrap(di_container=container,commands_mapper=cmd_mapper,middlewares=[LoggingMiddleware()],)asyncdefmain():awaitmediator.send(CreateOrderCommand(order_id="M-001",amount=88.0))awaitmediator.send(CreateOrderCommand(order_id="M-002",amount=256.0))asyncio.run(main())
环境与版本信息
适用
不适用
- 团队对 CQRS 模式不熟悉且无学习成本的短期项目
上线检查
- 确认 Handler 的依赖注入已正确配置,避免运行时解析失败
总结
python-cqrs 提供了一套完整的 CQRS + 事件驱动实现,从基础的命令查询分离到中间件、Saga 编排都有覆盖。如果你正在设计一个需要长期演进的业务系统,这个框架值得纳入技术选型清单。