ACMxxx000_00000_Lidar_xxxxmmdd.binACMxxx_00000_00000_Lidar_MEXT_xxxxmmdd_532.binACMxxx_00000_00000_Lidar_MBAKSCAT_xxxxmmdd_532.binpip install numpy scipy matplotlib netCDF4气象行业标准格式长这样(文件头 + 通道数据):
我们写一个函数 read_lidar_raw,一次性读出:
import struct, numpy as npfrom datetime import datetime, timedeltadef read_lidar_raw(filepath):with open(filepath, 'rb') as f:raw = f.read()# 解析头部(关键字段)...return {'time': scan_time, # 观测时间'range_km': r_km, # 高度(km)'signal': signal, # 原始回波}
原始信号有两个问题:噪声大 + 随距离衰减快。我们做两件事:
移动平均平滑(窗口5)——去噪
距离平方校正-补偿几何衰减
from scipy.ndimage import uniform_filter1ddef preprocess(lidar):signal_sm = uniform_filter1d(lidar['signal'], size=5)r_m = lidar['range_m'][:len(signal_sm)]pr2 = signal_sm * (r_m**2)pr2[pr2 <= 0] = np.nanreturn pr2, lidar['range_km']
Fernald 后向积分法是目前最成熟的方法。核心思路:找一个清洁大气层(比如 1.5~2.5 km)作为参考点,然后向下递推。
def fernald_extinction(pr2, r_km, Sa=50):# 自动找参考层# 后向积分计算消光系数 α_a(r)return alpha_a # 单位 km⁻¹
🧠 参数 Sa 是气溶胶激光雷达比,城市气溶胶取 50 sr,沙尘取 40 sr。
得到消光系数后,后向散射系数就简单了:
beta = alpha / Sa # 单位 km⁻¹·sr⁻¹#!/usr/bin/env python3# -*- coding: utf-8 -*-"""激光雷达原始数据处理与产品反演步骤:1. 读取原始二进制数据,绘制原始回波和距离平方校正信号2. 反演消光系数(Fernald / Klett)和后向散射系数3. 保存文本/NetCDF产品4. 绘制反演廓线图(与已有产品对比,如果存在)"""import osimport structimport numpy as npimport matplotlib.pyplot as pltfrom datetime import datetime, timedeltafrom scipy.ndimage import uniform_filter1dfrom scipy.interpolate import interp1dimport warningswarnings.filterwarnings('ignore')# ==================== 1. 原始数据解析 ====================def read_lidar_raw(filepath):"""解析原始二进制数据(with open(filepath, 'rb') as f:raw = f.read()iflen(raw) < 64:return None# 15-16: 数据类型(0=原始)data_type = struct.unpack('<H', raw[14:16])[0]if data_type != 0:return None# 19-22: 设备编号dev_id = struct.unpack('<I', raw[18:22])[0]# 23-26: 经纬度lon_code = struct.unpack('<H', raw[22:24])[0]lon = (lon_code / 8.0) * (180.0 / 4096.0)lat_code = struct.unpack('<H', raw[24:26])[0]lat = (lat_code / 8.0) * (180.0 / 4096.0)alt = struct.unpack('<H', raw[26:28])[0]# 时间start_sec = struct.unpack('<I', raw[32:36])[0]end_sec = struct.unpack('<I', raw[36:40])[0]julian = struct.unpack('<H', raw[40:42])[0]base = datetime(1970, 1, 1)scan_date = base + timedelta(days=julian)scan_time = scan_date.replace(hour=start_sec//3600,minute=(start_sec%3600)//60,second=start_sec%60)# 仰角elev_code = struct.unpack('<H', raw[42:44])[0]elevation = (elev_code / 8.0) * (180.0 / 4096.0)# 距离分辨率range_res_code = struct.unpack('<H', raw[60:62])[0]range_res = range_res_code / 100.0 # mblind_code = struct.unpack('<H', raw[62:64])[0]blind = blind_code / 10.0 # m# 通道数nch = struct.unpack('<H', raw[52:54])[0]# 读取第一个有效通道信号signal = Nonefor i in range(min(nch, 16)):offset = 64 + i * 16ptr = struct.unpack('<I', raw[offset:offset+4])[0]nbins = struct.unpack('<H', raw[offset+4:offset+6])[0]if ptr + nbins*4 <= len(raw) and nbins > 0:data = np.frombuffer(raw[ptr:ptr+nbins*4], dtype='<f4')iflen(data) > 0:signal = databreakif signal is None or len(signal) < 100:return Noner_m = blind + np.arange(len(signal)) * range_resr_km = r_m / 1000.0return {'time': scan_time,'lon': lon, 'lat': lat, 'alt': alt,'elevation': elevation,'range_km': r_km,'range_m': r_m,'signal': signal,'range_res': range_res,'blind': blind,'device_id': dev_id}# ==================== 2. 已有产品解析(用于对比) ====================def read_lidar_product(filepath):"""解析1级产品文件(表2)"""with open(filepath, 'rb') as f:raw = f.read()iflen(raw) < 50:return Nonedata_type = struct.unpack('<H', raw[14:16])[0]if data_type != 1:return None# 距离分辨率range_res_code = struct.unpack('<H', raw[28:30])[0]range_res = range_res_code / 100.0# 放大倍数mode_code = struct.unpack('<H', raw[30:32])[0]scale = mode_code & 0x3FFF# 时间start_sec = struct.unpack('<I', raw[32:36])[0]end_sec = struct.unpack('<I', raw[36:40])[0]julian = struct.unpack('<H', raw[40:42])[0]base = datetime(1970, 1, 1)scan_date = base + timedelta(days=julian)scan_time = scan_date.replace(hour=start_sec//3600,minute=(start_sec%3600)//60,second=start_sec%60)# 仰角elev_code = struct.unpack('<H', raw[42:44])[0]elevation = (elev_code / 8.0) * (180.0 / 4096.0)# 波长wave = struct.unpack('<H', raw[44:46])[0]# 产品标识 (1=米消光,2=米后向散射)prod_flag = struct.unpack('<H', raw[46:48])[0]# 距离库数nbins = struct.unpack('<H', raw[48:50])[0]data_start = 50if data_start + nbins*4 > len(raw):return Nonevalues = np.frombuffer(raw[data_start:data_start+nbins*4], dtype='<f4')if scale != 0:values = values / scaler_m = np.arange(nbins) * range_resr_km = r_m / 1000.0return {'time': scan_time, 'elevation': elevation, 'wave': wave,'product_flag': prod_flag, 'range_km': r_km, 'data': values,'scale': scale}# ==================== 3. 原始数据绘图 ====================def plot_raw_data(lidar, output_dir):"""绘制原始回波和距离平方校正信号"""r_km = lidar['range_km']signal = lidar['signal']r_m = lidar['range_m'][:len(signal)]pr2 = signal * (r_m**2) # 距离平方校正fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(12, 5))ax1.semilogy(r_km, signal, 'b-', linewidth=1)ax1.set_xlabel('Height (km)')ax1.set_ylabel('Raw signal (a.u.)')ax1.set_title('Raw echo')ax1.grid(True, alpha=0.5)ax2.semilogy(r_km, pr2, 'r-', linewidth=1)ax2.set_xlabel('Height (km)')ax2.set_ylabel('Range-corrected signal (a.u.)')ax2.set_title('P(r)*r²')ax2.grid(True, alpha=0.5)plt.suptitle(f"Raw data diagnostic\n{lidar['time'].strftime('%Y-%m-%d %H:%M:%S')}")plt.tight_layout()out = os.path.join(output_dir, 'raw_diagnostic.png')plt.savefig(out, dpi=150)plt.close()print(f"Raw diagnostic plot saved: {out}")# ==================== 4. 反演算法 ====================def fernald_extinction(pr2, r_km, ref_start=1.5, ref_end=2.5, Sa=50, Sm=8/3):"""Fernald后向积分"""dr = r_km[1] - r_km[0]alpha_m = 0.02 * np.exp(-r_km / 8.0) # 分子消光简化valid = (pr2 > 0) & (~np.isnan(pr2))ref_mask = (r_km >= ref_start) & (r_km <= ref_end) & validif np.sum(ref_mask) < 5:ref_mask = (r_km >= 1.0) & (r_km <= 3.0) & validref_idx = np.where(ref_mask)[0]iflen(ref_idx) == 0:raise ValueError("No valid reference layer")ref_i = ref_idx[len(ref_idx)//2]ref_alpha = 0.008 # km^-1, 清洁大气参考值X = pr2.copy()alpha_a = np.zeros_like(r_km)alpha_a[ref_i] = ref_alphafor i in range(ref_i-1, -1, -1):if X[i] <= 0 or X[i+1] <= 0:alpha_a[i] = 0continueterm1 = X[i] * np.exp(2.0 * (Sa/Sm - 1.0) * (alpha_m[i+1] - alpha_m[i]) * dr)term2 = X[i+1] + (Sa - Sm) * (alpha_a[i+1] + alpha_m[i+1]) * X[i] * drterm3 = X[i] + (Sa - Sm) * (alpha_a[i+1] + alpha_m[i+1]) * X[i+1] * dralpha_a[i] = term1 * (alpha_a[i+1] + alpha_m[i+1]) * term2 / term3 - alpha_m[i]if alpha_a[i] < 0:alpha_a[i] = 0for i inrange(ref_i+1, len(r_km)):alpha_a[i] = ref_alpha * np.exp(-(r_km[i] - r_km[ref_i]) / 5.0)return alpha_adef klett_extinction(pr2, r_km, Sa=50):"""Klett斜率法"""log_pr2 = np.log(pr2 + 1e-12)dr = r_km[1] - r_km[0]dlog = np.gradient(log_pr2, dr)alpha = -dlog / 2.0alpha[alpha < 0] = 0return alphadef retrieve_from_raw(lidar, method='fernald', Sa=50):"""从原始数据反演消光和后向散射系数"""r_km = lidar['range_km']signal = lidar['signal']signal_sm = uniform_filter1d(signal, size=5, mode='nearest')r_m = lidar['range_m'][:len(signal_sm)]pr2 = signal_sm * (r_m**2)pr2[pr2 <= 0] = np.nanif method == 'fernald':alpha = fernald_extinction(pr2, r_km, Sa=Sa)else:alpha = klett_extinction(pr2, r_km, Sa=Sa)beta = alpha / Sa# 剔除盲区(<150m)和过高噪声alpha[r_km < 0.15] = np.nanbeta[r_km < 0.15] = np.nanreturn alpha, beta, r_km# ==================== 5. 绘图与输出 ====================def plot_retrieved_profile(lidar, alpha, beta, r_km, output_dir, product_ext=None, product_back=None):"""绘制反演廓线,并与已有产品对比"""fig, ax1 = plt.subplots(figsize=(8, 6))# 反演结果ax1.plot(alpha, r_km, 'b-', linewidth=1.5, label='Retrieved extinction')ax1.set_xlabel('Extinction coefficient (km⁻¹)', color='b')ax1.set_ylabel('Height (km)')ax1.tick_params(axis='x', labelcolor='b')ax1.grid(True, linestyle='--', alpha=0.6)ax2 = ax1.twiny()ax2.plot(beta, r_km, 'r-', linewidth=1.5, label='Retrieved backscatter')ax2.set_xlabel('Backscatter coefficient (km⁻¹ sr⁻¹)', color='r')ax2.tick_params(axis='x', labelcolor='r')# 如果有已有产品,叠加对比if product_ext is not None:h_ext = product_ext['range_km']# 插值到相同高度网格f = interp1d(h_ext, product_ext['data'], kind='linear', fill_value=np.nan, bounds_error=False)ext_interp = f(r_km)ax1.plot(ext_interp, r_km, 'b--', linewidth=1, alpha=0.7, label='Product extinction')if product_back is not None:h_back = product_back['range_km']f = interp1d(h_back, product_back['data'], kind='linear', fill_value=np.nan, bounds_error=False)back_interp = f(r_km)ax2.plot(back_interp, r_km, 'r--', linewidth=1, alpha=0.7, label='Product backscatter')# 图例lines1, labels1 = ax1.get_legend_handles_labels()lines2, labels2 = ax2.get_legend_handles_labels()ax1.legend(lines1+lines2, labels1+labels2, loc='upper right')plt.title(f"Retrieved profiles vs Products\n{lidar['time'].strftime('%Y-%m-%d %H:%M:%S')}")plt.tight_layout()out = os.path.join(output_dir, 'retrieved_profile.png')plt.savefig(out, dpi=150)plt.close()print(f"Retrieved profile plot saved: {out}")def save_products(lidar, alpha, beta, r_km, output_dir):"""保存反演产品为文本和NetCDF"""base = os.path.basename(lidar['file'])[:-4] ifhasattr(lidar, 'file') else 'retrieved'# 文本txt_path = os.path.join(output_dir, f"{base}_retrieved.txt")withopen(txt_path, 'w') as f:f.write("# Height(km)\tExtinction(km^-1)\tBackscatter(km^-1 sr^-1)\n")for h, a, b in zip(r_km, alpha, beta):if not np.isnan(a):f.write(f"{h:.3f}\t{a:.6f}\t{b:.6f}\n")print(f"Text product saved: {txt_path}")# NetCDFtry:from netCDF4 import Datasetnc_path = os.path.join(output_dir, f"{base}_retrieved.nc")nc = Dataset(nc_path, 'w', format='NETCDF3_CLASSIC')nc.createDimension('height', len(r_km))h_var = nc.createVariable('height', 'f8', ('height',))h_var.units = 'km'h_var[:] = r_kmext_var = nc.createVariable('extinction', 'f8', ('height',))ext_var.units = 'km^-1'ext_var[:] = alphaback_var = nc.createVariable('backscatter', 'f8', ('height',))back_var.units = 'km^-1 sr^-1'back_var[:] = betanc.close()print(f"NetCDF product saved: {nc_path}")except ImportError:print("netCDF4 not installed, skip NetCDF output")# ==================== 6. 主程序 ====================if __name__ == "__main__":# 数据目录(请修改为实际路径)data_dir = "/"raw_file = os.path.join(data_dir, "ACMxxx000_00000_Lidar_xxxxmmdd.bin")ext_file = os.path.join(data_dir, "ACMxxx_00000_00000_Lidar_MEXT_xxxxmmdd_532.bin")back_file = os.path.join(data_dir, "ACMxxx_00000_00000_Lidar_MBAKSCAT_xxxxmmdd_532.bin")# 1. 解析原始数据lidar = read_lidar_raw(raw_file)if lidar is None:print("Failed to parse raw file")exit()# 添加文件名属性lidar['file'] = raw_fileplot_raw_data(lidar, data_dir)print("Retrieving extinction and backscatter...")alpha, beta, r_km = retrieve_from_raw(lidar, method='fernald', Sa=50)save_products(lidar, alpha, beta, r_km, data_dir)prod_ext = read_lidar_product(ext_file) if os.path.exists(ext_file) else Noneprod_back = read_lidar_product(back_file) if os.path.exists(back_file) else Noneplot_retrieved_profile(lidar, alpha, beta, r_km, data_dir, prod_ext, prod_back)


觉得有用?点个“在看”,欢迎讨论,转发给需要的气象同行吧。
PyCINRAD 教程:解锁新一代天气雷达的 Python 处理秘笈