点击蓝字,关注我们
本期分享:
栅格数据逐像元偏相关分析
完整可复制代码偏相关分析代码.zip
实例解析
问 题:在青藏高原或高纬度冻土区,春季土壤解冻(SOT)通常被认为是驱动植被返青(GUD)的关键因子。然而,解冻早并不总是意味着植被返青早,因为土壤水分有效性还取决于融化的深度,即活动层厚度(ALT)。在排除了活动层厚度(ALT)变化的影响后,春季解冻时间(SOT)本身对植被返青(GUD)的独立贡献是多少?或者说,ALT 是否是连接 SOT 和 GUD 的关键中介变量?
数 据:本次示例数据包括1987年至2015年某区域多年 SOT: Start of Thaw (解冻开始时间),ALT: Active Layer Thickness (活动层厚度)和 GUD: Green-up Date (返青日期)。数据格式均为tif且空间参考信息均一致:

原理讲解
利用上述提供的三个 tif 数据集(时间序列为 1987-2015,共29年),在每个像元(Pixel-based)上进行时间序列统计分析:

实战演示
注:运行代码文件需与上述源代码文件在同一个文件夹
导入上述源代码文件(核心偏相关分析文件)import pcorrpc = pcorr.ParCorr(data_dir=r'E:\公众号\偏相关分析\vars', min_years=5)
data_dir 是所有变量包括X Y 以及所有控制变量的文件夹,并以变量名命名:

其中每一个文件都有 变量名+年份

ParCorr_Res = pc.PCor(x='SOT', y='GUD', covar=['ALT'], method='pearson')
pc.plot_results(ParCorr_Res)

pc.save_results_to_tif(ParCorr_Res, r'E:\公众号\偏相关分析\结果\ParCorr_SOT_GUD')
代码解析
代码文件:pcorr.py
代码结构:
Structure
ParCorr/
├── __init__: 初始化类实例,设置数据路径、无效值及最小计算年限限制
├── _init_profile: 轻量化读取元数据,获取影像的行列数及坐标系投影等配置
├── _load_var_window: 窗口化数据加载,按块读取时间序列并处理 NoData
├── _pixel_partial_corr: 像素级核心计算,基于 pingouin 执行偏相关及置信区间分析
├── PCor : 顶层控制逻辑,执行分块循环,管理内存并收集全图结果
├── save_results_to_tif : 结果持久化,将 5 个统计指标分别导出为地理 TIFF 文件
└── plot_results : 组图可视化,一键绘制 n, r, p, CI95 空间分布组图
# -*- coding: utf-8 -*-import osimport reimport numpy as npimport rasterioimport pandas as pdimport pingouin as pgimport warningsimport gcfrom rasterio.windows import Windowclass ParCorr(object):"""Pixel-wise Partial Correlation (Optimized for Large Scale)"""def __init__(self, data_dir, nodata=-99999, min_years=7):self.data_dir = data_dirself.nodata = nodataself.min_years = min_yearsself.profile = None # 初始为 Nonedef _init_profile(self, var_name):"""轻量级获取元数据:只打开第一个文件读取属性,不加载像素数据"""var_path = os.path.join(self.data_dir, var_name)if not os.path.exists(var_path):raise FileNotFoundError(f"找不到文件夹: {var_path}")tifs = sorted([f for f in os.listdir(var_path) if f.endswith(".tif")])if not tifs:raise FileNotFoundError(f"在 {var_path} 中没有找到 .tif 文件")with rasterio.open(os.path.join(var_path, tifs[0])) as src:self.profile = src.profile.copy()self.rows = src.heightself.cols = src.widthprint(f"✅ 元数据初始化成功: 范围 {self.rows}x{self.cols}")def _load_var_window(self, var_path, window, pattern=r'\d{4}'):"""按窗口读取局部数据栈"""files = sorted([f for f in os.listdir(var_path) if f.endswith(".tif")],key=lambda x: int(re.findall(pattern, x)[0]))stack = []for f in files:with rasterio.open(os.path.join(var_path, f)) as src:arr = src.read(1, window=window).astype(np.float32)if src.nodata is not None:arr[arr == src.nodata] = np.nanelif self.nodata is not None:arr[arr == self.nodata] = np.nanstack.append(arr)return np.stack(stack, axis=0)def _pixel_partial_corr(self, x, y, covars, method):"""x, y: (T,)covars: dict{name: (T,)}"""data = {'V1': x, 'V2': y}for k, v in covars.items():data[k] = vdf = pd.DataFrame(data)df = df.dropna()n = len(df)if n < self.min_years:return np.nan, np.nan, (np.nan, np.nan), np.nantry:res = pg.partial_corr(data=df,x='V1',y='V2',covar=list(covars.keys()),method=method)row = res.iloc[0]r = float(row['r'])p = float(row['p_val'])ci_low, ci_high = row['CI95']if not (-1 <= r <= 1):return np.nan, np.nan, (np.nan, np.nan), np.nanreturn n, r, (ci_low, ci_high), pexcept Exception:return np.nan, np.nan, (np.nan, np.nan), np.nandef PCor(self, x, y, covar, method='pearson', pattern=r'\d{4}', chunk_size=50):if isinstance(covar, str):covar = [covar]var_list = [x, y] + covarif self.profile is None:self._init_profile(x)# 1. 预先获取元数据 (假设初始化时已通过某个文件获取了 self.profile)# 如果没有,需要先打开一个文件获取行列数rows = self.profile['height']cols = self.profile['width']# 2. 初始化结果地图 (使用 float32 节省 50% 内存)res_maps = {'n': np.full((rows, cols), np.nan, dtype=np.float32),'r': np.full((rows, cols), np.nan, dtype=np.float32),'p_val': np.full((rows, cols), np.nan, dtype=np.float32),'CI95_low': np.full((rows, cols), np.nan, dtype=np.float32),'CI95_high': np.full((rows, cols), np.nan, dtype=np.float32)}print(f"开始分块计算 (块大小: {chunk_size} 行): {rows}x{cols} 网格...")# 3. 外层分块循环for start_row in range(0, rows, chunk_size):end_row = min(start_row + chunk_size, rows)curr_height = end_row - start_rowwindow = Window(0, start_row, cols, curr_height)stacks = {v: self._load_var_window(os.path.join(self.data_dir, v), window, pattern)for v in var_list}for r_offset in range(curr_height):global_row = start_row + r_offsetfor c0 in range(cols):x_ts = stacks[x][:, r_offset, c0]y_ts = stacks[y][:, r_offset, c0]cov_ts = {cv: stacks[cv][:, r_offset, c0] for cv in covar}# 基础空值检查 (保留原逻辑)if np.all(np.isnan(x_ts)) or np.all(np.isnan(y_ts)):continueif any(np.all(np.isnan(v)) for v in cov_ts.values()):continuetry:with warnings.catch_warnings():warnings.simplefilter("error", RuntimeWarning)n, r, ci, p = self._pixel_partial_corr(x_ts, y_ts, cov_ts, method)if not np.isnan(r):res_maps['n'][global_row, c0] = nres_maps['r'][global_row, c0] = rres_maps['p_val'][global_row, c0] = pres_maps['CI95_low'][global_row, c0] = ci[0]res_maps['CI95_high'][global_row, c0] = ci[1]except RuntimeWarning as e:print("\n" + "="*50)print(f"❌ 数值错误 at Global Row: {global_row}, Col: {c0}")print(f"⚠️ 详情: {e}")print(f"🔍 X方差: {np.var(x_ts):.6f}, Y方差: {np.var(y_ts):.6f}")raise RuntimeError(f"计算终止") from eexcept Exception as e:raise edel stacksgc.collect()print(f"进度: {end_row}/{rows} 行已完成", end='\r')print("\n✅ 处理完成。")return res_mapsdef plot_results(self, results, figsize=(20, 12)):"""绘制组图:将 n, r, p_val, CI95_low, CI95_high 组合展示"""plot_config = {'n': {'title': 'Sample Size (n)', 'cmap': 'viridis', 'vmin': self.min_years, 'vmax': None},'r': {'title': 'Partial Corr (r)', 'cmap': 'RdBu_r', 'vmin': -1, 'vmax': 1},'p_val': {'title': 'P-Value', 'cmap': 'YlGnBu_r', 'vmin': 0, 'vmax': 0.1},'CI95_low': {'title': '95% CI Lower', 'cmap': 'RdBu_r', 'vmin': -1, 'vmax': 1},'CI95_high': {'title': '95% CI Upper', 'cmap': 'RdBu_r', 'vmin': -1, 'vmax': 1}}keys = ['n', 'r', 'p_val', 'CI95_low', 'CI95_high']fig, axes = plt.subplots(2, 3, figsize=figsize)axes = axes.flatten()for i, key in enumerate(keys):data = results[key]cfg = plot_config[key]im = axes[i].imshow(data, cmap=cfg['cmap'],vmin=cfg['vmin'], vmax=cfg['vmax'])axes[i].set_title(cfg['title'], fontsize=14, fontweight='bold')axes[i].axis('off') # 地理数据通常关闭坐标轴plt.colorbar(im, ax=axes[i], orientation='vertical', fraction=0.046, pad=0.04)if len(keys) < len(axes):axes[-1].axis('off')plt.tight_layout()plt.show()def save_results_to_tif(self, results, output_dir):if not os.path.exists(output_dir):os.makedirs(output_dir)profile = self.profile.copy()profile.update(driver='GTiff',dtype='float32',count=1,nodata=self.nodata,compress='lzw')for name, data in results.items():out = data.astype(np.float32)out[np.isnan(out)] = self.nodatawith rasterio.open(os.path.join(output_dir, f'{name}.tif'),'w',**profile) as dst:dst.write(out, 1)
求点赞

求分享

求喜欢


点击下方账号名片 关注我们

#Python#遥感实战 #偏相关分析#栅格数据#地理#地理信息系统#代码开源

点“阅读原文”下载文章源代码