from pyspark.sql import SparkSessionfrom pyspark.sql.functions import col, count, descimport pandas as pddef get_spark_session(): spark = SparkSession.builder \ .appName("UniversityAnalysisSystem") \ .config("spark.some.config.option", "some-value") \ .getOrCreate() return sparkdef process_province_rank_analysis(spark, df): province_counts = df.groupBy("province").agg(count("*").alias("school_count")) sorted_province_counts = province_counts.orderBy(desc("school_count")) result_list = sorted_province_counts.collect() json_data = [] for row in result_list: province_name = row['province'] school_count = row['school_count'] json_data.append({"name": province_name, "value": school_count}) return json_datadef process_school_type_analysis(spark, df): type_counts = df.groupBy("school_type").agg(count("*").alias("count")) type_list = type_counts.collect() data_map = {} for row in type_list: s_type = row['school_type'] s_count = row['count'] data_map[s_type] = s_count total_count = df.count() ratio_map = {} for key, value in data_map.items(): ratio = round((value / total_count) * 100, 2) ratio_map[key] = ratio return {"counts": data_map, "ratios": ratio_map}def process_specific_field_analysis(spark, df, keyword): filtered_df = df.filter(col("school_name").contains(keyword)) province_dist = filtered_df.groupBy("province").agg(count("*").alias("count")) province_list = province_dist.collect() province_data = [] for row in province_list: province_data.append({"name": row['province'], "value": row['count']}) level_dist = filtered_df.groupBy("level").agg(count("*").alias("count")) level_list = level_dist.collect() level_data = {} for row in level_list: level_data[row['level']] = row['count'] final_result = { "keyword": keyword, "province_distribution": province_data, "level_distribution": level_data } return final_result