#!/usr/bin/env python3"""CAMS数据下载 + 合并GRIB"""import os, sys, subprocess, logging, time, glob, shutilfrom datetime import datetime, timedeltaFTP_HOST = "***"FTP_USER = "***"FTP_PASS = "***"REMOTE_DIR = "/DATA/CAMS_GLOBAL"LOCAL_DIR = "/download"TMP_DIR = os.path.join(LOCAL_DIR, "tmp")SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))LOG_DIR = os.path.join(SCRIPT_DIR, "logs")MAX_RETRY = 3os.makedirs(LOCAL_DIR, exist_ok=True)os.makedirs(LOG_DIR, exist_ok=True)WANTED = [ "aod550", "ssaod550", "duaod550", "bcaod550", "suaod550", "omaod550", "pm2p5", "pm10", "2t", "2d", "10u", "10v", "msl", "sp", "tcwv", "vis", "go3", "co", "so2", "no2", "hcho",]defkeep_file(filename): ifnot filename.endswith(".grib"): returnFalse for w in WANTED: iff"_{w}."in filename: returnTrue returnFalsedefget_logger(): logger = logging.getLogger("cams") logger.setLevel(logging.INFO) logger.handlers.clear() fmt = logging.Formatter("%(asctime)s %(message)s", "%H:%M:%S") ch = logging.StreamHandler(); ch.setFormatter(fmt); logger.addHandler(ch) fh = logging.FileHandler(os.path.join(LOG_DIR, f"dl_{datetime.now().strftime('%Y-%m')}.log")) fh.setFormatter(logging.Formatter("%(asctime)s %(message)s", "%m-%d %H:%M:%S")) logger.addHandler(fh) return loggerlogger = get_logger()defdownload_one(batch): date_part = f"{batch[:4]}-{batch[4:6]}-{batch[6:8]}" outfile = os.path.join(LOCAL_DIR, f"cams_{date_part}_{batch[8:10]}.grib") if os.path.exists(outfile) and os.path.getsize(outfile) > 50_000_000: logger.info(f"[SKIP] {os.path.basename(outfile)}") returnTrue tmp_dir = os.path.join(TMP_DIR, batch) os.makedirs(tmp_dir, exist_ok=True) t0 = time.time() logger.info(f"[START] {batch}") for attempt inrange(1, MAX_RETRY + 1): lftp_cmds = f"""open {FTP_HOST}user {FTP_USER}{FTP_PASS}set ftp:ssl-allow nolcd {tmp_dir}cd {REMOTE_DIR}/{batch}mget *.gribbye""" try: r = subprocess.run( ["lftp"], input=lftp_cmds, capture_output=True, text=True, timeout=7200 ) all_files = glob.glob(os.path.join(tmp_dir, "*.grib")) if r.returncode == 0and all_files: keep = sorted([f for f in all_files if keep_file(os.path.basename(f))]) removed = len(all_files) - len(keep) for f in all_files: if f notin keep: os.remove(f) logger.info(f"[FILTER] keep={len(keep)} del={removed}") withopen(outfile, 'wb') as out: for f in keep: withopen(f, 'rb') as inp: out.write(inp.read()) shutil.rmtree(tmp_dir, ignore_errors=True) elapsed = int(time.time() - t0) size_mb = os.path.getsize(outfile) / 1_048_576 logger.info(f"[OK] {os.path.basename(outfile)}{size_mb:.0f}MB {elapsed}s") returnTrue if attempt < MAX_RETRY: logger.warning(f"[RETRY] {batch}{attempt}/{MAX_RETRY}") time.sleep(15) except subprocess.TimeoutExpired: logger.warning(f"[TIMEOUT] {batch}") except Exception as e: logger.error(f"[ERR] {batch}: {e}") shutil.rmtree(tmp_dir, ignore_errors=True) logger.error(f"[FAIL] {batch}") returnFalseif __name__ == "__main__": batches = sys.argv[1:] iflen(sys.argv) > 1else [datetime.now().strftime("%Y%m%d") + h for h in ("00", "12")] logger.info(f"开始 {len(batches)} 批次") ok = fail = 0 for b insorted(batches): if download_one(b): ok += 1 else: fail += 1 logger.info(f"完成 OK={ok} FAIL={fail}")