1. DuckDB - 最推荐的选择
2. Polars - 高性能数据处理
3. pandasql - 轻量级SQL查询
4. DataFusion - Apache Arrow的力量
5. dask-sql - 分布式大数据处理
| DuckDB | |||||
| Polars | |||||
| pandasql | |||||
| DataFusion | |||||
| dask-sql |
* DuckDB需要安装Excel扩展;** dask-sql通过dask支持Excel,需安装额外依赖
| DuckDB | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐⭐ | |
| Polars | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐ | ⭐⭐⭐⭐ | ⭐⭐⭐⭐⭐ | |
| pandasql | ⭐⭐⭐ | ⭐⭐⭐ | ⭐⭐⭐⭐⭐ | ⭐⭐⭐ | |
| DataFusion | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐ | ⭐⭐⭐ | ⭐⭐⭐⭐ | |
| dask-sql | ⭐⭐⭐⭐ | ⭐⭐⭐⭐ | ⭐⭐⭐ | ⭐⭐⭐⭐ |
DuckDB 是一个嵌入式分析型数据库,专为数据分析设计。它无需安装服务器,完全运行在本地进程中,被誉为"数据分析领域的SQLite"。
基于DuckDB封装的查询工具请访问如下文章:
零部署本地SQL神器:基于DuckDB的文件直查GUI,千万数据秒级分析
pip install duckdbimport duckdb# 直接查询文件result = duckdb.sql("SELECT * FROM 'data.csv' LIMIT 10").fetchall()# 使用连接对象con = duckdb.connect()df = con.execute("SELECT * FROM 'data.csv' WHERE age > 25").fetchdf()# 读取为Pandas DataFrameresult = duckdb.sql("SELECT * FROM 'data.csv'").df()import duckdbcon = duckdb.connect()con.install_extension("excel") # excel扩展第一次执行一次即可con.load_extension("excel") # 只需执行一次# 读取Excel文件df = con.execute("SELECT * FROM 'data.xlsx'").fetchdf()# 指定工作表df = con.execute("SELECT * FROM read_excel('data.xlsx', sheet_name='Sheet1')").fetchdf()import duckdb# 读取JSON文件df = duckdb.sql("SELECT * FROM 'data.json'").df()# 读取newline-delimited JSONdf = duckdb.sql("SELECT * FROM read_json_auto('data.ndjson')").df()import duckdb# 直接查询Parquet文件df = duckdb.sql("SELECT * FROM 'data.parquet'").df()# 读取多个Parquet文件df = duckdb.sql("SELECT * FROM read_parquet('*.parquet')").df()import duckdb# 读取固定格式的txt文件df = duckdb.sql(""" SELECT * FROM read_csv('data.txt', delimiter='|', header=True)""").df()import duckdbcon = duckdb.connect()# COPY命令导出con.execute(""" COPY (SELECT * FROM 'data.parquet') TO 'output.csv' (HEADER, DELIMITER ',')""")import duckdbcon = duckdb.connect()con.install_extension("excel") # excel扩展第一次执行一次即可con.load_extension("excel") # 第一次执行一次即可# 导出为Excelcon.execute("COPY (SELECT * FROM 'data.csv') TO 'output.xlsx')")import duckdb# 导出为JSONcon.sql(""" COPY (SELECT * FROM 'data.parquet') TO 'output.json'""")import duckdb# 导出为Parquet(推荐,性能最优)con.sql(""" COPY (SELECT * FROM 'data.csv') TO 'output.parquet' (FORMAT 'parquet')""")import duckdb# 导出为自定义分隔符的txtcon.sql(""" COPY (SELECT * FROM 'data.csv') TO 'output.txt' (HEADER, DELIMITER '|')""")import duckdbimport pandas as pd# 创建Pandas DataFramedf_pd = pd.DataFrame({ 'name': ['Alice', 'Bob', 'Charlie'], 'age': [25, 30, 35], 'city': ['NYC', 'LA', 'SF']})# 直接查询Pandas DataFrameresult = duckdb.sql("SELECT * FROM df_pd WHERE age > 25").df()# 注册为表con = duckdb.connect()con.register('users', df_pd)result = con.execute("SELECT * FROM users").fetchdf()Polars 是一个基于Rust的高性能DataFrame库,采用向量化执行和惰性求值技术,在处理大数据时性能远超Pandas。内置SQLContext类,可直接使用SQL语句查询数据。
# 基础安装pip install polars# 完整安装(包含Excel支持)pip install 'polars[excel]'# 或安装所有可选依赖pip install 'polars[all]'import polars as pl# 读取CSV,该方法会一次性加载所有数据到内存df = pl.read_csv("data.csv")# 懒加载(推荐用于大文件):推荐df_lazy = pl.scan_csv("data.csv")import polars as pl# 读取Excel(默认使用fastexcel引擎)df = pl.read_excel("data.xlsx")# 指定工作表df = pl.read_excel("data.xlsx", sheet_name="Sheet1")# 指定引擎df = pl.read_excel("data.xlsx", engine="openpyxl")import polars as pl# 读取JSONdf = pl.read_json("data.json")# 读取newline-delimited JSONdf = pl.read_ndjson("data.ndjson")import polars as pl# 读取Parquetdf = pl.read_parquet("data.parquet")# 懒加载Parquet(推荐)df_lazy = pl.scan_parquet("data.parquet")import polars as pl# 读取固定分隔符的txtdf = pl.read_csv("data.txt", separator="|", has_header=True)# 读取无表头的txtdf = pl.read_csv("data.txt", separator="\t", has_header=False, new_columns=["col1", "col2", "col3"])import polars as pl# 读取数据df = pl.read_csv("data.csv")# 创建SQL上下文ctx = pl.SQLContext(register_globals=True)# 使用SQL查询result = ctx.execute(""" SELECT * FROM df WHERE age > 25 ORDER BY name DESC LIMIT 10""").collect()# 多表JOINdf2 = pl.read_csv("orders.csv")ctx = pl.SQLContext(users=df, orders=df2)result = ctx.execute(""" SELECT u.name, o.amount FROM users u INNER JOIN orders o ON u.id = o.user_id WHERE o.amount > 100""").collect()# 聚合查询result = ctx.execute(""" SELECT city, COUNT(*) as user_count, AVG(age) as avg_age FROM df GROUP BY city HAVING COUNT(*) > 5""").collect()import polars as pldf = pl.DataFrame({"a": [1, 2, 3], "b": ["x", "y", "z"]})# 基础导出df.write_csv("output.csv")# 带选项导出df.write_csv("output.csv", separator=";", include_header=True)import polars as pldf = pl.DataFrame({"name": ["Alice", "Bob"], "age": [25, 30]})# 导出为Exceldf.write_excel("output.xlsx")# 指定工作表名称df.write_excel("output.xlsx", worksheet="Users")import polars as pl# 导出为JSONdf.write_json("output.json")# 导出为newline-delimited JSONdf.write_ndjson("output.ndjson")import polars as pl# 导出为Parquet(推荐)df.write_parquet("output.parquet")# 带压缩df.write_parquet("output.parquet", compression="snappy")import polars as pl# 导出为txt(使用CSV函数,自定义分隔符)df.write_csv("output.txt", separator="|")import polars as plimport pandas as pd# Pandas → Polarsdf_pd = pd.DataFrame({"a": [1, 2, 3], "b": [4, 5, 6]})df_pl = pl.from_pandas(df_pd)# Polars → Pandasdf_pd_back = df_pl.to_pandas()# 使用SQL查询Pandas转换后的数据ctx = pl.SQLContext(data=df_pl)result = ctx.execute("SELECT * FROM data WHERE a > 1").collect()pandasql 是一个轻量级库,允许你在Pandas DataFrame上执行SQL查询。它的工作原理是将DataFrame加载到SQLite数据库中,然后执行SQL查询。
pip install pandasqlpandasql本身不提供文件读取功能,需要先使用Pandas读取文件:
import pandas as pdfrom pandasql import sqldf# 读取CSVdf = pd.read_csv("data.csv")# 读取Exceldf = pd.read_excel("data.xlsx")# 读取JSONdf = pd.read_json("data.json")# 读取Parquetdf = pd.read_parquet("data.parquet")# 读取txtdf = pd.read_csv("data.txt", sep="|", header=0)from pandasql import sqldf# 定义查询函数pysqldf = lambda q: sqldf(q, globals())# 执行SQL查询result = pysqldf("SELECT * FROM df WHERE age > 25")# 复杂查询query = """SELECT city, AVG(age) as avg_age, COUNT(*) as countFROM dfGROUP BY cityHAVING COUNT(*) > 1"""result = pysqldf(query)import pandas as pdfrom pandasql import sqldf# 查询后导出result = sqldf("SELECT * FROM df WHERE age > 25", globals())# 导出为CSVresult.to_csv("output.csv", index=False)# 导出为Excelresult.to_excel("output.xlsx", index=False)# 导出为JSONresult.to_json("output.json", orient="records")# 导出为Parquetresult.to_parquet("output.parquet")# 导出为txtresult.to_csv("output.txt", sep="|", index=False)import pandas as pdfrom pandasql import sqldf# 创建DataFramedf1 = pd.DataFrame({"id": [1, 2, 3], "name": ["A", "B", "C"]})df2 = pd.DataFrame({"id": [1, 2, 4], "value": [100, 200, 400]})# JOIN查询query = """SELECT df1.id, df1.name, df2.valueFROM df1LEFT JOIN df2 ON df1.id = df2.id"""result = sqldf(query, globals())# 结果仍然是Pandas DataFrameprint(type(result)) #DataFusion 是Apache Arrow项目的一部分,是一个用Rust编写的查询引擎,提供了Python绑定。它专注于高性能数据分析。
pip install datafusionfrom datafusion import SessionContextctx = SessionContext()df = ctx.read_csv("data.csv")from datafusion import SessionContextctx = SessionContext()df = ctx.read_parquet("data.parquet")from datafusion import SessionContextctx = SessionContext()df = ctx.read_json("data.json")from datafusion import SessionContextctx = SessionContext()# txt文件通过read_csv读取,指定分隔符df = ctx.read_csv("data.txt", delimiter="|", has_header=True)from datafusion import SessionContextctx = SessionContext()# 读取数据ctx.register_csv("users", "data.csv")ctx.register_parquet("sales", "sales.parquet")# 执行SQL查询result = ctx.sql(""" SELECT u.*, s.amount FROM users u LEFT JOIN sales s ON u.id = s.user_id WHERE u.age > 25""").show()from datafusion import SessionContextimport pyarrow.csv as csvctx = SessionContext()df = ctx.read_csv("data.csv")# 转为Arrow Table后导出table = df.collect().to_arrow()csv.write_csv(table, "output.csv")from datafusion import SessionContextimport pyarrow.parquet as pqctx = SessionContext()df = ctx.read_csv("data.csv")table = df.collect().to_arrow()pq.write_table(table, "output.parquet")from datafusion import SessionContextctx = SessionContext()df = ctx.read_csv("data.csv")# 转为Pandas后导出JSONtable = df.collect().to_arrow()table.to_pandas().to_json("output.json", orient="records")from datafusion import SessionContextimport pyarrow.csv as csvctx = SessionContext()df = ctx.read_csv("data.csv")table = df.collect().to_arrow()# 导出为自定义分隔符的txtcsv.write_csv(table, "output.txt", write_options=csv.WriteOptions(delimiter="|"))from datafusion import SessionContextimport pandas as pdimport pyarrow as pa# 创建Pandas DataFramedf_pd = pd.DataFrame({"a": [1, 2, 3], "b": [4, 5, 6]})# Pandas → DataFusionctx = SessionContext()table = pa.Table.from_pandas(df_pd)ctx.register_dataset("my_table", table)# SQL查询result = ctx.sql("SELECT * FROM my_table WHERE a > 1").collect()# DataFusion → Pandasdf_result = result.to_pandas()dask-sql 是一个分布式SQL查询引擎,结合了Dask的分布式计算能力和SQL的易用性。
pip install dask-sqlfrom dask_sql import Contextc = Context()c.create_table("users", "data.csv")from dask_sql import Contextc = Context()c.create_table("sales", "sales.parquet")from dask_sql import Contextimport dask.dataframe as ddc = Context()df = dd.read_json("data.json")c.create_table("data", df)from dask_sql import Contextimport dask.dataframe as ddc = Context()# 通过dask读取Exceldf = dd.read_excel("data.xlsx")c.create_table("data", df)from dask_sql import Contextimport dask.dataframe as ddc = Context()# txt文件通过read_csv读取df = dd.read_csv("data.txt", sep="|")c.create_table("data", df)from dask_sql import Contextimport dask.dataframe as ddc = Context()# 注册表df = dd.read_csv("data.csv")c.create_table("users", df)# 执行SQL查询,sql语句字段不要使用 `` 包裹,可使用双引号:"销售额">100result = c.sql(""" SELECT category, AVG(value) as avg_value, COUNT(*) as count FROM users GROUP BY category""")# 获取结果result_df = result.compute()from dask_sql import Contextimport dask.dataframe as ddc = Context()df = dd.read_csv("data.csv")c.create_table("data", df)result = c.sql("SELECT * FROM data WHERE value > 100")result.compute().to_csv("output*.csv", index=False)from dask_sql import Contextimport dask.dataframe as ddc = Context()df = dd.read_csv("data.csv")c.create_table("data", df)result = c.sql("SELECT * FROM data WHERE value > 100")result.compute().to_parquet("output.parquet")from dask_sql import Contextimport dask.dataframe as ddc = Context()df = dd.read_csv("data.csv")c.create_table("data", df)result = c.sql("SELECT * FROM data WHERE value > 100")result.compute().to_excel("output.xlsx", index=False)from dask_sql import Contextimport dask.dataframe as ddc = Context()df = dd.read_csv("data.csv")c.create_table("data", df)result = c.sql("SELECT * FROM data WHERE value > 100")result.compute().to_json("output.json", orient="records")from dask_sql import Contextimport dask.dataframe as ddc = Context()df = dd.read_csv("data.csv")c.create_table("data", df)result = c.sql("SELECT * FROM data WHERE value > 100")result.compute().to_csv("output.txt", sep="|", index=False)from dask_sql import Contextimport pandas as pd# 创建Pandas DataFramedf_pd = pd.DataFrame({"a": [1, 2, 3], "b": [4, 5, 6]})# 注册到dask-sqlc = Context()c.create_table("my_table", df_pd)# SQL查询result = c.sql("SELECT * FROM my_table WHERE a > 1")# 转回Pandasresult_df = result.compute()| DuckDB | ||
| Polars | ||
| pandasql | ||
| DataFusion | ||
| dask-sql |
DuckDB
官方文档:https://duckdb.org/docs/
Python API文档:https://duckdb.org/docs/stable/clients/python/overview
GitHub仓库:https://github.com/duckdb/duckdb
Polars
官方文档:https://docs.pola.rs/
SQL接口指南:https://docs.pola.rs/user-guide/sql/intro/
GitHub仓库:https://github.com/pola-rs/polars
pandasql
GitHub仓库:https://github.com/yhat/pandasql
PyPI页面:https://pypi.org/project/pandasql/
DataFusion
官方文档:https://datafusion.apache.org/
Python文档:https://datafusion.apache.org/python/user-guide/
GitHub仓库:https://github.com/apache/datafusion-python
dask-sql
官方文档:https://dask-sql.readthedocs.io/
GitHub仓库:https://github.com/dask-contrib/dask-sql