在上篇内容,我们对分组聚合与窗口函数做了详解,今天我们将继续来完成剩余内容,对于上次的模拟数据,通过滚动平均平滑短期波动,来观测长期趋势。# 先算每日销售daily_sales = ( df .group_by(pl.col("order_date").dt.date().alias("date")) .agg(pl.sum("amount").alias("daily_amount")) .sort("date"))# 加滚动指标trend_analysis = ( daily_sales .with_columns( # 7日移动平均(包含今天往前6天) pl.col("daily_amount").rolling_mean(window_size=7, min_periods=1) .alias("MA7"), # 7日移动总和 pl.col("daily_amount").rolling_sum(window_size=7, min_periods=1) .alias("SUM7"), # 指数加权移动平均(更重视近期) pl.col("daily_amount").ewm_mean(span=7).alias("EWMA7"), # 环比(今天 vs 昨天) pl.col("daily_amount").pct_change().alias("日环比") ) .with_columns( # 趋势判断 pl.when(pl.col("daily_amount") > pl.col("MA7") * 1.1) .then(pl.lit("🔥 高于平均10%")) .when(pl.col("daily_amount") < pl.col("MA7") * 0.9) .then(pl.lit("❄️ 低于平均10%")) .otherwise(pl.lit("➡️ 正常波动")) .alias("趋势判断") ))print("最近10天趋势:")print(trend_analysis.tail(10).select(["date", "daily_amount", "MA7", "趋势判断"]))
分组滚动,每个品类看自己的趋势,如果要想每个品类单独算趋势,加over()即可:category_trend = ( df .sort(["category", "order_date"]) .with_columns( pl.col("amount").rolling_mean(window_size=7, min_periods=1) .over("category") # 每个品类独立计算 .alias("品类MA7") ) .with_columns( # 偏离度:今天 vs 品类近期平均 ((pl.col("amount") - pl.col("品类MA7")) / pl.col("品类MA7") * 100) .round(2) .alias("偏离度%") ))print("手机品类偏离度分析:")print( category_trend .filter(pl.col("category") == "手机") .select(["order_date", "amount", "品类MA7", "偏离度%"]) .head(10))
# 慢:逐行 Python 函数df.group_by("region").agg( pl.col("amount").map_elements(lambda x: x.sum()))# 快:原生表达式df.group_by("region").agg(pl.sum("amount"))# 慢:多次 group_bydf1 = df.group_by("region").agg(pl.sum("x"))df2 = df.group_by("region").agg(pl.mean("x"))# 快:一次聚合多个df.group_by("region").agg([pl.sum("x"), pl.mean("x")])
# 10万行以上推荐result = ( pl.scan_csv("big_orders.csv") # 不加载内存 .group_by(["region", "category"]) .agg([ pl.sum("amount"), pl.col("user_id").n_approx_unique() # 近似去重,更快 ]) .sort("amount_sum", descending=True) .limit(100) .collect(streaming=True) # 流式执行)
对于这两篇的内容,我们可以自己来尝试使用模拟数据,对比三种取Top N 方法的性能A:sort().group_by().heaad()C:group_by().agg(pl.col().top_k()) 再explode 测试1万、10万、100万行的差异,深入理解Polars的优化逻辑。