核心目标是对接股票、期货等多资产数据,让模型自动学习不同资产间的联动规律,并最终输出可用于交易的信号。
方案设计思路
- 1. 数据预处理层:统一多资产数据格式,构建时间序列特征矩阵,处理缺失值和异常值,并进行标准化
- 2. CNN特征提取层:利用卷积层捕捉跨资产的空间联动特征,池化层降维,提取关键联动模式
- 3. 全连接决策层:整合特征并输出交易信号(做多/做空/观望)
- 4. 信号输出层:标准化交易信号格式,便于对接交易系统
完整实现代码
import numpy as npimport pandas as pdimport torchimport torch.nn as nnimport torch.optim as optimfrom torch.utils.data import Dataset, DataLoaderfrom sklearn.preprocessing import StandardScalerfrom sklearn.model_selection import train_test_splitimport warningswarnings.filterwarnings('ignore')# 设置随机种子确保可复现性torch.manual_seed(42)np.random.seed(42)classMultiAssetDataset(Dataset):"""多资产数据集类,用于加载和处理股票/期货数据"""def__init__(self, data_matrix, labels, seq_len=20):""" 参数: data_matrix: 多资产特征矩阵 (时间步, 资产数, 特征数) labels: 目标标签 (时间步, 1) - 1=做多, 0=观望, -1=做空 seq_len: 时间序列窗口长度 """self.data_matrix = torch.tensor(data_matrix, dtype=torch.float32)self.labels = torch.tensor(labels, dtype=torch.float32)self.seq_len = seq_len def__len__(self):returnlen(self.data_matrix) - self.seq_len def__getitem__(self, idx):# 提取时间窗口内的特征和对应标签 x = self.data_matrix[idx:idx+self.seq_len, :, :] y = self.labels[idx+self.seq_len-1]return x, yclassCNNMultiAssetTrendModel(nn.Module):"""CNN多资产趋势捕捉模型"""def__init__(self, num_assets, num_features, seq_len=20, num_filters=64, kernel_size=3):super().__init__() # CNN特征提取层 - 捕捉跨资产空间联动特征self.conv1 = nn.Conv2d( in_channels=1, # 输入通道数 out_channels=num_filters, # 卷积核数量 kernel_size=(kernel_size, num_assets), # (时间窗口, 资产数) padding='same'# 保持输出尺寸不变 )self.relu = nn.ReLU()self.pool = nn.MaxPool2d((2, 1)) # 时间维度降维 # 计算池化后的特征维度 pooled_seq_len = seq_len // 2self.fc1_input_dim = num_filters * pooled_seq_len * num_features # 全连接层 - 决策输出self.fc1 = nn.Linear(self.fc1_input_dim, 128)self.dropout = nn.Dropout(0.2) # 防止过拟合self.fc2 = nn.Linear(128, 3) # 输出维度: [做多, 观望, 做空]self.softmax = nn.Softmax(dim=1) # 概率归一化 defforward(self, x):# 输入形状: (batch_size, seq_len, num_assets, num_features) batch_size = x.size(0) # 添加通道维度: (batch_size, 1, seq_len, num_assets*num_features) x = x.view(batch_size, 1, x.size(1), -1) # CNN特征提取 x = self.conv1(x) x = self.relu(x) x = self.pool(x) # 展平特征 x = x.view(batch_size, -1) # 全连接层 x = self.fc1(x) x = self.relu(x) x = self.dropout(x) x = self.fc2(x) output = self.softmax(x) return outputclassMultiAssetTradingSystem:"""多资产交易系统,整合数据处理、模型训练和信号输出"""def__init__(self, seq_len=20, num_filters=64, lr=0.001):self.seq_len = seq_lenself.num_filters = num_filtersself.lr = lrself.scaler = StandardScaler()self.model = Noneself.device = torch.device('cuda'if torch.cuda.is_available() else'cpu') defpreprocess_data(self, raw_data):""" 预处理多资产原始数据 参数: raw_data: DataFrame, 索引为时间, 列包含: asset_code, close, open, high, low, volume 返回: data_matrix: 标准化后的特征矩阵 (时间步, 资产数, 特征数) asset_list: 资产列表 """# 1. 数据透视,按资产和特征重组 assets = raw_data['asset_code'].unique() features = ['open', 'high', 'low', 'close', 'volume'] # 构建特征矩阵 data_matrix = [] timestamps = sorted(raw_data.index.unique()) for ts in timestamps: ts_data = raw_data.loc[ts] asset_features = [] for asset in assets:if asset in ts_data['asset_code'].values: asset_data = ts_data[ts_data['asset_code'] == asset] feat_vals = [asset_data[feat].values[0] for feat in features]else:# 缺失值填充为0 feat_vals = [0] * len(features) asset_features.append(feat_vals) data_matrix.append(asset_features) # 2. 标准化处理 data_matrix = np.array(data_matrix) num_timesteps, num_assets, num_features = data_matrix.shape # 按特征维度标准化for f inrange(num_features): data_matrix[:, :, f] = self.scaler.fit_transform(data_matrix[:, :, f].reshape(-1, 1)).reshape(num_timesteps, num_assets) return data_matrix, assets defgenerate_labels(self, data_matrix, target_asset_idx=0, threshold=0.005):""" 生成交易标签(基于目标资产的收益率) 参数: data_matrix: 特征矩阵 (时间步, 资产数, 特征数) target_asset_idx: 目标资产索引 threshold: 收益率阈值 返回: labels: 标签数组 (1=做多, 0=观望, -1=做空) """# 提取目标资产收盘价 close_prices = data_matrix[:, target_asset_idx, 3] # 第四个特征是close returns = np.diff(close_prices) / close_prices[:-1] # 生成标签 labels = np.zeros_like(returns) labels[returns > threshold] = 1# 做多 labels[returns < -threshold] = -1# 做空 # 补全长度(和特征矩阵对齐) labels = np.pad(labels, (1, 0), mode='constant') return labels deftrain_model(self, data_matrix, labels, epochs=50, batch_size=32):"""训练CNN模型"""# 创建数据集 dataset = MultiAssetDataset(data_matrix, labels, self.seq_len) train_data, val_data = train_test_split(dataset, test_size=0.2, random_state=42) train_loader = DataLoader(train_data, batch_size=batch_size, shuffle=True) val_loader = DataLoader(val_data, batch_size=batch_size) # 初始化模型 num_assets, num_features = data_matrix.shape[1], data_matrix.shape[2]self.model = CNNMultiAssetTrendModel( num_assets=num_assets, num_features=num_features, seq_len=self.seq_len, num_filters=self.num_filters ).to(self.device) # 定义损失函数和优化器 criterion = nn.CrossEntropyLoss() optimizer = optim.Adam(self.model.parameters(), lr=self.lr) # 训练过程self.model.train()for epoch inrange(epochs): train_loss = 0.0for x, y in train_loader: x, y = x.to(self.device), y.to(self.device) # 转换标签格式 (1→[0,1,0], 0→[1,0,0], -1→[0,0,1]) y_onehot = torch.zeros(y.size(0), 3).to(self.device) y_onehot[torch.where(y == 1)] = torch.tensor([0, 1, 0]).to(self.device) y_onehot[torch.where(y == 0)] = torch.tensor([1, 0, 0]).to(self.device) y_onehot[torch.where(y == -1)] = torch.tensor([0, 0, 1]).to(self.device) optimizer.zero_grad() outputs = self.model(x) loss = criterion(outputs, y_onehot) loss.backward() optimizer.step() train_loss += loss.item() # 验证集评估 val_loss = 0.0self.model.eval()with torch.no_grad():for x, y in val_loader: x, y = x.to(self.device), y.to(self.device) y_onehot = torch.zeros(y.size(0), 3).to(self.device) y_onehot[torch.where(y == 1)] = torch.tensor([0, 1, 0]).to(self.device) y_onehot[torch.where(y == 0)] = torch.tensor([1, 0, 0]).to(self.device) y_onehot[torch.where(y == -1)] = torch.tensor([0, 0, 1]).to(self.device) outputs = self.model(x) loss = criterion(outputs, y_onehot) val_loss += loss.item() if (epoch + 1) % 10 == 0:print(f'Epoch [{epoch+1}/{epochs}], Train Loss: {train_loss/len(train_loader):.4f}, Val Loss: {val_loss/len(val_loader):.4f}') self.model.eval()print("模型训练完成!") defgenerate_trading_signal(self, latest_data):""" 生成交易信号 参数: latest_data: 最新的特征矩阵 (seq_len, num_assets, num_features) 返回: signal: 交易信号 (1=做多, 0=观望, -1=做空) confidence: 信号置信度 (0-1) """ifself.model isNone:raise ValueError("模型未训练,请先调用train_model方法") # 数据预处理 latest_data = torch.tensor(latest_data, dtype=torch.float32).unsqueeze(0).to(self.device) # 模型预测with torch.no_grad(): output = self.model(latest_data) pred_idx = torch.argmax(output, dim=1).item() confidence = torch.max(output).item() # 转换为交易信号 signal_map = {0: 0, 1: 1, 2: -1} # 0=观望, 1=做多, 2=做空 signal = signal_map[pred_idx] return signal, confidence# ------------------------------# 使用示例# ------------------------------if __name__ == "__main__":# 1. 生成模拟多资产数据(实际使用时替换为真实数据) dates = pd.date_range(start='2020-01-01', end='2024-01-01', freq='D') assets = ['STOCK001', 'FUTURE001', 'STOCK002', 'FUTURE002'] raw_data = [] for date in dates:for asset in assets:# 模拟OHLCV数据 open_price = np.random.uniform(10, 100) high_price = open_price * np.random.uniform(1.0, 1.05) low_price = open_price * np.random.uniform(0.95, 1.0) close_price = np.random.uniform(low_price, high_price) volume = np.random.randint(1000000, 10000000) raw_data.append({'date': date,'asset_code': asset,'open': open_price,'high': high_price,'low': low_price,'close': close_price,'volume': volume }) raw_df = pd.DataFrame(raw_data).set_index('date') # 2. 初始化交易系统 trading_system = MultiAssetTradingSystem(seq_len=20, num_filters=64, lr=0.001) # 3. 数据预处理 data_matrix, asset_list = trading_system.preprocess_data(raw_df) # 4. 生成标签(以第一个资产为目标资产) labels = trading_system.generate_labels(data_matrix, target_asset_idx=0, threshold=0.005) # 5. 训练模型 trading_system.train_model(data_matrix, labels, epochs=50, batch_size=32) # 6. 生成交易信号(使用最后20个时间步的数据) latest_data = data_matrix[-20:] signal, confidence = trading_system.generate_trading_signal(latest_data) # 7. 输出结果 signal_desc = {1: "做多", 0: "观望", -1: "做空"}print(f"\n交易信号: {signal_desc[signal]} (置信度: {confidence:.4f})")print(f"目标资产: {asset_list[0]}")
代码关键部分解释
- 1. MultiAssetDataset类:专门处理多资产时间序列数据,按指定窗口长度构建训练样本,确保输入格式符合CNN要求
- 2. CNNMultiAssetTrendModel类:
- • 卷积层:使用
(kernel_size, num_assets)的卷积核,专门捕捉跨资产的空间联动特征
- 3. MultiAssetTradingSystem类:
- • 数据预处理:统一多资产数据格式,标准化特征,处理缺失值
- • 标签生成:基于目标资产收益率自动生成做多/做空/观望标签
- • 模型训练:封装训练流程,包含训练/验证拆分和损失计算
- • 信号生成:输入最新数据,输出带置信度的交易信号
实际使用说明
- 1. 数据对接:将
使用示例中的模拟数据替换为真实的股票/期货数据,确保包含asset_code(资产代码)、open/high/low/close/volume字段 - •
seq_len:时间窗口长度(建议20-60) - •
num_filters:卷积核数量(建议32-128) - •
threshold:收益率阈值(根据资产波动率调整)
- • 训练完成后保存模型:
torch.save(trading_system.model.state_dict(), 'cnn_multi_asset_model.pth') - • 实际交易时加载模型并调用
generate_trading_signal方法
总结
- 1. 该模板通过CNN的空间特征提取能力,有效捕捉多资产间的联动模式,相比传统单资产模型更具全局视角
- 2. 完整的数据流处理流程:从原始多资产数据→标准化特征矩阵→CNN特征提取→交易信号输出,可直接对接真实交易数据
- 3. 核心优势是自动化特征学习,无需人工设计跨资产特征,模型可自适应不同市场环境下的资产联动规律
使用时建议先用历史数据充分回测,验证信号有效性后再接入实盘交易系统。