from pyspark.sql import SparkSessionfrom pyspark.sql.functions import col, count, avg, when, litspark = SparkSession.builder.appName("GlobalMigrationAnalysis").getOrCreate()df = spark.read.csv("hdfs://namenode:9000/input/global_student_migration.csv", header=True, inferSchema=True)# 功能一:全球留学流向分析,统计Top10热门留学路径def analyze_global_flow(): flow_df = df.groupBy("origin_country", "destination_country").agg(count("*").alias("student_count")).orderBy(col("student_count").desc()) flow_df.show(10) flow_df.coalesce(1).write.mode("overwrite").option("header", "true").csv("hdfs://namenode:9000/output/global_flow_analysis")# 功能二:学科专业选择分析,找出平均起薪最高的前5个专业def analyze_major_salary(): # 过滤掉未就业的学生 placed_df = df.filter(col("starting_salary_usd") > 0) major_salary_df = placed_df.groupBy("course_name").agg(count("*").alias("graduate_count"), avg("starting_salary_usd").alias("avg_salary")).orderBy(col("avg_salary").desc()) major_salary_df.show(5) major_salary_df.coalesce(1).write.mode("overwrite").option("header", "true").csv("hdfs://namenode:9000/output/major_salary_analysis")# 功能三:就业与薪资分析,分析留学生毕业后留在留学国与回国的比例def analyze_employment_retention(): # 过滤掉未就业且就业国家不为"N/A"的学生 employed_df = df.filter((col("placement_status") == "Placed") & (col("placement_country") != "N/A")) # 判断是否留在留学国工作 retention_df = employed_df.withColumn("retention_status", when(col("destination_country") == col("placement_country"), lit("留在留学国")).otherwise(lit("去往其他国家或回国"))) # 按留学国家和留存状态分组统计 final_retention_df = retention_df.groupBy("destination_country", "retention_status").agg(count("*").alias("student_number")).orderBy("destination_country", "student_number") final_retention_df.show() final_retention_df.coalesce(1).write.mode("overwrite").option("header", "true").csv("hdfs://namenode:9000/output/employment_retention_analysis")analyze_global_flow()analyze_major_salary()analyze_employment_retention()