简介
基于FFT计算时间序列功率谱,将其封装成函数直接调用,包括简单的绘图代码。通过与NCL现成的功率谱计算函数对比,发现结果还是很靠谱的。这里直接将代码分享给大家!
1. 完整代码
同目录存放该文件即可直接使用:
power_spectrum_functions.py:函数封装(计算 + 绘图)
"""Power spectrum analysis utilities for 1D time series.Main APIs:- compute_power_spectrum(series, ...)- plot_power_spectrum(result, ...)"""from __future__ import annotationsfrom pathlib import Pathfrom typing import Anyimport matplotlib.pyplot as pltimport numpy as npfrom scipy import signalfrom scipy.stats import chi2def _to_1d_array(series: Any) -> np.ndarray: arr = np.asarray(series, dtype=np.float64).squeeze()if arr.ndim != 1: raise ValueError(f"`series` must be 1D, got shape={arr.shape}")if arr.size < 16: raise ValueError("`series` is too short. Need at least 16 points.")return arrdef _fill_nan_linear(y: np.ndarray) -> np.ndarray:if not np.isnan(y).any():return y idx = np.arange(y.size) valid = ~np.isnan(y)if valid.sum() < 3: raise ValueError("Too many NaN values. Need at least 3 valid points.") out = y.copy() out[~valid] = np.interp(idx[~valid], idx[valid], y[valid])return outdef _lag1_autocorr(y: np.ndarray) -> float: y0 = y[:-1] - y[:-1].mean() y1 = y[1:] - y[1:].mean() denom = np.sqrt(np.sum(y0**2) * np.sum(y1**2))if denom == 0:return 0.0 r1 = float(np.sum(y0 * y1) / denom)returnfloat(np.clip(r1, -0.99, 0.99))def _significant_ranges(periods: np.ndarray, sig_mask: np.ndarray) -> list[tuple[float, float]]:if periods.size == 0:return [] idx = np.argsort(periods) p_sorted = periods[idx] s_sorted = sig_mask[idx] ranges: list[tuple[float, float]] = [] start = None end = Nonefor p, is_sig in zip(p_sorted, s_sorted):if is_sig:if start is None: start = float(p) end = float(p)elif start is not None: ranges.append((start, end)) start = None end = Noneif start is not None: ranges.append((start, end))return rangesdef compute_power_spectrum( series: Any, dt: float = 1.0, alpha: float = 0.95, nfft: int | None = None, detrend: bool = True, fill_nan: bool = True, min_period: float | None = None, max_period: float | None = None,) -> dict[str, Any]:""" Compute raw periodogram and AR(1)-based significance for a 1D time series. Parameters ---------- series : array-like 1D time series. dt : float Sampling interval in days. Daily data usually uses dt=1. alpha : float Significance level, e.g. 0.95. nfft : int or None FFT points. None means full length n. detrend : bool If True, remove linear trend. fill_nan : bool If True, linearly interpolate NaNs. min_period, max_period : float or None Optional period filter (days). Returns ------- dict Spectrum arrays and summary metrics. """if dt <= 0: raise ValueError("`dt` must be positive.")if not (0.0 < alpha < 1.0): raise ValueError("`alpha` must be between 0 and 1.") y = _to_1d_array(series)if fill_nan: y = _fill_nan_linear(y)elif np.isnan(y).any(): raise ValueError("Input contains NaN. Set `fill_nan=True` to interpolate.") n = y.sizeif detrend: y = signal.detrend(y, type="linear") y = y - y.mean() nfft_use = n if nfft is None else max(int(nfft), n) fs = 1.0 / dt freqs, psd = signal.periodogram( y, fs=fs, window="boxcar", nfft=nfft_use, detrend=False, scaling="density", return_onesided=True, ) min_freq = 1.0 / (n * dt) keep = freqs >= min_freq freqs = freqs[keep] psd = psd[keep] periods = 1.0 / freqsif min_period is not None: keep = periods >= float(min_period) freqs, periods, psd = freqs[keep], periods[keep], psd[keep]if max_period is not None: keep = periods <= float(max_period) freqs, periods, psd = freqs[keep], periods[keep], psd[keep]if freqs.size == 0: raise ValueError("No frequency bins remain after period filtering.") r1 = _lag1_autocorr(y) cyc_per_sample = freqs * dt red_noise = (1.0 - r1**2) / ( 1.0 + r1**2 - 2.0 * r1 * np.cos(2.0 * np.pi * cyc_per_sample) ) red_noise = np.maximum(red_noise, 1e-12) area_psd = np.trapezoid(psd, freqs) area_red = np.trapezoid(red_noise, freqs)if area_red > 0: red_noise = red_noise * (area_psd / area_red) dof = 2 signif = red_noise * (chi2.ppf(alpha, dof) / dof) is_significant = psd > signif ranges = _significant_ranges(periods, is_significant) overall = None if not ranges else (min(a for a, _ in ranges), max(b for _, b in ranges)) peak_idx = int(np.argmax(psd))if np.any(is_significant): sig_indices = np.where(is_significant)[0] sig_peak_idx = int(sig_indices[np.argmax(psd[sig_indices])]) significant_peak_period = float(periods[sig_peak_idx]) significant_peak_power = float(psd[sig_peak_idx])else: significant_peak_period = None significant_peak_power = Nonereturn {"freqs_cpd": freqs,"periods_days": periods,"psd": psd,"red_noise": red_noise,"signif": signif,"is_significant": is_significant,"significant_ranges_days": ranges,"overall_significant_range_days": overall,"lag1_autocorr": r1,"dominant_period_days": float(periods[peak_idx]),"dominant_power": float(psd[peak_idx]),"significant_peak_period_days": significant_peak_period,"significant_peak_power": significant_peak_power,"dof": dof,"n": n,"nfft": nfft_use,"dt": float(dt),"alpha": float(alpha), }def plot_power_spectrum( result: dict[str, Any], title: str = "Power Spectrum", ylim: tuple[float, float] | None = (0, 20000), xlim: tuple[float, float] | None = None, xscale: str = "log", annotate_peak: bool = True, ax: plt.Axes | None = None, save_path: str | Path | None = None, dpi: int = 150,) -> tuple[plt.Figure, plt.Axes]: periods = np.asarray(result["periods_days"], dtype=float) psd = np.asarray(result["psd"], dtype=float) red = np.asarray(result["red_noise"], dtype=float) signif = np.asarray(result["signif"], dtype=float) idx = np.argsort(periods) px = periods[idx] py = psd[idx] pr = red[idx] ps = signif[idx]if ax is None: fig, ax = plt.subplots(figsize=(10, 6), dpi=dpi)else: fig = ax.figure ax.plot(px, py, lw=1.8, color = 'k',label="Spectrum") ax.plot(px, pr, "--", lw=1.3, label=f"Red noise") ax.plot(px, ps, ":", lw=1.8, label=f"{int(result['alpha'] * 100)}% significance") ax.set_xscale(xscale) ax.set_xlabel("Period (days)") ax.set_ylabel("Power spectral density") ax.set_title(title)if xlim is not None: ax.set_xlim(*xlim)if ylim is not None: ax.set_ylim(*ylim) ax.grid(True, alpha=0.3, linestyle="--") ax.legend() fig.tight_layout()if save_path is not None: fig.savefig(save_path, dpi=dpi)return fig, ax
2. 最简调用
import numpy as npfrom power_spectrum_functions import compute_power_spectrum, plot_power_spectrum# x 可以是任意一维序列x = np.random.randn(180)result = compute_power_spectrum(x, dt=1.0, alpha=0.95)fig, ax = plot_power_spectrum( result, title="Power Spectrum", ylim=(0, 20000),)print("主峰周期:", result["dominant_period_days"])print("95%过检周期范围:", result["significant_ranges_days"])
3. 这个函数做了什么
compute_power_spectrum() 的核心流程:
4. 使用建议
- 日数据建议
dt=1.0,6小时数据可设 dt=0.25
5. 测试结果