
2026年重磅升级已全面落地!欢迎加入专注财经数据与量化投研的【数据科学实战】知识星球!您将获取持续更新的《财经数据宝典》与《量化投研宝典》,双典协同提供系统化指引;星球内含300篇以上独有高质量文章,深度覆盖策略开发、因子分析、风险管理等核心领域,内容基本每日更新;同步推出的「量化因子专题教程」系列(含完整可运行代码与实战案例),系统详解因子构建、回测与优化全流程,并实现日更迭代。我们持续扩充独家内容资源,全方位赋能您的投研效率与专业成长。无论您是量化新手还是资深研究者,这里都是助您少走弯路、事半功倍的理想伙伴,携手共探数据驱动的投资未来!
如果你还在用传统的几何布朗运动(GBM)模型来分析商品期货的风险,那你可能正在严重低估市场的真实危险。
原油、天然气这类大宗商品,从来不会"温柔地"波动——它们充满了供应冲击、地缘政治事件和突发的流动性危机。这些剧烈的价格跳跃,是标准正态分布根本无法捕捉的"肥尾"风险。
本文将带你实现一个完整的 Merton 跳跃扩散(MJD)模型 校准引擎,包括:
最终结论令人震惊:标准 GBM 模型低估了约 24% 的尾部风险!
几何布朗运动假设价格变化是连续、平滑的,遵循正态分布。但现实中的商品市场:
错误一:忽视肥尾风险,导致预期损失(CVaR)被严重低估。
错误二:直接使用拼接的期货数据,把合约换月产生的价差当成真实的市场跳跃。
当你从 yfinance 获取 CL=F(原油期货)数据时,看到的是多个月份合约拼接而成的连续序列。合约换月时的价差是确定性的,而非随机跳跃。
我们的策略是:在每月最后 3 个交易日内,将超过 5 倍标准差的收益率标记为换月噪声,并将其置零。
import numpy as np
import pandas as pd
import yfinance as yf
class Config:
"""全局配置参数"""
PRIMARY_TICKER = 'CL=F' # 原油期货
START_DATE = "2020-01-01"
END_DATE = "2025-12-18"
ROLL_WINDOW_DAYS = 3 # 换月窗口天数
OUTLIER_SIGMA = 5.0 # 异常值阈值
DT = 1 / 252.0 # 日时间步长
def fetch_and_clean_data(ticker: str) -> pd.DataFrame:
"""
获取期货数据并清洗换月噪声
参数:
ticker: 期货代码
返回:
清洗后的 DataFrame
"""
# 下载原始数据
df = yf.download(
ticker,
start=Config.START_DATE,
end=Config.END_DATE,
auto_adjust=False,
progress=False
)
# 计算对数收益率
df['Log_Returns'] = np.log(df['Close'] / df['Close'].shift(1))
std_returns = df['Log_Returns'].std()
# 标记换月窗口:每月最后 3 个交易日
df['Is_Roll_Window'] = False
month_ends = df.index.to_series().groupby(pd.Grouper(freq='ME')).max()
for me in month_ends:
# 获取该月末前 3 天的索引
window_indices = df.index[df.index <= me][-Config.ROLL_WINDOW_DAYS:]
df.loc[window_indices, 'Is_Roll_Window'] = True
# 在换月窗口内,屏蔽超过 5 倍标准差的收益率
mask_condition = (
(df['Is_Roll_Window']) &
(np.abs(df['Log_Returns']) > Config.OUTLIER_SIGMA * std_returns)
)
outlier_count = mask_condition.sum()
print(f"检测到 {outlier_count} 个换月噪声点,已屏蔽")
# 将噪声收益率置零(而非删除,保持时间连续性)
df.loc[mask_condition, 'Log_Returns'] = 0.0
return df.dropna(subset=['Log_Returns'])
# 执行数据清洗
clean_df = fetch_and_clean_data(Config.PRIMARY_TICKER)
print(f"清洗后数据点数: {len(clean_df)}")运行结果:
检测到 2 个换月噪声点,已屏蔽
清洗后数据点数: 1500MJD 模型将价格过程分解为两部分:
关键参数包括:
为确保模型的期望收益与漂移参数一致,必须引入补偿项
:MJD 的概率密度函数是一个混合分布:多个正态分布按泊松权重加权求和。我们使用 logsumexp 技巧保证数值稳定性。
import math
from scipy.stats import norm
from scipy.special import logsumexp
from statsmodels.base.model import GenericLikelihoodModel
class MertonMLE(GenericLikelihoodModel):
"""
Merton 跳跃扩散模型的最大似然估计器
参数向量: [mu, sigma, lambda, mu_j, sigma_j]
"""
def __init__(self, endog, dt=1/252, max_jumps=10):
super().__init__(endog)
self.dt = dt # 时间步长
self.max_jumps = max_jumps # 泊松求和截断项数
def nloglikeobs(self, params):
"""计算每个观测点的负对数似然"""
mu, sigma, lamb, mu_j, sigma_j = params
y = self.endog # 对数收益率序列
dt = self.dt
# 参数有效性检查
if sigma <= 1e-6 or lamb < 0 or sigma_j <= 1e-6:
return np.ones_like(y) * 1e10
log_probs = []
# 对跳跃次数 k 从 0 到 max_jumps 求和
for k in range(self.max_jumps):
# 泊松权重(对数形式)
log_weight = (
-lamb * dt +
k * np.log(np.maximum(lamb * dt, 1e-12)) -
np.log(float(math.factorial(k)))
)
# 给定 k 次跳跃时的正态分布参数
mean_k = (mu - 0.5 * sigma**2) * dt + k * mu_j
var_k = sigma**2 * dt + k * sigma_j**2
# 正态分布的对数概率密度
lp = log_weight + norm.logpdf(y, loc=mean_k, scale=np.sqrt(var_k))
log_probs.append(lp)
# 使用 logsumexp 合并(保证数值稳定)
total_log_ll = logsumexp(np.vstack(log_probs), axis=0)
return -total_log_ll # 返回负对数似然
def calibrate_mjd(returns: np.ndarray) -> np.ndarray:
"""
使用 MLE 校准 Merton 跳跃扩散模型
参数:
returns: 对数收益率数组
返回:
校准后的参数数组 [mu, sigma, lambda, mu_j, sigma_j]
"""
dt = Config.DT
# 初始参数猜测
start_params = [
np.mean(returns) / dt, # mu: 年化漂移
np.std(returns) / np.sqrt(dt), # sigma: 年化波动率
5.0, # lambda: 初始跳跃强度
0.0, # mu_j: 跳跃均值
np.std(returns) # sigma_j: 跳跃波动率
]
# 创建模型并拟合
model = MertonMLE(returns, dt=dt)
result = model.fit(
start_params=start_params,
method='nm', # Nelder-Mead 算法,对非凸优化更稳健
maxiter=1000,
disp=0
)
return result.params
# 校准模型
returns = clean_df['Log_Returns'].values
mle_params = calibrate_mjd(returns)
# 打印校准结果
param_names = ['漂移率 (μ)', '扩散波动率 (σ)', '跳跃强度 (λ)',
'跳跃均值 (μ_j)', '跳跃波动率 (σ_j)']
print("\n========== 校准结果 ==========")
for name, value in zip(param_names, mle_params):
print(f"{name}: {value:.4f}")校准结果示例:
========== 校准结果 ==========
漂移率 (μ): 0.3275
扩散波动率 (σ): 0.3212
跳跃强度 (λ): 17.5882
跳跃均值 (μ_j): -0.0148
跳跃波动率 (σ_j): 0.0859解读:模型识别出原油期货平均每年发生约 17.6 次跳跃,且跳跃均值为负,说明下行冲击略多于上行。
为了高效生成数千条价格路径,我们采用 NumPy 的向量化操作,避免低效的循环。
def simulate_mjd_paths(
S0: float,
params: np.ndarray,
steps: int = 252,
paths: int = 1000
) -> np.ndarray:
"""
向量化生成 MJD 价格路径
参数:
S0: 初始价格
params: 模型参数 [mu, sigma, lambda, mu_j, sigma_j]
steps: 模拟步数(天数)
paths: 路径数量
返回:
价格矩阵,形状为 (steps+1, paths)
"""
mu, sigma, lamb, mu_j, sigma_j = params
dt = Config.DT
# 计算补偿项 k
k = np.exp(mu_j + 0.5 * sigma_j**2) - 1
# 补偿后的漂移
drift = (mu - 0.5 * sigma**2 - lamb * k) * dt
# 生成扩散部分(标准正态)
Z = np.random.standard_normal((steps, paths))
diffusion = sigma * np.sqrt(dt) * Z
# 生成跳跃部分
# 泊松分布决定每个时间步的跳跃次数
N = np.random.poisson(lamb * dt, (steps, paths))
# 跳跃幅度 = 次数 × 均值 + 根号(次数) × 标准差 × 正态随机数
J = N * mu_j + np.sqrt(N) * sigma_j * np.random.standard_normal((steps, paths))
# 合并对数增量
log_increments = drift + diffusion + J
# 累积求和并转换为价格
path_matrix = S0 * np.exp(np.cumsum(log_increments, axis=0))
# 在首行添加初始价格
path_matrix = np.vstack([np.ones(paths) * S0, path_matrix])
return path_matrix
# 获取最新价格
current_price = clean_df['Close'].iloc[-1]
# 模拟 MJD 路径
mjd_paths = simulate_mjd_paths(current_price, mle_params, steps=252, paths=1000)
print(f"生成 {mjd_paths.shape[1]} 条路径,每条 {mjd_paths.shape[0]} 个时间点")def calculate_risk_metrics(paths: np.ndarray, confidence: float = 0.99):
"""
计算 VaR 和 CVaR(预期损失)
参数:
paths: 价格路径矩阵
confidence: 置信水平
返回:
(VaR, CVaR) 元组
"""
# 计算终端收益率
final_returns = (paths[-1] / paths[0]) - 1
# VaR: 给定置信水平下的最大损失分位点
alpha = 1 - confidence
var = np.percentile(final_returns, alpha * 100)
# CVaR: 超过 VaR 损失的平均值
cvar = final_returns[final_returns <= var].mean()
return var, cvar
# 计算 MJD 模型的风险
mjd_var, mjd_cvar = calculate_risk_metrics(mjd_paths)
# 生成 GBM 基准路径(跳跃参数设为 0)
gbm_params = np.array([mle_params[0], mle_params[1], 0.0, 0.0, 0.0])
gbm_paths = simulate_mjd_paths(current_price, gbm_params, steps=252, paths=1000)
gbm_var, gbm_cvar = calculate_risk_metrics(gbm_paths)
# 结果对比
print("\n========== 风险度量对比 (99% 置信水平) ==========")
print(f"{'模型':<20} {'VaR':<15} {'CVaR (预期损失)':<15}")
print("-" * 50)
print(f"{'MJD (跳跃扩散)':<20} {mjd_var:.2%} {mjd_cvar:.2%}")
print(f"{'GBM (标准模型)':<20} {gbm_var:.2%} {gbm_cvar:.2%}")
print("-" * 50)
print(f"风险低估差距: {abs(mjd_cvar - gbm_cvar):.2%}")输出结果:
========== 风险度量对比 (99% 置信水平) ==========
模型 VaR CVaR (预期损失)
--------------------------------------------------
MJD (跳跃扩散) -56.23% -66.75%
GBM (标准模型) -35.12% -42.41%
--------------------------------------------------
风险低估差距: 24.34%| 24.34% |
这意味着:如果你用 GBM 模型设置风险限额,一次真实的跳跃事件可能导致的损失会超出你预期的 24 个百分点!
"""
Merton 跳跃扩散模型完整实现
用于商品期货的风险分析
"""
import numpy as np
import pandas as pd
import yfinance as yf
import math
from scipy.stats import norm
from scipy.special import logsumexp
from statsmodels.base.model import GenericLikelihoodModel
# ==================== 配置 ====================
class Config:
PRIMARY_TICKER = 'CL=F'
START_DATE = "2020-01-01"
END_DATE = "2025-12-18"
ROLL_WINDOW_DAYS = 3
OUTLIER_SIGMA = 5.0
DT = 1 / 252.0
SIM_PATHS = 1000
SIM_STEPS = 252
CONFIDENCE = 0.99
# ==================== 数据处理 ====================
def fetch_and_clean_data(ticker: str) -> pd.DataFrame:
"""获取并清洗期货数据"""
df = yf.download(ticker, start=Config.START_DATE,
end=Config.END_DATE, auto_adjust=False, progress=False)
df['Log_Returns'] = np.log(df['Close'] / df['Close'].shift(1))
std_returns = df['Log_Returns'].std()
# 标记换月窗口
df['Is_Roll_Window'] = False
month_ends = df.index.to_series().groupby(pd.Grouper(freq='ME')).max()
for me in month_ends:
window_indices = df.index[df.index <= me][-Config.ROLL_WINDOW_DAYS:]
df.loc[window_indices, 'Is_Roll_Window'] = True
# 屏蔽换月噪声
mask = (df['Is_Roll_Window']) & (np.abs(df['Log_Returns']) > Config.OUTLIER_SIGMA * std_returns)
df.loc[mask, 'Log_Returns'] = 0.0
return df.dropna(subset=['Log_Returns'])
# ==================== MLE 校准 ====================
class MertonMLE(GenericLikelihoodModel):
def __init__(self, endog, dt=1/252, max_jumps=10):
super().__init__(endog)
self.dt = dt
self.max_jumps = max_jumps
def nloglikeobs(self, params):
mu, sigma, lamb, mu_j, sigma_j = params
y = self.endog
dt = self.dt
if sigma <= 1e-6 or lamb < 0 or sigma_j <= 1e-6:
return np.ones_like(y) * 1e10
log_probs = []
for k in range(self.max_jumps):
log_weight = -lamb * dt + k * np.log(np.maximum(lamb * dt, 1e-12)) - np.log(float(math.factorial(k)))
mean_k = (mu - 0.5 * sigma**2) * dt + k * mu_j
var_k = sigma**2 * dt + k * sigma_j**2
lp = log_weight + norm.logpdf(y, loc=mean_k, scale=np.sqrt(var_k))
log_probs.append(lp)
return -logsumexp(np.vstack(log_probs), axis=0)
def calibrate_mjd(returns):
start_params = [np.mean(returns)/Config.DT, np.std(returns)/np.sqrt(Config.DT), 5.0, 0.0, np.std(returns)]
model = MertonMLE(returns, dt=Config.DT)
result = model.fit(start_params=start_params, method='nm', maxiter=1000, disp=0)
return result.params
# ==================== 模拟与风险计算 ====================
def simulate_paths(S0, params, steps, paths):
mu, sigma, lamb, mu_j, sigma_j = params
dt = Config.DT
k = np.exp(mu_j + 0.5 * sigma_j**2) - 1
drift = (mu - 0.5 * sigma**2 - lamb * k) * dt
Z = np.random.standard_normal((steps, paths))
diffusion = sigma * np.sqrt(dt) * Z
N = np.random.poisson(lamb * dt, (steps, paths))
J = N * mu_j + np.sqrt(N) * sigma_j * np.random.standard_normal((steps, paths))
log_increments = drift + diffusion + J
path_matrix = S0 * np.exp(np.cumsum(log_increments, axis=0))
return np.vstack([np.ones(paths) * S0, path_matrix])
def calculate_cvar(paths):
final_returns = (paths[-1] / paths[0]) - 1
var = np.percentile(final_returns, (1 - Config.CONFIDENCE) * 100)
return final_returns[final_returns <= var].mean()
# ==================== 主程序 ====================
if __name__ == "__main__":
# 1. 数据准备
df = fetch_and_clean_data(Config.PRIMARY_TICKER)
returns = df['Log_Returns'].values
S0 = df['Close'].iloc[-1]
# 2. 模型校准
params = calibrate_mjd(returns)
print(f"跳跃强度: {params[2]:.2f} 次/年")
# 3. 风险对比
mjd_cvar = calculate_cvar(simulate_paths(S0, params, Config.SIM_STEPS, Config.SIM_PATHS))
gbm_cvar = calculate_cvar(simulate_paths(S0, np.array([params[0], params[1], 0, 0, 0]), Config.SIM_STEPS, Config.SIM_PATHS))
print(f"\nMJD CVaR: {mjd_cvar:.2%}")
print(f"GBM CVaR: {gbm_cvar:.2%}")
print(f"风险低估: {abs(mjd_cvar - gbm_cvar):.2%}")本文实现了一个完整的 Merton 跳跃扩散模型校准与风险分析流程,核心要点包括:
数据清洗层面:使用 5-Sigma 窗口检测器识别期货换月造成的伪跳跃,避免污染模型校准。
模型校准层面:通过最大似然估计(MLE)拟合五个参数,其中跳跃强度 λ 是区分 MJD 与 GBM 的关键指标。
风险度量层面:向量化蒙特卡洛模拟高效生成价格路径,CVaR(预期损失)比 VaR 更能反映尾部风险的严重程度。
核心发现:在原油期货数据上,标准 GBM 模型低估了约 24% 的尾部风险——这足以让一个按常规模型设置风险限额的交易者在极端行情中爆仓。
对于生产环境,建议采用滚动窗口校准以捕捉市场状态变化,并考虑使用交易所公布的换月价差进行更精确的数据调整。
2026年全面升级已落地!【数据科学实战】知识星球核心权益如下:
星球已沉淀丰富内容生态——涵盖量化文章专题教程库、因子日更系列、高频数据集、PyBroker实战课程、专家深度分享与实时答疑服务。无论您是初探量化的学习者,还是深耕领域的从业者,这里都是助您少走弯路、高效成长的理想平台。诚邀加入,共探数据驱动的投资未来!
好文推荐
1. 用 Python 打造股票预测系统:Transformer 模型教程(一)
2. 用 Python 打造股票预测系统:Transformer 模型教程(二)
3. 用 Python 打造股票预测系统:Transformer 模型教程(三)
4. 用 Python 打造股票预测系统:Transformer 模型教程(完结)
6. YOLO 也能预测股市涨跌?计算机视觉在股票市场预测中的应用
9. Python 量化投资利器:Ridge、Lasso 和 Elastic Net 回归详解
好书推荐