from pyspark.sql import SparkSessionfrom pyspark.sql.functions import col, avg, count, when, floorimport pandas as pdspark = SparkSession.builder.appName("OvarianCancerAnalysis").getOrCreate()# 假设df是一个已经加载好的Spark DataFrame,包含所有字段# df = spark.read.csv("hdfs://path/to/data", header=True, inferSchema=True)# 核心功能1: 不同风险等级的人群年龄结构分析def analyze_age_by_risk(df): age_stats_by_risk = df.groupBy("RiskLabel").agg( avg("Age").alias("平均年龄"), count("Age").alias("人数统计") ).orderBy(col("平均年龄")) return age_stats_by_risk# 核心功能2: 不同癌症分期的CA-125标志物水平分析def analyze_ca125_by_stage(df): ca125_stats_by_stage = df.groupBy("CancerStage").agg( avg("CA125").alias("CA125平均水平"), count("CA125").alias("病例数") ).orderBy(col("CancerStage")) return ca125_stats_by_stage# 核心功能3: 年龄与CA-125水平组合下的风险矩阵分析def create_age_ca125_risk_matrix(df): # 将年龄和CA125水平分箱 df_binned = df.withColumn("AgeGroup", when(col("Age") < 40, "40岁以下") .when((col("Age") >= 40) & (col("Age") < 60), "40-59岁") .otherwise("60岁及以上") ).withColumn("CA125Group", when(col("CA125") < 35, "正常范围") .when((col("CA125") >= 35) & (col("CA125") < 200), "轻度升高") .otherwise("显著升高") ) # 计算每个组合中的高风险人数 risk_matrix = df_binned.filter(col("RiskLabel") == "High")\ .groupBy("AgeGroup", "CA125Group")\ .agg(count("RiskLabel").alias("高风险人数"))\ .orderBy("AgeGroup", "CA125Group") return risk_matrix