数据是驱动业务决策、产品创新和技术迭代的核心资产。随着物联网、人工智能、实时交互应用的爆发式增长,数据呈现出:海量、高速、异构的鲜明特征,传统批处理模式难以满足实时分析、即时响应的业务需求。从电商平台的实时推荐、金融系统的风险监控,到智能运维的异常告警、LLM 应用的实时知识库更新,各行各业对低延迟、高可靠、易扩展的数据流处理框架需求日益迫切。然而,现有解决方案往往面临易用性与性能不可兼得的困境 ——Python 生态工具灵活便捷但处理速度有限,而高性能的流式计算框架(如 Flink、Spark Streaming)则依赖复杂的 JVM 环境,开发门槛高且部署成本高。
而 Pathway 作为一款开源的 Python 数据处理框架,它将 Python 的易用性与 Rust 的高性能完美融合,为实时数据处理提供了全新的解决方案。这款基于 Differential Dataflow 技术的框架,不仅支持批处理与流处理一体化,还内置了丰富的 LLM 工具链和 350 + 数据连接器,让开发者无需在开发效率和运行性能之间做取舍。无论是数据工程师搭建实时 ETL 管道,还是算法工程师开发 RAG 应用,Pathway 都能以极简的代码实现复杂的数据处理逻辑,极大降低了实时数据应用的开发门槛,推动流式计算技术在更广泛的场景中落地。
◆ 简介
Pathway 是由 Pathway 团队开源的一款面向实时分析和 AI 管道的 Python 数据处理框架,项目托管于 GitHub(
https://github.com/pathwaycom/pathway),采用 BSL 1.1 开源许可证。作为一款全场景适配的数据处理框架,Pathway 的核心定位是打通批处理与流处理的界限,让开发者用同一套代码应对静态数据集和实时数据流。其底层由高性能 Rust 引擎驱动,摆脱了 Python 的 GIL 限制,支持多线程、多进程和分布式计算,同时通过 Differential Dataflow 技术实现增量计算,能够高效处理乱序数据和迟到数据,确保计算结果的一致性。与传统流式计算框架相比,Pathway 的显著优势在于:零学习成本的 Python 体验—— 开发者无需掌握复杂的分布式编程知识,仅需使用熟悉的 Python 语法,就能调用框架提供的丰富 API,轻松集成 Pandas、Scikit-learn、LangChain 等主流 Python 库。
Pathway 项目的核心特性可概括为六大亮点:
- 一是丰富的连接器生态,内置 350 + 数据连接器,涵盖 Kafka、GDrive、PostgreSQL、SharePoint 等常见数据源,还支持通过 Python 自定义连接器;
- 二是无状态与有状态转换兼具,不仅提供 Join、窗口、排序等 Rust 实现的高性能转换操作,还支持调用任意 Python 函数或库进行数据处理;
- 三是可靠的持久化与回溯能力,可保存计算状态,支持故障后快速恢复和管道更新后的回溯处理;
- 四是严格的一致性保障,社区版提供 “至少一次” 一致性,企业版支持 “恰好一次” 一致性,确保批处理与流处理结果一致;
- 五是专为 LLM 优化的工具链,简化 RAG 应用开发流程,支持实时知识库更新;
- 六是便捷的部署方式,原生支持 Docker 和 Kubernetes 部署,兼容 OpenTelemetry,还提供监控仪表板实时查看系统运行状态。
Pathway 的应用场景覆盖实时数据处理和 AI 管道两大核心领域:
- 在实时数据处理方面,它可用于构建实时 ETL 管道,将 Kafka、PostgreSQL、SharePoint 等多种数据源的数据实时同步、清洗、转换后加载至目标系统;支持事件驱动型应用开发,通过窗口函数、聚合计算等功能实现实时告警、行为分析;还能实现实时数据分析,为业务决策提供秒级响应的数据支撑。
- 在 AI 管道方面,Pathway 内置了完整的 LLM 工具链,包括 LLM 封装器、解析器、嵌入模型、文本分割器等组件,支持构建私有 RAG、自适应 RAG、多模态 RAG 等复杂应用,还提供内存实时向量索引,可与 LlamaIndex、LangChain 等生态工具无缝集成,让 AI 应用能够基于最新数据提供服务。
◆ 使用
Pathway 的安装部署简便,目前支持 macOS 和 Linux 系统,Windows 用户可通过虚拟机运行。其核心依赖 Python 3.10 及以上版本,推荐使用虚拟环境隔离项目依赖,避免版本冲突。直接通过 pip 命令安装 Pathway 最新版本:
pip install -U pathway
部分高级功能(如监控仪表板、SharePoint 连接器)需要免费许可证密钥,可通过 Pathway 官方注册页面获取,注册后将密钥配置到环境变量中即可使用:
export PATHWAY_LICENSE_KEY="你的许可证密钥"
◆【入门例子:实时计算正数值总和】
下面通过一个简单的示例,演示如何使用 Pathway 处理实时数据流:计算输入文件中所有正整数的总和,并将结果实时写入输出文件。首先在本地创建项目目录,并新建输入文件夹和主程序文件:
# 创建项目目录
mkdirpathway-demo && cd pathway-demo
# 创建输入文件夹(用于存放待处理的CSV文件)
mkdirinput
# 创建主程序文件
touchmain.py
打开 main.py 文件,写入以下代码:
import pathway as pw
# 定义输入数据的Schema(可选,用于数据类型约束和字段映射)
classInputSchema(pw.Schema):
value: int # 定义输入数据仅包含一个int类型字段value
# 连接输入数据源:读取input文件夹中的CSV文件,支持实时监测文件新增/更新
# 若CSV文件包含表头,可添加header=True参数;无表头则无需设置
input_table = pw.io.csv.read(
path="./input/", # 数据文件所在目录
schema=InputSchema, # 绑定数据Schema
mode="streaming"# 流式模式,实时处理新增数据
)
# 数据处理:筛选出value大于等于0的正数值
filtered_table = input_table.filter(input_table.value >= 0)
# 数据聚合:计算筛选后所有value的总和
result_table = filtered_table.reduce(
sum_value=pw.reducers.sum(filtered_table.value) # 定义聚合结果字段sum_value
)
# 输出结果:将计算结果实时写入JSON Lines格式文件
pw.io.jsonlines.write(
result_table,
path="output.jsonl"# 输出文件路径
)
# 启动计算引擎
pw.run()
代码中定义了输入数据的Schema,然后连接数据源,并定义数据筛选和数据聚合规则,最终输出结果。在终端中执行主程序:
pythonmain.py
程序启动后,会自动监测./input 目录下的 CSV 文件。此时,新建一个 CSV 文件(如 data1.csv),写入多行数字,文件保存后,Pathway 会实时检测到新增数据并进行处理。此时查看项目目录,会生成 output.jsonl 文件,打开后可看到计算结果。
若继续向./input 目录添加新的 CSV 文件(如 data2.csv),Pathway 会自动增量计算,output.jsonl 文件会实时更新,整个过程无需重启程序,充分体现了 Pathway 的实时处理能力。
◆【LLM应用:构建私有 RAG 应用】
Pathway 内置了完善的 LLM 工具链,可快速搭建私有 RAG(检索增强生成)应用,以下是基于 Ollama 本地大模型和 Mistral AI 的实现示例。创建 rag-demo.py 文件,核心代码如下:
文档分割:将长文档分割为适合嵌入的短片段
document_chunks = documents.select(
content=pw.reducers.split_text(
documents.content,
chunk_size=500, # 每个片段500字符
chunk_overlap=50# 片段重叠50字符
),
source=documents.source
)
生成嵌入向量:使用Ollama提供的Mistral嵌入模型
embedder = embedders.OllamaEmbedder(
model="mistral",
base_url="http://localhost:11434"# Ollama服务地址
)
...
定义查询处理逻辑:检索相关文档并生成回答
def rag_pipeline(query: str) -> str:
# 生成查询嵌入向量
query_embedding = embedder.embed(query)
# 检索Top3相关文档片段
relevant_chunks = vector_index.search(
query_embedding,
k=3,
distance_measure="cosine"# 余弦相似度计算
)
# 构建提示词
context = "\n".join([chunk.content for chunk in relevant_chunks])
prompt = f"""
基于以下上下文回答用户问题:
{context}
用户问题:{query}
要求:回答必须基于上下文,简洁准确,不添加额外信息。
"""
# 调用Mistral模型生成回答
llm = llms.OllamaLLM(
model="mistral",
base_url="http://localhost:11434"
)
return llm.generate(prompt)
完整的代码构建了一个:文档分割、嵌入向量生成、构建向量索引、检索相关文档并生成回答 的实时数据流过程。完成后,创建文档目录并添加示例文档,然后运行 RAG 应用,即可通过 HTTP 请求测试 RAG 功能:
curl"http://localhost:8080/query?q=Pathway支持哪些数据处理模式?"
此时,应用会从文档中检索相关信息,并返回基于 Mistral AI 的回答,实现私有知识库的实时问答。
◆ 总结
Pathway 作为一款开源数据处理框架,结合了 Python 的易用性 和 Rust 的高性能,成功解决了传统流式计算框架开发门槛高、批流处理割裂、部署复杂等痛点。其核心价值在于构建了一个统一的批流处理引擎,让开发者无需切换技术栈,即可用同一套代码应对静态数据和实时数据流,极大提升了开发效率;底层 Rust 引擎提供的多线程、分布式计算能力,以及 Differential Dataflow 技术支持的增量计算和乱序数据处理,确保了框架在高吞吐场景下的低延迟表现;而丰富的连接器生态、完整的 LLM 工具链和灵活的部署方式,则进一步拓展了其应用边界,使其能够适配从个人开发到企业级部署的全场景需求。
Pathway 为不同领域的开发者提供了强大的工具支撑:数据工程师可以快速搭建实时 ETL 管道,实现多源数据的实时同步与转换;算法工程师能够便捷地构建基于实时数据的 AI 应用,尤其是 RAG 系统,让大模型始终基于最新知识库提供服务;业务开发者无需深入掌握分布式计算原理,即可通过 Python API 实现复杂的实时数据处理逻辑,加速产品迭代。
来源:
https://www.toutiao.com/article/7592269131823530515/?log_from=5e68aff06e073_1767920376102
“开源大咖说”欢迎广大技术人员投稿,投稿邮箱:aliang@itdks.com
开源大咖说 | 关于版权
由“开源大咖说(ID:kaiyuandakashuo)”原创的文章,转载时请注明作者、出处及微信公众号。投稿、约稿、转载请加微信:ITDKS10(备注:投稿),茉莉小姐姐会及时与您联系!
感谢您对开源大咖说的热心支持!