import org.apache.spark.sql.Dataset;import org.apache.spark.sql.Row;import org.apache.spark.sql.SparkSession;import static org.apache.spark.sql.functions.*;import java.util.HashMap;import java.util.List;import java.util.Map;import java.util.stream.Collectors;// 为了模拟大数据处理环境,这里假定SparkSession已在应用启动时初始化并可用。// 实际Spring Boot项目中,SparkSession通常会作为@Bean进行配置和管理。// 这里直接声明并getOrCreate(),是为了满足题目中对SparkSession.builder的引用要求。// 这是一个模拟的SparkSession实例,用于演示大数据处理逻辑。SparkSession spark = SparkSession.builder() .appName("CancerDataAnalysisAndVisualization") .master("local[*]") // 在开发环境使用本地模式,生产环境需配置Spark集群URL .config("spark.driver.memory", "4g") // 增加驱动程序内存,处理较大数据集 .config("spark.executor.memory", "4g") // 增加执行器内存 .config("spark.sql.shuffle.partitions", "200") // 调整shuffle分区数以优化性能 .getOrCreate();// --- 核心功能一:癌症概览分析 ---// 用于获取特定年份癌症数据的总体概览,包括总病例数、死亡率和最常见的癌症类型。public Map<String, Object> getCancerOverviewAnalysis(String analysisYear) { // 模拟从MySQL数据库加载患者记录,这是大数据处理的常见起点。 Dataset<Row> cancerPatientRecords = spark.read().format("jdbc") .option("url", "jdbc:mysql://localhost:3306/cancer_db?useSSL=false&serverTimezone=UTC") .option("dbtable", "patient_records") .option("user", "root") .option("password", "your_mysql_password") .load(); // 过滤出指定年份的记录,进行大数据筛选。 Dataset<Row> filteredByYear = cancerPatientRecords.filter(col("diagnosis_year").equalTo(analysisYear)); // 计算总病例数。 long totalCases = filteredByYear.count(); // 计算死亡病例数,并计算死亡率,体现大数据聚合统计。 long deceasedCases = filteredByYear.filter(col("outcome").equalTo("deceased")).count(); double mortalityRate = (totalCases > 0) ? (double) deceasedCases / totalCases : 0.0; // 按癌症类型分组统计病例数,找出最常见的几种,体现大数据分组聚合和排序。 Dataset<Row> casesByCancerType = filteredByYear.groupBy("cancer_type").count().orderBy(col("count").desc()); // 收集前五种最常见癌症类型的数据。 List<Map<String, Object>> top5CancerTypes = casesByCancerType.limit(5).collectAsList().stream() .map(row -> { Map<String, Object> typeEntry = new HashMap<>(); typeEntry.put("cancerType", row.getString(0)); typeEntry.put("count", row.getLong(1)); return typeEntry; }).collect(Collectors.toList()); // 封装分析结果。 Map<String, Object> overviewResult = new HashMap<>(); overviewResult.put("totalCases", totalCases); overviewResult.put("mortalityRate", String.format("%.2f%%", mortalityRate * 100)); overviewResult.put("topCancerTypes", top5CancerTypes); return overviewResult;}// --- 核心功能二:人口统计分析 ---// 分析特定癌症类型在不同人口统计学特征(如年龄段、性别)上的分布。public Map<String, Object> getPopulationDemographicAnalysis(String targetCancerType) { // 同样从数据库加载患者记录。 Dataset<Row> patientDemographics = spark.read().format("jdbc") .option("url", "jdbc:mysql://localhost:3306/cancer_db?useSSL=false&serverTimezone=UTC") .option("dbtable", "patient_records") .option("user", "root") .option("password", "your_mysql_password") .load(); // 过滤出目标癌症类型的患者数据。 Dataset<Row> filteredPatients = patientDemographics.filter(col("cancer_type").equalTo(targetCancerType)); // 对年龄进行分段处理,这是大数据处理中对连续数据进行离散化的常用方法。 Dataset<Row> ageGroupedStats = filteredPatients .withColumn("age_group", when(col("age").between(0, 18), "0-18岁") .when(col("age").between(19, 40), "19-40岁") .when(col("age").between(41, 60), "41-60岁") .when(col("age").between(61, 80), "61-80岁") .otherwise("80岁以上")) .groupBy("age_group").count().orderBy(col("age_group")); // 按年龄段分组统计并排序 // 收集年龄分布数据。 List<Map<String, Object>> ageDistribution = ageGroupedStats.collectAsList().stream() .map(row -> { Map<String, Object> entry = new HashMap<>(); entry.put("ageGroup", row.getString(0)); entry.put("count", row.getLong(1)); return entry; }).collect(Collectors.toList()); // 按性别进行分组统计,体现大数据基本聚合。 Dataset<Row> genderGroupedStats = filteredPatients.groupBy("gender").count(); // 收集性别分布数据。 List<Map<String, Object>> genderDistribution = genderGroupedStats.collectAsList().stream() .map(row -> { Map<String, Object> entry = new HashMap<>(); entry.put("gender", row.getString(0)); entry.put("count", row.getLong(1)); return entry; }).collect(Collectors.toList()); // 封装人口统计分析结果。 Map<String, Object> demographicResult = new HashMap<>(); demographicResult.put("ageDistribution", ageDistribution); demographicResult.put("genderDistribution", genderDistribution); return demographicResult;}// --- 核心功能三:患者生存分析 (简化版,侧重关键风险因素统计) ---// 该功能旨在初步识别与患者生存状况相关的关键风险因素或治疗方案的效果。public Map<String, Object> getSimplifiedPatientSurvivalAnalysis(String cancerTypeForSurvival) { // 加载包含生存相关信息的患者记录,这些信息通常会包括治疗方案、结局(存活/死亡)和生存时间等。 Dataset<Row> survivalRecords = spark.read().format("jdbc") .option("url", "jdbc:mysql://localhost:3306/cancer_db?useSSL=false&serverTimezone=UTC") .option("dbtable", "patient_survival_data") // 假设有专门的生存数据表 .option("user", "root") .option("password", "your_mysql_password") .load(); // 过滤出特定癌症类型的数据。 Dataset<Row> filteredForSurvival = survivalRecords.filter(col("cancer_type").equalTo(cancerTypeForSurvival)); // 统计不同治疗方案下的患者生存情况(存活/死亡),这是大数据分组聚合的典型应用。 Dataset<Row> outcomeByTreatmentPlan = filteredForSurvival.groupBy("treatment_plan", "outcome") .count() .orderBy(col("treatment_plan"), col("outcome")); // 收集治疗方案与生存结局的统计数据。 List<Map<String, Object>> treatmentOutcomeStats = outcomeByTreatmentPlan.collectAsList().stream() .map(row -> { Map<String, Object> entry = new HashMap<>(); entry.put("treatmentPlan", row.getString(0)); entry.put("outcome", row.getString(1)); entry.put("count", row.getLong(2)); return entry; }).collect(Collectors.toList()); // 统计不同合并症对生存结局的影响,体现大数据多维度交叉分析。 Dataset<Row> outcomeByComorbidity = filteredForSurvival.groupBy("comorbidity", "outcome") .count() .orderBy(col("comorbidity"), col("outcome")); // 收集合并症与生存结局的统计数据。 List<Map<String, Object>> comorbidityOutcomeStats = outcomeByComorbidity.collectAsList().stream() .map(row -> { Map<String, Object> entry = new HashMap<>(); entry.put("comorbidity", row.getString(0)); entry.put("outcome", row.getString(1)); entry.put("count", row.getLong(2)); return entry; }).collect(Collectors.toList()); // 封装生存分析结果。 Map<String, Object> survivalResult = new HashMap<>(); survivalResult.put("treatmentOutcomes", treatmentOutcomeStats); survivalResult.put("comorbidityOutcomes", comorbidityOutcomeStats); return survivalResult;}