从零构建自己的多因子模型,找到超额收益的秘密
你有没有想过,为什么有些基金无论市场涨跌都能稳定跑赢指数?为什么有些量化策略能够持续创造超额收益?这背后往往隐藏着一个共同的秘密:因子投资。今天,我们将揭开这个神秘的面纱,教你如何用Python挖掘和构建自己的Alpha因子。
先来看一个真实的案例。2000年,两位经济学家Eugene Fama和Kenneth French提出了著名的三因子模型,他们发现:价值股(低市净率)表现优于成长股(高市净率)、小盘股表现优于大盘股。基于这个发现,很多量化基金构建了价值因子和小盘因子策略,在接下来的20年里获得了显著的超额收益。
但故事没有结束。2013年,同样的两位学者更新了他们的模型,加入了盈利因子和投资因子,形成了五因子模型。这个演变告诉我们:市场在变化,有效的因子也在变化。今天有效的因子,明天可能就会失效。这就是因子投资的本质——持续挖掘、验证、迭代。
今天我们要深入探讨的,就是如何用Python系统性地进行因子研究,构建属于自己的多因子模型。我们将从最基础的单因子测试开始,逐步深入到多因子组合优化。
一、什么是因子投资?
因子投资的核心思想是:股票的收益可以被一组共同的风险因子解释。这些因子包括:
市场因子:市场整体涨跌带来的收益
价值因子:便宜的公司比贵的公司表现更好
规模因子:小公司比大公司表现更好
动量因子:过去表现好的股票未来继续表现好
质量因子:优质公司比劣质公司表现更好
通过暴露在这些因子上,我们可以获取相应的风险溢价。但更重要的是,我们可以通过因子择时和因子组合来获取超额收益。
让我们从一个简单的价值因子开始。价值因子的核心逻辑是:买入便宜的股票,卖出昂贵的股票。但"便宜"如何定义?是低市盈率(PE)、低市净率(PB)、还是低市销率(PS)?不同的定义可能导致完全不同的结果。
二、环境准备与数据获取
首先,我们需要准备因子研究所需的环境和数据。与之前的策略不同,因子研究需要更全面的数据,包括财务数据、估值数据等。
import numpy as npimport pandas as pdimport matplotlib.pyplot as pltimport seaborn as snsfrom scipy import statsimport statsmodels.api as smimport tushare as tsfrom datetime import datetime, timedeltaimport warningswarnings.filterwarnings('ignore')# 设置中文显示plt.rcParams['font.sans-serif'] = ['SimHei']plt.rcParams['axes.unicode_minus'] = False# 初始化tushare prots.set_token('你的token')pro = ts.pro_api()# 设置研究参数START_DATE = '20180101'END_DATE = '20231231'UNIVERSE_SIZE = 500 # 研究股票池大小def get_stock_universe(date, top_n=UNIVERSE_SIZE): """ 获取股票池:选择流动性好、市值大的股票 """ # 获取指定日期的所有A股 stocks = pro.stock_basic(exchange='', list_status='L', fields='ts_code,symbol,name,area,industry,list_date') # 获取市值数据 trade_date = (datetime.strptime(date, '%Y%m%d') - timedelta(days=1)).strftime('%Y%m%d') daily_basic = pro.daily_basic(trade_date=trade_date, fields='ts_code,total_mv') # 合并数据 df = pd.merge(stocks, daily_basic, on='ts_code') # 按市值排序,选择前top_n df = df.sort_values('total_mv', ascending=False).head(top_n) return df['ts_code'].tolist()def get_factor_data(ts_code, start_date=START_DATE, end_date=END_DATE): """ 获取单只股票的因子数据 """ try: # 获取日行情数据 daily = pro.daily(ts_code=ts_code, start_date=start_date, end_date=end_date) daily['trade_date'] = pd.to_datetime(daily['trade_date']) daily.set_index('trade_date', inplace=True) # 获取日度基本面数据 daily_basic = pro.daily_basic(ts_code=ts_code, start_date=start_date, end_date=end_date, fields='trade_date,pe,pe_ttm,pb,ps,ps_ttm,dv_ratio,total_mv,circ_mv') daily_basic['trade_date'] = pd.to_datetime(daily_basic['trade_date']) daily_basic.set_index('trade_date', inplace=True) # 获取季度财务数据 income = pro.income(ts_code=ts_code, start_date=start_date, end_date=end_date, fields='end_date,total_revenue,n_income') balance = pro.balancesheet(ts_code=ts_code, start_date=start_date, end_date=end_date, fields='end_date,total_assets,total_hldr_eqy_exc_min_int') cashflow = pro.cashflow(ts_code=ts_code, start_date=start_date, end_date=end_date, fields='end_date,n_cashflow_oper_act') # 合并数据 df = daily[['open', 'high', 'low', 'close', 'vol', 'amount']].copy() df = df.merge(daily_basic, left_index=True, right_index=True, how='left') # 计算日收益率 df['returns'] = df['close'].pct_change() # 向前填充缺失的财务数据 df = df.ffill() return df except Exception as e: print(f"获取{ts_code}数据失败: {e}") return None# 获取基准日期(每月最后一个交易日)def get_month_end_dates(start_date, end_date): """获取月度调仓日期""" dates = pro.trade_cal(exchange='SSE', start_date=start_date, end_date=end_date, is_open='1') dates['cal_date'] = pd.to_datetime(dates['cal_date']) dates = dates[dates['is_open'] == 1] # 获取每月最后交易日 month_ends = dates.groupby(dates['cal_date'].dt.to_period('M'))['cal_date'].max() return month_ends.tolist()# 获取调仓日期rebalance_dates = get_month_end_dates(START_DATE, END_DATE)print(f"回测期间调仓日期数量: {len(rebalance_dates)}")print(f"首期调仓日: {rebalance_dates[0]}, 末期调仓日: {rebalance_dates[-1]}")
三、单因子研究与测试
1. 计算常见因子
def calculate_factors(df): """ 计算各种因子值 """ df = df.copy() # 估值因子 df['PE'] = df['pe_ttm'] # 市盈率 df['PB'] = df['pb'] # 市净率 df['PS'] = df['ps_ttm'] # 市销率 # 反转因子:过去N个月的收益率 for months in [1, 3, 6, 12]: df[f'REV_{months}M'] = df['close'].pct_change(months * 21) # 假设每月21个交易日 # 波动率因子 df['VOL_20D'] = df['returns'].rolling(20).std() df['VOL_60D'] = df['returns'].rolling(60).std() # 换手率因子 df['TURNOVER'] = df['vol'] / df['circ_mv'] * df['close'] # 市值因子(取对数) df['LOG_MV'] = np.log(df['total_mv']) # 质量因子(需要财务数据,这里简化处理) # 实际应用中需要从财务报表中提取ROE、ROA等指标 # 标准化处理 for factor in ['PE', 'PB', 'PS', 'LOG_MV', 'VOL_20D', 'TURNOVER']: if factor in df.columns: # 去除极端值(1%和99%分位数) lower = df[factor].quantile(0.01) upper = df[factor].quantile(0.99) df[factor] = df[factor].clip(lower, upper) # Z-score标准化 df[f'{factor}_Z'] = (df[factor] - df[factor].mean()) / df[factor].std() return df# 测试单只股票的因子计算sample_stock = '600519.SH'sample_data = get_factor_data(sample_stock)if sample_data is not None: sample_data = calculate_factors(sample_data) print(f"\n{sample_stock} 因子数据示例:") print(sample_data[['close', 'PE', 'PB', 'LOG_MV', 'VOL_20D']].tail())
2. 单因子回测框架
class SingleFactorTester: """ 单因子测试器 """ def __init__(self, universe_size=UNIVERSE_SIZE): self.universe_size = universe_size self.factor_data = {} self.factor_performance = {} def prepare_factor_data(self, factor_name, rebalance_date): """ 准备全市场因子数据 """ # 获取当期的股票池 stock_universe = get_stock_universe(rebalance_date.strftime('%Y%m%d'), self.universe_size) factor_values = {} for ts_code in stock_universe: try: # 获取股票数据 df = get_factor_data(ts_code) if df is None or len(df) < 100: # 要求至少100个交易日数据 continue # 计算因子 df = calculate_factors(df) # 获取最新因子值 latest_data = df.loc[rebalance_date] # 存储因子值 if factor_name in latest_data: factor_values[ts_code] = { 'factor_value': latest_data[factor_name], 'market_cap': latest_data['total_mv'], 'price': latest_data['close'] } except Exception as e: # 部分股票可能没有数据 continue return factor_values def calculate_factor_returns(self, factor_values, next_rebalance_date): """ 计算因子收益 """ returns_data = {} for ts_code, data in factor_values.items(): try: # 获取下一期的收益率 df = get_factor_data(ts_code) if df is None: continue # 计算持有期收益率 start_price = data['price'] # 找到调仓日后第一个交易日 available_dates = df.index[df.index >= next_rebalance_date] if len(available_dates) == 0: continue end_date = available_dates[0] end_price = df.loc[end_date, 'close'] holding_return = (end_price - start_price) / start_price returns_data[ts_code] = { 'factor_value': data['factor_value'], 'market_cap': data['market_cap'], 'return': holding_return } except Exception as e: continue return returns_data def test_single_factor(self, factor_name, quantiles=5): """ 测试单个因子 """ print(f"\n开始测试因子: {factor_name}") print("=" * 50) all_returns = [] for i in range(len(rebalance_dates) - 1): current_date = rebalance_dates[i] next_date = rebalance_dates[i + 1] print(f" 周期 {i+1}/{len(rebalance_dates)-1}: {current_date.date()} -> {next_date.date()}") # 准备因子数据 factor_values = self.prepare_factor_data(factor_name, current_date) if not factor_values: continue # 计算因子收益 returns_data = self.calculate_factor_returns(factor_values, next_date) if not returns_data: continue # 构建DataFrame df_returns = pd.DataFrame.from_dict(returns_data, orient='index') # 按因子值分组 df_returns['quantile'] = pd.qcut(df_returns['factor_value'], q=quantiles, labels=range(1, quantiles+1)) # 计算各分位数的平均收益 quantile_returns = df_returns.groupby('quantile')['return'].mean() # 计算多空组合收益(第一分位做多,第末分位做空) if len(quantile_returns) >= 2: long_short_return = quantile_returns.iloc[0] - quantile_returns.iloc[-1] else: long_short_return = 0 all_returns.append({ 'period': i, 'start_date': current_date, 'end_date': next_date, 'quantile_returns': quantile_returns, 'long_short_return': long_short_return }) # 汇总结果 if not all_returns: print("没有有效数据") return None # 计算因子表现 long_short_returns = [r['long_short_return'] for r in all_returns] mean_return = np.mean(long_short_returns) std_return = np.std(long_short_returns) sharpe_ratio = mean_return / std_return * np.sqrt(12) # 月度年化夏普 # 计算IC(信息系数) ic_values = [] for period_data in all_returns: # 这里简化处理,实际需要计算每期的Rank IC ic_values.append(period_data['long_short_return']) mean_ic = np.mean(ic_values) ic_ir = mean_ic / np.std(ic_values) if np.std(ic_values) > 0 else 0 performance = { 'factor_name': factor_name, 'mean_return': mean_return, 'std_return': std_return, 'sharpe_ratio': sharpe_ratio, 'mean_ic': mean_ic, 'ic_ir': ic_ir, 'num_periods': len(all_returns), 'period_returns': all_returns } self.factor_performance[factor_name] = performance print(f"\n因子 {factor_name} 测试结果:") print(f" 平均月度收益: {mean_return:.4%}") print(f" 收益标准差: {std_return:.4%}") print(f" 年化夏普比率: {sharpe_ratio:.3f}") print(f" 平均IC: {mean_ic:.4f}") print(f" IC信息比率: {ic_ir:.3f}") print(f" 有效期数: {len(all_returns)}") return performance# 测试几个常见因子tester = SingleFactorTester()# 测试估值因子factors_to_test = ['PE', 'PB', 'LOG_MV', 'REV_1M', 'VOL_20D']for factor in factors_to_test: tester.test_single_factor(factor)
3. 可视化因子表现
def visualize_factor_performance(performance_dict): """ 可视化因子表现 """ if not performance_dict: print("没有性能数据可显示") return # 提取数据 factor_names = [] sharpe_ratios = [] mean_returns = [] mean_ics = [] for factor_name, perf in performance_dict.items(): factor_names.append(factor_name) sharpe_ratios.append(perf['sharpe_ratio']) mean_returns.append(perf['mean_return'] * 100) # 转换为百分比 mean_ics.append(perf['mean_ic']) # 创建图表 fig, axes = plt.subplots(2, 2, figsize=(15, 12)) # 夏普比率对比 axes[0, 0].barh(factor_names, sharpe_ratios, color='steelblue') axes[0, 0].axvline(x=0, color='red', linestyle='--', alpha=0.5) axes[0, 0].set_xlabel('年化夏普比率') axes[0, 0].set_title('各因子夏普比率对比', fontsize=14) axes[0, 0].grid(True, alpha=0.3, axis='x') # 平均收益对比 axes[0, 1].barh(factor_names, mean_returns, color='forestgreen') axes[0, 1].axvline(x=0, color='red', linestyle='--', alpha=0.5) axes[0, 1].set_xlabel('平均月度收益 (%)') axes[0, 1].set_title('各因子平均收益对比', fontsize=14) axes[0, 1].grid(True, alpha=0.3, axis='x') # IC对比 axes[1, 0].barh(factor_names, mean_ics, color='darkorange') axes[1, 0].axvline(x=0, color='red', linestyle='--', alpha=0.5) axes[1, 0].set_xlabel('平均IC') axes[1, 0].set_title('各因子信息系数对比', fontsize=14) axes[1, 0].grid(True, alpha=0.3, axis='x') # 因子收益曲线(以夏普最高的因子为例) best_factor = max(performance_dict.items(), key=lambda x: x[1]['sharpe_ratio']) best_factor_name = best_factor[0] best_performance = best_factor[1] cumulative_returns = [] cum_return = 1.0 for period in best_performance['period_returns']: cum_return *= (1 + period['long_short_return']) cumulative_returns.append(cum_return) axes[1, 1].plot(range(len(cumulative_returns)), cumulative_returns, linewidth=2, color='purple') axes[1, 1].axhline(y=1, color='gray', linestyle='--', alpha=0.5) axes[1, 1].set_xlabel('调仓周期') axes[1, 1].set_ylabel('累计收益') axes[1, 1].set_title(f'最佳因子 ({best_factor_name}) 累计收益曲线', fontsize=14) axes[1, 1].grid(True, alpha=0.3) plt.tight_layout() plt.show() # 打印详细分析 print("\n" + "="*60) print("因子表现详细分析") print("="*60) # 创建汇总表格 summary_data = [] for factor_name, perf in performance_dict.items(): summary_data.append({ '因子': factor_name, '夏普比率': f"{perf['sharpe_ratio']:.3f}", '月均收益': f"{perf['mean_return']:.3%}", '收益波动': f"{perf['std_return']:.3%}", '平均IC': f"{perf['mean_ic']:.4f}", 'IC_IR': f"{perf['ic_ir']:.3f}" }) summary_df = pd.DataFrame(summary_data) print(summary_df.to_string(index=False))# 可视化因子表现visualize_factor_performance(tester.factor_performance)