from pyspark.sql import SparkSessionfrom pyspark.sql.functions import col, count, when, descspark = SparkSession.builder.appName("BikeDataAnalysis").getOrCreate()# 假设df是一个已加载的Spark DataFrame,包含订单数据# 核心功能1: 全天各时段订单量分析def analyze_hourly_orders(df): # 按开始小时分组,统计每个小时的订单数量 hourly_counts = df.groupBy("start_hour").agg(count("orderid").alias("order_count")) # 按小时排序,确保时间序列正确 sorted_hourly_counts = hourly_counts.orderBy(col("start_hour").asc()) # 将结果转换为Pandas DataFrame以便后续处理或展示 result_pdf = sorted_hourly_counts.toPandas() return result_pdf# 核心功能2: 城市核心骑行流向分析def analyze_core_flow(df, top_n=20): # 筛选出有效的起点和终点区域 flow_df = df.filter((col("geohash_start_block") != col("geohash_end_block")) & (col("geohash_start_block").isNotNull()) & (col("geohash_end_block").isNotNull())) # 按起点和终点区域分组,统计流向频次 flow_counts = flow_df.groupBy("geohash_start_block", "geohash_end_block") \ .agg(count("orderid").alias("flow_count")) # 按流向频次降序排序,并取前N条 top_flows = flow_counts.orderBy(desc("flow_count")).limit(top_n) # 转换为Pandas DataFrame result_pdf = top_flows.toPandas() return result_pdf# 核心功能3: 高频、中频、低频用户分群对比分析def analyze_user_segmentation(df): # 按用户ID分组,计算每个用户的总订单数 user_order_counts = df.groupBy("userid").agg(count("orderid").alias("order_count")) # 定义用户分群逻辑 segmented_users = user_order_counts.withColumn("user_group", when(col("order_count") <= 5, "低频用户") .when((col("order_count") > 5) & (col("order_count") <= 20), "中频用户") .otherwise("高频用户") ) # 按用户分群进行分组,统计每个群组的用户数量 group_summary = segmented_users.groupBy("user_group").agg(count("userid").alias("user_number")) # 转换为Pandas DataFrame result_pdf = group_summary.toPandas() return result_pdffrom pyspark.sql import SparkSessionfrom pyspark.sql.functions import col, count, when, descspark = SparkSession.builder.appName("BikeDataAnalysis").getOrCreate()# 假设df是一个已加载的Spark DataFrame,包含订单数据# 核心功能1: 全天各时段订单量分析def analyze_hourly_orders(df): # 按开始小时分组,统计每个小时的订单数量 hourly_counts = df.groupBy("start_hour").agg(count("orderid").alias("order_count")) # 按小时排序,确保时间序列正确 sorted_hourly_counts = hourly_counts.orderBy(col("start_hour").asc()) # 将结果转换为Pandas DataFrame以便后续处理或展示 result_pdf = sorted_hourly_counts.toPandas() return result_pdf# 核心功能2: 城市核心骑行流向分析def analyze_core_flow(df, top_n=20): # 筛选出有效的起点和终点区域 flow_df = df.filter((col("geohash_start_block") != col("geohash_end_block")) & (col("geohash_start_block").isNotNull()) & (col("geohash_end_block").isNotNull())) # 按起点和终点区域分组,统计流向频次 flow_counts = flow_df.groupBy("geohash_start_block", "geohash_end_block") \ .agg(count("orderid").alias("flow_count")) # 按流向频次降序排序,并取前N条 top_flows = flow_counts.orderBy(desc("flow_count")).limit(top_n) # 转换为Pandas DataFrame result_pdf = top_flows.toPandas() return result_pdf# 核心功能3: 高频、中频、低频用户分群对比分析def analyze_user_segmentation(df): # 按用户ID分组,计算每个用户的总订单数 user_order_counts = df.groupBy("userid").agg(count("orderid").alias("order_count")) # 定义用户分群逻辑 segmented_users = user_order_counts.withColumn("user_group", when(col("order_count") <= 5, "低频用户") .when((col("order_count") > 5) & (col("order_count") <= 20), "中频用户") .otherwise("高频用户") ) # 按用户分群进行分组,统计每个群组的用户数量 group_summary = segmented_users.groupBy("user_group").agg(count("userid").alias("user_number")) # 转换为Pandas DataFrame result_pdf = group_summary.toPandas() return result_pdf