import os, glob, numpy as np, xarray as xrimport metpy.calc as mpcalcfrom metpy.units import unitsfrom tqdm import tqdmfrom scipy.integrate import trapezoid as trapz# 数据路径DIR_FLUX = '/data/moisture_flux_daily-ncep1/'DIR_PRES = '/ncep/surface/'DIR_OUT = '/data/moisture_flux_daily-ncep1/'os.makedirs(DIR_OUT, exist_ok=True)# 读取并合并数据files = sorted(glob.glob(f'{DIR_FLUX}moisture_flux_daily_*.nc'))ds = xr.open_mfdataset(files, combine='nested', concat_dim='time')ds = ds.sel(time=slice('1979-01-01', '2022-12-31'))# 读入对应年份的地表气压,统一坐标pres_files = sorted(glob.glob(f'{DIR_PRES}pres.sfc.*.nc'))ps_ds = xr.open_mfdataset(pres_files, combine='nested', concat_dim='time')ps_sfc = ps_ds['pres'] / 100.0 # Pa -> hPaps_sfc = ps_sfc.rename({'lat': 'lat', 'lon': 'lon', 'time': 'time'})ps_sfc = ps_sfc.sel(time=slice('1979-01-01', '2022-12-31'))# 垂直积分:地表气压→300 hPalevel = ds['level'] # 单位已是 hPaplev = level.values.astype(float) # 1-D hPa# 积分函数:每格点按 ps→300 hPa 积分def integrate_surface_to_300(q, ps_arr): """ q : (nlev, nlat, nlon) 某变量(qx 或 qy) ps_arr : (nlat, nlon) 地表气压 [hPa] return : (nlat, nlon) 垂直积分结果 """ nlev, nlat, nlon = q.shape plev_Pa = plev * 100.0 # hPa → Pa # 三维网格 plev_3d = np.broadcast_to(plev_Pa[:, None, None], (nlev, nlat, nlon)) ps_3d = np.broadcast_to(ps_arr[None, :, :], (nlev, nlat, nlon)) # 有效层掩膜,仅保留 300–ps 之间的层 mask = (plev_3d >= 30000.0) & (plev_3d <= ps_3d * 100.0) integral = np.full((nlat, nlon), np.nan, dtype='float32') for j in range(nlat): for k in range(nlon): m = mask[:, j, k] # 当前廓线的有效层 if m.sum() < 2: # 至少要有 2 层才能梯形积分 continue y = q[:, j, k][m] # 有效值 x = plev_3d[:, j, k][m] # 对应气压 integral[j, k] = trapz(y, x) return integral# 预分配结果数组Qx_int_np = np.zeros((ds.sizes['time'], ds.sizes['lat'], ds.sizes['lon']), dtype='float32')Qy_int_np = np.zeros_like(Qx_int_np)# 逐日积分for i, t in enumerate(tqdm(ds['time'], desc='垂直积分')): ps_t = ps_sfc.sel(time=t).values # (lat, lon) qx_t = ds['qx'].sel(time=t).values # (level, lat, lon) qy_t = ds['qy'].sel(time=t).values Qx_int_np[i] = integrate_surface_to_300(qx_t, ps_t) Qy_int_np[i] = integrate_surface_to_300(qy_t, ps_t)# 重新封装为 xarray.DataArrayQx_int = xr.DataArray( Qx_int_np, coords={'time': ds['time'], 'lat': ds['lat'], 'lon': ds['lon']}, dims=['time', 'lat', 'lon'], attrs={'units': 'kg/(m·s)', 'long_name': 'vertically integrated zonal moisture flux (surface to 300 hPa)'})Qy_int = xr.DataArray( Qy_int_np, coords={'time': ds['time'], 'lat': ds['lat'], 'lon': ds['lon']}, dims=['time', 'lat', 'lon'], attrs={'units': 'kg/(m·s)', 'long_name': 'vertically integrated meridional moisture flux (surface to 300 hPa)'})# 计算整层水汽通量散度dx, dy = mpcalc.lat_lon_grid_deltas(ds['lon'], ds['lat'])div_list = []for t in tqdm(ds['time'], desc='计算散度'): qx_t = Qx_int.sel(time=t) qy_t = Qy_int.sel(time=t) div_t = mpcalc.divergence( u=qx_t * units('kg/m/s'), v=qy_t * units('kg/m/s'), dx=dx, dy=dy ) div_list.append(div_t)div_da = xr.DataArray( np.array(div_list), coords={'time': ds['time'], 'lat': ds['lat'], 'lon': ds['lon']}, dims=['time', 'lat', 'lon'], attrs={'units': 'kg/(m²·s)', 'long_name': 'vertically integrated moisture flux divergence (surface to 300 hPa)'})# 保存计算结果为一个nc文件out_ds = xr.Dataset({ 'Qx_int': Qx_int, 'Qy_int': Qy_int, 'div_int': div_da})outfile = f'{DIR_OUT}integrated_flux_divergence_sfc-300_1979_2022.nc'out_ds.to_netcdf(outfile)print(f'已保存:{outfile}')