
欢迎加入专注于财经数据与量化投研的【数据科学实战】知识星球!在这里,您将获取持续更新的《财经数据宝典》和《量化投研宝典》,这两部宝典相辅相成,为您在量化投研道路上提供明确指引。 我们提供了精选的国内外量化投研的 250+ 篇高质量文章,并每日更新最新研究成果,涵盖策略开发、因子分析、风险管理等核心领域。 无论您是量化投资新手还是经验丰富的研究者,星球社区都能帮您少走弯路,事半功倍,共同探索数据驱动的投资世界!
很多人都想跑赢大盘,但真正能做到的却寥寥无几。传统的投资方法往往依赖宏观经济预测或缓慢变化的基本面分析,而市场变化太快,人的直觉和过时的信息很难跟上节奏。
如果我们能让机器来识别市场的潜在规律,系统性地预测哪些行业板块将会表现优异,然后自动调仓,会怎么样?
本文将基于一篇最新的量化金融教程,带你从零搭建一个基于机器学习的板块轮动策略系统。这个策略在 6 年的回测中,将 100 万美元变成了超过 3400 万美元,年化收益率高达 65.86%,夏普比率 2.39,最大回撤仅 27%——远优于标普 500 指数。
传统的板块轮动策略依赖人工判断经济周期,比如在复苏期买入工业股,在衰退期买入消费必需品。但这种方法存在滞后性和主观偏差。
本策略的核心思路是:
金融数据的质量直接决定策略的成败。股票拆分、分红等公司行为如果不处理,回测结果就会失真。
import pandas as pd
import yfinance as yf
import logging
classConfig:
# 回测时间范围
START_DATE = "2018-01-01"
END_DATE = "2024-12-31"
# 基准指数
BENCHMARK_TICKER = "SPY"
# 股票池:覆盖多个行业的流动性较好的美股
TICKERS = [
'AAPL', 'MSFT', 'GOOGL', 'AMZN', 'NVDA', 'JPM', 'BAC', 'XOM', 'CVX',
'KO', 'PEP', 'PG', 'JNJ', 'UNH', 'LLY', 'HD', 'TSLA', 'CRM', 'ADBE',
'NFLX', 'CMCSA', 'VZ', 'T', 'CAT', 'GE', 'NEE'
]
classDataLoader:
deffetch_data(self, tickers: list, start_date: str, end_date: str):
"""获取股票历史数据"""
all_tickers = list(set(tickers + [Config.BENCHMARK_TICKER]))
logging.info(f"正在获取 {len(all_tickers)} 只股票的数据...")
# auto_adjust=True 自动处理股票拆分和分红,这一点非常关键
data = yf.download(
all_tickers,
start=start_date,
end=end_date,
auto_adjust=True, # 自动调整价格
group_by='column',
progress=False
)
# 处理多层列索引,转换为 Close_AAPL 这样的格式
if isinstance(data.columns, pd.MultiIndex):
new_columns = []
for col_level0, col_level1 in data.columns:
if col_level1:
new_columns.append(f"{col_level0}_{col_level1}")
else:
new_columns.append(f"{col_level0}")
data.columns = new_columns
close_cols = [col for col in data.columns if'Close'in col]
return data[close_cols]
defhandle_missing_data(self, dataframe: pd.DataFrame) -> pd.DataFrame:
"""处理缺失数据:先前向填充,再删除剩余空值"""
logging.info(f"处理缺失数据,原始形状: {dataframe.shape}")
df_filled = dataframe.ffill() # 前向填充
df_cleaned = df_filled.dropna() # 删除剩余空值
logging.info(f"处理完成,最终形状: {df_cleaned.shape}")
return df_cleaned
原始价格本身没有预测能力,我们需要从中提取有意义的特征。本策略使用两类特征:
classConfig:
# 动量计算的回看周期(天)
MOMENTUM_LOOKBACK_PERIODS = [20, 60, 120]
# 波动率计算的回看周期(天)
VOLATILITY_LOOKBACK_PERIODS = [20, 60]
# 股票与板块的映射关系
TICKER_SECTOR_MAP = {
'AAPL': 'Technology', 'MSFT': 'Technology', 'GOOGL': 'Technology',
'AMZN': 'Technology', 'NVDA': 'Technology', 'TSLA': 'Technology',
'CRM': 'Technology', 'ADBE': 'Technology', 'NFLX': 'Technology',
'JPM': 'Financials', 'BAC': 'Financials',
'XOM': 'Energy', 'CVX': 'Energy',
'KO': 'Consumer Staples', 'PEP': 'Consumer Staples', 'PG': 'Consumer Staples',
'JNJ': 'Healthcare', 'UNH': 'Healthcare', 'LLY': 'Healthcare',
'HD': 'Consumer Discretionary',
'CMCSA': 'Communication Services', 'VZ': 'Communication Services', 'T': 'Communication Services',
'CAT': 'Industrials', 'GE': 'Industrials',
'NEE': 'Utilities'
}
classFeatureEngineer:
defgenerate_momentum_features(self, prices_df: pd.DataFrame, lookback_periods: list):
"""生成动量特征:当前价格相对于移动平均线的偏离"""
logging.info(f"生成动量特征,回看周期: {lookback_periods}")
momentum_features = pd.DataFrame(index=prices_df.index)
for col in prices_df.columns:
if'Close_'in col:
ticker = col.split('Close_')[1]
for period in lookback_periods:
# 计算简单移动平均
sma = prices_df[col].rolling(window=period).mean()
# 动量 = (当前价格 - 均线) / 均线
momentum_features[f'Momentum_{period}_{ticker}'] = (prices_df[col] - sma) / sma
logging.info(f"动量特征生成完成,形状: {momentum_features.shape}")
return momentum_features.dropna()
defgenerate_volatility_features(self, prices_df: pd.DataFrame, lookback_periods: list):
"""生成波动率特征:收益率的滚动标准差"""
logging.info(f"生成波动率特征,回看周期: {lookback_periods}")
volatility_features = pd.DataFrame(index=prices_df.index)
# 先计算日收益率
returns_df = prices_df.pct_change()
for col in prices_df.columns:
if'Close_'in col:
ticker = col.split('Close_')[1]
returns_col = f'Returns_{ticker}'
for period in lookback_periods:
# 波动率 = 收益率的滚动标准差
volatility_features[f'Volatility_{period}_{ticker}'] = returns_df[col].rolling(window=period).std()
logging.info(f"波动率特征生成完成,形状: {volatility_features.shape}")
return volatility_features.dropna()
这是整个策略最关键的一步。很多回测之所以失败,就是因为不小心使用了未来的信息来做预测。
我们的目标是预测未来 20 个交易日(约 1 个月)哪些板块表现最好。标签的构建方式是:计算每个板块未来 20 天的平均收益率,选出表现最好的 3 个板块标记为 1,其余标记为 0。
classConfig:
PREDICTION_HORIZON = 20# 预测未来 20 个交易日的表现
TOP_SECTORS_TO_PICK = 3# 选择表现最好的 3 个板块
classFeatureEngineer:
defcreate_relative_strength_labels(self, prices_df: pd.DataFrame,
sector_mapper,
prediction_horizon: int,
top_sectors: int):
"""
创建相对强度标签
关键点:使用 shift(-prediction_horizon) 将未来数据"移回"当前
这样在任意日期 D,标签 y 代表的是 D 之后发生的事情
"""
logging.info(f"创建标签,预测周期: {prediction_horizon} 天")
# 排除基准指数
stock_prices = prices_df.drop(columns=[f'Close_{Config.BENCHMARK_TICKER}'], errors='ignore')
stock_returns = stock_prices.pct_change()
# 核心:将未来收益率移回当前日期
# 这是防止信息泄露的关键操作
future_returns = pd.DataFrame(index=stock_returns.index)
for col in stock_returns.columns:
# shift(-20) 意味着把 20 天后的数据放到今天
future_returns[col] = stock_returns[col].shift(-prediction_horizon)
# 按板块聚合未来收益率,然后标记表现最好的板块
# ... 聚合和标记逻辑 ...
logging.info(f"标签创建完成")
return sector_labels_df.dropna()
我们使用随机森林分类器来预测板块的相对强弱。随机森林是一种集成学习方法,它结合多棵决策树的预测结果,对噪声数据更加鲁棒。
两个重要的设计决策:
class_weight='balanced' 防止模型偏向多数类from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, classification_report, f1_score
import joblib
classConfig:
MODEL_TYPE = 'RandomForestClassifier'
MODEL_HYPERPARAMETERS = {
'n_estimators': 100, # 决策树数量
'random_state': 42, # 随机种子,保证可复现
'class_weight': 'balanced', # 处理类别不平衡
'max_depth': 10# 限制树深度,防止过拟合
}
MODEL_PATH = "ml_predictor_model.joblib"
classMLPredictor:
def__init__(self, model_type: str = 'RandomForestClassifier', hyperparameters: dict = None):
self.model_type = model_type
self.hyperparameters = hyperparameters if hyperparameters else Config.MODEL_HYPERPARAMETERS
self.model = self._initialize_model()
logging.info(f"模型初始化完成: {self.model_type}")
def_initialize_model(self):
"""初始化模型"""
if self.model_type == 'RandomForestClassifier':
return RandomForestClassifier(**self.hyperparameters)
else:
raise ValueError(f"不支持的模型类型: {self.model_type}")
deftrain(self, X: pd.DataFrame, y: pd.Series):
"""训练模型,使用时间序列切分"""
logging.info("开始训练模型...")
y = y.astype(int)
# 时间序列切分:按日期排序后,前 80% 训练,后 20% 测试
unique_dates = X.index.get_level_values('Date').unique().sort_values()
train_end_date = unique_dates[int(len(unique_dates) * 0.8)]
X_train = X[X.index.get_level_values('Date') <= train_end_date]
y_train = y[y.index.get_level_values('Date') <= train_end_date]
X_test = X[X.index.get_level_values('Date') > train_end_date]
y_test = y[y.index.get_level_values('Date') > train_end_date]
logging.info(f"数据划分完成: 训练集 {len(X_train)} 样本,测试集 {len(X_test)} 样本")
# 训练模型
self.model.fit(X_train, y_train)
logging.info("模型训练完成")
# 在测试集上评估
ifnot X_test.empty:
y_pred = self.model.predict(X_test)
accuracy = accuracy_score(y_test, y_pred)
f1 = f1_score(y_test, y_pred, average='weighted')
logging.info(f"测试集准确率: {accuracy:.4f}")
logging.info(f"测试集 F1 分数: {f1:.4f}")
defpredict(self, X: pd.DataFrame):
"""使用训练好的模型进行预测"""
if self.model isNone:
raise RuntimeError("模型尚未训练")
logging.info(f"正在预测 {len(X)} 个样本...")
return self.model.predict(X)
defsave_model(self, path: str):
"""保存模型到文件"""
joblib.dump(self.model, path)
logging.info(f"模型已保存到 {path}")
模型在测试集上达到了 56% 的准确率。在金融市场这种噪声极大的环境中,这个数字看起来不高,但考虑到随机猜测只有约 27% 的准确率(3 个最强板块 / 11 个板块),这已经是一个有意义的预测优势了。
有了预测模型,接下来就是模拟真实交易。策略引擎每 20 个交易日(约 1 个月)调仓一次:
classConfig:
INITIAL_CAPITAL = 1000000# 初始资金 100 万美元
REBALANCE_FREQUENCY = 20# 每 20 个交易日调仓一次
classStrategyEngine:
def__init__(self, initial_capital: float, rebalance_frequency: int):
self.initial_capital = initial_capital
self.rebalance_frequency = rebalance_frequency
self.portfolio_value = pd.Series(dtype=float)
self.holdings = {} # 持仓:{股票代码: 股数}
self.capital = initial_capital # 现金
logging.info(f"策略引擎初始化,初始资金: ${initial_capital:,.2f}")
defrun_backtest(self, prices_df: pd.DataFrame, sector_predictions_df: pd.DataFrame, sector_mapper):
"""运行回测"""
logging.info("开始回测模拟...")
dates = prices_df.index
self.portfolio_value = pd.Series(index=dates, dtype=float)
self.benchmark_value = pd.Series(index=dates, dtype=float)
# 计算基准的初始持仓
initial_spy_price = prices_df.loc[dates[0], f'Close_{Config.BENCHMARK_TICKER}']
benchmark_shares = self.initial_capital / initial_spy_price
self.benchmark_value.iloc[0] = self.initial_capital
for i, current_date in enumerate(dates):
# 每天更新基准价值
if i > 0:
self.benchmark_value.loc[current_date] = benchmark_shares * prices_df.loc[current_date, f'Close_{Config.BENCHMARK_TICKER}']
# 计算当前组合价值(现金 + 持仓市值)
current_portfolio_value = self.capital
for ticker, shares in self.holdings.items():
iff'Close_{ticker}'in prices_df.columns:
current_portfolio_value += shares * prices_df.loc[current_date, f'Close_{ticker}']
self.portfolio_value.loc[current_date] = current_portfolio_value
# 调仓逻辑:每隔 rebalance_frequency 天执行一次
if i % self.rebalance_frequency == 0:
logging.info(f"调仓日期: {current_date.strftime('%Y-%m-%d')}")
# 获取当天的板块预测
if current_date in sector_predictions_df.index.get_level_values('Date'):
daily_preds = sector_predictions_df.loc[current_date, 'Predicted_Label']
top_sectors = daily_preds[daily_preds == 1].index.tolist()
else:
continue
# 找出最强板块对应的股票
selected_tickers = []
for sector in top_sectors:
tickers_in_sector = [t for t, s in sector_mapper.items() if s == sector]
selected_tickers.extend(tickers_in_sector)
selected_tickers = list(set(selected_tickers))
# 执行调仓
self.rebalance_portfolio(current_date, prices_df, selected_tickers)
logging.info(f"回测完成,共调仓 {len(dates) // self.rebalance_frequency} 次")
logging.info(f"最终组合价值: ${self.portfolio_value.iloc[-1]:,.2f}")
logging.info(f"最终基准价值: ${self.benchmark_value.iloc[-1]:,.2f}")
defrebalance_portfolio(self, date, prices_df, selected_tickers):
"""执行调仓:先清仓,再等权买入目标股票"""
# 清仓:卖出所有持仓,全部转为现金
for ticker, shares in self.holdings.items():
iff'Close_{ticker}'in prices_df.columns:
self.capital += shares * prices_df.loc[date, f'Close_{ticker}']
self.holdings = {}
# 如果有目标股票,等权分配资金买入
if selected_tickers:
allocation_per_stock = self.capital / len(selected_tickers)
for ticker in selected_tickers:
iff'Close_{ticker}'in prices_df.columns:
price = prices_df.loc[date, f'Close_{ticker}']
shares = allocation_per_stock / price
self.holdings[ticker] = shares
self.capital = 0# 全部资金投入
回测结束后,我们需要全面评估策略的表现。以下是关键指标:
import numpy as np
classPerformanceAnalyzer:
defcalculate_metrics(self, portfolio_returns: pd.Series, benchmark_returns: pd.Series,
risk_free_rate: float = 0.02) -> dict:
"""计算关键绩效指标"""
# 累计收益
cumulative_portfolio = (1 + portfolio_returns).prod() - 1
cumulative_benchmark = (1 + benchmark_returns).prod() - 1
# 年化收益率 (CAGR)
years = len(portfolio_returns) / 252# 假设每年 252 个交易日
cagr_portfolio = (1 + cumulative_portfolio) ** (1 / years) - 1
cagr_benchmark = (1 + cumulative_benchmark) ** (1 / years) - 1
# 年化波动率
vol_portfolio = portfolio_returns.std() * np.sqrt(252)
vol_benchmark = benchmark_returns.std() * np.sqrt(252)
# 夏普比率 = (年化收益 - 无风险利率) / 年化波动率
sharpe_portfolio = (cagr_portfolio - risk_free_rate) / vol_portfolio
sharpe_benchmark = (cagr_benchmark - risk_free_rate) / vol_benchmark
# 最大回撤
cumulative = (1 + portfolio_returns).cumprod()
running_max = cumulative.cummax()
drawdown = (cumulative - running_max) / running_max
max_drawdown = drawdown.min()
metrics = {
'累计收益(策略)': f'{cumulative_portfolio:.2%}',
'累计收益(基准)': f'{cumulative_benchmark:.2%}',
'年化收益率(策略)': f'{cagr_portfolio:.2%}',
'年化收益率(基准)': f'{cagr_benchmark:.2%}',
'夏普比率(策略)': f'{sharpe_portfolio:.2f}',
'夏普比率(基准)': f'{sharpe_benchmark:.2f}',
'最大回撤(策略)': f'{max_drawdown:.2%}',
}
for k, v in metrics.items():
logging.info(f" {k}: {v}")
return metrics
经过 6 年的回测(2018–2024),策略表现如下:
策略不仅收益远超基准,而且风险控制也更好——最大回撤比基准还低 6 个百分点。夏普比率 2.39 意味着每承担一单位风险,策略能获得 2.39 单位的超额收益,这在量化投资领域是非常优秀的数字。
这个策略只是一个起点,还有很多可以优化的方向:
本文介绍了如何用 Python 构建一个完整的机器学习驱动的板块轮动策略,涵盖了从数据获取、特征工程、模型训练到回测分析的全流程。
关键要点:
shift() 操作确保标签只使用未来数据class_weight='balanced' 让模型重视少数类虽然模型预测准确率只有 56%,但这个微小的优势在长期复利作用下,足以产生惊人的超额收益。量化投资的精髓就在于:找到一个可重复的、有统计优势的策略,然后系统性地执行它。
核心权益如下:
星球已有丰富内容积累,包括量化投研论文、财经高频数据、 PyBroker 视频教程、定期直播、数据分享和答疑解难。适合对量化投研和财经数据分析有兴趣的学习者及从业者。欢迎加入我们!
好文推荐
1. 用 Python 打造股票预测系统:Transformer 模型教程(一)
2. 用 Python 打造股票预测系统:Transformer 模型教程(二)
3. 用 Python 打造股票预测系统:Transformer 模型教程(三)
4. 用 Python 打造股票预测系统:Transformer 模型教程(完结)
6. YOLO 也能预测股市涨跌?计算机视觉在股票市场预测中的应用
9. Python 量化投资利器:Ridge、Lasso 和 Elastic Net 回归详解
好书推荐