from pyspark.sql import SparkSession, functions as F, Windowspark = SparkSession.builder.appName("OlympicAnalysis").getOrCreate()df = spark.read.csv("hdfs://path/to/olympics.csv", header=True, inferSchema=True)def get_medal_trends(): filtered_df = df.filter((df['Season'] == 'Summer') & (df['Year'] >= 1896)) trend_df = filtered_df.groupBy('Year').agg(F.sum('Total').alias('Total_Medals')) trend_df = trend_df.orderBy('Year') result = trend_df.collect() json_data = [{"year": row['Year'], "medals": row['Total_Medals']} for row in result] return json_datadef get_powerful_nations_transition(): summer_df = df.filter(df['Season'] == 'Summer') gold_sum_df = summer_df.groupBy('Year', 'NOC').agg(F.sum('Gold').alias('Total_Gold')) window_spec = Window.partitionBy('Year').orderBy(F.desc('Total_Gold')) ranked_df = gold_sum_df.withColumn('Rank', F.row_number().over(window_spec)) top10_df = ranked_df.filter(ranked_df['Rank'] <= 10) result = top10_df.orderBy('Year', 'Rank').collect() json_data = [{"year": row['Year'], "noc": row['NOC'], "rank": row['Rank'], "gold": row['Total_Gold']} for row in result] return json_datadef get_gold_conversion_efficiency(): total_medals_df = df.groupBy('NOC').agg(F.sum('Total').alias('Career_Total'), F.sum('Gold').alias('Career_Gold')) efficiency_df = total_medals_df.filter(total_medals_df['Career_Total'] > 0) efficiency_df = efficiency_df.withColumn('Conversion_Rate', (F.col('Career_Gold') / F.col('Career_Total')).cast('decimal(10, 4)')) efficiency_df = efficiency_df.orderBy(F.desc('Conversion_Rate')) efficiency_df = efficiency_df.filter(efficiency_df['Career_Total'] >= 10) result = efficiency_df.collect() json_data = [{"noc": row['NOC'], "total_medals": row['Career_Total'], "gold_medals": row['Career_Gold'], "rate": float(row['Conversion_Rate'])} for row in result] return json_data