
用 Python 揭秘均值回归策略:你的收益从何而来?
2026年重磅升级已全面落地!欢迎加入专注财经数据与量化投研的【数据科学实战】知识星球!您将获取持续更新的《财经数据宝典》与《量化投研宝典》,双典协同提供系统化指引;星球内含 350 篇以上独有高质量文章,深度覆盖策略开发、因子分析、风险管理等核心领域,内容基本每日更新;同步推出的「量化因子专题教程」系列(含完整可运行代码与实战案例),系统详解因子构建、回测与优化全流程,并实现日更迭代。我们持续扩充独家内容资源,全方位赋能您的投研效率与专业成长。无论您是量化新手还是资深研究者,这里都是助您少走弯路、事半功倍的理想伙伴,携手共探数据驱动的投资未来!
随着 AI 技术在金融市场的深度应用,越来越多的开发者开始尝试用 Python 构建智能交易系统。但你有没有想过,与其让一个「全能型」AI 单打独斗,不如组建一支分工明确的「智能体军团」?
今天这篇文章,我们就来聊聊 Sub-Agent(子智能体)架构 在量化交易中的应用。通过把复杂任务拆解给多个专精的小智能体,我们可以实现并行处理、降低成本、提升鲁棒性。无论你是 Python 初学者还是 AI 爱好者,相信看完都能有所收获。
把一个庞大的交易系统拆成多个子智能体,主要有 4 大好处:
简单说,就像组建一支交易团队:有人盯技术面,有人看基本面,有人管风险,最后由经理拍板。
我们先用 Python 搭一个最基础的交易智能体框架。这里用到了 asyncio 异步编程和 dataclass 数据类。
import asyncio
import pandas as pd
from typing import Dict, List, Any
from dataclasses import dataclass
# 定义交易信号的数据结构
@dataclass
class TradeSignal:
symbol: str # 股票代码
action: str # 操作类型:BUY、SELL、HOLD
confidence: float # 信号置信度
reason: str # 决策原因
# 基础交易智能体(所有智能体的父类)
class BaseTradingAgent:
def __init__(self, name: str):
self.name = name
self.context = {}
async def analyze(self, market_data: Dict[str, Any]) -> TradeSignal:
"""分析市场数据,子类需要重写这个方法"""
raise NotImplementedError
async def execute_trade(self, signal: TradeSignal) -> bool:
"""根据信号执行交易"""
print(f"[{self.name}] 执行 {signal.action} 操作,标的:{signal.symbol}")
# 真实场景中,这里会调用券商 API
return True接下来我们实现一个简单移动平均线(SMA)策略智能体:
class SimpleMovingAverageAgent(BaseTradingAgent):
def __init__(self, name: str, short_window: int = 20, long_window: int = 50):
super().__init__(name)
self.short_window = short_window # 短期均线窗口
self.long_window = long_window # 长期均线窗口
async def analyze(self, market_data: Dict[str, Any]) -> TradeSignal:
"""基于均线交叉的简单策略"""
df = pd.DataFrame(market_data['prices'])
# 计算短期和长期均线
df['short_ma'] = df['close'].rolling(window=self.short_window).mean()
df['long_ma'] = df['close'].rolling(window=self.long_window).mean()
current_short = df['short_ma'].iloc[-1]
current_long = df['long_ma'].iloc[-1]
previous_short = df['short_ma'].iloc[-2]
previous_long = df['long_ma'].iloc[-2]
# 金叉(看涨信号)
if previous_short <= previous_long and current_short > current_long:
return TradeSignal(
symbol=market_data['symbol'],
action="BUY",
confidence=0.8,
reason="检测到金叉信号"
)
# 死叉(看跌信号)
elif previous_short >= previous_long and current_short < current_long:
return TradeSignal(
symbol=market_data['symbol'],
action="SELL",
confidence=0.8,
reason="检测到死叉信号"
)
else:
return TradeSignal(
symbol=market_data['symbol'],
action="HOLD",
confidence=0.5,
reason="未出现明显的交叉信号"
)单一策略往往不够稳健,我们可以同时跑多个智能体,比如风险评估智能体和新闻情绪智能体,最后由一个投资组合经理汇总投票。
class RiskAssessmentAgent(BaseTradingAgent):
"""风险评估智能体:基于波动率判断风险"""
def __init__(self, name: str):
super().__init__(name)
self.max_risk_per_trade = 0.02 # 单笔最大风险敞口 2%
async def analyze(self, market_data: Dict[str, Any]) -> TradeSignal:
prices = market_data['prices']['close']
# 计算年化波动率
volatility = pd.Series(prices).pct_change().std() * (252 ** 0.5)
if volatility > 0.3: # 高波动率
return TradeSignal(
symbol=market_data['symbol'],
action="HOLD",
confidence=0.9,
reason=f"波动率过高({volatility:.2f}),超过风险阈值"
)
else:
return TradeSignal(
symbol=market_data['symbol'],
action="PROCEED",
confidence=0.8,
reason=f"波动率({volatility:.2f})在可接受范围内"
)
class PortfolioManagerAgent(BaseTradingAgent):
"""投资组合经理:协调多个子智能体"""
def __init__(self, name: str):
super().__init__(name)
self.agents = []
def add_agent(self, agent: BaseTradingAgent):
"""添加一个子智能体"""
self.agents.append(agent)
async def coordinate_agents(self, market_data: Dict[str, Any]) -> List[TradeSignal]:
"""并行调度所有子智能体"""
# 关键:使用 asyncio.gather 实现并行
tasks = [agent.analyze(market_data) for agent in self.agents]
signals = await asyncio.gather(*tasks)
return signals
async def make_decision(self, market_data: Dict[str, Any]) -> TradeSignal:
"""基于投票机制做最终决策"""
signals = await self.coordinate_agents(market_data)
# 统计投票
buy_votes = sum(1 for s in signals if s.action == "BUY")
sell_votes = sum(1 for s in signals if s.action == "SELL")
if buy_votes > sell_votes and buy_votes >= len(signals) // 2:
return TradeSignal(
symbol=market_data['symbol'],
action="BUY",
confidence=sum(s.confidence for s in signals if s.action == "BUY") / buy_votes,
reason=f"{buy_votes}/{len(signals)} 个智能体推荐买入"
)
elif sell_votes > buy_votes:
return TradeSignal(
symbol=market_data['symbol'],
action="SELL",
confidence=0.7,
reason=f"{sell_votes}/{len(signals)} 个智能体推荐卖出"
)
else:
return TradeSignal(
symbol=market_data['symbol'],
action="HOLD",
confidence=0.5,
reason="未达成明确共识"
)💡 小提示:
asyncio.gather是 Python 异步编程的核武器,能让多个协程同时执行,效率比串行高得多。
更高级的玩法是让智能体之间互相通信,模拟一个真实的交易团队工作流:
class TeamMemberAgent(BaseTradingAgent):
"""支持团队通信的智能体"""
def __init__(self, name: str, specialty: str):
super().__init__(name)
self.specialty = specialty # 专业领域
self.messages = [] # 消息收件箱
async def send_message(self, recipient, message: str):
"""向其他成员发送消息"""
await recipient.receive_message(self, message)
async def receive_message(self, sender, message: str):
"""接收消息"""
self.messages.append({"from": sender.name, "message": message})
print(f"[{self.name}] 收到来自 {sender.name} 的消息:{message}")
class TradingTeamManager:
"""团队经理:协调技术分析师、基本面分析师、风险经理"""
def __init__(self):
self.technical_agent = TechnicalAnalysisAgent("技术分析师")
self.fundamental_agent = FundamentalAnalysisAgent("基本面分析师")
self.risk_agent = RiskManagementAgent("风险经理")
async def coordinate_team(self, market_data):
# 第一步:技术分析
tech_analysis = await self.technical_agent.analyze(market_data)
# 第二步:基本面分析
fund_analysis = await self.fundamental_agent.analyze(market_data)
# 第三步:成员之间共享分析结果
await self.technical_agent.share_analysis(self.fundamental_agent, tech_analysis)
# 第四步:风险评估
risk_assessment = await self.risk_agent.assess_risk(tech_analysis, fund_analysis)
# 第五步:综合决策
# ……(具体逻辑见原文)
return final_signal调用大模型 API 是要花钱的。一个聪明的做法是:根据任务复杂度,选择不同价位的模型。
class ModelSelector:
"""根据任务复杂度选择合适的模型"""
# 不同模型的价格(每千 tokens)
MODEL_COSTS = {
"gpt-4": 0.03, # 最贵、能力最强
"gpt-3.5-turbo": 0.002, # 中等价位
"claude-haiku": 0.001, # 最便宜
}
@staticmethod
def select_model(task_complexity: str, data_size: int) -> str:
"""根据任务需求选择模型"""
if task_complexity == "high" or data_size > 10000:
return "gpt-4" # 复杂任务用顶级模型
elif task_complexity == "medium" or data_size > 1000:
return "gpt-3.5-turbo" # 中等任务用中端模型
else:
return "claude-haiku" # 简单任务用便宜模型这个思路非常实用,比如:
按需调用,能省下不少 API 费用。
通过这篇文章,我们学到了 4 个核心要点:
asyncio.gather 让多个智能体同时跑,性能拉满虽然示例围绕量化交易展开,但这套架构同样适用于:客服机器人、内容生成、数据分析、自动化办公等场景。只要任务可以拆解,子智能体架构都能派上用场。
如果你正在学习 Python 或对 AI 应用开发感兴趣,建议动手把这些代码跑一遍,把 SMA、风险评估、情绪分析这几个智能体串起来,相信你会对异步编程和面向对象设计有更深的理解。
🔧 动手挑战:试着加一个「成交量分析智能体」加入投票机制,看看决策结果会有什么变化?
2026年全面升级已落地!【数据科学实战】知识星球核心权益如下:
星球已沉淀丰富内容生态——涵盖量化文章专题教程库、因子日更系列、高频数据集、PyBroker实战课程、专家深度分享与实时答疑服务。无论您是初探量化的学习者,还是深耕领域的从业者,这里都是助您少走弯路、高效成长的理想平台。诚邀加入,共探数据驱动的投资未来!
好文推荐
1. 用 Python 打造股票预测系统:Transformer 模型教程(一)
2. 用 Python 打造股票预测系统:Transformer 模型教程(二)
3. 用 Python 打造股票预测系统:Transformer 模型教程(三)
4. 用 Python 打造股票预测系统:Transformer 模型教程(完结)
6. YOLO 也能预测股市涨跌?计算机视觉在股票市场预测中的应用
9. Python 量化投资利器:Ridge、Lasso 和 Elastic Net 回归详解
好书推荐