def analyze_factor_correlation(performance_dict): """ 分析因子间的相关性 """ # 收集各因子的月度收益序列 factor_returns = {} # 假设所有因子有相同的期数 sample_factor = list(performance_dict.keys())[0] num_periods = len(performance_dict[sample_factor]['period_returns']) for factor_name, perf in performance_dict.items(): monthly_returns = [p['long_short_return'] for p in perf['period_returns']] # 确保长度一致 if len(monthly_returns) == num_periods: factor_returns[factor_name] = monthly_returns # 创建相关性矩阵 returns_df = pd.DataFrame(factor_returns) correlation_matrix = returns_df.corr() # 可视化相关性矩阵 plt.figure(figsize=(10, 8)) sns.heatmap(correlation_matrix, annot=True, cmap='coolwarm', center=0, fmt='.2f', square=True) plt.title('因子收益相关性矩阵', fontsize=15, pad=20) plt.tight_layout() plt.show() # 识别高度相关的因子对 high_corr_pairs = [] for i in range(len(correlation_matrix.columns)): for j in range(i+1, len(correlation_matrix.columns)): corr = correlation_matrix.iloc[i, j] if abs(corr) > 0.7: # 阈值设为0.7 factor1 = correlation_matrix.columns[i] factor2 = correlation_matrix.columns[j] high_corr_pairs.append((factor1, factor2, corr)) if high_corr_pairs: print("\n高度相关的因子对(相关系数 > 0.7):") for factor1, factor2, corr in high_corr_pairs: print(f" {factor1} 与 {factor2}: {corr:.3f}") else: print("\n没有发现高度相关的因子对") return correlation_matrix# 分析因子相关性corr_matrix = analyze_factor_correlation(tester.factor_performance)
class MultiFactorModel: """ 多因子模型 """ def __init__(self, factors_weights=None): """ 初始化多因子模型 factors_weights: 各因子权重字典 """ if factors_weights is None: # 默认等权重 self.factors_weights = {} else: self.factors_weights = factors_weights self.combined_scores = {} self.model_performance = {} def calculate_combined_score(self, factor_scores, method='weighted'): """ 计算综合因子得分 """ if method == 'weighted': # 加权平均 total_weight = sum(self.factors_weights.values()) if total_weight == 0: # 如果没有设置权重,使用等权重 weight = 1.0 / len(factor_scores) combined_score = sum(factor_scores.values()) * weight else: combined_score = sum( factor_scores[factor] * weight for factor, weight in self.factors_weights.items() if factor in factor_scores ) elif method == 'zscore': # Z-score加权(需要各因子已标准化) weighted_zscore = sum( factor_scores[factor] * weight for factor, weight in self.factors_weights.items() if factor in factor_scores ) combined_score = weighted_zscore elif method == 'rank': # 排序加权 ranked_scores = {} for factor in factor_scores: # 对每个因子值进行排序 values = list(factor_scores[factor].values()) ranks = pd.Series(values).rank() ranked_scores[factor] = dict(zip(factor_scores[factor].keys(), ranks)) # 计算加权排名 total_rank = {} for factor, weight in self.factors_weights.items(): if factor in ranked_scores: for stock, rank in ranked_scores[factor].items(): if stock not in total_rank: total_rank[stock] = 0 total_rank[stock] += rank * weight combined_score = total_rank return combined_score def optimize_weights(self, factor_performance, method='sharpe'): """ 优化因子权重 """ factor_names = list(factor_performance.keys()) n_factors = len(factor_names) # 收集各因子的月度收益 monthly_returns = {} for factor_name, perf in factor_performance.items(): monthly_returns[factor_name] = [ p['long_short_return'] for p in perf['period_returns'] ] # 确保所有因子有相同的期数 min_length = min(len(returns) for returns in monthly_returns.values()) for factor in monthly_returns: monthly_returns[factor] = monthly_returns[factor][:min_length] returns_df = pd.DataFrame(monthly_returns) if method == 'sharpe': # 最大化夏普比率 from scipy.optimize import minimize def negative_sharpe(weights): # 计算组合收益 portfolio_returns = returns_df.dot(weights) # 计算夏普比率(月度) mean_return = portfolio_returns.mean() std_return = portfolio_returns.std() if std_return == 0: return 999 # 惩罚零波动 sharpe = mean_return / std_return return -sharpe # 最小化负夏普 # 约束条件:权重和为1,所有权重非负 constraints = ({'type': 'eq', 'fun': lambda x: np.sum(x) - 1}) bounds = tuple((0, 1) for _ in range(n_factors)) # 初始权重(等权重) init_weights = np.ones(n_factors) / n_factors # 优化 result = minimize(negative_sharpe, init_weights, method='SLSQP', bounds=bounds, constraints=constraints) if result.success: optimized_weights = dict(zip(factor_names, result.x)) else: print("权重优化失败,使用等权重") optimized_weights = dict(zip(factor_names, init_weights)) elif method == 'equal': # 等权重 optimized_weights = dict(zip(factor_names, np.ones(n_factors) / n_factors)) elif method == 'ic_weighted': # 根据IC值加权 ic_values = [perf['mean_ic'] for perf in factor_performance.values()] ic_abs = np.abs(ic_values) if ic_abs.sum() > 0: weights = ic_abs / ic_abs.sum() else: weights = np.ones(n_factors) / n_factors optimized_weights = dict(zip(factor_names, weights)) self.factors_weights = optimized_weights print("\n优化后的因子权重:") for factor, weight in optimized_weights.items(): print(f" {factor}: {weight:.3f}") return optimized_weights def backtest_multi_factor(self, factor_performance, top_n=50): """ 回测多因子模型 """ print(f"\n开始回测多因子模型") print("=" * 50) # 如果没有权重,先优化权重 if not self.factors_weights: print("正在优化因子权重...") self.optimize_weights(factor_performance) all_returns = [] portfolio_compositions = [] # 获取所有因子的期数 sample_factor = list(factor_performance.keys())[0] num_periods = len(factor_performance[sample_factor]['period_returns']) for period_idx in range(num_periods): try: # 收集当前期各因子的股票得分 period_scores = {} for factor_name, perf in factor_performance.items(): period_data = perf['period_returns'][period_idx] start_date = period_data['start_date'] # 获取该期的因子值 factor_values = tester.prepare_factor_data(factor_name, start_date) if factor_values: # 提取因子值 scores = {stock: data['factor_value'] for stock, data in factor_values.items()} period_scores[factor_name] = scores if not period_scores: continue # 计算综合得分 combined_scores = self.calculate_combined_score(period_scores, method='weighted') # 选择得分最高的top_n只股票 if isinstance(combined_scores, dict): sorted_stocks = sorted(combined_scores.items(), key=lambda x: x[1], reverse=True) selected_stocks = [stock for stock, _ in sorted_stocks[:top_n]] else: # 如果是数组形式 selected_stocks = list(combined_scores.keys())[:top_n] # 计算组合收益 portfolio_return = 0 valid_stocks = 0 for stock in selected_stocks: # 获取该股票在下期的收益 for factor_name in factor_performance.keys(): period_data = factor_performance[factor_name]['period_returns'][period_idx] # 这里简化处理,实际需要更精确的收益计算 if hasattr(period_data, 'get') and 'quantile_returns' in period_data: # 获取该股票所在分位数的收益 # 实际应用中需要更精确的个股收益 pass # 简化:使用等权重的平均收益 # 实际应用中需要计算真实的持仓收益 valid_stocks += 1 if valid_stocks > 0: # 简化:使用平均因子收益作为组合收益 period_returns = [] for factor_name, perf in factor_performance.items(): period_data = perf['period_returns'][period_idx] period_returns.append(period_data['long_short_return']) portfolio_return = np.mean(period_returns) all_returns.append(portfolio_return) portfolio_compositions.append({ 'period': period_idx, 'date': start_date, 'selected_stocks': selected_stocks[:10], # 只记录前10只 'portfolio_return': portfolio_return }) print(f" 第{period_idx+1}期: 组合收益 = {portfolio_return:.4%}") except Exception as e: print(f" 第{period_idx+1}期出错: {e}") continue # 计算模型表现 if not all_returns: print("没有有效的回测结果") return None returns_array = np.array(all_returns) mean_return = returns_array.mean() std_return = returns_array.std() sharpe_ratio = mean_return / std_return * np.sqrt(12) if std_return > 0 else 0 # 累计收益 cumulative_returns = (1 + returns_array).cumprod() # 最大回撤 peak = cumulative_returns.expanding().max() drawdown = (cumulative_returns - peak) / peak max_drawdown = drawdown.min() self.model_performance = { 'mean_return': mean_return, 'std_return': std_return, 'sharpe_ratio': sharpe_ratio, 'max_drawdown': max_drawdown, 'cumulative_returns': cumulative_returns, 'portfolio_compositions': portfolio_compositions, 'period_returns': returns_array } print(f"\n多因子模型表现:") print(f" 平均月度收益: {mean_return:.4%}") print(f" 收益波动率: {std_return:.4%}") print(f" 年化夏普比率: {sharpe_ratio:.3f}") print(f" 最大回撤: {max_drawdown:.2%}") print(f" 总期数: {len(all_returns)}") return self.model_performance# 构建多因子模型multi_factor = MultiFactorModel()model_perf = multi_factor.backtest_multi_factor(tester.factor_performance)
def visualize_multi_factor_performance(model_performance, factor_performance): """ 可视化多因子模型表现 """ if not model_performance: print("没有模型性能数据") return fig, axes = plt.subplots(2, 2, figsize=(15, 12)) # 累计收益曲线 cumulative_returns = model_performance['cumulative_returns'] axes[0, 0].plot(range(len(cumulative_returns)), cumulative_returns, linewidth=2, color='darkblue', label='多因子组合') axes[0, 0].axhline(y=1, color='gray', linestyle='--', alpha=0.5) axes[0, 0].set_xlabel('调仓周期') axes[0, 0].set_ylabel('累计收益') axes[0, 0].set_title('多因子模型累计收益曲线', fontsize=14) axes[0, 0].legend() axes[0, 0].grid(True, alpha=0.3) # 月度收益分布 monthly_returns = model_performance['period_returns'] axes[0, 1].hist(monthly_returns * 100, bins=20, edgecolor='black', alpha=0.7, color='steelblue') axes[0, 1].axvline(x=np.mean(monthly_returns) * 100, color='red', linestyle='--', linewidth=2, label=f'均值: {np.mean(monthly_returns)*100:.2f}%') axes[0, 1].set_xlabel('月度收益 (%)') axes[0, 1].set_ylabel('频数') axes[0, 1].set_title('月度收益分布', fontsize=14) axes[0, 1].legend() axes[0, 1].grid(True, alpha=0.3) # 与单因子对比(夏普比率) factor_sharpes = [] factor_names = [] for factor_name, perf in factor_performance.items(): factor_sharpes.append(perf['sharpe_ratio']) factor_names.append(factor_name) # 添加多因子模型的夏普 factor_names.append('多因子组合') factor_sharpes.append(model_performance['sharpe_ratio']) axes[1, 0].barh(factor_names, factor_sharpes, color=['steelblue']*len(factor_names)) axes[1, 0].axvline(x=0, color='red', linestyle='--', alpha=0.5) axes[1, 0].set_xlabel('年化夏普比率') axes[1, 0].set_title('多因子与单因子夏普比率对比', fontsize=14) axes[1, 0].grid(True, alpha=0.3, axis='x') # 滚动夏普比率(12个月滚动) rolling_window = min(12, len(monthly_returns)) rolling_sharpes = [] for i in range(rolling_window, len(monthly_returns)): window_returns = monthly_returns[i-rolling_window:i] if len(window_returns) > 0 and np.std(window_returns) > 0: sharpe = np.mean(window_returns) / np.std(window_returns) * np.sqrt(12) rolling_sharpes.append(sharpe) axes[1, 1].plot(range(len(rolling_sharpes)), rolling_sharpes, linewidth=2, color='green') axes[1, 1].axhline(y=0, color='gray', linestyle='--', alpha=0.5) axes[1, 1].set_xlabel('滚动窗口') axes[1, 1].set_ylabel('年化夏普比率') axes[1, 1].set_title(f'{rolling_window}个月滚动夏普比率', fontsize=14) axes[1, 1].grid(True, alpha=0.3) plt.tight_layout() plt.show() # 详细性能分析 print("\n" + "="*60) print("多因子模型详细性能分析") print("="*60) # 计算各种风险调整后收益指标 returns = monthly_returns # 索提诺比率(只考虑下行风险) downside_returns = returns[returns < 0] downside_std = np.std(downside_returns) if len(downside_returns) > 0 else 0 sortino_ratio = np.mean(returns) / downside_std * np.sqrt(12) if downside_std > 0 else 0 # 卡玛比率(收益/最大回撤) calmar_ratio = np.mean(returns) * 12 / abs(model_performance['max_drawdown']) if abs(model_performance['max_drawdown']) > 0 else 0 # 胜率 win_rate = np.sum(returns > 0) / len(returns) if len(returns) > 0 else 0 # 盈亏比 winning_returns = returns[returns > 0] losing_returns = returns[returns < 0] avg_win = np.mean(winning_returns) if len(winning_returns) > 0 else 0 avg_loss = abs(np.mean(losing_returns)) if len(losing_returns) > 0 else 0 profit_ratio = avg_win / avg_loss if avg_loss > 0 else 0 performance_metrics = { '年化收益': f"{np.mean(returns) * 12:.2%}", '年化波动': f"{np.std(returns) * np.sqrt(12):.2%}", '夏普比率': f"{model_performance['sharpe_ratio']:.3f}", '索提诺比率': f"{sortino_ratio:.3f}", '卡玛比率': f"{calmar_ratio:.3f}", '最大回撤': f"{model_performance['max_drawdown']:.2%}", '胜率': f"{win_rate:.2%}", '平均盈利': f"{avg_win:.4%}", '平均亏损': f"{avg_loss:.4%}", '盈亏比': f"{profit_ratio:.2f}" } print("\n风险调整后收益指标:") for metric, value in performance_metrics.items(): print(f" {metric}: {value}")# 可视化多因子模型表现if model_perf: visualize_multi_factor_performance(model_perf, tester.factor_performance)