from pyspark.sql import SparkSessionfrom pyspark.sql.functions import col, countDistinct, descfrom pyspark.ml.feature import VectorAssemblerfrom pyspark.ml.stat import Correlationfrom pyspark.ml.clustering import KMeans# 初始化SparkSession,这是所有Spark操作的入口spark = SparkSession.builder \ .appName("UniversityRankingAnalysis") \ .config("spark.driver.memory", "4g") \ .getOrCreate()# 功能一:各国家/地区上榜大学数量分析 (对应维度1.1)def analyze_university_count_by_country(df): """ 统计每个国家/地区的上榜大学总数,并按数量降序排列。 这个功能是宏观分析的基础,能快速展示教育强国的规模优势。 """ # 筛选出需要的列:Location和University # 使用countDistinct确保同一所大学不会被重复计算 country_counts_df = df.select("Location", "University") \ .groupBy("Location") \ .agg(countDistinct("University").alias("university_count")) \ .orderBy(desc("university_count")) # 将结果缓存起来,如果后续有其他操作需要用到这个结果,可以提高效率 country_counts_df.cache() # 返回处理后的DataFrame,供前端调用或保存为文件 return country_counts_df# 功能二:核心评分指标间相关性分析 (对应维度2.1)def analyze_core_metrics_correlation(df): """ 计算教学、研究、产业收入等核心评分指标之间的皮尔逊相关系数矩阵。 这有助于揭示哪些因素是驱动大学排名的关键,例如研究质量和总分是否高度相关。 """ # 定义需要计算相关性的核心指标列 metric_cols = [ "Overall Teaching Score", "Research Score", "Research Quality", "Industry Income Score", "International Outlook Score", "Overall Score" ] # 使用VectorAssembler将多个特征列合并成一个单一的向量列,这是Spark ML库的标准输入格式 assembler = VectorAssembler(inputCols=metric_cols, outputCol="features") df_vector = assembler.transform(df).select("features") # 使用Correlation类计算皮尔逊相关系数 # 结果是一个包含单个矩阵行的DataFrame correlation_matrix = Correlation.corr(df_vector, "features", "pearson").collect()[0][0] # 将Spark矩阵转换为NumPy数组,方便后续处理或展示 correlation_array = correlation_matrix.toArray() # 返回相关系数数组 return correlation_array# 功能三:基于多维评分的大学聚类分析 (对应维度4.1)def cluster_universities(df, k=4): """ 使用K-Means算法,根据大学的多项评分指标将其自动聚类为k个类别。 这能超越传统排名,发现具有相似特质(如研究型、教学型)的大学群体。 """ # 定义用于聚类的特征维度,这些是衡量大学能力的关键指标 feature_cols = [ "Overall Teaching Score", "Research Score", "Research Quality", "Industry Income Score", "International Outlook Score" ] # 同样使用VectorAssembler将特征列组合成向量 assembler = VectorAssembler(inputCols=feature_cols, outputCol="features") assembled_df = assembler.transform(df) # 创建K-Means模型实例,设置聚类数量k和特征列名 kmeans = KMeans(featuresCol="features", predictionCol="cluster", k=k, seed=42) # 训练模型,这个过程是迭代的,Spark会自动在集群上并行计算 model = kmeans.fit(assembled_df) # 使用训练好的模型对数据进行转换,为每所大学分配一个聚类标签 predictions = model.transform(assembled_df) # 选择原始的大学信息、排名以及新分配的聚类标签进行展示 result_df = predictions.select("University", "Location", "Overall Score", "cluster") # 返回带有聚类结果的DataFrame return result_df