import geopandas as gpdimport pandas as pdimport numpy as npfrom libpysal.weights import DistanceBandimport scipy.sparse as spimport warningswarnings.filterwarnings('ignore')# ==================== 数据路径 ====================# 注意:GeoPackage格式默认读取第一个图层,如果有多个图层则需要指定layer,修改第214行代码cities_path = r'C:\Users\Adminis\Desktop\test\研究城市_project.shp' # 投影过的研究区边界数据,包含字段 'city',每个城市一个面people_path = r'C:\Users\Adminis\Desktop\test\china_2024_project.gpkg' # 投影过的人口网格面数据,包含字段 'pixel_val'(人口数)output_path = r'C:\Users\Adminis\Desktop\test\polycentricity_results.csv' # 输出结果路径shp_path = r'C:\Users\Adminis\Desktop\test\population_centers.shp' # 输出子中心面路径# ==================== 参数设置 ====================MIN_GRIDS = 3 # 子中心最少格子数(约3 km²)MIN_POP = 100000 # 子中心最小人口数(10万)POP_FIELD = 'pixel_val' # 人口字段名CITY_FIELD = 'city' # 城市名字段SIGNIFICANCE = 0.05 # LISA显著性水平N_PERMUTATIONS = 999 # 置换检验次数RANDOM_SEED = 42 # 随机种子,确保置换检验结果可复现# ==================== 核心函数 ====================def compute_local_morans_I(values, weights_matrix, seed=RANDOM_SEED): """ 计算局部Moran's I (LISA) """ n = len(values) # 对人口值做 z 标准化 z = (values - values.mean()) / values.std() # 行标准化稀疏权重矩阵 # 对稀疏矩阵按行求和,结果为 (n,1) 的矩阵,需转为一维数组 row_sums = np.asarray(weights_matrix.sum(axis=1)).ravel() row_sums[row_sums == 0] = 1 # 避免孤立格子除零 # 用对角稀疏矩阵实现行标准化:W = D^{-1} * weights_matrix D_inv = sp.diags(1.0 / row_sums) W = D_inv @ weights_matrix # 仍为稀疏矩阵 # 空间滞后(稀疏矩阵与稠密向量相乘,结果为 ndarray) Wz = np.asarray(W @ z).ravel() # 局部 Moran's I:I_i = z_i * (Wz)_i local_I = z * Wz # 置换检验:随机打乱 z,统计模拟值超过观测值的频次 # 固定随机种子,保证每次运行结果一致 if seed is not None: np.random.seed(seed) print(f" 进行置换检验 ({N_PERMUTATIONS} 次)...") count_extreme = np.zeros(n) for _ in range(N_PERMUTATIONS): z_perm = np.random.permutation(z) Wz_perm = np.asarray(W @ z_perm).ravel() local_I_perm = z * Wz_perm count_extreme += (np.abs(local_I_perm) >= np.abs(local_I)) p_values = count_extreme / N_PERMUTATIONS # 判断 Moran 散点图象限 # HH(1): 高值被高值包围;LH(2): 低值被高值包围 # LL(3): 低值被低值包围;HL(4): 高值被低值包围 quadrant = np.zeros(n, dtype=int) quadrant[(z > 0) & (Wz > 0)] = 1 # HH quadrant[(z < 0) & (Wz > 0)] = 2 # LH quadrant[(z < 0) & (Wz < 0)] = 3 # LL quadrant[(z > 0) & (Wz < 0)] = 4 # HL return local_I, p_values, quadrantdef build_distance_inverse_weights(coords): """ 构建距离倒数稀疏权重矩阵 距离阈值自动设为"确保每个要素至少具有一个邻域的最小欧氏距离" """ from scipy.spatial import cKDTree # 计算每个要素到其最近邻的距离 # k=2:第1近邻是自身(距离0),第2近邻才是真正的最近邻 tree = cKDTree(coords) min_dists, _ = tree.query(coords, k=2) nearest_dists = min_dists[:, 1] # 阈值 = 所有最近邻距离的最大值,确保每个要素至少有一个邻域 threshold = float(nearest_dists.max()) # 用 DistanceBand 构建带宽内的空间权重对象 # binary=False + alpha=-1 → 权重 = distance^(-1)(距离倒数) # silence_warnings=True 抑制孤立要素警告 w = DistanceBand( coords, threshold=threshold, binary=False, alpha=-1.0, silence_warnings=True ) # 直接从 libpysal 稀疏格式转为 scipy CSR 稀疏矩阵,不展开为密集矩阵 W_sparse = w.to_adjlist() # 得到 (focal, neighbor, weight) 的 DataFrame n = len(coords) W_sparse = sp.csr_matrix( (W_sparse['weight'].values, (W_sparse['focal'].values, W_sparse['neighbor'].values)), shape=(n, n) ) # 对角线强制置 0(消除自环) W_sparse.setdiag(0) W_sparse.eliminate_zeros() return W_sparse, thresholddef identify_subcenters(city_grids, pop_field=POP_FIELD): """ 使用 ESDA (LISA) 识别城市人口(子)中心 """ if len(city_grids) < 4: return [] city_grids = city_grids.copy() city_grids['cx'] = city_grids.geometry.centroid.x city_grids['cy'] = city_grids.geometry.centroid.y coords = city_grids[['cx', 'cy']].values pop_values = city_grids[pop_field].values.astype(float) # 从数据动态估算格子边长(论文原始数据为 1km × 1km) grid_size = np.sqrt(city_grids.geometry.area.median()) # 构建距离倒数权重矩阵并计算 LISA # threshold:确保每个格子至少有一个邻域的最小距离阈值 W, threshold = build_distance_inverse_weights(coords) print(f" 距离阈值:{threshold:.1f}(坐标单位)") local_I, p_values, quadrant = compute_local_morans_I(pop_values, W) # 筛选显著 HH 型格子 is_sig_HH = (quadrant == 1) & (p_values < SIGNIFICANCE) if is_sig_HH.sum() == 0: return [] hh_indices = np.where(is_sig_HH)[0] hh_grids = city_grids.iloc[hh_indices].copy() # 用微小缓冲区膨胀识别车式相邻(共边接触) buffer_dist = grid_size * 0.01 hh_geoms_buffered = list(hh_grids.geometry.buffer(buffer_dist)) # 并查集合并连通的 HH 格子 n_hh = len(hh_grids) parent = list(range(n_hh)) def find(x): while parent[x] != x: parent[x] = parent[parent[x]] x = parent[x] return x def union(x, y): px, py = find(x), find(y) if px != py: parent[px] = py for i in range(n_hh): for j in range(i + 1, n_hh): if hh_geoms_buffered[i].intersects(hh_geoms_buffered[j]): union(i, j) hh_grids = hh_grids.copy() hh_grids['cluster_id'] = [find(i) for i in range(n_hh)] # 按簇聚合,应用过滤条件,构建子中心列表 subcenters = [] for _, cluster in hh_grids.groupby('cluster_id'): n_grids = len(cluster) total_pop = cluster[pop_field].sum() if n_grids >= MIN_GRIDS and total_pop >= MIN_POP: # 将簇内所有格子的原始面合并为一个面几何 cluster_geom = cluster.geometry.unary_union # 几何质心:合并面的形心,用于计算子中心到主中心的距离 geom_centroid = cluster_geom.centroid subcenters.append({ 'pop': total_pop, 'cx': geom_centroid.x, 'cy': geom_centroid.y, 'n_grids': n_grids, 'geometry': cluster_geom }) return subcentersdef compute_polycentricity(subcenters): """ 计算形态多中心度 P(论文公式 1-3) """ if len(subcenters) <= 1: # 仅一个中心:完全单中心,P=0,MONO=1 return 0.0, 1.0 # 按人口降序排列,最大人口中心为主中心 subcenters_sorted = sorted(subcenters, key=lambda x: x['pop'], reverse=True) main_center = subcenters_sorted[0] main_xy = np.array([main_center['cx'], main_center['cy']]) # 计算各中心到主中心的欧氏距离 distances = [ np.linalg.norm(np.array([sc['cx'], sc['cy']]) - main_xy) for sc in subcenters_sorted ] # d_max:主中心到最远子中心的距离(排除主中心自身,距离为0) dmax = max(distances[1:]) # I_max = x_max * d_max(主中心重要性) Imax = main_center['pop'] * dmax # 各中心重要性:主中心取 I_max,子中心取 x_i * d_i importance = np.array([ Imax if i == 0 else sc['pop'] * distances[i] for i, sc in enumerate(subcenters_sorted) ]) # σ_obs:实际所有中心重要性的总体标准差 sigma_obs = np.std(importance, ddof=0) # σ_max:假设两中心城市 [I_max, 0] 的总体标准差 # 均值 = I_max/2,标准差 = sqrt(((I_max/2)^2 + (I_max/2)^2) / 2) = I_max / 2 sigma_max = Imax / 2 if sigma_max == 0: return 0.0, 1.0 P = 1 - sigma_obs / sigma_max MONO = 1 - P # 即 σ_obs / σ_max,恒 >= 0 return P, MONOdef compute_dispersion(subcenters, city_total_pop): """ 计算分散度 D(论文公式 4) """ if city_total_pop == 0: return np.nan pop_in_centers = sum(sc['pop'] for sc in subcenters) D = 1 - pop_in_centers / city_total_pop D = max(0.0, min(1.0, D)) # 数值保护性裁剪 return D# ==================== 主流程 ====================def main(): print("=" * 60) print("城市多中心度与分散度计算") print("参考:Li & Liu (2018), Landscape and Urban Planning") print("=" * 60) # ---------- 1. 读取数据 ---------- print("\n[1/4] 读取数据...") cities = gpd.read_file(cities_path) people = gpd.read_file(people_path) # (people_path, layer="图层名称") print(f" 城市面数量:{len(cities)}") print(f" 人口格子数量:{len(people)}") print(f" 城市字段:{CITY_FIELD},人口字段:{POP_FIELD}") # 确保坐标系一致 if cities.crs != people.crs: print(f" 警告:坐标系不一致,将人口数据重投影到城市坐标系") people = people.to_crs(cities.crs) # ---------- 2. 空间连接:为每个格子分配城市 ---------- print("\n[2/4] 空间连接(格子质心 → 城市面)...") people_centroids = people.copy() people_centroids['geometry'] = people_centroids.geometry.centroid joined = gpd.sjoin( people_centroids, cities[[CITY_FIELD, 'geometry']], how='left', predicate='within' ) # 未落入任何城市面的格子直接丢弃 n_unmatched = joined[CITY_FIELD].isna().sum() if n_unmatched > 0: print(f" {n_unmatched} 个格子质心未落入任何城市面,已丢弃") people['city_name'] = joined[CITY_FIELD].values people_valid = people[people['city_name'].notna()].copy() print(f" 成功匹配格子数量:{len(people_valid)}") # ---------- 3. 逐城市计算 ---------- print("\n[3/4] 逐城市计算多中心度与分散度...") city_names = cities[CITY_FIELD].unique() results = [] subcenter_records = [] # 用于导出 shp 的子中心记录 for i, city_name in enumerate(city_names): print(f"\n [{i+1}/{len(city_names)}] {city_name}") city_grids = people_valid[people_valid['city_name'] == city_name].copy() if len(city_grids) == 0: print(f" 跳过:无格子数据") results.append({ 'city': city_name, 'n_subcenters': 0, 'polycentricity_P': np.nan, 'monocentricity_MONO': np.nan, 'ln_MONO': np.nan, 'dispersion_D': np.nan, 'ln_DISP': np.nan, 'concentration': np.nan, 'total_pop': 0, 'pop_in_centers': 0, 'note': '无格子数据' }) continue total_pop = city_grids[POP_FIELD].sum() print(f" 格子数:{len(city_grids)},总人口:{total_pop:,.0f}") # 识别人口(子)中心 subcenters = identify_subcenters(city_grids) n_centers = len(subcenters) print(f" 识别到 {n_centers} 个人口(子)中心") if n_centers == 0: print(f" 跳过:无显著人口中心") results.append({ 'city': city_name, 'n_subcenters': 0, 'polycentricity_P': np.nan, 'monocentricity_MONO': np.nan, 'ln_MONO': np.nan, 'dispersion_D': np.nan, 'ln_DISP': np.nan, 'concentration': np.nan, 'total_pop': int(total_pop), 'pop_in_centers': 0, 'note': '无显著人口中心' }) continue # 计算多中心度与分散度 P, MONO = compute_polycentricity(subcenters) D = compute_dispersion(subcenters, total_pop) pop_in_centers = sum(sc['pop'] for sc in subcenters) print(f" 多中心度 P = {P:.4f},单中心度 MONO = {MONO:.4f},ln(MONO) = {np.log(MONO):.4f}") ln_disp_str = f"{np.log(D):.4f}" if D > 0 else "nan" print(f" 分散度 D = {D:.4f},ln(DISP) = {ln_disp_str}") results.append({ 'city': city_name, 'n_subcenters': n_centers, 'polycentricity_P': round(P, 6), 'monocentricity_MONO': round(MONO, 6), 'ln_MONO': round(np.log(MONO), 6), # 论文回归变量 'dispersion_D': round(D, 6), 'ln_DISP': round(np.log(D), 6) if D > 0 else np.nan, # 论文回归变量 'concentration': round(1 - D, 6), 'total_pop': int(total_pop), 'pop_in_centers': int(pop_in_centers), 'note': 'OK' }) # 收集子中心记录,用于导出 shp(面要素,为合并后的 HH 格子) # 主中心为人口最多的中心(与 compute_polycentricity 保持一致) subcenters_sorted = sorted(subcenters, key=lambda x: x['pop'], reverse=True) for rank, sc in enumerate(subcenters_sorted): subcenter_records.append({ 'city': city_name, 'center_id': rank + 1, # 1=主中心,2、3…=子中心 'is_main': 1 if rank == 0 else 0, # 1=主中心,0=子中心 'pop': int(sc['pop']), 'n_grids': sc['n_grids'], 'geometry': sc['geometry'] # 合并后的 HH 格子面 }) # ---------- 4. 输出结果 ---------- print("\n[4/4] 保存结果...") df = pd.DataFrame(results) df.to_csv(output_path, index=False, encoding='utf-8-sig') print(f" 统计结果已保存至:{output_path}") # 导出子中心面 shp(每个要素为合并后的 HH 格子面) # 字段说明: # city — 城市名 # center_id — 中心编号(1=主中心,按人口降序排列) # is_main — 是否主中心(1=主中心,0=子中心) # pop — 中心人口(簇内总人口) # n_grids — 中心包含的格子数 if subcenter_records: gdf_centers = gpd.GeoDataFrame(subcenter_records, crs=people.crs) gdf_centers.to_file(shp_path, encoding='utf-8') print(f" 子中心面层已保存至:{shp_path}(共 {len(gdf_centers)} 个中心面)") else: print(" 无子中心数据,跳过 shp 导出") # 统计摘要 valid = df[df['note'] == 'OK'] print(f"\n{'='*60}") print(f"计算完成!") print(f" 总城市数:{len(df)}") print(f" 成功计算城市数:{len(valid)}") print(f" 无显著中心城市数:{len(df) - len(valid)}") if len(valid) > 0: print(f"\n 多中心度 P:均值={valid['polycentricity_P'].mean():.4f}," f"最大={valid['polycentricity_P'].max():.4f}," f"最小={valid['polycentricity_P'].min():.4f}") print(f" 分散度 D:均值={valid['dispersion_D'].mean():.4f}," f"最大={valid['dispersion_D'].max():.4f}," f"最小={valid['dispersion_D'].min():.4f}") return dfif __name__ == '__main__': df = main() print("\n前5行结果预览:") print(df.head().to_string())