from pyspark.sql import SparkSessionfrom pyspark.sql.functions import col, when, count, avg, sum, corrfrom pyspark.ml.feature import VectorAssemblerfrom pyspark.ml.stat import Correlationspark = SparkSession.builder.appName("BikeAnalysisCore").getOrCreate()# 功能1: 用户价值分层分析 (RFM模型简化版)def user_value_segmentation(df): # 假设df包含: user_id, 消费等级, 每周平均使用次数, last_ride_days (距末次骑行天数) r_score = when(col('last_ride_days') <= 7, 5).when((col('last_ride_days') > 7) & (col('last_ride_days') <= 14), 4).when((col('last_ride_days') > 14) & (col('last_ride_days') <= 21), 3).when((col('last_ride_days') > 21) & (col('last_ride_days') <= 30), 2).otherwise(1) f_score = when(col('每周平均使用次数') >= 5, 5).when((col('每周平均使用次数') >= 3) & (col('每周平均使用次数') < 5), 4).when((col('每周平均使用次数') >= 1) & (col('每周平均使用次数') < 3), 3).when(col('每周平均使用次数') >= 0.5, 2).otherwise(1) m_score = when(col('消费等级') == '高', 5).when(col('消费等级') == '中', 3).otherwise(1) rfm_df = df.withColumn('R', r_score).withColumn('F', f_score).withColumn('M', m_score) rfm_df = rfm_df.withColumn('RFM_Score', col('R') + col('F') + col('M')) segmented_df = rfm_df.withColumn('用户分层', when(col('RFM_Score') >= 13, '高价值用户').when((col('RFM_Score') >= 9) & (col('RFM_Score') < 13), '潜力用户').when((col('RFM_Score') >= 5) & (col('RFM_Score') < 9), '一般用户').otherwise('低价值用户')) return segmented_df.select('user_id', 'RFM_Score', '用户分层')# 功能2: 区域市场潜力评估def regional_market_potential(df): # 假设df包含: 省份, 每周平均使用次数, 消费等级 province_metrics = df.groupBy('省份').agg(count('*').alias('用户数量'), avg('每周平均使用次数').alias('平均使用频率'), avg(when(col('消费等级') == '高', 5).when(col('消费等级') == '中', 3).otherwise(1)).alias('平均消费力')) potential_df = province_metrics.withColumn('市场潜力分', col('用户数量') * 0.4 + col('平均使用频率') * 0.3 + col('平均消费力') * 0.3) return potential_df.orderBy(col('市场潜力分').desc())# 功能3: 用户活跃度影响因素分析def user_activity_correlation(df): # 假设df包含: 活跃类型, 年龄, 每周平均使用次数, 驾驶距离 analysis_df = df.withColumn('is_active', when(col('活跃类型') == '活跃', 1).otherwise(0)) analysis_df = analysis_df.na.fill({'年龄': 0, '每周平均使用次数': 0, '驾驶距离': 0}) assembler = VectorAssembler(inputCols=['年龄', '每周平均使用次数', '驾驶距离'], outputCol='features') vec_df = assembler.transform(analysis_df) matrix = Correlation.corr(vec_df, 'features').collect()[0][0] corr_matrix = matrix.toArray().tolist() # 提取与活跃度的相关性 age_corr = corr_matrix[0][1] usage_corr = corr_matrix[0][2] distance_corr = corr_matrix[0][3] print(f"年龄与活跃度相关性: {age_corr}, 使用频率与活跃度相关性: {usage_corr}, 骑行距离与活跃度相关性: {distance_corr}") return corr_matrix