写在前面
大家好,我是GISer宁宁,今天是我第一次写公众号文章,希望可以分享一些大家用得到的内容并能帮大家切实地提高处理数据的效率。
简介
想必大家在对时序数据进行Sen-MK趋势分析的时候,一定是苦于计算特别慢这一缺点。本脚本针对大数据量利用矩阵对逐像元计算代码进行了优化,利用分块计算技术解决了高分辨率影像易导致的内存溢出问题。通过结合斜率正负与 检验阈值,脚本可自动生成 5 分类或 7 分类的栅格结果。
代码工作流:
趋势定量化: 通过 Sen's Slope 计算像素级的年度变化速率。
置信度评估: 配合 MK 检验,量化趋势的显著性。
结果可视化级处理: 预设 5 分类和 7 分类逻辑,直接生成“显著改善、稳定、显著退化”的重分类数据。
技术亮点:
内存优化: 采用 Chunking(分块处理)技术,支持在低内存环境下处理 GB 级超大尺寸遥感影像。
自动化集成: 支持一键完成“数据读取-趋势计算-显著性检验-结果重分类”的全流程工作。
公式介绍
1. Sen's Slope —— 趋势强度计算
Sen's Slope 是一种非参数方法,用于估算时间序列的变化速率。对于长度为 的时间序列数据,计算所有样本对的斜率 :
2. Mann-Kendall (MK) —— 趋势显著性检验
MK 检验用于判断上述趋势是否具有统计学意义(即这种变化是真实的还是随机波动的)。
第一步:计算检验统计量
第二步:计算方差
第三步:计算标准化检验统计量
代码实现
导入所需库及忽略警告
import osimport reimport globimport numpy as npimport rasterioimport warningsfrom contextlib import ExitStack# 忽略切片全为 NaN 时的警告warnings.filterwarnings('ignore', r'All-NaN slice')
构建sen-mk趋势分析代码
defcalculate_sen_mk_chunked(tif_files, output_sen_path, output_mk_path, output_class_path=None, classify_mode=None, chunk_size=100):""" 核心计算函数(分块版):读取时序数据,分块计算 Sen 斜率和 MK 检验,并可选进行重分类。 Args: tif_files (list): 按时间顺序排序的栅格文件路径列表。 output_sen_path (str): Sen 斜率结果输出路径。 output_mk_path (str): MK Z值结果输出路径。 output_class_path (str, optional): 重分类结果输出路径。 classify_mode (str, optional): 重分类模式, '5-class' 或 '7-class'。默认为 None。 chunk_size (int): 每次处理的行数,默认 100 行。 """ifnot tif_files: print("❌ 未找到输入文件。")return# 1. 读取基准元数据with rasterio.open(tif_files[0]) as src0: meta = src0.meta.copy() height = src0.height width = src0.width# 强制更新元数据为 float32 (方便存储 NaN) meta.update(dtype='float32', count=1, compress='lzw', nodata=np.nan) n_years = len(tif_files) print(f"📊 图像尺寸: {width}x{height}, 时间跨度: {n_years} 年") print("📥 正在将所有年份数据读取到内存...")# 2. 读取所有数据到内存try: full_stack = np.zeros((n_years, height, width), dtype='float32')for idx, f_path in enumerate(tif_files):with rasterio.open(f_path) as src: data = src.read(1)if src.nodata isnotNone: data = np.where(data == src.nodata, np.nan, data) full_stack[idx] = dataexcept MemoryError: print("❌ 原始数据过大,无法一次性读入内存。")return# 3. 使用 ExitStack 管理多个输出文件with ExitStack() as stack: dst_sen = stack.enter_context(rasterio.open(output_sen_path, 'w', **meta)) dst_mk = stack.enter_context(rasterio.open(output_mk_path, 'w', **meta)) dst_class = Noneif classify_mode and output_class_path: print(f"🔄 开启重分类模式: {classify_mode}") dst_class = stack.enter_context(rasterio.open(output_class_path, 'w', **meta)) print(f"⚙️ 开始分块计算 (每块 {chunk_size} 行)...")# 4. 按行分块循环处理for row_start in range(0, height, chunk_size): row_end = min(row_start + chunk_size, height) current_rows = row_end - row_start progress = (row_end / height) * 100 print(f" [处理进度]: {progress:.1f}% (行 {row_start}~{row_end})", end='\r')# 提取当前块 data_chunk = full_stack[:, row_start:row_end, :]# --- 核心算法开始 ---# A. 准备 Sen 斜率所需的差值矩阵 n_pairs = (n_years * (n_years - 1)) // 2 slope_chunk = np.zeros((n_pairs, current_rows, width), dtype='float32') idx = 0for i in range(n_years):for j in range(i + 1, n_years): diff = data_chunk[j] - data_chunk[i] time_diff = j - i slope_chunk[idx] = diff / time_diff idx += 1# B. 计算 Sen 斜率 sen_result_chunk = np.nanmedian(slope_chunk, axis=0)# C. 计算 MK 检验 sgn_chunk = np.sign(slope_chunk)del slope_chunk s_stat = np.nansum(sgn_chunk, axis=0)del sgn_chunk var_s = (n_years * (n_years - 1) * (2 * n_years + 5)) / 18.0 z_score_chunk = np.zeros((current_rows, width), dtype='float32') sqrt_var = np.sqrt(var_s) mask_pos = s_stat > 0 mask_neg = s_stat < 0 z_score_chunk[mask_pos] = (s_stat[mask_pos] - 1) / sqrt_var z_score_chunk[mask_neg] = (s_stat[mask_neg] + 1) / sqrt_var# 掩膜无效值 invalid_mask = np.isnan(sen_result_chunk) z_score_chunk[invalid_mask] = np.nan# --- 重分类逻辑 ---if dst_class: class_chunk = np.zeros_like(sen_result_chunk, dtype='float32')# 默认设为 NoData (np.nan),之后填充有效值 class_chunk[:] = np.nan# 有效区域掩膜 valid_mask = ~invalid_mask# 提取有效区域的 slope 和 z 用于计算 s = sen_result_chunk[valid_mask] z = np.abs(z_score_chunk[valid_mask]) c = np.zeros_like(s) # 默认为 0 (稳定)# 阈值设定 Z_95 = 1.96 Z_99 = 2.58# 对应 99% 置信度if classify_mode == '5-class':# 5分类: -2(显著退化), -1(轻微退化), 0(稳定), 1(轻微改善), 2(显著改善)# 退化 (Slope < 0) c[(s < 0) & (z < Z_95)] = -1 c[(s < 0) & (z >= Z_95)] = -2# 改善 (Slope > 0) c[(s > 0) & (z < Z_95)] = 1 c[(s > 0) & (z >= Z_95)] = 2elif classify_mode == '7-class':# 7分类: -3(极显著退化), -2(显著退化), -1(轻微退化), 0(稳定), 1(轻微改善), 2(显著改善), 3(极显著改善)# 退化 c[(s < 0) & (z < Z_95)] = -1 c[(s < 0) & (z >= Z_95) & (z < Z_99)] = -2 c[(s < 0) & (z >= Z_99)] = -3# 改善 c[(s > 0) & (z < Z_95)] = 1 c[(s > 0) & (z >= Z_95) & (z < Z_99)] = 2 c[(s > 0) & (z >= Z_99)] = 3# 将计算好的分类值填回 chunk class_chunk[valid_mask] = c# --- 写入文件 --- window = rasterio.windows.Window(col_off=0, row_off=row_start, width=width, height=current_rows) dst_sen.write(sen_result_chunk, 1, window=window) dst_mk.write(z_score_chunk, 1, window=window)if dst_class: dst_class.write(class_chunk, 1, window=window) print("\n✅ 计算完成。")
批量计算
defbatch_process_sen_mk(input_dir, output_sen_dir, output_mk_dir, input_template, subject_name, classify_mode=None, chunk_size=200):""" 批量处理入口函数 """ifnot os.path.exists(input_dir): print(f"⚠️ 输入路径不存在: {input_dir}")return# 确保输出目录存在for p in [output_sen_dir, output_mk_dir]:ifnot os.path.exists(p): os.makedirs(p) search_path = os.path.join(input_dir, input_template) files = glob.glob(search_path)try: files.sort(key=lambda x: int(''.join(filter(str.isdigit, os.path.basename(x)))))except ValueError: files.sort()if len(files) < 3: print(f"⚠️ {subject_name} 的文件数量不足 3 个。")return print(f"🚀 [任务启动] {subject_name} (共 {len(files)} 期) | 模式: {classify_mode or'仅计算趋势'}") out_sen_name = f"Sen_Slope_{subject_name}.tif" out_mk_name = f"MK_Z_Score_{subject_name}.tif" out_sen_path = os.path.join(output_sen_dir, out_sen_name) out_mk_path = os.path.join(output_mk_dir, out_mk_name)# 处理分类路径 out_class_path = Noneif classify_mode:# 如果开启分类,创建一个子文件夹或直接保存 out_class_name = f"Trend_Class_{classify_mode}_{subject_name}.tif"# 我们可以把分类结果放在 Sen 文件夹下,或者单独的文件夹。这里放在 Sen 目录下。 out_class_path = os.path.join(output_sen_dir, out_class_name)try: calculate_sen_mk_chunked( files, out_sen_path, out_mk_path, output_class_path=out_class_path, classify_mode=classify_mode, chunk_size=chunk_size )except Exception as e: print(f"❌ 处理失败: {e}")import traceback traceback.print_exc()
调用函数
if __name__ == "__main__":# ================= 参数配置区 =================# 输入文件路径 input_folder = r""# 输出文件路径 output_folder = r""# 匹配 MRSEI_{year}.tif 格式# 使用 * 通配符来匹配年份 file_pattern = "MRSEI_*.tif" task_name = "RSEI-SC"# 输出文件的后缀名# 重分类配置# 选项: None (不分类), "5-class", "7-class"# 5-class: 阈值 1.96 (Z = 1.96 是 P 在 0.05水平下,显著/不显著)# 5分类: -2(显著退化), -1(轻微退化), 0(稳定), 1(轻微改善), 2(显著改善)# 7-class: 阈值 1.96 和 2.58 (Z = 2.56 是 P 在 0.01 水平下,显著/极显著)# 7分类: -3(极显著退化), -2(显著退化), -1(轻微退化), 0(稳定), 1(轻微改善), 2(显著改善), 3(极显著改善) do_classification = "7-class"# 分块计算行数 (根据内存大小调整,内存越小该值设置越小,如 100 或 50) process_chunk_size = 200 batch_process_sen_mk( input_dir=input_folder, output_sen_dir=output_folder, output_mk_dir=output_folder, input_template=file_pattern, subject_name=task_name, classify_mode=do_classification, chunk_size=process_chunk_size )
写在后面
需要本次教程练习数据的小伙伴,可以在公众号后台回复关键词「SEN-MK」,即可获取下载链接。希望能帮助到大家!