随着大语言模型技术的深入发展,越来越多的量化爱好者开始尝试用 Python 构建智能交易系统。但实际落地中,很多人会发现:让一个“全能型” AI 独立完成所有任务,不仅容易出错,而且 API 调用成本居高不下。
其实,与其让一个大模型单打独斗,不如利用 Sub-Agent(子智能体)架构 组建一支分工明确的“智能交易军团”。通过将复杂的研报分析、风控、信号生成拆解给多个专精的小智能体,能够大幅度提升系统的鲁棒性并降低运行成本。
1. 为什么要引入多智能体架构?
在量化交易系统中,把庞大的分析任务拆分成多个子智能体,主要具备四大核心优势:
- 并行处理提速: 技术面分析、新闻情绪捕捉、宏观数据读取可以同时跑,互不阻塞。
- 专业分工明确: 每个智能体各司其职,比如专门盯着基本面的智能体,和专门管下风控的智能体,分工极度细化。
- 极速优化成本: 简单的逻辑判断用便宜的小模型,只有在最终的关键决策环节才动用昂贵的顶级模型。
- 避免上下文遗忘: 海量的数据流可以分块交由专门的智能体处理,规避了单模型超长上下文导致的记忆丢失问题。
换句话说,这就是在代码里复刻了一支真实的华尔街交易团队:有人盯盘,有人看财报,有人风控,最后由基金经理拍板。
2. 核心实战:搭建基础智能体框架
我们首先使用 Python 的 asyncio 异步编程配合 dataclass 构建一个最基础的交易智能体父类:
import asyncioimport pandas as pdfrom typing import Dict, List, Anyfrom dataclasses import dataclass# 定义交易信号数据结构@dataclassclass TradeSignal: symbol: str # 股票代码 action: str # 操作:BUY、SELL、HOLD confidence: float # 信号置信度 reason: str # 决策依据# 基础交易智能体class BaseTradingAgent: def __init__(self, name: str): self.name = name async def analyze(self, market_data: Dict[str, Any]) -> TradeSignal: raise NotImplementedError
基于此父类,我们可以轻松派生出特定的策略智能体。比如一个负责计算均线的简单移动平均线(SMA)智能体,它只专注于提取数据,判断金叉死叉,并产出带有置信度的 TradeSignal。
class SimpleMovingAverageAgent(BaseTradingAgent): def __init__(self, name: str, short_window: int = 20, long_window: int = 50): super().__init__(name) self.short_window = short_window # 短期均线窗口 self.long_window = long_window # 长期均线窗口 async def analyze(self, market_data: Dict[str, Any]) -> TradeSignal: """基于均线交叉的简单策略""" df = pd.DataFrame(market_data['prices']) # 计算短期和长期均线 df['short_ma'] = df['close'].rolling(window=self.short_window).mean() df['long_ma'] = df['close'].rolling(window=self.long_window).mean() current_short = df['short_ma'].iloc[-1] current_long = df['long_ma'].iloc[-1] previous_short = df['short_ma'].iloc[-2] previous_long = df['long_ma'].iloc[-2] # 金叉(看涨信号) if previous_short <= previous_long and current_short > current_long: return TradeSignal( symbol=market_data['symbol'], action="BUY", confidence=0.8, reason="检测到金叉信号" ) # 死叉(看跌信号) elif previous_short >= previous_long and current_short < current_long: return TradeSignal( symbol=market_data['symbol'], action="SELL", confidence=0.8, reason="检测到死叉信号" ) else: return TradeSignal( symbol=market_data['symbol'], action="HOLD", confidence=0.5, reason="未出现明显的交叉信号" )
3. 并行调度:让分析跑得飞起
单一策略往往抗风险能力较差。我们可以同时运行“风险评估智能体”和“新闻情绪智能体”,最后由“投资组合经理”汇总他们的选票。
class RiskAssessmentAgent(BaseTradingAgent): """风险评估智能体:基于波动率判断风险""" def __init__(self, name: str): super().__init__(name) self.max_risk_per_trade = 0.02 # 单笔最大风险敞口 2% async def analyze(self, market_data: Dict[str, Any]) -> TradeSignal: prices = market_data['prices']['close'] # 计算年化波动率 volatility = pd.Series(prices).pct_change().std() * (252 ** 0.5) if volatility > 0.3: # 高波动率 return TradeSignal( symbol=market_data['symbol'], action="HOLD", confidence=0.9, reason=f"波动率过高({volatility:.2f}),超过风险阈值" ) else: return TradeSignal( symbol=market_data['symbol'], action="PROCEED", confidence=0.8, reason=f"波动率({volatility:.2f})在可接受范围内" )class PortfolioManagerAgent(BaseTradingAgent): """投资组合经理:协调多个子智能体""" def __init__(self, name: str): super().__init__(name) self.agents = [] def add_agent(self, agent: BaseTradingAgent): """添加一个子智能体""" self.agents.append(agent) async def coordinate_agents(self, market_data: Dict[str, Any]) -> List[TradeSignal]: """并行调度所有子智能体""" # 关键:使用 asyncio.gather 实现并行 tasks = [agent.analyze(market_data) for agent in self.agents] signals = await asyncio.gather(*tasks) return signals async def make_decision(self, market_data: Dict[str, Any]) -> TradeSignal: """基于投票机制做最终决策""" signals = await self.coordinate_agents(market_data) # 统计投票 buy_votes = sum(1 for s in signals if s.action == "BUY") sell_votes = sum(1 for s in signals if s.action == "SELL") if buy_votes > sell_votes and buy_votes >= len(signals) // 2: return TradeSignal( symbol=market_data['symbol'], action="BUY", confidence=sum(s.confidence for s in signals if s.action == "BUY") / buy_votes, reason=f"{buy_votes}/{len(signals)} 个智能体推荐买入" ) elif sell_votes > buy_votes: return TradeSignal( symbol=market_data['symbol'], action="SELL", confidence=0.7, reason=f"{sell_votes}/{len(signals)} 个智能体推荐卖出" ) else: return TradeSignal( symbol=market_data['symbol'], action="HOLD", confidence=0.5, reason="未达成明确共识" )
重点提示:asyncio.gather 是并发调用的利器。它能让多个网络 I/O 密集型的大模型请求同时发出,把原本线性的等待时间压缩到极致。
4. 模拟真实工作流与成本切割
更高级的玩法,是为智能体引入消息收件箱(Message Inbox)机制。例如技术分析师将分析结果通过 send_message 发送给基本面分析师,双方交叉验证后再将统一口径推给风险经理。这种互相通信的机制几乎完美复刻了真实的交易桌。
好钢用在刀刃上
大模型的 API 费用是一笔不小的开支。我们可以专门写一个 ModelSelector:
- 简单均线判断、基础数据分类: 分配给价格极低的模型。
- 复杂的多因子综合分析: 分配给中端模型。
- 长文本研报解读、最终交易拍板: 交给推理能力极强的顶级模型(如 GPT-4o 或 DeepSeek-V3)。
这套“看碟下菜”的调度逻辑,可以帮你砍掉一大部分不必要的 API 账单。
5. 结语
将复杂系统拆解为专精的子智能体、利用 asyncio.gather 并发提速、建立智能体间的通信协作、实施阶梯化降本策略——这四点是构建高可用 AI 应用的通用心法。
这套多智能体架构不仅限于量化交易,无论是做自动化研报生成、还是企业级的办公助手,只要任务具备可拆解性,子智能体架构都能发挥出巨大的威力!
公众号:创新技术阁
感谢你看到这里,如果觉得文章对你有所收获,请在文末为我点个 【赞】 + 【推荐】 ,或者 【分享】 给身边更多有需要的人看,你的点赞就是对我莫大的支持与动力!