三星大罢工引爆半导体!Python量化风控全攻略
2026年5月20日 | 芯片供应链黑天鹅下的Python量化实战
前言:一只蝴蝶扇动翅膀
2026年5月20日,A股市场出现明显分化。上证指数收报4150.98点,小幅下跌0.45%;但与此同时,创业板指表现强劲,大涨约2.08%,科创50指数更是出现显著上涨。盘面上,半导体板块个股表现活跃,多只芯片股涨幅超过5%,寒武纪更是强势涨停。市场的躁动,源于一个重磅消息——三星电子工会宣布从5月21日起举行全面罢工,这将是三星历史上最大规模的工潮。
三星电子是全球最重要的存储芯片制造商,其DRAM和NAND Flash产能直接影响全球半导体供应链。在AI数据中心建设持续升温的当下,芯片供应本已趋紧,如今三星工潮无异于雪上加霜。这件事对于量化交易者意味着什么?意味着我们需要一套完整的"黑天鹅事件"应对系统,从消息面监控、情绪量化、波动率预测、事件驱动策略到技术面识别,缺一不可。
本文将从Python编程的角度,带你完整构建一套针对此类供应链黑天鹅事件的量化风控系统。无论你是程序员想入门量化,还是量化从业者想加强风控,这套实战体系都值得你深入研究。
01 用Python实时监控三星罢工消息面
量化交易的第一步,是获取数据。对于供应链黑天鹅事件,我们需要从多个渠道实时抓取相关信息,并进行结构化处理。常用的数据源包括财经新闻API、社交媒体舆情、公司公告等。
交易终端多屏实时行情
;white-space:pre-wrap;word-break:break-all;text-indent:0;line-height:1.7;border-radius:0 4px 4px 0;overflow-x:auto"># -*- coding: utf-8 -*-
"""
三星罢工事件监控系统
实时抓取财经新闻并计算市场情绪评分
"""
import urllib.request
import json
import ssl
import time
from datetime import datetime
from collections import defaultdict
# 创建SSL上下文,跳过证书验证(仅用于测试)
ctx = ssl.create_default_context()
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
class NewsMonitor:
"""新闻监控系统"""
def __init__(self):
self.headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
'Referer': 'https://fi
深夜代码编辑器与K线图
nance.eastmoney.com'
}
# 关键词权重配置
self.keywords = {
'三星': 1.0,
'罢工': 2.5,
'芯片': 1.5,
'半导体': 1.5,
'供应链': 1.2,
'存储': 1.0,
'涨价': 1.8,
'断供': 2.0,
'工潮': 1.5,
'工会': 1.0
}
self.history = [] # 存储历史情绪数据
def fetch_eastmoney_news(self, keyword='三星罢工', page=1):
"""获取东方财富网新闻"""
# 东方财富新闻搜索API
url = (
f'https://search-api-web.eastmoney.com/search/jsonp'
f'?param={json.dumps({
"uid": "",
"keyword": keyword,
"type": [5],
"client": "web",
"clientType": "web",
"clientVersion": "curr",
"param": {
"searchScope": "default",
"sort": {"scope": "desc"},
"analysis": True,
"recency": ""
}
}, ensure_ascii=False)}'
)
req = urllib.request.Request(url, headers=self.headers)
try:
with urllib.request.urlopen(req, timeout=10, context=ctx) as response:
raw = response.read().decode('utf-8')
# 解析JSONP格式
if raw.startswith('['):
return json.loads(raw)
# 去除JSONP包装
start = raw.find('[')
end = raw.rfind(']') + 1
if start >= 0 and end > start:
return json.loads(raw[start:end])
except Exception as e:
print(f'获取新闻失败: {e}')
return []
def fetch_sina_news(self, symbol='sh688981'):
"""获取新浪财经新闻"""
url = f'https://feed.mix.sina.com.cn/api/roll/get?pageid=153&lid=2516&k={urllib.parse.quote("三星罢工")}&num=10'
# ... 简化实现
pass
def calculate_sentiment(self, news_list):
"""计算市场情绪评分(0-100)"""
if not news_list:
return 50 # 默认中性
total_score = 0
count = 0
for news in news_list:
title = news.get('title', news.get('txt', ''))
content = news.get('content', news.get('abs', ''))
pub_time = news.get('ctime', news.get('time', ''))
# 计算单条新闻的情绪分
score = self._score_article(title, content)
# 时间衰减:新消息权重更高
time_decay = self._time_decay(pub_time)
total_score += score * time_decay
count += 1
# 归一化到0-100
final_score = (total_score / count) * 100 if count > 0 else 50
return min(100, max(0, final_score))
def _score_article(self, title, content):
"""对单篇文章打分"""
text = (title + ' ' + content).lower()
score = 50 # 基础分
# 负面关键词(供应链中断、价格上涨)
negative_words = ['罢工', '断供', '涨价', '危机', '冲击', '停产', '中断']
for word in negative_words:
if word in text:
score += self.keywords.get(word, 1.0) * 5
# 正面关键词(替代机会、国产化)
positive_words = ['替代', '国产', '机会', '受益', '突破', '自主']
for word in positive_words:
if word in text:
score -= self.keywords.get(word, 1.0) * 3
return min(100, max(0, score))
def _time_decay(self, pub_time):
"""时间衰减函数,越新的消息权重越高"""
if not pub_time:
return 1.0
try:
# 解析时间戳
if isinstance(pub_time, (int, float)):
age_hours = (time.time() - pub_time) / 3600
else:
# 解析日期字符串
dt = datetime.strptime(str(pub_time), '%Y-%m-%d %H:%M:%S')
age_hours = (datetime.now() - dt).total_seconds() / 3600
# 指数衰减:24小时内权重从1.0降到0.5
return max(0.5, 1.0 - (age_hours / 48))
except:
return 1.0
def run_monitoring(self, interval=300):
"""持续监控循环"""
print(f'开始监控三星罢工事态 | 间隔{interval}秒')
print('-' * 50)
while True:
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
# 获取新闻
news_data = self.fetch_eastmoney_news('三星 罢工 半导体')
news_data2 = self.fetch_eastmoney_news('芯片 供应链 涨价')
all_news = news_data + news_data2
# 计算情绪
sentiment = self.calculate_sentiment(all_news)
# 记录历史
record = {
'time': timestamp,
'sentiment': sentiment,
'news_count': len(all_news)
}
self.history.append(record)
# 打印状态
sentiment_label = self._get_sentiment_label(sentiment)
print(f'[{timestamp}] 情绪指数: {sentiment:.1f} ({sentiment_label}) | 新闻数: {len(all_news)}')
# 如果情绪剧烈变化,发出预警
if len(self.history) >= 2:
prev = self.history[-2]['sentiment']
curr = sentiment
change = abs(curr - prev)
if change > 15:
print(f'⚠️ 情绪突变: {prev:.1f} -> {curr:.1f} (变化{change:.1f})')
time.sleep(interval)
def _get_sentiment_label(self, score):
if score >= 75:
return '极度恐慌'
elif score >= 60:
return '恐慌'
elif score >= 40:
return '中性'
elif score >= 25:
return '乐观'
else:
return '极度乐观'
if __name__ == '__main__':
monitor = NewsMonitor()
# 运行一次测试
news = monitor.fetch_eastmoney_news('三星 罢工')
sentiment = monitor.calculate_sentiment(news)
print(f'当前市场情绪: {sentiment:.1f}/100')
【蓝色理解】上述代码实现了一个完整的新闻监控系统。核心要点:1)多数据源并行抓取,东方财富API返回的是JSONP格式,需要去除包装;2)情绪评分采用加权算法,每个关键词有不同权重,罢工、断供等词权重高达2.0-2.5;3)时间衰减函数确保最新消息获得更高权重;4)实时监控循环中加入了情绪突变检测,当情绪变化超过15个点时自动预警。这个系统的价值在于,在黑天鹅事件发生的第一时间,量化交易者就能获得客观的情绪数据,而非依赖主观判断。
当然,光有新闻抓取还不够。我们还需要将情绪数据与市场价格结合起来,形成一个完整的"消息面-情绪-价格"三维监控体系。
# -*- coding: utf-8 -*-
"""
情绪-价格联动分析
当情绪突变时,自动评估对股价的影响
"""
import urllib.request
import json
import ssl
import time
ctx = ssl.create_default_context()
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
class SentimentPriceAnalyzer:
"""情绪-价格联动分析器"""
def __init__(self):
self.headers = {
'User-Agent': 'Mozilla/5.0',
'Referer': 'https://finance.eastmoney.com'
}
def fetch_realtime_quote(self, code):
"""获取个股实时行情(腾讯API)"""
url = f'https://sqt.gtimg.cn/q=sh{code}'
req = urllib.request.Request(url, headers=self.headers)
try:
with urllib.request.urlopen(req, timeout=10, context=ctx) as r:
data = r.read().decode('gbk')
parts = data.split('~')
if len(parts) > 32:
return {
'name': parts[1],
'price': float(parts[3]),
'yesterday': float(parts[4]),
'open': float(parts[5]),
'volume': int(parts[6]),
'change_pct': float(parts[31]),
'time': parts[30]
}
except Exception as e:
print(f'获取行情失败 {code}: {e}')
return None
def analyze_correlation(self, sentiment_history, stock_codes):
"""分析情绪与股价的相关性"""
print('=' * 60)
print('情绪-价格联动分析报告')
print('=' * 60)
results = []
for code in stock_codes:
quote = self.fetch_realtime_quote(code)
if not quote:
continue
# 获取最新情绪数据
latest_sentiment = sentiment_history[-1]['sentiment'] if sentiment_history else 50
prev_sentiment = sentiment_history[-2]['sentiment'] if len(sentiment_history) > 1 else 50
# 计算情绪变化
sentiment_change = latest_sentiment - prev_sentiment
# 评估影响
price_change = quote['change_pct']
# 判断情绪与价格的背离
if sentiment_change > 10 and price_change < -2:
signal = '恐慌过度,可能反弹'
action = '观察买入机会'
elif sentiment_change < -10 and price_change > 5:
signal = '情绪乐观,警惕回调'
action = '考虑减仓'
elif abs(price_change) > 5:
signal = '价格异动,关注突破'
action = '等待确认信号'
else:
signal = '情绪与价格匹配'
action = '持仓观望'
result = {
'code': code,
'name': quote['name'],
'price': quote['price'],
'change_pct': price_change,
'sentiment': latest_sentiment,
'sentiment_change': sentiment_change,
'signal': signal,
'action': action
}
results.append(result)
# 打印报告
for r in results:
print(f"\n股票代码: {r['code']} | {r['name']}")
print(f"当前价格: {r['price']} | 涨跌幅: {r['change_pct']:.2f}%")
print(f"情绪指数: {r['sentiment']:.1f} | 情绪变化: {r['sentiment_change']:+.1f}")
print(f"信号: {r['signal']}")
print(f"建议: {r['action']}")
return results
# 使用示例
if __name__ == '__main__':
analyzer = SentimentPriceAnalyzer()
# 半导体板块重点关注个股(用代码代替名称)
stocks = ['688981', '688584', '688041', '688012']
# 模拟情绪数据(实际使用中从NewsMonitor获取)
mock_sentiment = [
{'sentiment': 55, 'time': '2026-05-20 09:30'},
{'sentiment': 58, 'time': '2026-05-20 10:00'},
{'sentiment': 65, 'time': '2026-05-20 10:30'},
{'sentiment': 72, 'time': '2026-05-20 11:00'},
{'sentiment': 78, 'time': '2026-05-20 11:30'},
]
results = analyzer.analyze_correlation(mock_sentiment, stocks)
这个分析器的核心逻辑在于检测"情绪-价格背离"。当市场情绪已经极度恐慌(超过75分),但股价跌幅并不明显时,往往意味着股价还没有完全反映坏消息,此时可能是逆向投资的机会;反之,当情绪乐观但股价已经开始下跌,说明市场可能已经提前反应了利好,需要警惕。
02 构建半导体板块波动率预测模型
黑天鹅事件最直接的影响就是波动率急剧放大。对于三星罢工这样的供应链冲击,半导体板块的波动率可能在短期内从年化20%跳升至50%甚至更高。如果风控系统不能及时调整仓位管理策略,很可能遭受巨大损失。
波动率预测是量化金融的核心课题之一。传统方法包括GARCH模型、EWMA方法等,但在应对黑天鹅事件时,这些模型往往滞后于市场。此时,我们可以借助事件驱动的波动率预测方法,将基本面因素(罢工持续时间、产能影响比例)纳入模型。
# -*- c
数据中心服务器机房
oding: utf-8 -*-
"""
事件驱动波动率预测模型
结合GARCH与基本面事件因素预测板块波动率
"""
import numpy as np
import pandas as pd
from scipy.optimize import minimize
from arch import arch_model
import warnings
warnings.filterwarnings('ignore')
class EventVolatilityModel:
"""事件驱动波动率预测模型"""
def __init__(self):
self.garch_model = None
self.event_coef = {}
self.baseline_vol = 0.20 # 基准年化波动率 20%
def calculate_historical_vol(self, returns, window=20):
"""计算历史波动率(滚动窗口)"""
if len(returns) < window:
return self.baseline_vol
# 使用EWMA,lambda=0.94(RiskMetrics标准)
ewma_var = np.zeros(len(returns))
ewma_var[0] = returns[0] ** 2
lam = 0.94
for i in range(1, len(returns)):
ewma_var[i] = lam * ewma_var[i-1] + (1 - lam) * returns[i] ** 2
# 取最近window的均值
recent_var = np.mean(ewma_var[-window:])
return np.sqrt(recent_var * 252) # 年化
def fit_garch(self, returns):
"""拟合GARCH(1,1)模型"""
try:
# 转换数据格式
train_data = pd.Series(returns)
# 拟合GARCH(1,1)模型
model = arch_model(train_data, vol='Garch', p=1, q=1, mean='Constant', dist='normal')
result = model.fit(disp='off', show_warning=False)
# 提取参数
omega = result.params.get('omega', 0.00001)
alpha = result.params.get('alpha[1]', 0.1)
beta = result.params.get('beta[1]', 0.8)
# 计算条件方差
forecast_horizon = 1
forecast = result.forecast(horizon=forecast_horizon)
conditional_var = forecast.mean.iloc[-1].iloc[0]
# 年化波动率
garch_vol = np.sqrt(conditional_var * 252)
return garch_vol, {'omega': omega, 'alpha': alpha, 'beta': beta}
except Exception as e:
print(f'GARCH拟合失败: {e}')
return self.baseline_vol, {}
def estimate_event_impact(self, event_type, duration_days, production_impact_pct):
"""
估算事件对波动率的影响
event_type: 事件类型('strike', 'earthquake', 'sanction'等)
duration_days: 预计持续天数
production_impact_pct: 产能影响比例(0-1)
"""
# 历史同类事件的波动率放大系数
event_multipliers = {
'strike': 2.5, # 罢工:波动率放大2.5倍
'earthquake': 3.0, # 自然灾害:3倍
'sanction': 2.2, # 制裁:2.2倍
'factory_fire': 1.8 # 工厂事故:1.8倍
}
base_multiplier = event_multipliers.get(event_type, 2.0)
# 持续时间调整:持续越长,波动率越高但边际效应递减
duration_factor = 1 + 0.1 * np.log1p(duration_days)
# 产能影响调整:产能影响越大,波动率越高
impact_factor = 1 + production_impact_pct * 1.5
# 综合影响系数
total_multiplier = base_multiplier * duration_factor * impact_factor
return total_multiplier
def predict_volatility(self, returns, event_params):
"""
综合预测波动率
event_params: 事件参数字典
- event_type: 事件类型
- duration_days: 预计持续天数
- production_impact_pct: 产能影响比例
"""
# 1. 计算历史波动率
hist_vol = self.calculate_historical_vol(returns)
# 2. 拟合GARCH模型
garch_vol, garch_params = self.fit_garch(returns)
# 3. 估算事件影响
event_multiplier = self.estimate_event_impact(
event_params['event_type'],
event_params['duration_days'],
event_params['production_impact_pct']
)
# 4. 综合预测
# 加权平均:近期GARCH权重更高,但事件因素不可忽视
base_vol = 0.4 * garch_vol + 0.3 * hist_vol + 0.3 * self.baseline_vol
predicted_vol = base_vol * event_multiplier
# 5. 设置上限(防止过度放大)
max_vol = self.baseline_vol * 5 # 最高放大5倍
predicted_vol = min(predicted_vol, max_vol)
return {
'predicted_vol': predicted_vol,
'hist_vol': hist_vol,
'garch_vol': garch_vol,
'event_multiplier': event_multiplier,
'confidence': 'high' if event_params['production_impact_pct'] > 0.3 else 'medium'
}
def calculate_var(self, portfolio_value, volatility, confidence=0.95, horizon=1):
"""
计算Value at Risk
portfolio_value: 组合价值
volatility: 年化波动率
confidence: 置信水平
horizon: 持有期(天)
"""
# Z分数(正态分布)
z_scores = {0.99: 2.326, 0.95: 1.645, 0.90: 1.282}
z = z_scores.get(confidence, 1.645)
# 日波动率
daily_vol = volatility / np.sqrt(252)
# 持有期调整
horizon_factor = np.sqrt(horizon)
# VaR = 组合价值 * Z分数 * 日波动率 * sqrt(持有期)
var = portfolio_value * z * daily_vol * horizon_factor
return var
def risk_report(self, portfolio_value, returns, event_params):
"""生成完整风控报告"""
pred = self.predict_volatility(returns, event_params)
print('=' * 60)
print('波动率预测与风险评估报告')
print('=' * 60)
print(f"预测年化波动率: {pred['predicted_vol']*100:.2f}%")
print(f"历史波动率: {pred['hist_vol']*100:.2f}%")
print(f"GARCH波动率: {pred['garch_vol']*100:.2f}%")
print(f"事件影响系数: {pred['event_multiplier']:.2f}x")
print(f"置信等级: {pred['confidence']}")
print('\n--- VaR风险评估 ---')
for conf in [0.99, 0.95, 0.90]:
for horizon in [1, 5, 10]:
var = self.calculate_var(portfolio_value, pred['predicted_vol'], conf, horizon)
print(f"置信度{conf*100:.0f}% | 持有{horizon}天 | VaR: {var:,.0f}元")
# 建议
vol_ratio = pred['predicted_vol'] / self.baseline_vol
if vol_ratio > 3:
suggestion = "极高波动期,建议减仓50%以上"
elif vol_ratio > 2:
suggestion = "高波动期,建议减仓30%"
elif vol_ratio > 1.5:
suggestion = "波动放大,谨慎加仓"
else:
suggestion = "波动可控,正常持仓"
print(f"\n风控建议: {suggestion}")
return pred
# 使用示例
if __name__ == '__main__':
model = EventVolatilityModel()
# 模拟半导体板块日收益率(100个交易日)
np.random.seed(42)
returns = np.random.normal(0.0005, 0.02, 100) # 日均收益0.05%,日波动2%
# 三星罢工事件参数
event_params = {
'event_type': 'strike',
'duration_days': 15, # 预计罢工持续15天
'production_impact_pct': 0.35 # 影响约35%产能
}
# 组合价值100万元
portfolio_value = 1_000_000
report = model.risk_report(portfolio_value, returns, event_params)
【红色警示】波动率预测不是万能的。模型永远落后于实际市场,尤其是黑天鹅事件发生时。关键是在波动率急剧上升时,系统必须自动触发减仓指令,而不是等待模型确认。这是很多量化交易者最常犯的错误——在极端行情中坚持"模型还没发出信号",结果损失惨重。记住:当市场已经极度恐慌时,你的模型参数可能已经严重滞后了。
上述模型中,核心创新点在于"事件因子"的设计。传统GARCH模型只依赖历史价格数据,在正常市场环境下效果不错,但遇到三星罢工这样的供应链冲击时,价格数据还没有来得及反映基本面变化,波动率就已经飙升了。通过将产能影响比例、预计持续天数等基本面因素纳入模型,可以在价格反应之前提前预警。
03 事件驱动型量化策略的Python实现
有了波动率预测作为基础,我们现在可以构建完整的事件驱动型量化策略。这类策略的核心逻辑是:当检测到重大供应链事件时,自动调整持仓,同时配合严格的止损逻辑和仓位管理规则。
科技感交易系统界面
4px 4px 0;overflow-x:auto"># -*- coding: utf-8 -*-
"""
事件驱动型量化策略系统
包含:仓位管理、动态止损、信号执行
"""
import numpy as np
import pandas as pd
from dataclasses import dataclass
from enum import Enum
from typing import Dict, List, Optional
class PositionStatus(Enum):
"""持仓状态"""
HOLDING = 'holding' # 持有中
STOP_LOSS = 'stop_loss' # 触发止损
STOP_PROFIT = 'stop_profit' # 触发止盈
SIGNAL_WAIT = 'signal_wait' # 等待信号
class EventRiskLevel(Enum):
"""事件风险等级"""
LOW = 1
MEDIUM = 2
HIGH = 3
EXTREME = 4
@dataclass
class TradeSignal:
"""交易信号"""
timestamp: str
signal_type: str # 'buy', 'sell', 'hold'
stock_code: str
price: float
confidence: float # 0-1
reason: str
@dataclass
class Position:
"""持仓记录"""
stock_code: str
entry_price: float
quantity: int
stop_loss_price: float
take_profit_price: float
max_price: float # 持仓期最高价(用于跟踪止盈)
current_pnl: float = 0.0
status: PositionStatus = PositionStatus.HOLDING
class EventDrivenStrategy:
"""事件驱动型量化策略"""
def __init__(self, initial_capital=1_000_000):
self.initial_capital = initial_capital
self.current_capital = initial_capital
self.positions: Dict[str, Position] = {}
self.trade_log: List[Dict] = []
# 策略参数
self.base_position_size = 0.10 # 基础仓位10%
self.max_position_size = 0.25 # 最大仓位25%
self.stop_loss_pct = 0.07 # 止损线7%
self.trailing_stop_pct = 0.05 # 移动止损5%
self.take_profit_pct = 0.15 # 止盈线15%
# 波动率调整参数
self.vol_threshold_low = 0.20 # 低波动阈值
self.vol_threshold_high = 0.35 # 高波动阈值
def calculate_position_size(self, stock_price, volatility, event_risk: EventRiskLevel) -> int:
"""
根据波动率和事件风险计算仓位
核心思想:高波动+高风险 = 低仓位
"""
# 基础仓位
base_size = self.base_position_size * self.current_capital / stock_price
# 波动率调整
if volatility > self.vol_threshold_high:
vol_factor = 0.5 # 高波动减半仓
elif volatility > self.vol_threshold_low:
vol_factor = 0.75
else:
vol_factor = 1.0
# 事件风险调整
risk_factors = {
EventRiskLevel.LOW: 1.0,
EventRiskLevel.MEDIUM: 0.8,
EventRiskLevel.HIGH: 0.6,
EventRiskLevel.EXTREME: 0.4
}
risk_factor = risk_factors.get(event_risk, 1.0)
# 最大仓位限制
max_shares = int(self.max_position_size * self.current_capital / stock_price)
# 计算最终仓位
final_shares = int(base_size * vol_factor * risk_factor)
return min(final_shares, max_shares)
def calculate_stop_loss(self, entry_price, volatility, event_risk: EventRiskLevel) -> float:
"""
动态计算止损价
高波动+高风险 -> 更宽的止损带(避免被噪音扫出)
"""
# 基础止损幅度
base_stop_pct = self.stop_loss_pct
# 波动率调整
if volatility > 0.30:
# 高波动场景:扩大止损带,防止正常波动触发止损
vol_adjust = 1.5
elif volatility > 0.20:
vol_adjust = 1.2
else:
vol_adjust = 1.0
# 事件风险调整
risk_adjust = {
EventRiskLevel.LOW: 1.0,
EventRiskLevel.MEDIUM: 1.2,
EventRiskLevel.HIGH: 1.5,
EventRiskLevel.EXTREME: 2.0
}.get(event_risk, 1.0)
final_stop_pct = base_stop_pct * vol_adjust * risk_adjust
return entry_price * (1 - final_stop_pct)
def calculate_take_profit(self, entry_price, volatility) -> float:
"""
计算止盈价(使用波动率动态调整)
"""
# 高波动品种,止盈目标相应提高
if volatility > 0.30:
profit_pct = 0.25 # 波动大,目标利润空间也大
elif volatility > 0.20:
profit_pct = 0.18
else:
profit_pct = self.take_profit_pct
return entry_price * (1 + profit_pct)
def update_trailing_stop(self, position: Position, current_price: float):
"""
更新移动止损
核心逻辑:最高点回落一定比例则出局
"""
# 更新最高价
if current_price > position.max_price:
position.max_price = current_price
# 计算从最高点的回落幅度
drawdown = (position.max_price - current_price) / position.max_price
# 触发移动止损
if drawdown > self.trailing_stop_pct:
return True # 触发移动止损
return False
def check_stop_loss(self, position: Position, current_price: float) -> bool:
"""检查是否触发止损"""
return current_price <= position.stop_loss_price
def check_take_profit(self, position: Position, current_price: float) -> bool:
"""检查是否触发止盈"""
return current_price >= position.take_profit_price
def execute_buy(self, signal: TradeSignal, volatility: float, event_risk: EventRiskLevel):
"""执行买入"""
if signal.stock_code in self.positions:
print(f"已有持仓,跳过买入: {signal.stock_code}")
return
# 计算仓位
quantity = self.calculate_position_size(signal.price, volatility, event_risk)
if quantity < 100: # 最小买入量
print(f"仓位不足,跳过: {signal.stock_code}")
return
# 计算止损止盈
stop_loss = self.calculate_stop_loss(signal.price, volatility, event_risk)
take_profit = self.calculate_take_profit(signal.price, volatility)
# 创建持仓
position = Position(
stock_code=signal.stock_code,
entry_price=signal.price,
quantity=quantity,
stop_loss_price=stop_loss,
take_profit_price=take_profit,
max_price=signal.price
)
self.positions[signal.stock_code] = position
# 扣除资金
cost = signal.price * quantity
self.current_capital -= cost
# 记录日志
self.trade_log.append({
'timestamp': signal.timestamp,
'action': 'BUY',
'stock_code': signal.stock_code,
'price': signal.price,
'quantity': quantity,
'cost': cost,
'remaining_capital': self.current_capital,
'reason': signal.reason
})
print(f"买入 {signal.stock_code} | 价格: {signal.price} | 数量: {quantity} | "
f"止损: {stop_loss:.2f} | 止盈: {take_profit:.2f} | 剩余资金: {self.current_capital:,.0f}")
def execute_sell(self, stock_code: str, reason: str, current_price: float):
"""执行卖出"""
if stock_code not in self.positions:
return
position = self.positions[stock_code]
# 计算收益
revenue = current_price * position.quantity
pnl = revenue - (position.entry_price * position.quantity)
pnl_pct = (current_price - position.entry_price) / position.entry_price * 100
# 回补资金
self.current_capital += revenue
# 记录日志
self.trade_log.append({
'timestamp': pd.Timestamp.now().isoformat(),
'action': 'SELL',
'stock_code': stock_code,
'price': current_price,
'quantity': position.quantity,
'revenue': revenue,
'pnl': pnl,
'pnl_pct': f'{pnl_pct:.2f}%',
'reason': reason
})
print(f"卖出 {stock_code} | 价格: {current_price} | 收益: {pnl:,.0f} ({pnl_pct:.2f}%) | "
f"原因: {reason}")
# 删除持仓
del self.positions[stock_code]
def on_tick(self, stock_code: str, current_price: float, volatility: float):
"""每笔行情触发检查"""
if stock_code not in self.positions:
return
position = self.positions[stock_code]
# 1. 检查止损
if self.check_stop_loss(position, current_price):
self.execute_sell(stock_code, '触发止损', current_price)
return
# 2. 检查移动止损
if self.update_trailing_stop(position, current_price):
self.execute_sell(stock_code, '触发移动止损', current_price)
return
# 3. 检查止盈
if self.check_take_profit(position, current_price):
self.execute_sell(stock_code, '触发止盈', current_price)
return
# 4. 更新盈亏
position.current_pnl = (current_price - position.entry_price) * position.quantity
def get_portfolio_status(self) -> Dict:
"""获取组合状态"""
total_value = self.current_capital
positions_value = 0
for code, pos in self.positions.items():
pos_value = pos.entry_price * pos.quantity # 按成本价计算
positions_value += pos_value
total_value += positions_value
return {
'initial_capital': self.initial_capital,
'current_capital': self.current_capital,
'positions_value': positions_value,
'total_value': total_value,
'total_return': (total_value - self.initial_capital) / self.initial_capital * 100,
'position_count': len(self.positions),
'open_positions': [code for code in self.positions.keys()]
}
class RiskManager:
"""风险管理器 - 监控整体风险"""
def __init__(self, max_portfolio_vol=0.40, max_drawdown=0.15):
self.max_portfolio_vol = max_portfolio_vol # 最大组合波动率
self.max_drawdown = max_drawdown # 最大回撤
self.peak_value = 0
def check_portfolio_risk(self, strategy: EventDrivenStrategy, current_vol: float) -> Dict:
"""检查组合风险"""
status = strategy.get_portfolio_status()
# 更新峰值
if status['total_value'] > self.peak_value:
self.peak_value = status['total_value']
# 计算当前回撤
current_drawdown = (self.peak_value - status['total_value']) / self.peak_value if self.peak_value > 0 else 0
# 风险评估
warnings = []
actions = []
if current_vol > self.max_portfolio_vol:
warnings.append(f'组合波动率{current_vol*100:.1f}%超过阈值{self.max_portfolio_vol*100:.1f}%')
actions.append('建议减仓降低风险')
if current_drawdown > self.max_drawdown:
warnings.append(f'当前回撤{current_drawdown*100:.1f}%接近极限{self.max_drawdown*100:.1f}%')
actions.append('必须减仓至50%以下')
# 仓位集中度检查
if status['position_count'] > 0:
for code in status['open_positions']:
pos = strategy.positions[code]
pos_weight = (pos.entry_price * pos.quantity) / status['total_value']
if pos_weight > 0.30:
warnings.append(f'{code}仓位占比{pos_weight*100:.1f}%过高')
actions.append('建议分批减仓')
return {
'warnings': warnings,
'actions': actions,
'current_vol': current_vol,
'current_drawdown': current_drawdown,
'peak_value': self.peak_value
}
# 使用示例
if __name__ == '__main__':
strategy = EventDrivenStrategy(initial_capital=1_000_000)
risk_manager = RiskManager()
# 模拟买入信号(半导体板块)
buy_signal = TradeSignal(
timestamp='2026-05-20 10:00:00',
signal_type='buy',
stock_code='688981',
price=130.0,
confidence=0.85,
reason='三星罢工利好半导体替代'
)
# 执行买入(高波动场景)
strategy.execute_buy(buy_signal, volatility=0.35, event_risk=EventRiskLevel.HIGH)
# 模拟行情更新
print('\n--- 模拟行情波动 ---')
prices = [132, 135, 128, 125, 138, 142]
for price in prices:
strategy.on_tick('688981', price, volatility=0.35)
status = strategy.get_portfolio_status()
print(f"价格: {price} | 持仓盈亏: {status['positions_value'] - 130*1000:,.0f}")
# 风险检查
print('\n--- 风险评估 ---')
risk_report = risk_manager.check_portfolio_risk(strategy, current_vol=0.35)
for w in risk_report['warnings']:
print(f'⚠️ {w}')
for a in risk_report['actions']:
print(f'→ {a}')
【蓝色理解】上述策略系统的核心设计思想是"波动率自适应仓位管理"。传统固定仓位策略的缺陷在于:市场平稳时仓位过小错过了机会,高波动时仓位过大承受了不必要的风险。通过将波动率作为仓位调整的核心变量,系统能够自动在高波动期间降低风险暴露。关键参数如止损幅度,也会随波动率动态调整——高波动期间使用更宽松的止损(防止被正常噪音扫出),低波动期间使用更严格的止损。
04 历史相似事件的数据回测方法
量化策略上线前,必须经过严格的回测验证。但对于三星罢工这样的供应链黑天鹅事件,历史同类事件的样本极其有限。我们需要用"事件驱动回测"方法,通过少量历史案例模拟不同情景下的策略表现。
程序员专注工作场景
13px;color:#2e7d32;background:#f8f8f8;border-left:4px solid #1a6e1a;border-right:1px solid #e0e0e0;border-top:1px solid #e0e0e0;border-bottom:1px solid #e0e0e0;padding:14px 18px;margin:20px 0;white-space:pre-wrap;word-break:break-all;text-indent:0;line-height:1.7;border-radius:0 4px 4px 0;overflow-x:auto"># -*- coding: utf-8 -*-
"""
事件驱动回测系统
模拟历史供应链黑天鹅事件的策略表现
"""
import numpy as np
import pandas as pd
from dataclasses import dataclass, field
from typing import List, Dict, Callable
from collections import defaultdict
import json
@dataclass
class HistoricalEvent:
"""历史事件记录"""
event_name: str
event_date: str
event_type: str # 'strike', 'sanction', 'earthquake', etc.
duration_days: int # 持续天数
production_impact: float # 产能影响比例 0-1
affected_sector: str # 影响的板块
price_data: List[Dict] # 价格数据 [{'date': '2024-01-01', 'price': 100}, ...]
def get_returns(self) -> np.ndarray:
"""计算日收益率序列"""
prices = [d['price'] for d in self.price_data]
returns = np.diff(prices) / prices[:-1]
return returns
def get_max_drawdown(self) -> float:
"""计算最大回撤"""
prices = [d['price'] for d in self.price_data]
peak = prices[0]
max_dd = 0
for p in prices:
if p > peak:
peak = p
dd = (peak - p) / peak
if dd > max_dd:
max_dd = dd
return max_dd
class EventBacktester:
"""事件驱动回测引擎"""
def __init__(self, initial_capital=1_000_000):
self.initial_capital = initial_capital
self.results: List[Dict] = []
def backtest_strategy_on_event(self, strategy_func: Callable, event: HistoricalEvent) -> Dict:
"""
对单个历史事件回测策略
strategy_func: 策略函数,输入(returns, initial_capital),输出(最终价值, 交易记录)
"""
returns = event.get_returns()
# 运行策略
final_value, trades = strategy_func(returns, self.initial_capital)
# 计算指标
total_return = (final_value - self.initial_capital) / self.initial_capital
max_dd = event.get_max_drawdown()
# 年化收益(假设事件期间为一年)
n_days = len(returns)
n_years = n_days / 252
annual_return = (final_value / self.initial_capital) ** (1 / n_years) - 1 if n_years > 0 else 0
# 夏普比率(简化版)
daily_rf = 0.0001 # 无风险利率日化
excess_returns = returns - daily_rf
sharpe = np.mean(excess_returns) / np.std(excess_returns) * np.sqrt(252) if np.std(excess_returns) > 0 else 0
# 卡玛比率
calmar = annual_return / max_dd if max_dd > 0 else 0
result = {
'event_name': event.event_name,
'event_date': event.event_date,
'event_type': event.event_type,
'duration_days': event.duration_days,
'production_impact': event.production_impact,
'final_value': final_value,
'total_return': total_return,
'annual_return': annual_return,
'max_drawdown': max_dd,
'sharpe_ratio': sharpe,
'calmar_ratio': calmar,
'num_trades': len(trades),
'trades': trades
}
return result
def run_multi_event_backtest(self, strategy_func: Callable, events: List[HistoricalEvent]) -> pd.DataFrame:
"""对多个历史事件回测"""
all_results = []
for event in events:
result = self.backtest_strategy_on_event(strategy_func, event)
all_results.append(result)
print(f"回测 {event.event_name}: 收益 {result['total_return']*100:.2f}% | 最大回撤 {result['max_drawdown']*100:.2f}%")
self.results = all_results
# 生成汇总报告
df = pd.DataFrame(all_results)
print('\n' + '=' * 60)
print('回测汇总统计')
print('=' * 60)
print(f"事件数量: {len(events)}")
print(f"平均收益: {df['total_return'].mean()*100:.2f}%")
print(f"收益标准差: {df['total_return'].std()*100:.2f}%")
print(f"最大收益: {df['total_return'].max()*100:.2f}%")
print(f"最小收益: {df['total_return'].min()*100:.2f}%")
print(f"平均最大回撤: {df['max_drawdown'].mean()*100:.2f}%")
print(f"平均夏普比率: {df['sharpe_ratio'].mean():.2f}")
print(f"平均卡玛比率: {df['calmar_ratio'].mean():.2f}")
print(f"盈利事件比例: {(df['total_return'] > 0).mean()*100:.1f}%")
return df
def stress_test(self, strategy_func: Callable, event: HistoricalEvent,
param_ranges: Dict) -> pd.DataFrame:
"""
参数敏感性分析(压力测试)
测试不同参数组合在同类事件下的表现
"""
import itertools
param_names = list(param_ranges.keys())
param_values = list(param_ranges.values())
results = []
# 生成所有参数组合
for combo in itertools.product(*param_values):
params = dict(zip(param_names, combo))
# 使用参数运行策略(简化实现)
returns = event.get_returns()
try:
# 这里应该传入params到strategy_func
final_value, _ = strategy_func(returns, self.initial_capital, **params)
total_return = (final_value - self.initial_capital) / self.initial_capital
row = {'params': str(params), 'total_return': total_return}
row.update(params)
results.append(row)
except Exception as e:
print(f'参数组合{params}运行失败: {e}')
return pd.DataFrame(results)
def sample_strategy(returns: np.ndarray, initial_capital: float,
stop_loss=0.07, position_size=0.1) -> tuple:
"""
示例策略:固定止损+固定仓位
用于演示回测框架的使用方法
"""
capital = initial_capital
position = 0 # 当前持仓股数
trades = []
entry_price = None
for i, ret in enumerate(returns):
current_price = 100 * (1 + ret) # 假设起始价格100
if position == 0:
# 买入逻辑:收益率低于-2%时抄底
if ret < -0.02:
shares = int(capital * position_size / current_price)
if shares > 0:
position = shares
entry_price = current_price
capital -= shares * current_price
trades.append({'action': 'BUY', 'day': i, 'price': current_price, 'shares': shares})
else:
# 卖出逻辑:止损或收益超过20%
pnl_pct = (current_price - entry_price) / entry_price
if pnl_pct <= -stop_loss or pnl_pct >= 0.20:
capital += position * current_price
trades.append({'action': 'SELL', 'day': i, 'price': current_price,
'pnl_pct': pnl_pct, 'shares': position})
position = 0
entry_price = None
# 如果还有持仓,按最后价格平仓
if position > 0:
final_price = 100 * np.prod(1 + returns)
capital += position * final_price
return capital, trades
# 创建模拟历史事件
def create_simulation_events():
"""创建模拟的历史供应链黑天鹅事件"""
events = []
# 模拟事件1:某国某工厂火灾(类似三星罢工)
n = 60
prices_base = 100
event_returns = []
for i in range(n):
if i < 10:
ret = np.random.normal(-0.005, 0.015) # 正常波动
elif i < 30:
ret = np.random.normal(-0.02, 0.035) # 事件冲击,波动放大
else:
ret = np.random.normal(0.003, 0.02) # 恢复期
if i >= 10 and i < 15:
ret -= 0.03 # 急性冲击日
event_returns.append(ret)
prices = [prices_base]
for r in event_returns:
prices.append(prices[-1] * (1 + r))
event1 = HistoricalEvent(
event_name='某半导体工厂火灾',
event_date='2024-11-15',
event_type='factory_fire',
duration_days=20,
production_impact=0.25,
affected_sector='半导体',
price_data=[{'date': f'2024-11-{15+i:02d}', 'price': prices[i]} for i in range(len(prices))]
)
events.append(event1)
# 模拟事件2:某地区地震影响芯片封装
n = 45
prices_base = 100
event_returns2 = []
for i in range(n):
if i < 8:
ret = np.random.normal(-0.003, 0.012)
elif i < 25:
ret = np.random.normal(-0.015, 0.03)
else:
ret = np.random.normal(0.002, 0.018)
if i >= 8 and i < 12:
ret -= 0.025 # 主震日
prices2 = [prices_base]
for r in event_returns2:
prices2.append(prices2[-1] * (1 + r))
event2 = HistoricalEvent(
event_name='某地区地震影响封装产能',
event_date='2025-03-22',
event_type='earthquake',
duration_days=17,
production_impact=0.18,
affected_sector='半导体封装',
price_data=[{'date': f'2025-03-{22+i:02d}', 'price': prices2[i]} for i in range(len(prices2))]
)
events.append(event2)
# 模拟事件3:重大制裁导致的供应中断
n = 70
prices_base = 100
event_returns3 = []
for i in range(n):
if i < 5:
ret = np.random.normal(-0.001, 0.010)
elif i < 35:
ret = np.random.normal(-0.025, 0.04)
else:
ret = np.random.normal(0.005, 0.025)
if i >= 5 and i < 8:
ret -= 0.04
prices3 = [prices_base]
for r in event_returns3:
prices3.append(prices3[-1] * (1 + r))
event3 = HistoricalEvent(
event_name='某地区重要半导体材料出口限制',
event_date='2025-08-10',
event_type='sanction',
duration_days=30,
production_impact=0.40,
affected_sector='半导体材料',
price_data=[{'date': f'2025-08-{10+i:02d}', 'price': prices3[i]} for i in range(len(prices3))]
)
events.append(event3)
return events
if __name__ == '__main__':
# 创建回测引擎
backtester = EventBacktester(initial_capital=1_000_000)
# 创建模拟事件
events = create_simulation_events()
print('=== 事件驱动回测报告 ===\n')
# 运行回测
results_df = backtester.run_multi_event_backtest(sample_strategy, events)
# 参数压力测试
print('\n=== 参数压力测试 ===')
stress_results = backtester.stress_test(
sample_strategy,
events[0], # 用第一个事件测试
param_ranges={
'stop_loss': [0.05, 0.07, 0.10, 0.12],
'position_size': [0.05, 0.10, 0.15, 0.20]
}
)
# 找出最优参数
best_row = stress_results.loc[stress_results['total_return'].idxmax()]
print(f"最优参数组合: stop_loss={best_row['stop_loss']}, position_size={best_row['position_size']}")
print(f"对应收益: {best_row['total_return']*100:.2f}%")
# 保存结果
results_df.to_csv('backtest_results.csv', index=False)
print('\n回测结果已保存到 backtest_results.csv')
回测系统有两个核心要点需要特别说明。第一,由于供应链黑天鹅事件的样本量极少,我们采用了模拟事件的方法,通过参数化控制模拟不同类型的冲击(火灾、地震、制裁、罢工),每种类型有不同的波动率放大倍数和恢复速度。这种方法虽然不能完全复制真实市场,但能够测试策略在极端情况下的表现。第二,参数压力测试非常关键——它能告诉我们策略对哪些参数最敏感,从而在实盘时知道应该在哪些方面保持警惕。
05 用MACD+布林带识别罢工引发的板块异动

未来科技城市天际线
技术分析不是万能的,但在事件驱动策略中,技术指标可以作为"确认信号",帮助我们判断什么时候该进场、什么时候该出场。MACD和布林带是识别板块异动的经典组合,尤其适合在三星罢工这类供应链冲击发生后,寻找板块的启动点。
# -*- coding: utf-8 -*-
"""
MACD + 布林带 板块异动识别系统
识别三星罢工引发的半导体板块异动信号
"""
import numpy as np
import pandas as pd
import urllib.request
import ssl
import json
from collections import deque
class BollingerBands:
"""布林带指标"""
def __init__(self, window=20, num_std=2):
self.window = window
self.num_std = num_std
def calculate(self, prices: np.ndarray) -> dict:
"""计算布林带"""
if len(prices) < self.window:
return None
# 使用移动平均
ma = np.convolve(prices, np.ones(self.window)/self.window, mode='valid')
# 计算标准差
rolling_std = []
for i in range(self.window-1, len(prices)):
window_data = prices[i-self.window+1:i+1]
rolling_std.append(np.std(window_data))
rolling_std = np.array(rolling_std)
# 上轨、下轨、中轨
upper = ma + self.num_std * rolling_std
lower = ma - self.num_std * rolling_std
return {
'ma': ma,
'upper': upper,
'lower': lower,
'bandwidth': upper - lower, # 布林带宽度(波动率指标)
'position': (prices[self.window-1:] - lower) / (upper - lower + 1e-10) # %B指标
}
class MACD:
"""MACD指标"""
def __init__(self, fast=12, slow=26, signal=9):
self.fast = fast
self.slow = slow
self.signal = signal
def calculate(self, prices: np.ndarray) -> dict:
"""计算MACD"""
if len(prices) < self.slow:
return None
# 计算EMA
def ema(data, period):
alpha = 2 / (period + 1)
ema_arr = np.zeros(len(data))
ema_arr[0] = data[0]
for i in range(1, len(data)):
ema_arr[i] = alpha * data[i] + (1 - alpha) * ema_arr[i-1]
return ema_arr
ema_fast = ema(prices, self.fast)
ema_slow = ema(prices, self.slow)
# DIF(快线)
dif = ema_fast - ema_slow
# DEA(慢线/信号线)
dea = ema(dif, self.signal)
# MACD柱
macd_hist = 2 * (dif - dea)
return {
'dif': dif,
'dea': dea,
'hist': macd_hist,
'divergence': self._detect_divergence(dif, prices)
}
def _detect_divergence(self, dif: np.ndarray, prices: np.ndarray) -> str:
"""
检测MACD背离
- 顶背离:价格创新高但MACD没创新高 → 下跌信号
- 底背离:价格创新低但MACD没创新低 → 上涨信号
"""
if len(dif) < 20:
return 'none'
# 检查最近20个点
recent_dif = dif[-20:]
recent_prices = prices[-20:]
# 找最近的价格高点
max_price_idx = np.argmax(recent_prices)
max_price = recent_prices[max_price_idx]
# 找对应的DIF值
dif_at_price_max = recent_dif[max_price_idx]
# 找之前的价格高点
prev_price_max_idx = np.argmax(recent_prices[:max_price_idx]) if max_price_idx > 0 else -1
if prev_price_max_idx < 0:
return 'none'
prev_price_max = recent_prices[prev_price_max_idx]
prev_dif_at_max = recent_dif[prev_price_max_idx]
# 顶背离检测
if max_price > prev_price_max and dif_at_price_max < prev_dif_at_max:
return 'bearish_divergence' # 顶背离,卖出信号
# 底背离检测
min_price_idx = np.argmin(recent_prices)
min_price = recent_prices[min_price_idx]
dif_at_price_min = recent_dif[min_price_idx]
prev_min_idx = np.argmin(recent_prices[:min_price_idx]) if min_price_idx > 0 else -1
if prev_min_idx >= 0:
prev_price_min = recent_prices[prev_min_idx]
prev_dif_at_min = recent_dif[prev_min_idx]
if min_price < prev_price_min and dif_at_price_min > prev_dif_at_min:
return 'bullish_divergence' # 底背离,买入信号
return 'none'
class Sector异动Detector:
"""板块异动检测器"""
def __init__(self):
self.bb = BollingerBands(window=20, num_std=2)
self.macd = MACD(fast=12, slow=26, signal=9)
self.price_history = deque(maxlen=100) # 保留最近100个价格
def add_price(self, price: float):
"""添加新价格"""
self.price_history.append(price)
def analyze(self) -> dict:
"""综合分析异动信号"""
if len(self.price_history) < 30:
return {'status': 'insufficient_data'}
prices = np.array(self.price_history)
# 计算布林带
bb_result = self.bb.calculate(prices)
# 计算MACD
macd_result = self.macd.calculate(prices)
# 综合信号判断
signals = []
# 信号1:布林带收窄后爆发(波动率压缩后的突破)
if bb_result is not None:
bandwidth = bb_result['bandwidth']
if len(bandwidth) >= 5:
recent_bandwidth = bandwidth[-1]
avg_bandwidth = np.mean(bandwidth[-20:])
if recent_bandwidth < avg_bandwidth * 0.5:
# 布林带极度收窄,预示突破
position = bb_result['position'][-1]
if position > 0.95:
signals.append(('BOLL_BREAKOUT_UP', 0.8, '布林带突破上轨'))
elif position < 0.05:
signals.append(('BOLL_BREAKOUT_DOWN', 0.8, '布林带突破下轨'))
# 信号2:MACD金叉/死叉
if macd_result is not None:
dif = macd_result['dif']
dea = macd_result['dea']
if len(dif) >= 2 and len(dea) >= 2:
# 金叉( DIF 上穿 DEA )
if dif[-1] > dea[-1] and dif[-2] <= dea[-2]:
signals.append(('MACD_GOLDEN_CROSS', 0.75, 'MACD金叉'))
# 死叉( DIF 下穿 DEA )
if dif[-1] < dea[-1] and dif[-2] >= dea[-2]:
signals.append(('MACD_DEAD_CROSS', 0.75, 'MACD死叉'))
# MACD柱由绿转红
if macd_result['hist'][-1] > 0 and macd_result['hist'][-2] <= 0:
signals.append(('MACD_HIST_TURN_POSITIVE', 0.6, 'MACD柱转正'))
# MACD背离信号
divergence = macd_result['divergence']
if divergence == 'bullish_divergence':
signals.append(('MACD_BULLISH_DIVERGENCE', 0.85, 'MACD底背离'))
elif divergence == 'bearish_divergence':
signals.append(('MACD_BEARISH_DIVERGENCE', 0.85, 'MACD顶背离'))
# 信号3:价格异动(当日涨幅超过3%)
if len(prices) >= 2:
daily_return = (prices[-1] - prices[-2]) / prices[-2]
if daily_return > 0.03:
signals.append(('PRICE_JUMP_UP', 0.7, f'价格当日上涨{daily_return*100:.1f}%'))
elif daily_return < -0.03:
signals.append(('PRICE_JUMP_DOWN', 0.7, f'价格当日下跌{abs(daily_return)*100:.1f}%'))
# 综合评分
bullish_signals = [s for s in signals if 'DOWN' not in s[0] and 'BEARISH' not in s[0] and 'DEAD' not in s[0]]
bearish_signals = [s for s in signals if any(x in s[0] for x in ['UP', 'BULLISH', 'GOLDEN'])
and 'DOWN' not in s[0] and 'BEARISH' not in s[0]]
# 重新计算看多和看空
bullish_score = sum(s[1] for s in signals if 'BREAKOUT_UP' in s[0] or 'GOLDEN' in s[0] or 'BULLISH' in s[0] or 'HIST_TURN_POSITIVE' in s[0] or 'JUMP_UP' in s[0])
bearish_score = sum(s[1] for s in signals if 'BREAKOUT_DOWN' in s[0] or 'DEAD' in s[0] or 'BEARISH' in s[0] or 'JUMP_DOWN' in s[0])
if bullish_score > bearish_score + 0.5:
overall = 'STRONG_BULLISH'
elif bullish_score > bearish_score:
overall = 'WEAK_BULLISH'
elif bearish_score > bullish_score + 0.5:
overall = 'STRONG_BEARISH'
elif bearish_score > bullish_score:
overall = 'WEAK_BEARISH'
else:
overall = 'NEUTRAL'
return {
'status': 'analyzed',
'overall_signal': overall,
'signals': signals,
'bullish_score': bullish_score,
'bearish_score': bearish_score,
'bb_position': bb_result['position'][-1] if bb_result else None,
'macd_hist': macd_result['hist'][-1] if macd_result else None,
'divergence': macd_result['divergence'] if macd_result else 'none'
}
def generate_trading_signal(self, volatility: float = 0.20) -> dict:
"""
生成交易信号
结合技术分析和波动率环境
"""
analysis = self.analyze()
if analysis['status'] != 'analyzed':
return {'action': 'WAIT', 'reason': '数据不足'}
signal = analysis['overall_signal']
vol_adjusted = volatility > 0.30 # 高波动环境
# 交易逻辑
if signal in ['STRONG_BULLISH', 'WEAK_BULLISH']:
if vol_adjusted:
# 高波动环境下,信号强度降低
action = 'BUY' if analysis['bullish_score'] > 1.5 else 'WAIT'
reason = f'看多信号 | 但波动率偏高({volatility*100:.0f}%),谨慎参与'
else:
action = 'BUY'
reason = f'看多信号 | 布林%位:{analysis["bb_position"]:.2f} | MACD柱:{analysis["macd_hist"]:.4f}'
elif signal in ['STRONG_BEARISH', 'WEAK_BEARISH']:
action = 'SELL'
reason = f'看空信号 | MACD背离:{analysis["divergence"]}'
else:
action = 'WAIT'
reason = '技术面中性,等待明确信号'
return {
'action': action,
'confidence': max(analysis['bullish_score'], analysis['bearish_score']),
'reason': reason,
'details': analysis
}
# 使用示例
if __name__ == '__main__':
detector = Sector异动Detector()
# 模拟半导体板块价格数据(三星罢工期间)
np.random.seed(2026)
# 正常期
base_prices = [100 + np.random.normal(0, 1) for _ in range(20)]
# 罢工消息发酵期(波动加剧)
event_prices = base_prices[-1:] + [base_prices[-1] * (1 + np.random.normal(-0.015, 0.035)) for _ in range(15)]
# 恐慌下跌期
crash_prices = event_prices[-1:] + [event_prices[-1] * (1 + np.random.normal(-0.02, 0.04)) for _ in range(10)]
# 反弹期
recovery_prices = crash_prices[-1:] + [crash_prices[-1] * (1 + np.random.normal(0.01, 0.03)) for _ in range(20)]
all_prices = base_prices + event_prices + crash_prices + recovery_prices
print('=== 半导体板块异动检测 ===')
print(f'数据点: {len(all_prices)}')
signals_history = []
for i, price in enumerate(all_prices):
detector.add_price(price)
if i >= 30: # 至少30个数据点才开始分析
result = detector.analyze()
signals_history.append({
'day': i,
'price': price,
'signal': result['overall_signal'],
'bullish': result.get('bullish_score', 0),
'bearish': result.get('bearish_score', 0)
})
# 打印关键信号点
print('\n关键信号点:')
for s in signals_history:
if s['signal'] not in ['NEUTRAL', 'WEAK_BULLISH', 'WEAK_BEARISH']:
print(f"Day {s['day']}: {s['signal']} (看多:{s['bullish']:.2f} 看空:{s['bearish']:.2f})")
# 生成当前交易信号
current_signal = detector.generate_trading_signal(volatility=0.35)
print(f'\n当前操作建议: {current_signal["action"]}')
print(f'置信度: {current_signal["confidence"]:.2f}')
print(f'原因: {current_signal["reason"]}')
【红色警示】技术指标在极端行情中的有效性会大幅下降。三星罢工期间,半导体板块可能出现连续涨停或跌停,这种"流动性危机"状态下,布林带可能完全失效。MACD的金叉死叉也可能被日内剧烈波动反复触发。技术分析应该作为"确认工具",而非"主要决策依据"。在黑天鹅事件中,消息面和风控规则才是第一位的,技术分析只能锦上添花,不能雪中送炭。
06 构建完整的Python量化风控系统
前面我们分别讲解了消息面监控、波动率预测、事件驱动策略、回测方法和技术指标识别。现在,我们把这些模块整合成一个完整的量化风控系统,并演示如何在三星罢工这样的黑天鹅事件中运转。
# -*- coding: utf-8 -*-
"""
完整量化风控系统
整合消息监控 + 波动率预测 + 事件驱动策略 + 技术确认
"""
import numpy as np
import pandas as pd
import time
import ssl
import urllib.request
from datetime import datetime, timedelta
from collections import deque
from dataclasses import dataclass, field
from typing import Dict, List, Optional
import json
import threading
ctx = ssl.create_default_context()
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
# ============== 组件1: 消息情绪监控 ==============
class NewsSentimentMonitor:
"""消息情绪监控组件"""
def __init__(self):
self.sentiment_history = deque(maxlen=100)
self.alert_thresholds = {
'extreme_fear': 80, # 极度恐慌
'fear': 65,
'greed': 35,
'extreme_greed': 20
}
def update_sentiment(self, sentiment_score: float):
self.sentiment_history.append({
'timestamp': datetime.now(),
'score': sentiment_score
})
def get_alert_level(self) -> str:
if len(self.sentiment_history) == 0:
return 'neutral'
current = self.sentiment_history[-1]['score']
if current >= self.alert_thresholds['extreme_fear']:
return 'EXTREME_FEAR'
elif current >= self.alert_thresholds['fear']:
return 'FEAR'
elif current <= self.alert_thresholds['extreme_greed']:
return 'EXTREME_GREED'
elif current <= self.alert_thresholds['greed']:
return 'GREED'
return 'NEUTRAL'
# ============== 组件2: 波动率预测 ==============
class VolatilityPredictor:
"""波动率预测组件"""
def __init__(self, baseline_vol=0.20):
self.baseline_vol = baseline_vol
def predict(self, event_type: str, duration_days: int,
production_impact: float) -> float:
"""预测未来波动率"""
multipliers = {
'strike': 2.5,
'earthquake': 3.0,
'sanction': 2.2,
'factory_fire': 1.8,
'unknown': 1.5
}
base_mult = multipliers.get(event_type, 1.5)
duration_factor = 1 + 0.05 * np.log1p(duration_days)
impact_factor = 1 + production_impact * 1.5
mult = base_mult * duration_factor * impact_factor
predicted = self.baseline_vol * mult
return min(predicted, self.baseline_vol * 5)
# ============== 组件3: 仓位管理器 ==============
class PositionManager:
"""仓位管理器"""
def __init__(self, total_capital=1_000_000):
self.total_capital = total_capital
self.positions = {}
self.stop_loss_pct = 0.07
self.max_positions = 5
def calculate_position(self, price: float, volatility: float,
event_risk: str) -> int:
"""计算买入数量"""
vol_factor = 0.5 if volatility > 0.30 else (0.75 if volatility > 0.20 else 1.0)
risk_factor = {'extreme': 0.4, 'high': 0.6, 'medium': 0.8, 'low': 1.0}.get(event_risk, 1.0)
target_value = self.total_capital * 0.10 * vol_factor * risk_factor
shares = int(target_value / price)
return min(shares, self.total_capital * 0.25 // price)
def should_stop_loss(self, code: str, current_price: float) -> bool:
if code not in self.positions:
return False
entry = self.positions[code]['entry_price']
return (current_price - entry) / entry <= -self.stop_loss_pct
# ============== 组件4: 技术信号确认 ==============
class TechnicalConfirmator:
"""技术指标确认器"""
def __init__(self):
self.price_history = deque(maxlen=60)
def add_price(self, price: float):
self.price_history.append(price)
def get_signal(self) -> Dict:
if len(self.price_history) < 30:
return {'signal': 'WAIT', 'reason': '数据不足'}
prices = np.array(self.price_history)
# 简化布林带计算
ma = np.mean(prices[-20:])
std = np.std(prices[-20:])
upper = ma + 2 * std
lower = ma - 2 * std
position = (prices[-1] - lower) / (upper - lower + 1e-10)
# 简化MACD
ema12 = np.mean(prices[-12:])
ema26 = np.mean(prices[-26:])
dif = ema12 - ema26
dea = (dif + (ema12 - ema26)) / 2 # 简化
hist = 2 * (dif - dea)
# 信号判断
if position > 0.95 and hist > 0:
return {'signal': 'BUY', 'reason': '布林突破+MACD正向', 'confidence': 0.8}
elif position < 0.05 and hist < 0:
return {'signal': 'SELL', 'reason': '布林突破+MACD负向', 'confidence': 0.8}
elif abs(position - 0.5) < 0.1 and abs(hist) < 0.01:
return {'signal': 'NEUTRAL', 'reason': '技术面无明显方向', 'confidence': 0.5}
else:
return {'signal': 'WAIT', 'reason': '等待进一步确认', 'confidence': 0.3}
# ============== 主系统: 量化风控中枢 ==============
class QuantRiskControlSystem:
"""量化风控系统主控"""
def __init__(self, initial_capital=1_000_000):
self.capital = initial_capital
self.monitor = NewsSentimentMonitor()
self.vol_predictor = VolatilityPredictor()
self.position_manager = PositionManager(initial_capital)
self.tech_confirmator = TechnicalConfirmator()
self.active_positions = {}
self.trade_history = []
self.risk_alerts = []
self.is_running = False
def on_event_detected(self, event_type: str, duration_days: int,
production_impact: float):
"""当检测到供应链事件时触发"""
print(f'\n{"="*60}')
print(f'🚨 事件触发: {event_type} | 预计持续{duration_days}天 | 产能影响{production_impact*100:.0f}%')
print(f'{"="*60}')
# 1. 更新情绪
sentiment_score = 70 + production_impact * 30 # 估算情绪
self.monitor.update_sentiment(sentiment_score)
alert_level = self.monitor.get_alert_level()
print(f'情绪监控: {sentiment_score:.1f} ({alert_level})')
# 2. 预测波动率
predicted_vol = self.vol_predictor.predict(event_type, duration_days, production_impact)
print(f'波动率预测: {predicted_vol*100:.1f}% (基准: 20%)')
# 3. 确定事件风险等级
risk_map = {
'extreme_fear': 'extreme',
'fear': 'high',
'neutral': 'medium'
}
event_risk = risk_map.get(alert_level, 'medium')
# 4. 生成操作建议
print(f'\n--- 操作建议 ---')
tech_signal = self.tech_confirmator.get_signal()
if alert_level in ['EXTREME_FEAR', 'FEAR'] and tech_signal['signal'] == 'BUY':
# 恐慌 + 技术看多 = 逆向买入机会
print(f'建议: 逆向买入半导体板块')
print(f'理由: 情绪过度恐慌({alert_level}) + 技术面确认({tech_signal["reason"]})')
print(f'风险: 波动率偏高({predicted_vol*100:.0f}%),控制仓位')
self.risk_alerts.append({
'time': datetime.now(),
'type': 'BUY_OPPORTUNITY',
'alert_level': alert_level,
'volatility': predicted_vol,
'tech_signal': tech_signal
})
elif alert_level in ['EXTREME_FEAR', 'FEAR']:
# 恐慌但技术面不确认
print(f'建议: 观望等待技术确认')
print(f'理由: 情绪极度恐慌({alert_level}),但技术面未发出信号')
print(f'建议: 等待技术指标企稳后再操作')
elif alert_level in ['EXTREME_GREED', 'GREED']:
print(f'建议: 获利了结,锁定收益')
else:
print(f'建议: 持仓观察')
def on_price_update(self, code: str, price: float):
"""价格更新时检查持仓状态"""
self.tech_confirmator.add_price(price)
if code in self.active_positions:
# 检查止损
if self.position_manager.should_stop_loss(code, price):
self._execute_sell(code, price, '触发止损')
def _execute_buy(self, code: str, price: float, quantity: int, reason: str):
"""执行买入"""
cost = price * quantity
self.capital -= cost
self.active_positions[code] = {
'entry_price': price,
'quantity': quantity,
'entry_time': datetime.now()
}
self.trade_history.append({
'time': datetime.now(),
'action': 'BUY',
'code': code,
'price': price,
'quantity': quantity,
'reason': reason
})
print(f'✅ 买入 {code} | 价格:{price} | 数量:{quantity} | 剩余资金:{self.capital:,.0f}')
def _execute_sell(self, code: str, price: float, reason: str):
"""执行卖出"""
if code not in self.active_positions:
return
pos = self.active_positions[code]
revenue = price * pos['quantity']
pnl = revenue - pos['entry_price'] * pos['quantity']
self.capital += revenue
self.trade_history.append({
'time': datetime.now(),
'action': 'SELL',
'code': code,
'price': price,
'quantity': pos['quantity'],
'pnl': pnl,
'reason': reason
})
print(f'❌ 卖出 {code} | 价格:{price} | 收益:{pnl:,.0f} | 剩余资金:{self.capital:,.0f}')
del self.active_positions[code]
def get_risk_report(self) -> Dict:
"""生成风控报告"""
total_value = self.capital
for code, pos in self.active_positions.items():
total_value += pos['entry_price'] * pos['quantity']
return {
'timestamp': datetime.now(),
'total_value': total_value,
'cash': self.capital,
'positions': len(self.active_positions),
'active_codes': list(self.active_positions.keys()),
'return_pct': (total_value - 1_000_000) / 1_000_000 * 100,
'recent_alerts': self.risk_alerts[-5:],
'trade_count': len(self.trade_history)
}
# ============== 运行示例 ==============
if __name__ == '__main__':
print('初始化量化风控系统...')
system = QuantRiskControlSystem(initial_capital=1_000_000)
print('\n场景模拟: 三星电子工会宣布罢工')
system.on_event_detected(
event_type='strike',
duration_days=15,
production_impact=0.35
)
print('\n场景模拟: 半导体板块价格更新')
test_prices = [100, 98, 95, 97, 103, 108, 112, 115]
for i, price in enumerate(test_prices):
system.on_price_update('688981', price)
print(f'Day {i+1}: 价格={price}')
print('\n--- 风控报告 ---')
report = system.get_risk_report()
print(f"总资产: {report['total_value']:,.0f}元")
print(f"收益率: {report['return_pct']:.2f}%")
print(f"持仓数: {report['positions']}')
print(f"交易次数: {report['trade_count']}")
【蓝色理解】上述完整系统展示了"模块化量化风控"的设计思路:消息情绪监控负责第一时间感知黑天鹅事件;波动率预测负责量化风险敞口;仓位管理负责控制风险暴露;技术确认负责验证交易时机。四个模块相互独立又紧密协作,通过统一的风险中枢协调运作。这种架构的优势在于:每个模块都可以单独升级或替换,而不影响整体系统的稳定性。当你有新的数据源或新的策略思路时,只需替换对应模块即可。
结语:在不确定性中寻找确定性
三星电子工会的全面罢工,是2026年半导体行业最大的黑天鹅事件之一。对于量化交易者而言,这类事件既是挑战,也是机会。挑战在于——传统风控模型在极端行情中可能失效;机会在于——当市场陷入恐慌时,往往存在被错误定价的洼地。
本文从Python编程的角度,完整构建了一套针对供应链黑天鹅事件的量化风控系统。从实时消息监控到情绪量化、从波动率预测到仓位管理、从回测验证到技术确认,每个环节都有关键代码实现。这套系统的核心理念是:在不确定性中寻找确定性——用数据驱动决策,用模型控制风险,用纪律约束行为。
记住,任何量化系统都不是万能的。在三星罢工这样的极端事件中,市场可能因为恐慌情绪而出现非理性下跌,也可能在消息平息后出现剧烈反弹。量化系统能做的,是帮助你在混乱中保持清醒,在波动中控制风险,在机会出现时果断出击。
最后,用一句交易界的名言收尾:"在别人贪婪时恐惧,在别人恐惧时贪婪。"但这句话说起来容易,做起来难。没有一套完整的量化系统和严格的纪律约束,"逆向投资"往往变成"接飞刀"。希望本文的Python实战体系,能帮助你在下一次黑天鹅事件来临时,有准备、有信心、有纪律地应对。
风险提示:本文仅供参考,不构成投资建议。量化策略存在固有风险,历史表现不代表未来收益。投资有风险,入市需谨慎。本文涉及的所有Python代码均为教学示例,实际使用时请根据自身情况调整参数并充分回测验证。