pgstream,一个轻量级的 Python CDC 库,无需轮询,几行代码监听数据库变更,事件驱动你的业务逻辑!
什么是 pgstream?
pgstream 是一个基于 Python 的逻辑复制(CDC)库,专门用于监听 PostgreSQL 数据库中表的 INSERT、UPDATE、DELETE 操作,并实时触发你自定义的回调函数。
它底层依赖 PostgreSQL 原生的逻辑复制功能,通过解析 WAL 日志获取变更事件,对源数据库性能影响极小,非常适合构建事件驱动架构、实时数据管道、Webhook 触发器等场景。
快速开始
环境要求
Python 3.13 及以上(因为用到了最新的 asyncio 特性)PostgreSQL 10+(需开启逻辑复制)
安装
pip install pgstream asyncpg
修改 PostgreSQL 配置
编辑 postgresql.conf(常见路径:/var/lib/postgresql/data/postgresql.conf):
wal_level = logicalmax_replication_slots = 10max_wal_senders = 10
修改后重启 PostgreSQL。
准备表与复制权限
确保要监听的表已存在。若需清理旧的复制槽或 publication:
DROP PUBLICATION pgstream CASCADE;
示例代码
下面是一个完整的监听示例,当 users 或 orders 表发生变更时,自动打印事件内容。
pythonimport asynciofrom pgstream import PGStreamasync def main(): dsn = "postgresql://postgres:123456@192.168.1.1/multi_source?sslmode=disable" stream = PGStream(dsn=dsn) # 注册要监听的表 stream.watch("users") stream.watch("orders") # 初始化(自动创建复制槽和publication) await stream.setup() # 定义回调函数 @stream.on_change async def handle(event, sink): print(f"表: {event.schema}.{event.table}") print(f"操作: {event.operation}") # INSERT / UPDATE / DELETE print(f"数据: {event.row}") # 启动监听(会阻塞运行) await stream.start()asyncio.run(main())
当你在 PostgreSQL 中执行:
INSERT INTO users (name) VALUES ('张三');
控制台会立即输出:
表: public.users操作: INSERT数据: {'id': 1, 'name': '张三'}
应用场景
Webhook 事件驱动
将数据库变更转化为 HTTP 请求,实时通知外部系统:
@stream.on_changeasync def send_webhook(event, sink): await httpx.post("https://your-api.com/webhook", json=event.row)
无需轮询,数据库一变化,下游立即感知。
Kafka 数据管道解耦
把变更事件写入 Kafka,多个消费者按各自节奏处理,避免 replication slot 堆积:
@stream.on_changeasync def to_kafka(event, sink): await kafka_producer.send("db_topic", value=event.row)
上游 DDL / DML 变更可以同时分发到数仓、缓存、搜索引擎等。
实时缓存同步
当数据库更新时,自动更新 Redis 或 Memcached,保持缓存最终一致性。
注意事项
复制槽会持久化 WAL:如果消费者长时间宕机,WAL 会堆积,请监控 pg_replication_slots。DDL 变更:pgstream 主要关注行数据变更,对 ALTER TABLE 等 DDL 不会产生事件。性能:建议监听必要的表,不要监听全库。
总结
pgstream 极大降低了 Python 项目中接入 PostgreSQL CDC 的门槛。几行代码就能构建实时响应数据库变更的应用,非常适合微服务、事件溯源、实时数仓等现代架构。