from pyspark.sql import SparkSession, functions as Ffrom pyspark.sql.types import IntegerType, FloatTypespark = SparkSession.builder.appName("GallstoneAnalysis").getOrCreate()def analyze_age_gender_bmi_risk(df): df = df.withColumn("age_group", F.when((F.col("Age") >= 20) & (F.col("Age") <= 30), "20-30").when((F.col("Age") >= 31) & (F.col("Age") <= 40), "31-40").when((F.col("Age") >= 41) & (F.col("Age") <= 50), "41-50").when((F.col("Age") >= 51) & (F.col("Age") <= 60), "51-60").when((F.col("Age") >= 61) & (F.col("Age") <= 70), "61-70").otherwise("71+")) df = df.withColumn("bmi_category", F.when((F.col("Body Mass Index (BMI)") < 18.5), "低体重").when((F.col("Body Mass Index (BMI)") >= 18.5) & (F.col("Body Mass Index (BMI)") < 25), "正常").when((F.col("Body Mass Index (BMI)") >= 25) & (F.col("Body Mass Index (BMI)") < 30), "超重").otherwise("肥胖")) risk_analysis = df.groupBy("Gender", "age_group", "bmi_category", "Gallstone Status").count() total_counts = df.groupBy("Gender", "age_group", "bmi_category").agg(F.sum("count").alias("total_in_group")) result = risk_analysis.join(total_counts, on=["Gender", "age_group", "bmi_category"], how="left") final_result = result.withColumn("incidence_rate", F.round(F.col("count") / F.col("total_in_group") * 100, 2)).filter(F.col("Gallstone Status") == "Yes").select("Gender", "age_group", "bmi_category", "count", "incidence_rate") return final_resultdef analyze_body_fat_risk(df): df = df.withColumn("tbfr_category", F.when(F.col("Total Body Fat Ratio (TBFR) (%)") < 15, "体脂率偏低").when((F.col("Total Body Fat Ratio (TBFR) (%)") >= 15) & (F.col("Total Body Fat Ratio (TBFR) (%)") < 25), "体脂率正常").when((F.col("Total Body Fat Ratio (TBFR) (%)") >= 25) & (F.col("Total Body Fat Ratio (TBFR) (%)") < 35), "体脂率偏高").otherwise("体脂率过高")) body_fat_stats = df.groupBy("tbfr_category", "Gallstone Status").agg(F.count("*").alias("patient_count"), F.avg("Total Body Fat Ratio (TBFR) (%)").alias("avg_tbfr"), F.avg("Visceral Fat Rating (VFR)").alias("avg_vfr")) total_patients_by_category = df.groupBy("tbfr_category").agg(F.sum("patient_count").alias("total_patients")) result = body_fat_stats.join(total_patients_by_category, on="tbfr_category", how="left").withColumn("percentage", F.round(F.col("patient_count") / F.col("total_patients") * 100, 2)) correlation = df.stat.corr("Total Body Fat Ratio (TBFR) (%)", "Visceral Fat Area (VFA)") print(f"体脂率与内脏脂肪面积的Pearson相关系数为: {correlation:.4f}") return result.filter(F.col("Gallstone Status") == "Yes").select("tbfr_category", "patient_count", "avg_tbfr", "avg_vfr", "percentage")def analyze_blood_lipids_risk(df): df = df.withColumn("high_tc", F.when(F.col("Total Cholesterol (TC)") > 6.2, 1).otherwise(0)) df = df.withColumn("high_ldl", F.when(F.col("Low Density Lipoprotein (LDL)") > 4.1, 1).otherwise(0)) df = df.withColumn("low_hdl", F.when(F.col("High Density Lipoprotein (HDL)") < 1.0, 1).otherwise(0)) df = df.withColumn("high_tg", F.when(F.col("Triglyceride") > 2.3, 1).otherwise(0)) df = df.withColumn("lipid_risk_score", F.col("high_tc") + F.col("high_ldl") + F.col("low_hdl") + F.col("high_tg")) df = df.withColumn("risk_level", F.when(F.col("lipid_risk_score") >= 3, "高风险").when(F.col("lipid_risk_score") >= 2, "中风险").when(F.col("lipid_risk_score") >= 1, "低风险").otherwise("无风险")) lipid_risk_analysis = df.groupBy("risk_level", "Gallstone Status").agg(F.count("*").alias("count"), F.avg("Total Cholesterol (TC)").alias("avg_tc"), F.avg("Low Density Lipoprotein (LDL)").alias("avg_ldl"), F.avg("High Density Lipoprotein (HDL)").alias("avg_hdl"), F.avg("Triglyceride").alias("avg_tg")) total_by_risk = df.groupBy("risk_level").agg(F.sum("count").alias("total_count")) final_result = lipid_risk_analysis.join(total_by_risk, on="risk_level", how="left").withColumn("prevalence", F.round(F.col("count") / F.col("total_count") * 100, 2)) return final_result.filter(F.col("Gallstone Status") == "Yes").select("risk_level", "count", "prevalence", "avg_tc", "avg_ldl", "avg_hdl", "avg_tg").orderBy(F.col("risk_level").desc())