from pyspark.sql import SparkSession, Windowfrom pyspark.sql.functions import col, to_date, sum as _sum, count as _count, row_number, descfrom pyspark.ml.feature import VectorAssemblerfrom pyspark.ml.clustering import KMeansimport pandas as pdspark = SparkSession.builder.appName("TmallDataAnalysis").getOrCreate()def get_daily_sales_trend(df): df = df.withColumn('order_date', to_date(col('order_payment_time'))) daily_sales = df.groupBy('order_date').agg(_sum(col('buyer_actual_payment')).alias('total_sales'), _count(col('order_id')).alias('total_orders')) daily_sales = daily_sales.orderBy('order_date') pd_df = daily_sales.toPandas() pd_df['order_date'] = pd_df['order_date'].astype(str) return pd_df.to_dict(orient='records')def get_province_sales_contribution(df): province_sales = df.groupBy('receiver_state').agg(_sum(col('buyer_actual_payment')).alias('province_total_sales')) total_sales = df.agg(_sum(col('buyer_actual_payment')).alias('grand_total')).collect()[0]['grand_total'] province_sales = province_sales.withColumn('sales_percentage', (col('province_total_sales') / total_sales) * 100) window_spec = Window.orderBy(desc(col('province_total_sales'))) province_sales = province_sales.withColumn('rank', row_number().over(window_spec)) pd_df = province_sales.toPandas() return pd_df.sort_values(by='rank').to_dict(orient='records')def get_order_value_clustering(df): payment_df = df.select(col('buyer_actual_payment').alias('payment')).na.drop() assembler = VectorAssembler(inputCols=["payment"], outputCol="features") feature_data = assembler.transform(payment_df) kmeans = KMeans(k=3, seed=1) model = kmeans.fit(feature_data) clustered_data = model.transform(feature_data) clustered_data = clustered_data.withColumn('cluster', col('prediction').cast('string')) cluster_stats = clustered_data.groupBy('cluster').agg(_count('payment').alias('order_count'), _sum('payment').alias('total_payment'), (col('total_payment') / col('order_count')).alias('avg_payment')) pd_df = cluster_stats.toPandas() return pd_df.to_dict(orient='records')