from pyspark.sql import SparkSession, functions as Fspark = SparkSession.builder.appName("SkinCancerAnalysis").getOrCreate()df = spark.read.csv("hdfs://path/to/skin_cancer_data.csv", header=True, inferSchema=True)def analyze_age_structure(df): age_bins = [0, 20, 40, 60, 80, 100] bin_labels = ["0-20岁", "21-40岁", "41-60岁", "61-80岁", "80岁以上"] df_with_age_group = df.withColumn("age_group", F.when((df.age >= 0) & (df.age <= 20), "0-20岁").when((df.age > 20) & (df.age <= 40), "21-40岁").when((df.age > 40) & (df.age <= 60), "41-60岁").when((df.age > 60) & (df.age <= 80), "61-80岁").otherwise("80岁以上")) age_analysis_df = df_with_age_group.filter(df_with_age_group.diagnostic.isNotNull()).groupBy("diagnostic", "age_group").count().orderBy("diagnostic", "age_group") age_analysis_df.show() return age_analysis_dfdef analyze_symptom_frequency(df): symptom_cols = ["itch", "grew", "hurt", "changed", "bleed", "elevation"] agg_exprs = [F.sum(F.when(col == 1, 1).otherwise(0)).alias(col) for col in symptom_cols] symptom_analysis_df = df.filter(df.diagnostic.isNotNull()).groupBy("diagnostic").agg(*agg_exprs, F.count("*").alias("total_cases")) for col in symptom_cols: symptom_analysis_df = symptom_analysis_df.withColumn(col + "_rate", F.format_number(F.col(col) / F.col("total_cases") * 100, 2)) final_cols = ["diagnostic"] + [col + "_rate" for col in symptom_cols] symptom_analysis_df.select(final_cols).show() return symptom_analysis_dfdef mine_association_rules_symptoms(df): malignant_df = df.filter(df.diagnostic.isin(['MEL', 'BCC', 'SCC'])) symptom_cols = ["itch", "grew", "hurt", "changed", "bleed"] basket_df = malignant_df.select("patient_id", *symptom_cols) def create_symptoms_array(itch, grew, hurt, changed, bleed): symptoms = [] if itch == 1: symptoms.append("itch") if grew == 1: symptoms.append("grew") if hurt == 1: symptoms.append("hurt") if changed == 1: symptoms.append("changed") if bleed == 1: symptoms.append("bleed") return symptoms create_udf = F.udf(create_symptoms_array, ArrayType(StringType())) baskets_df = basket_df.withColumn("symptoms", create_udf("itch", "grew", "hurt", "changed", "bleed")).filter(F.size("symptoms") > 0).select("symptoms") single_item_sets = baskets_df.withColumn("symptom", F.explode("symptoms")).groupBy("symptom").count().withColumnRenamed("count", "support") min_support_count = 10 frequent_single_items = single_item_sets.filter(single_item_sets.support >= min_support_count) pairs_df = baskets_df.withColumn("symptom1", F.explode("symptoms")).select("symptoms", "symptom1").filter(F.col("symptoms").contains(F.col("symptom1"))).withColumn("symptom2", F.explode("symptoms")).filter(F.col("symptom1") < F.col("symptom2")).groupBy("symptom1", "symptom2").count().withColumnRenamed("count", "pair_support") frequent_pairs = pairs_df.filter(pairs_df.pair_support >= min_support_count) rules_df = frequent_pairs.join(frequent_single_items.alias("s1"), frequent_pairs.symptom1 == F.col("s1.symptom")).select(frequent_pairs["*"], F.col("s1.support").alias("support1")) rules_df = rules_df.withColumn("confidence", F.format_number(F.col("pair_support") / F.col("support1") * 100, 2)) rules_df.orderBy(F.desc("confidence")).show(20) return rules_df