InfluxDB 是由 InfluxData 公司开发的开源时序型数据库,使用 Go 语言编写而成,不需要需任何外部依赖即可独立部署。它专为处理带有时间戳的数据(时序数据)设计,在 DB-Engines 时序型数据库排行榜上位居第一,广泛应用于 DevOps 监控、物联网(IoT)数据采集、实时分析及金融市场等场景。它的底层核心依赖三大开源技术:Apache Parquet、Apache Arrow、Arrow Flight。Arrow 以数组形式管理数据,多个数组可以组合成表,代表表格型数据的列;同时支持多种数据格式(最常用 Parquet)进行磁盘存储与网络传输。而我们今天主要看看PyArrow的使用。Arrow 有两个核心数据结构:Array(数组)和 Table(表)import pyarrow as pa# 从列表创建 Arrow 数组animal = pa.array(["sheep", "cows", "horses", "foxes"], type=pa.string())count = pa.array([12, 5, 2, 1], type=pa.int8())year = pa.array([2022, 2022, 2022, 2022], type=pa.int16())# 数组组合成表table = pa.Table.from_arrays( [animal, count, year], names=['animal', 'count', 'year'])print(table)
上面的示例,是表和数组的创建。而Arrow最常用的持久化格式就是Parquet,也是influxDB 3.0的核心存储格式import pyarrow as paimport pyarrow.parquet as pq# 1. 创建表(同上)animal = pa.array(["sheep", "cows", "horses", "foxes"], type=pa.string())count = pa.array([12, 5, 2, 1], type=pa.int8())year = pa.array([2022, 2022, 2022, 2022], type=pa.int16())table = pa.Table.from_arrays([animal, count, year], names=['animal', 'count', 'year'])# 2. 写入 Parquetpq.write_table(table, 'example.parquet')# 3. 读取 Parquettable2 = pq.read_table('example.parquet')print(table2)
接下来我们看PyArrow自带的强大计算函数,用value_counts做简单的统计import pyarrow as paimport pyarrow.compute as pc# 构造数据animal = pa.array(["sheep", "cows", "horses", "foxes", "sheep"], type=pa.string())count = pa.array([12, 5, 2, 1, 10], type=pa.int8())year = pa.array([2022, 2022, 2022, 2022, 2021], type=pa.int16())table = pa.Table.from_arrays([animal, count, year], names=['animal', 'count', 'year'])# 统计频次count_result = pc.value_counts(table['animal'])print(count_result)
接下来我们还是看一个实战示例代码,用infuxdb 3.0 + pyarrow进行时序数据处理写入分区 Parquet(create_parquet.py)from influxdb_client_3 import InfluxDBClient3import pyarrow.dataset as ds# 连接信息token = ''host = 'eu-central-1-1.aws.cloud2.influxdata.com'org = 'Jay-IOx'db = 'factory'# 连接客户端client = InfluxDBClient3( token=token, host=host, org=org, database=db)# 查询时序数据query = "SELECT vibration FROM machine_data WHERE time >= now() - 1h GROUP BY machineID"table = client.query(query=query, language="influxql")# 按 measurement 分区保存为 Parquetds.write_dataset( table, "machine_data", format="parquet", partitioning=ds.partitioning( pa.schema([table.schema.field("iox::measurement")]) ))
将大数据集按列分区,拆分成小文件,提升查询效率。接下来读取并聚合分析import pyarrow.dataset as ds# 读取分区数据集machine_data = ds.dataset( "machine_data", format="parquet", partitioning=["iox::measurement"])# 转 Arrow 表table = machine_data.to_table()# 分组聚合:按机器ID 统计振动均值/最大/最小值agg_table = table.group_by("machineID").aggregate([ ("vibration", "mean"), ("vibration", "max"), ("vibration", "min")])# 转 Pandas 方便查看print(agg_table.to_pandas())
至此,我吗了解了Apache Arrow的概念呢,PyArrow数组、表的创建和使用、Parquet的文件的读写,基础数据统计。如果有兴趣,不妨自己动手试试