时间序列预测是数据分析最有价值的方向之一,广泛应用于销量预测、股价分析、异常检测等场景。本文从时序数据的基础分析到 ARIMA、Prophet、LSTM 等主流预测方法,带你系统掌握时间序列全链路。
一、时间序列基础
1.1 核心概念
时间序列由四个成分叠加而成:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from statsmodels.tsa.seasonal import seasonal_decompose, STL
# 生成示例时序数据(模拟月销售额)
np.random.seed(42)
n = 120# 10年月数据
dates = pd.date_range("2014-01", periods=n, freq="ME")
trend = np.linspace(1000, 5000, n)
seasonal = 500 * np.sin(2 * np.pi * np.arange(n) / 12)
noise = np.random.normal(0, 150, n)
sales = trend + seasonal + noise
ts = pd.Series(sales, index=dates, name="sales")
# 经典分解
result = seasonal_decompose(ts, model="additive", period=12)
result.plot()
plt.suptitle("时间序列加法分解", fontsize=14)
plt.tight_layout()
plt.show()
# STL 分解(更鲁棒,推荐使用)
stl = STL(ts, period=12, robust=True)
res = stl.fit()
res.plot()
plt.show()
1.2 平稳性检验(建模前必做)
from statsmodels.tsa.stattools import adfuller, kpss
deftest_stationarity(series, name=""):
"""ADF 检验 + KPSS 检验双重验证"""
# ADF 检验:H0 = 有单位根(非平稳)
adf_result = adfuller(series.dropna(), autolag="AIC")
adf_stat, adf_p = adf_result[0], adf_result[1]
# KPSS 检验:H0 = 平稳
kpss_result = kpss(series.dropna(), regression="c")
kpss_stat, kpss_p = kpss_result[0], kpss_result[1]
print(f"{'='*50}")
print(f"序列: {name}")
print(f"ADF 检验:统计量={adf_stat:.4f},p值={adf_p:.4f}")
print(f"KPSS 检验:统计量={kpss_stat:.4f},p值={kpss_p:.4f}")
if adf_p < 0.05and kpss_p > 0.05:
print("✅ 结论:序列平稳")
elif adf_p >= 0.05and kpss_p <= 0.05:
print("❌ 结论:序列非平稳,需要差分")
else:
print("⚠️ 结论:两个检验结论矛盾,需进一步分析")
test_stationarity(ts, "原始序列")
# 差分处理使序列平稳
ts_diff = ts.diff().dropna()
test_stationarity(ts_diff, "一阶差分")
1.3 自相关与偏自相关分析
from statsmodels.graphics.tsaplots import plot_acf, plot_pacf
fig, axes = plt.subplots(2, 2, figsize=(14, 8))
# 原始序列
plot_acf(ts, lags=40, ax=axes[0, 0], title="原始序列 ACF")
plot_pacf(ts, lags=40, ax=axes[0, 1], title="原始序列 PACF")
# 差分后序列
plot_acf(ts_diff, lags=40, ax=axes[1, 0], title="一阶差分 ACF")
plot_pacf(ts_diff, lags=40, ax=axes[1, 1], title="一阶差分 PACF")
plt.tight_layout()
plt.show()
# 读图指南:
# PACF 在 lag=p 后截断 → AR(p) 模型
# ACF 在 lag=q 后截断 → MA(q) 模型
# 两者都拖尾 → ARMA(p,q) 模型
二、经典统计模型
2.1 ARIMA 模型(自动参数选择)
from statsmodels.tsa.arima.model import ARIMA
from statsmodels.tsa.statespace.sarimax import SARIMAX
import warnings
warnings.filterwarnings("ignore")
# 训练集/测试集拆分(时序不能随机拆分!)
train = ts[:-12] # 保留最后12个月作为测试集
test = ts[-12:]
# 手动指定 ARIMA(p,d,q)
model = ARIMA(train, order=(2, 1, 2))
result = model.fit()
print(result.summary())
# 预测
forecast = result.get_forecast(steps=12)
pred_mean = forecast.predicted_mean
conf_int = forecast.conf_int(alpha=0.05)
# 可视化
plt.figure(figsize=(12, 5))
plt.plot(train, label="训练集")
plt.plot(test, label="测试集", color="green")
plt.plot(pred_mean, label="预测值", color="red", linestyle="--")
plt.fill_between(conf_int.index,
conf_int.iloc[:, 0], conf_int.iloc[:, 1],
alpha=0.2, color="red", label="95% 置信区间")
plt.title("ARIMA 预测结果")
plt.legend()
plt.show()
2.2 SARIMA:含季节项的 ARIMA
# 有明显季节性时,必须用 SARIMA
# SARIMA(p,d,q)(P,D,Q)[s]
# s=12 表示年度季节性(月数据)
model_sarima = SARIMAX(
train,
order=(1, 1, 1), # 非季节项
seasonal_order=(1, 1, 1, 12), # 季节项,s=12
enforce_stationarity=False,
enforce_invertibility=False
)
result_sarima = model_sarima.fit(disp=False)
print(result_sarima.summary())
# 自动选参(暴力网格搜索)
from itertools import product
defauto_sarima(ts, max_p=3, max_q=3):
best_aic = np.inf
best_params = None
for p, d, q, P, Q in product(range(max_p+1), [0,1], range(max_q+1),
range(2), range(2)):
try:
model = SARIMAX(ts, order=(p,d,q), seasonal_order=(P,1,Q,12))
res = model.fit(disp=False)
if res.aic < best_aic:
best_aic = res.aic
best_params = (p,d,q,P,Q)
except:
continue
print(f"最优参数: SARIMA{best_params[:3]}{best_params[3:]}[12], AIC={best_aic:.2f}")
return best_params
2.3 指数平滑(Holt-Winters)
from statsmodels.tsa.holtwinters import ExponentialSmoothing
# Holt-Winters 三重指数平滑(趋势 + 季节性)
hw_model = ExponentialSmoothing(
train,
trend="add", # 加性趋势
seasonal="add", # 加性季节性
seasonal_periods=12,
damped_trend=True# 阻尼趋势,避免长期过度外推
)
hw_result = hw_model.fit(optimized=True) # 自动优化参数
hw_forecast = hw_result.forecast(12)
print(f"Holt-Winters MAE: {np.mean(np.abs(hw_forecast.values - test.values)):.2f}")
三、Prophet:Facebook 开源的工业级预测工具
Prophet 特别适合:有明显趋势+季节性、存在节假日效应、允许缺失值的业务场景。
3.1 Prophet 基础用法
# pip install prophet
from prophet import Prophet
from prophet.diagnostics import cross_validation, performance_metrics
from prophet.plot import plot_cross_validation_metric
# Prophet 要求 DataFrame 格式:ds(时间), y(目标值)
df_prophet = pd.DataFrame({"ds": ts.index, "y": ts.values})
# 配置模型
m = Prophet(
yearly_seasonality=True, # 年度季节性
weekly_seasonality=False, # 月数据无需周季节性
daily_seasonality=False,
seasonality_mode="additive", # 加性(乘性用 "multiplicative")
changepoint_prior_scale=0.05, # 趋势变化点灵敏度(越大越灵活)
seasonality_prior_scale=10.0, # 季节性强度
interval_width=0.95# 不确定性区间宽度
)
# 添加节假日效应
holidays = pd.DataFrame({
"holiday": "double_eleven",
"ds": pd.to_datetime(["2014-11-11","2015-11-11","2016-11-11",
"2017-11-11","2018-11-11","2019-11-11"]),
"lower_window": -2, # 节前2天
"upper_window": 1, # 节后1天
})
m = Prophet(holidays=holidays)
m.fit(df_prophet[df_prophet["ds"] < "2023-01-01"])
# 预测
future = m.make_future_dataframe(periods=12, freq="ME")
forecast = m.predict(future)
# 可视化
fig1 = m.plot(forecast)
fig2 = m.plot_components(forecast)
plt.show()
3.2 Prophet 交叉验证评估
# 时序交叉验证
df_cv = cross_validation(
m,
initial="730 days", # 初始训练集大小
period="180 days", # 每隔多久做一次预测
horizon="365 days", # 预测多远
parallel="processes"
)
# 计算各种误差指标
df_perf = performance_metrics(df_cv)
print(df_perf[["horizon", "mae", "rmse", "mape"]].tail(10))
# 可视化 MAPE 随预测步长的变化
plot_cross_validation_metric(df_cv, metric="mape")
plt.show()
四、深度学习方法
4.1 LSTM 时序预测
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
# 数据预处理:创建滑动窗口样本
defcreate_sequences(data, lookback=24, horizon=12):
"""
lookback: 用过去多少步预测未来
horizon: 预测未来多少步
"""
X, y = [], []
for i in range(len(data) - lookback - horizon + 1):
X.append(data[i : i + lookback])
y.append(data[i + lookback : i + lookback + horizon])
return np.array(X), np.array(y)
# 标准化
from sklearn.preprocessing import MinMaxScaler
scaler = MinMaxScaler()
ts_scaled = scaler.fit_transform(ts.values.reshape(-1, 1)).flatten()
X, y = create_sequences(ts_scaled, lookback=24, horizon=12)
# 划分训练/测试集
split = int(len(X) * 0.8)
X_train, X_test = X[:split], X[split:]
y_train, y_test = y[:split], y[split:]
X_train_t = torch.FloatTensor(X_train).unsqueeze(-1) # (batch, seq, features)
y_train_t = torch.FloatTensor(y_train)
X_test_t = torch.FloatTensor(X_test).unsqueeze(-1)
y_test_t = torch.FloatTensor(y_test)
# LSTM 模型
classLSTMForecaster(nn.Module):
def__init__(self, input_size=1, hidden_size=64, num_layers=2,
dropout=0.2, output_size=12):
super().__init__()
self.lstm = nn.LSTM(
input_size=input_size,
hidden_size=hidden_size,
num_layers=num_layers,
dropout=dropout,
batch_first=True
)
self.fc = nn.Sequential(
nn.Linear(hidden_size, 32),
nn.ReLU(),
nn.Linear(32, output_size)
)
defforward(self, x):
out, (h_n, c_n) = self.lstm(x)
return self.fc(out[:, -1, :]) # 取最后一步的隐状态
model = LSTMForecaster()
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, patience=10, factor=0.5)
# 训练
train_losses, val_losses = [], []
for epoch in range(200):
model.train()
optimizer.zero_grad()
pred = model(X_train_t)
loss = criterion(pred, y_train_t)
loss.backward()
torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0) # 梯度裁剪
optimizer.step()
model.eval()
with torch.no_grad():
val_pred = model(X_test_t)
val_loss = criterion(val_pred, y_test_t)
scheduler.step(val_loss)
train_losses.append(loss.item())
val_losses.append(val_loss.item())
if (epoch + 1) % 50 == 0:
print(f"Epoch {epoch+1:3d} | Train Loss: {loss:.6f} | Val Loss: {val_loss:.6f}")
# 反归一化预测结果
model.eval()
with torch.no_grad():
test_pred_scaled = model(X_test_t).numpy()
test_pred = scaler.inverse_transform(test_pred_scaled)
y_true = scaler.inverse_transform(y_test)
4.2 N-BEATS / Transformer 方案(现代方法)
# Nixtla 的 neuralforecast 库包含最新时序深度学习模型
# pip install neuralforecast
from neuralforecast import NeuralForecast
from neuralforecast.models import NBEATS, NHITS, PatchTST
from neuralforecast.losses.pytorch import MAE
df_nf = pd.DataFrame({
"unique_id": "series_1",
"ds": ts.index,
"y": ts.values
})
# N-BEATS(可解释的深度学习时序模型)
models = [
NBEATS(
h=12, # 预测步长
input_size=36, # 回看窗口
max_steps=500,
loss=MAE(),
scaler_type="standard"
),
# NHITS:多尺度插值,适合长序列
NHITS(h=12, input_size=36, max_steps=500, loss=MAE()),
]
nf = NeuralForecast(models=models, freq="ME")
nf.fit(df_nf)
forecast = nf.predict()
print(forecast.tail())
五、模型评估与选择
5.1 评估指标
from sklearn.metrics import mean_absolute_error, mean_squared_error
defevaluate_forecast(y_true, y_pred, model_name=""):
mae = mean_absolute_error(y_true, y_pred)
rmse = np.sqrt(mean_squared_error(y_true, y_pred))
mape = np.mean(np.abs((y_true - y_pred) / (y_true + 1e-8))) * 100
smape = np.mean(2 * np.abs(y_true - y_pred) /
(np.abs(y_true) + np.abs(y_pred) + 1e-8)) * 100
print(f"\n{'='*40}")
print(f"模型:{model_name}")
print(f"MAE : {mae:.4f} (绝对误差均值)")
print(f"RMSE : {rmse:.4f} (对大误差更敏感)")
print(f"MAPE : {mape:.2f}% (相对误差,受极小值影响)")
print(f"SMAPE: {smape:.2f}% (对称相对误差,更稳健)")
return {"mae": mae, "rmse": rmse, "mape": mape, "smape": smape}
5.2 方法对比
| | | |
|---|
| ARIMA | | | |
| SARIMA | | | |
| Holt-Winters | | | |
| Prophet | | | |
| LightGBM | | | |
| LSTM | | | |
| N-BEATS | | | |
六、实战建议
# 时序预测的通用 Baseline 策略(先做 Baseline,再尝试复杂模型)
classTimeSeriesBaseline:
"""三种简单 Baseline,复杂模型必须打败它们"""
defnaive_forecast(self, train, h):
"""朴素预测:用最后一个值预测未来所有步"""
return np.full(h, train.iloc[-1])
defseasonal_naive(self, train, h, period=12):
"""季节性朴素预测:用去年同期值预测"""
last_season = train.values[-period:]
return np.tile(last_season, h // period + 1)[:h]
defmoving_average(self, train, h, window=12):
"""移动平均预测"""
return np.full(h, train.iloc[-window:].mean())
baseline = TimeSeriesBaseline()
naive_pred = baseline.naive_forecast(train, h=12)
season_pred = baseline.seasonal_naive(train, h=12)
ma_pred = baseline.moving_average(train, h=12)
evaluate_forecast(test.values, naive_pred, "朴素预测")
evaluate_forecast(test.values, season_pred, "季节性朴素")
evaluate_forecast(test.values, ma_pred, "移动平均")
时序预测没有万能模型,实际项目中建议:先 EDA → 做分解 → 建 Baseline → 试统计模型 → 试 ML/DL 模型 → 集成融合。最终指标必须超过 Baseline,否则简单模型就够用了。
欢迎点赞收藏,如有问题欢迎在评论区交流!