from pyspark.sql import SparkSessionfrom pyspark.sql import functions as Ffrom pyspark.sql.window import Windowfrom sklearn.cluster import DBSCANimport numpy as np# 初始化SparkSession,这是所有Spark操作的入口spark = SparkSession.builder.appName("HaidilaoDataAnalysis").getOrCreate()# 假设hdf_stores_df是一个已加载的Spark DataFrame,包含store_name, province, city, longitude, latitude等字段# 核心功能1: 全国门店地理空间分布分析 (统计各省份门店数量及TOP20城市)def analyze_store_distribution(hdf_stores_df): # 按省份分组,统计每个省份的门店数量 province_counts_df = hdf_stores_df.groupBy("province").count().withColumnRenamed("count", "store_count") # 按城市分组,统计每个城市的门店数量 city_counts_df = hdf_stores_df.groupBy("city").count().withColumnRenamed("count", "store_count") # 使用窗口函数为城市排名,方便获取TOP N window_spec = Window.orderBy(F.desc("store_count")) top_cities_df = city_counts_df.withColumn("rank", F.row_number().over(window_spec)).filter(F.col("rank") <= 20) # 将省份统计和TOP城市统计结果合并为一个视图返回 # 实际应用中可能会分开返回,这里为演示合并 return {"province_distribution": province_counts_df.toJSON().collect(), "top_20_cities": top_cities_df.toJSON().collect()}# 核心功能2: 核心城市内门店集聚效应分析 (使用DBSCAN聚类算法)def perform_dbscan_clustering(hdf_stores_df, target_city="上海"): # 筛选出目标城市的所有门店 city_stores_df = hdf_stores_df.filter(F.col("city") == target_city).select("store_name", "longitude", "latitude").na.drop() # 将Spark DataFrame中的坐标数据收集到Driver端 coordinates = city_stores_df.select("longitude", "latitude").rdd.map(lambda row: (row[0], row[1])).collect() store_names = city_stores_df.select("store_name").rdd.map(lambda row: row[0]).collect() # 使用sklearn的DBSCAN算法进行聚类,eps和min_samples是关键参数,需要根据实际情况调整 coords_np = np.array(coordinates) clustering = DBSCAN(eps=0.01, min_samples=2).fit(coords_np) # 将聚类结果(簇标签)与门店名称关联起来 cluster_labels = clustering.labels_ results = [{"store_name": name, "longitude": lon, "latitude": lat, "cluster": int(label)} for name, (lon, lat), label in zip(store_names, coordinates, cluster_labels)] # 将结果转换回Spark DataFrame以便后续处理或返回 return spark.createDataFrame(results)# 核心功能3: 门店命名与商业地产品牌关联度分析def analyze_mall_association(hdf_stores_df): # 定义常见的商业地产品牌关键词 mall_keywords = { "万达广场": "万达", "万象城": "华润", "吾悦广场": "新城", "银泰城": "银泰", "大悦城": "中粮" } # 创建一个用于判断是否关联的UDF或使用when-otherwise链式操作 # 这里使用when-otherwise,更易于理解和维护 df_with_mall = hdf_stores_df.withColumn("associated_brand", F.lit("其他/独立")) for keyword, brand in mall_keywords.items(): df_with_mall = df_with_mall.withColumn("associated_brand", F.when(F.col("store_name").contains(keyword), F.lit(brand)).otherwise(F.col("associated_brand"))) # 按关联的品牌进行分组计数 brand_association_df = df_with_mall.groupBy("associated_brand").count().withColumnRenamed("count", "store_count").orderBy(F.desc("store_count")) return brand_association_df