polars学习系列文章,第11篇 用户自定义函数,python 自定义函数如何与 polars 结合使用
该库目前已更新到 1.37.1 版本,近一年版本更新迭代的速度非常快,之前分享的前10篇文章的版本是 1.2.1
该系列文章会分享到github,大家可以去下载jupyter文件,进行参考学习仓库地址:https://github.com/DataShare-duo/polars_learn
import sysprint('python版本:',sys.version.split('|')[0])#python 版本: 3.11.11 import polars as plprint("polars版本:",pl.__version__)#polars 版本: 1.37.1map_elementsmap_batchesdf = pl.DataFrame( {"keys":["a", "a", "b", "b"],"values":[10, 7, 1, 23], })print(df)shape: (4, 2)┌──────┬────────┐│ keys ┆ values ││ --- ┆ --- ││ str ┆ i64 │╞══════╪════════╡│ a ┆ 10 ││ a ┆ 7 ││ b ┆ 1 ││ b ┆ 23 │└──────┴────────┘import math defmy_log(value):return math.log(value) # math.log 应用于每个值out=df.select(pl.col("values").map_elements(my_log, return_dtype=pl.Float64))print(out)shape: (4, 1)┌──────────┐│ values ││ --- ││ f64 │╞══════════╡│ 2.302585 ││ 1.94591 ││ 0.0 ││ 3.135494 │└──────────┘存在问题:
defdiff_from_mean(series): total = 0for value in series: total += value mean = total / len(series)return pl.Series([value - mean for value in series])out=df.select(pl.col("values").map_batches(diff_from_mean,return_dtype=pl.Float64))print("== select() with UDF ==")print(out)== select() with UDF ==shape: (4, 1)┌────────┐│ values ││ --- ││ f64 │╞════════╡│ -0.25 ││ -3.25 ││ -9.25 ││ 12.75 │└────────┘print("== group_by() with UDF ==")out=df.group_by("keys").agg( pl.col("values").map_batches(diff_from_mean, return_dtype=pl.Float64))print(out)== group_by() with UDF ==shape: (2, 2)┌──────┬───────────────┐│ keys ┆ values ││ --- ┆ --- ││ str ┆ list[f64] │╞══════╪═══════════════╡│ a ┆ [1.5, -1.5] ││ b ┆ [-11.0, 11.0] │└──────┴───────────────┘纯python实现的自定义函数一般速度都比较慢,要尽量减少使用python实现的方法,可以调用 numpy 中的实现的通用函数/算子,来加速,实际是通过调用C语言的轮子来加速
import numpy as npout=df.select(pl.col("values").map_batches(np.log, return_dtype=pl.Float64))print(out)如果 numpy 中没有可用的函数,那么自定义函数可以通过 Numba 来提速,即时编译
from numba import guvectorize, int64, float64@guvectorize([(int64[:], float64[:])], "(n)->(n)")defdiff_from_mean_numba(arr, result): total = 0for value in arr: total += value mean = total / len(arr)for i, value in enumerate(arr): result[i] = value - meanout=df.select(pl.col("values").map_batches(diff_from_mean_numba, return_dtype=pl.Float64))print("== select() with UDF ==")print(out)out=df.group_by("keys").agg(pl.col("values").map_batches(diff_from_mean_numba, return_dtype=pl.Float64))print("== group_by() with UDF ==")print(out)加速时,数据缺失是不行的,在利用numba装饰器@guvectorize加速时,要么填充缺失值,要么删除缺失值,否则polars会报错
@guvectorize([(int64[:], int64[:], float64[:])], "(n),(n)->(n)")defadd(arr, arr2, result):for i in range(len(arr)): result[i] = arr[i] + arr2[i]df3 = pl.DataFrame({"values_1": [1, 2, 3], "values_2": [10, 20, 30]})out = df3.select( pl.struct(["values_1", "values_2"]) .map_batches(lambda combined: add( combined.struct.field("values_1"), combined.struct.field("values_2") ), return_dtype=pl.Float64, ).alias("add_columns"))print(out)可以使用 map_batches 的 is_elementwise=True 参数将结果流式传输到函数中
设置流式计算,需要确保是针对每个值进行计算,更节省内存
返回数据类型是自动推断的,第一个非空值类型,作为结果类型
python 与 polars 数据类型映射:
可以将 return_dtype 参数传递给 map_batches
以上是自己实践中遇到的一些问题,分享出来供大家参考学习,欢迎关注微信公众号:DataShare ,不定期分享干货
整理的一个资源共享库:
https://github.com/DataShare-duo/datashare_data