一、Ibis 是什么?
Ibis 是一个纯 Python 编写的便携式数据框(dataframe)库。它的核心理念简单而强大:让你用一套 Python API,操作 20 多种不同的数据后端——从本地的 DuckDB、Polars,到云端的 Snowflake、BigQuery,再到分布式的 Spark,全部通吃。
传统的 Pandas 分析流程存在两个痛点:数据必须能装入内存,而且分析逻辑与执行引擎紧密耦合。Ibis 优雅地解决了这两个问题:它采用延迟执行策略——在你调用 execute() 之前,所有操作都只是构建一个符号化的查询表达式,真正的计算完全在数据库或分析引擎中完成。
二、Ibis 的工作流程
下图展示了 Ibis 的核心工作流程:从 Python 代码到最终执行结果的完整路径。
Ibis 本质上是一个查询编译器,它自己不做计算,而是将用户的意图翻译成各后端最擅长的语言。当你切换后端时,核心业务逻辑一行都不用改。
三、安装与第一个程序
# 安装核心库(默认包含 DuckDB 后端)pip install 'ibis-framework[duckdb]'# 如果需要连接特定数据库,按需安装pip install 'ibis-framework[postgres]'pip install 'ibis-framework[snowflake]'
开启交互模式后,查询会自动预览结果,非常适合探索性分析:
import ibis# 开启交互模式(推荐)ibis.options.interactive = True# 加载内置示例数据集(企鹅数据)penguins = ibis.examples.penguins.fetch()penguins
输出会展示一个美观的表格预览:
┏━━━━━━━━━┳━━━━━━━━━━━┳━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━┳━━━━━━━━┳━━━━━━━┓┃ species ┃ island ┃ bill_length_mm ┃ bill_depth_mm ┃ flipper_length_mm ┃ body_mass_g ┃ sex ┃ year ┃┡━━━━━━━━━╇━━━━━━━━━━━╇━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━╇━━━━━━━━╇━━━━━━━┩│ Adelie │ Torgersen │ 39.1 │ 18.7 │ 181 │ 3750 │ male │ 2007 ││ Adelie │ Torgersen │ 39.5 │ 17.4 │ 186 │ 3800 │ female │ 2007 ││ Adelie │ Torgersen │ 40.3 │ 18.0 │ 195 │ 3250 │ female │ 2007 ││ ... │ ... │ ... │ ... │ ... │ ... │ ... │ ... │└─────────┴───────────┴────────────────┴───────────────┴───────────────────┴─────────────┴────────┴───────┘
这就是 Ibis 的魔法:你用 Python 写代码,但数据从来没进入 Python 内存,一切都在数据库内部完成。
四、核心操作详解
4.1 连接数据源
Ibis 统一的连接方式极大降低了跨数据源操作的复杂度:
import ibis# 连接 DuckDB(默认后端,内存模式)con = ibis.connect("duckdb://")# 连接 DuckDB 持久化文件con = ibis.connect("duckdb://my_database.ddb")# 连接 PostgreSQLcon = ibis.postgres.connect( host="localhost", port=5432, database="analytics", user="username", password="password")# 连接 SQLitecon = ibis.sqlite.connect("path/to/database.db")# 连接 Snowflakecon = ibis.snowflake.connect( account="your_account", user="your_user", password="your_password", database="analytics", schema="public")
4.2 读取数据与创建表格
# 从 CSV 读取数据t = con.read_csv("sales_data.csv")# 从 Parquet 读取t = con.read_parquet("data.parquet")# 从 Pandas DataFrame 导入import pandas as pddf_pd = pd.read_csv("local_data.csv")t = con.create_table("my_table", df_pd, overwrite=True)# 列出所有表格con.list_tables()
4.3 基本数据操作(过滤、选择、排序)
# 获取表格引用sales = con.table("sales")# 过滤数据high_value = sales.filter(sales.amount > 1000)# 选择特定列selected = sales.select(["order_id", "customer_name", "amount", "category"])# 排序(升序/降序)sorted_data = sales.order_by([ibis.desc(sales.amount), sales.order_date])# 链式组合操作result = ( sales .filter(sales.amount > 500) .select(["order_id", "amount", "category"]) .order_by(ibis.desc(sales.amount)) .limit(10))# 执行查询,返回 Pandas DataFramedf = result.execute()
4.4 聚合与分组
# 基本聚合summary = ( sales .group_by("category") .agg( total_revenue=sales.amount.sum(), avg_order=sales.amount.mean(), order_count=sales.order_id.count(), max_order=sales.amount.max(), min_order=sales.amount.min() ) .order_by(ibis.desc("total_revenue")))# 多维度分组multi_group = ( sales .group_by(["category", "region"]) .agg(revenue=sales.amount.sum()))# 查看结果summary.execute()
4.5 新增计算列
mutate() 方法用于添加或覆盖列:
enriched = sales.mutate( # 计算含税金额 amount_with_tax=sales.amount * 1.13, # 提取年份 year=sales.order_date.year(), # 提取月份 month=sales.order_date.month(), # 条件分类 order_size=( ibis.case() .when(sales.amount < 100, "Small") .when(sales.amount < 1000, "Medium") .else_("Large") .end() ), # 布尔判断 is_premium=sales.amount > 5000)
4.6 窗口函数
窗口函数是进行高级分析(如排名、累计求和、移动平均)的利器:
# 计算排名和累计指标windowed = sales.mutate( # 按金额降序排名 rank=ibis.row_number().over(order_by=ibis.desc(sales.amount)), # 每个分类内的排名 rank_in_category=( ibis.row_number() .over(group_by=sales.category, order_by=ibis.desc(sales.amount)) ), # 累计销售额 cumulative_sales=( sales.amount.sum() .over(order_by=sales.order_date) ), # 7天移动平均 moving_avg_7d=( sales.amount.mean() .over(order_by=sales.order_date, preceding=6) ), # 与分类平均值的差值 diff_from_avg=( sales.amount - sales.amount.mean().over(group_by=sales.category) ))
4.7 表连接操作
# 假设有两张表orders = con.table("orders")customers = con.table("customers")products = con.table("products")# 内连接joined = orders.join( customers, orders.customer_id == customers.id).select( orders.order_id, customers.name, orders.amount, orders.order_date)# 左连接left_joined = orders.left_join( products, orders.product_id == products.id).mutate( product_name=ibis.coalesce(products.name, "Unknown Product"))# 多表链式连接full_data = ( orders .join(customers, orders.customer_id == customers.id) .join(products, orders.product_id == products.id) .select([ orders.order_id, customers.name.name("customer_name"), products.name.name("product_name"), orders.amount, orders.order_date ]))
4.8 集成原生 SQL
当你需要执行特别复杂的 SQL 逻辑时,Ibis 允许无缝嵌入原生 SQL:
# 执行原始 SQL 查询result = con.sql(""" SELECT category, SUM(amount) as total_sales, COUNT(*) as order_count, AVG(amount) as avg_order_value FROM sales WHERE order_date >= '2024-01-01' GROUP BY category HAVING SUM(amount) > 10000 ORDER BY total_sales DESC""").execute()# 将 SQL 结果转换为 Ibis 表格,继续用 Python 操作sales_2024 = con.sql("SELECT * FROM sales WHERE year = 2024")high_value = sales_2024.filter(sales_2024.amount > 5000)
五、高级特性
5.1 延迟执行与查询编译
Ibis 的核心是延迟执行——在调用 execute() 之前,没有任何计算真正发生:
import ibiscon = ibis.connect("duckdb://")t = con.read_csv("large_data.csv")# 以下操作只构建表达式树,不执行计算expr = ( t .filter(t.value > 100) .group_by(t.category) .agg(total=t.value.sum()) .order_by(ibis.desc("total")))# 查看生成的 SQL(无需执行)print(ibis.to_sql(expr))# 输出:# SELECT# "category",# SUM("value") AS "total"# FROM "large_data"# WHERE "value" > 100# GROUP BY "category"# ORDER BY "total" DESC# 真正执行查询df = expr.execute()
5.2 后端无缝切换
这是 Ibis 最亮眼的能力:一行代码切换后端,业务逻辑完全不变。
import ibis# 定义通用的分析逻辑def analyze_sales(con): sales = con.table("sales") return ( sales .group_by(["region", "category"]) .agg( total_revenue=sales.amount.sum(), order_count=sales.order_id.count() ) .filter(ibis._.total_revenue > 100000) .order_by(ibis.desc("total_revenue")) )# 本地开发:使用 DuckDBdev_con = ibis.duckdb.connect()dev_result = analyze_sales(dev_con)# 生产环境:切换到 Snowflakeprod_con = ibis.snowflake.connect( account="prod_account", user="prod_user", password="prod_password", database="analytics")# 同样的函数,一行代码切换后端,其余逻辑完全不变prod_result = analyze_sales(prod_con)
这种能力极大地降低了从开发环境到生产环境的迁移成本——你可以在本地用 DuckDB 快速迭代,然后无缝部署到云数据仓库。
5.3 用户自定义函数(UDF)
Ibis 支持将 Python 函数编译到后端执行,借助 Numba JIT 编译器,性能优异:
import ibisimport ibis.expr.datatypes as dt@ibis.udf.scalar.pythondef categorize_value(v: float) -> str: """根据数值返回分类标签""" if v < 100: return "Low" elif v < 1000: return "Medium" else: return "High"# 在查询中使用 UDFresult = t.mutate( value_category=categorize_value(t.amount))@ibis.udf.agg.pythondef trimmed_mean(values: list) -> float: """计算去除最大最小值后的均值""" if len(values) <= 2: return sum(values) / len(values) sorted_vals = sorted(values) trimmed = sorted_vals[1:-1] return sum(trimmed) / len(trimmed)# 作为聚合函数使用agg_result = t.group_by("category").agg( avg_trimmed=trimmed_mean(t.amount))
六、实战案例:电商销售分析
import ibisibis.options.interactive = Truecon = ibis.duckdb.connect("ecommerce.ddb")# 1. 创建并导入数据orders = con.create_table("orders", pd.read_csv("orders.csv"), overwrite=True)customers = con.create_table("customers", pd.read_csv("customers.csv"), overwrite=True)# 2. 关联并丰富数据enriched = ( orders .join(customers, orders.customer_id == customers.id) .mutate( year=orders.order_date.year(), month=orders.order_date.month(), # 客户价值分段 tier=( ibis.case() .when(customers.total_spent > 10000, "VIP") .when(customers.total_spent > 5000, "Gold") .when(customers.total_spent > 1000, "Silver") .else_("Bronze") .end() ), # 订单金额等级 order_value=( ibis.case() .when(orders.amount > 1000, "High") .when(orders.amount > 500, "Medium") .else_("Low") .end() ) ))# 3. 多维度聚合分析monthly_report = ( enriched .group_by(["year", "month", "tier"]) .agg( total_revenue=orders.amount.sum(), avg_order=orders.amount.mean(), order_count=orders.order_id.count(), unique_customers=customers.id.nunique() ) .mutate( # 计算同比增长(使用窗口函数) prev_month_revenue=ibis.lag("total_revenue").over( group_by="tier", order_by=["year", "month"] ), yoy_growth=( (ibis._.total_revenue - ibis._.prev_month_revenue) / ibis._.prev_month_revenue * 100 ) ) .order_by(["year", "month", "tier"]))# 4. 客户留存分析first_purchase = ( orders .group_by("customer_id") .agg(first_order_date=orders.order_date.min()))retention = ( orders .join(first_purchase, orders.customer_id == first_purchase.customer_id) .mutate( months_since_first=( ibis.date_diff("month", first_purchase.first_order_date, orders.order_date) ) ) .group_by(["months_since_first"]) .agg(active_customers=orders.customer_id.nunique()) .order_by("months_since_first"))# 5. 导出结果monthly_report.execute().to_csv("monthly_sales_report.csv")retention.execute().to_csv("customer_retention.csv")
七、Ibis 的优势与适用场景
核心优势:
极致的便携性:一套代码,20+ 后端通用。本地用 DuckDB 快速开发,生产环境切换到 Snowflake 或 Spark,代码一行不改。
延迟执行与查询优化:Ibis 在调用 execute() 前只构建表达式树,后端自动进行 SQL 优化和谓词下推,处理亿级数据也游刃有余。
Python 原生体验:Pandas 用户零门槛上手,无需学习 SQL。复杂的链式操作、窗口函数、UDF 都能用 Python 表达。
与现有生态无缝集成:结果直接导出为 Pandas/Polars DataFrame 或 Arrow 格式,可立即接入 scikit-learn、PyTorch 等 ML 框架。
适用场景:
数据团队需要跨多个数据库进行分析,不想为每种数据库学习不同的 SQL 方言
本地开发和云端生产环境之间的无缝迁移
需要处理超出 Pandas 内存限制的大型数据集,但希望保持 Python 编程体验
构建可复用的数据管道,需要在不同基础设施间移植
八、总结
Ibis 为 Python 数据分析带来了一种全新的范式:将分析意图与执行引擎彻底解耦。你只需要掌握一套 Python API,就能在任何地方运行你的分析代码——无论是在笔记本上快速探索,还是在云端处理海量数据。
用 Ibis 的核心理念来概括:Write once, run anywhere with everything.
# 一个简单的例子,展示 Ibis 的精髓import ibisdef analyze_any_data(con): """这个函数可以在任何 Ibis 支持的后端上运行,无需修改""" t = con.table("sales") return ( t .filter(t.amount > 100) .group_by(t.category) .agg( total=t.amount.sum(), avg=t.amount.mean(), count=t.order_id.count() ) .order_by(ibis.desc("total")) )# 本地 DuckDBduck_con = ibis.duckdb.connect()result1 = analyze_any_data(duck_con)# 生产环境 BigQuerybq_con = ibis.bigquery.connect(project_id="my-project")result2 = analyze_any_data(bq_con) # 完全相同的代码!
这就是 Ibis 的价值:让数据分析师和工程师不再被底层数据基础设施绑架,专注于真正重要的分析逻辑。
编辑:余雨馨
审校:余文彬