from pyspark.sql import SparkSession, functions as Ffrom pyspark.sql.types import IntegerType, FloatTypespark = SparkSession.builder.appName("LagouDataAnalysis").getOrCreate()df = spark.read.csv("hdfs://path/to/招聘信息1.csv", header=True, inferSchema=True)df_cleaned = df.dropDuplicates()def parse_salary_udf(salary_str): if salary_str and 'k' in salary_str.lower(): parts = salary_str.lower().replace('k', '').split('-') try: low = float(parts[0]) high = float(parts[1]) if len(parts) > 1 else low return (low + high) / 2 * 1000 except: return 0.0 return 0.0parse_salary = F.udf(parse_salary_udf, FloatType())df_with_salary = df_cleaned.withColumn("avg_salary", parse_salary(F.col("money")))city_salary_analysis = df_with_salary.filter(F.col("avg_salary") > 0).groupBy("area").agg(F.avg("avg_salary").alias("city_avg_salary")).orderBy(F.desc("city_avg_salary"))city_salary_analysis.coalesce(1).write.csv("hdfs://path/to/result/2_city_avg_salary.csv", header=True, mode="overwrite")def standardize_exp_udf(exp_str): if exp_str: exp_str = exp_str.strip() if "应届" in exp_str or "在校" in exp_str: return "应届生" if "1-3" in exp_str: return "1-3年" if "3-5" in exp_str: return "3-5年" if "5-10" in exp_str: return "5-10年" if "10年以上" in exp_str: return "10年以上" if "不限" in exp_str: return "经验不限" return "其他"standardize_exp = F.udf(standardize_exp_udf)df_exp_salary = df_with_salary.withColumn("standard_exp", standardize_exp(F.col("job_exp"))).filter(F.col("avg_salary") > 0)exp_salary_analysis = df_exp_salary.groupBy("standard_exp").agg(F.avg("avg_salary").alias("exp_avg_salary")).orderBy(F.col("standard_exp"))exp_salary_analysis.coalesce(1).write.csv("hdfs://path/to/result/5_exp_salary_relation.csv", header=True, mode="overwrite")df_welfare = df_cleaned.filter(F.col("job_wel").isNotNull() & (F.trim(F.col("job_wel")) != ""))welfare_exploded = df_welfare.withColumn("welfare_item", F.explode(F.split(F.col("job_wel"), "[,,、]")))welfare_count = welfare_exploded.groupBy("welfare_item").count().orderBy(F.desc("count"))welfare_count.coalesce(1).write.csv("hdfs://path/to/result/8_hot_welfare_cloud.csv", header=True, mode="overwrite")