# 初始化SparkSessionspark = SparkSession.builder.appName("HealthRiskAnalysis").getOrCreate()# 假设df是一个已加载的Spark DataFrame,包含所有患者数据# 核心功能1: 不同风险等级下的平均生命体征分析def analyze_avg_vitals_by_risk(df): result_df = df.groupBy("Risk_Level").agg( F.avg("Heart_Rate").alias("Avg_Heart_Rate"), F.avg("Systolic_BP").alias("Avg_Systolic_BP"), F.avg("Oxygen_Saturation").alias("Avg_Oxygen_Saturation"), F.avg("Respiratory_Rate").alias("Avg_Respiratory_Rate"), F.avg("Temperature").alias("Avg_Temperature") ).orderBy("Risk_Level") return result_df# 核心功能2: 基于核心生命体征的患者聚类分析def perform_patient_clustering(df): feature_cols = ["Heart_Rate", "Systolic_BP", "Oxygen_Saturation", "Respiratory_Rate", "Temperature"] assembler = VectorAssembler(inputCols=feature_cols, outputCol="features") df_with_features = assembler.transform(df) kmeans = KMeans(featuresCol="features", predictionCol="cluster", k=4, seed=42) model = kmeans.fit(df_with_features) clustered_df = model.transform(df_with_features) # 计算每个簇的中心点特征,用于后续解读 cluster_centers = model.clusterCenters() return clustered_df, cluster_centers# 核心功能3: 心率与血压的联合分布对风险等级的影响分析def analyze_heart_rate_bp_joint_effect(df): # 定义心率和血压的分类阈值 avg_hr = df.select(F.avg("Heart_Rate")).collect()[0][0] avg_bp = df.select(F.avg("Systolic_BP")).collect()[0][0] # 创建分类标签 df_categorized = df.withColumn("HR_Category", F.when(F.col("Heart_Rate") > avg_hr, "高心率").otherwise("正常心率")) df_categorized = df_categorized.withColumn("BP_Category", F.when(F.col("Systolic_BP") > avg_bp, "高血压").otherwise("正常血压")) # 联合分组并统计各风险等级的人数 joint_effect_df = df_categorized.groupBy("HR_Category", "BP_Category", "Risk_Level").count().orderBy("HR_Category", "BP_Category", "Risk_Level") return joint_effect_df