BumbleBee DB 是一个基于 Datalog 的高性能分析引擎,它同时也支持 SQL,特别适合处理递归查询和图分析这类在传统 SQL 中很麻烦的任务。它的核心引擎用 C++ 编写,支持列式存储和多线程,能够高效处理 CSV、Parquet 和 pandas DataFrame 数据。
快速上手
先安装,BumbleBee DB 为 Linux x86_64 和 macOS ARM 提供了预编译的 wheel 包:
安装后,来看第一个 SQL 查询示例。这段代码从一个 CSV 文件中读取员工数据,按部门分组计算人数和薪资总和:
import bumblebeedb as bbdb = bb.db()# 用 SQL 查询 CSV 文件,alias 参数给结果命名db.sql(""" SELECT DEPARTMENT_ID, COUNT(*) AS CNT, SUM(SALARY) AS TOTAL FROM "examples/data/employees.csv" GROUP BY DEPARTMENT_ID""", alias="dept_stats")# 获取结果并转为 pandas DataFramedf = db.get_table("dept_stats", 3).to_df( col_names=["dept_id", "count", "total_salary"])print(df)
递归 Datalog 查询
这是 BumbleBee DB 最突出的能力。假设有一个员工上下级关系表,想找出所有人的所有层级下属,用 SQL 写递归查询会很复杂,但用 Datalog 就清晰多了:
import bumblebeedb as bbimport pandas as pd# 准备一份组织架构数据hierarchy = pd.DataFrame({ "manager": ["alice", "alice", "bob", "bob", "carol", "dave"], "report": ["bob", "carol", "dave", "eve", "frank", "grace"],})db = bb.db()# 把 DataFrame 加载为 predicate,predicate 名称必须小写开头db.load_df(hierarchy, "manages")# 递归 Datalog 规则# 第一条:直接下属# 第二条:间接下属(传递闭包)db.run(""" reports_to(M, R) :- manages(M, R). reports_to(M, R) :- manages(M, X), reports_to(X, R). reports_to(X, Y)?""")# 查看结果df = db.get_table("reports_to", 2).to_df(col_names=["manager", "report"])print("完整汇报关系:")print(df.sort_values(["manager", "report"]).to_string(index=False))
输出会显示类似这样的结果:
manager reportalice bobalice carolalice davealice evealice frankalice gracebob davebob evecarol frankdave grace
还可以把 Datalog 查询结果作为 SQL 的数据源继续分析:
# 在 Datalog 结果上跑 SQL,统计每个管理者的下属数量db.sql(""" SELECT COL_0, COUNT(*) AS CNT FROM reports_to GROUP BY COL_0""", alias="report_count")df2 = db.get_table("report_count", 2).to_df(col_names=["manager", "num_reports"])print("\n下属数量统计:")print(df2.sort_values("num_reports", ascending=False).to_string(index=False))
输出:
manager num_reportsalice 6bob 2carol 1dave 1
从多种数据源加载
BumbleBee DB 可以从 CSV 和 Parquet 文件加载数据,也可以直接加载 pandas DataFrame,这意味着你可以先连接任何数据源(数据库、API、Excel 等),再交给 BumbleBee 处理:
# 从 CSV 文件加载db.run(''' employee(id, name, dept_id) :- load_csv("employees.csv").''')# 从 Parquet 文件加载db.run(''' sales(id, amount, region) :- load_parquet("sales.parquet").''')# 从 pandas DataFrame 加载import pandas as pddf = pd.read_sql("SELECT * FROM orders", your_db_connection)db.load_df(df, "orders")
Python API 常用操作
获取所有输出 predicate 列表:
# 列出当前所有的 predicate 及其元数predicates = db.get_output_predicates()for name, arity in predicates: print(f"{name}/{arity}")
从结果表中获取数据的不同方式:
# 方式1:获取为 tuple 列表table = db.get_table("reports_to", 2)tuples = table.tuples()print(tuples) # 类似于 [('alice', 'bob'), ('alice', 'carol'), ...]# 方式2:转为 DataFrame 并指定列名df = table.to_df(col_names=["manager", "report"])
删除不再需要的 predicate:
db.remove_table("reports_to", 2)
命令行工具
除了 Python API,BumbleBee 还提供了命令行工具,可以直接执行 SQL 或 Datalog 文件:
# 运行 SQL 文件BumbleBee -i examples/sql/01_import/basic_csv_import.sql# 运行 Datalog 文件,-a 参数打印所有 predicateBumbleBee -i examples/dl/01_import/basic_csv_import.dl -a# 指定线程数BumbleBee -i query.sql -t 4# 打印性能分析数据BumbleBee -i query.sql -r
性能特点
BumbleBee 采用列式存储和基于推送的执行模型,查询时只会读取需要的列,而不是整行数据。当前版本包含基于规则的优化器,支持谓词下推和列裁剪,但连接顺序遵循查询中的书写顺序——成本优化的连接重排功能在路线图中。预编译的 wheel 包针对 Linux 和 macOS ARM 优化,可以直接获得这些性能收益。