用Python实时监控量化新规合规状态:
三大核心指标的计算与预警
文 | 程飞 | 2026年4月7日 | Python量化合规实战
4月5日凌晨,我把我的量化交易终端关了。不是因为要休息,是因为我要重写它。新规出来之后,原来的交易终端有一个致命缺陷:它不知道新规的存在。它只知道下单、撤单、成交,不知道自己每天下了多少单,不知道这个数字距离新规的红线还有多远。
今天这篇文章,是我在过去48小时内写的一个合规监控模块的完整说明。内容覆盖:交易频率实时统计、波动市触发条件检测、以及一个完整的预警系统。代码可以直接嵌入到你现有的量化交易系统里,也可以作为独立模块运行。
我不是在教你如何绕过新规。我是在教你如何在新规允许的范围内,安全地运行你的策略。这两者的区别,是能不能睡得着觉的区别。
一、核心数据结构:交易日志的实时积累
任何合规监控系统的核心,都是交易日志。但原始的交易日志是无结构的,需要转换成有意义的统计指标。我的做法是用一个类来封装所有交易记录和统计指标。
from collections import deque
from datetime import datetime, time as dtime
import pandas as pd
import numpy as np
class ComplianceMonitor:
def __init__(self, account_id: str, strategy_name: str,
daily_order_limit: int = 5000,
freq_multiplier: float = 3.0):
self.account_id = account_id
self.strategy_name = strategy_name
# 新规核心参数
self.daily_order_limit = daily_order_limit
self.freq_multiplier = freq_multiplier
# 当日历史均值(从数据库加载或手动设置)
self.hist_avg_daily_orders = 1200 # 需要根据实际情况填写
self.current_limit = self.hist_avg_daily_orders * self.freq_multiplier
# 交易记录:deque自动维护滑动窗口
self.order_log = deque(maxlen=10000)
self.trade_log = deque(maxlen=5000)
# 市场状态
self.market_volatility = "NORMAL" # NORMAL / 5%_TRIGGER / 7%_TRIGGER
self.last_reset()
这里用了 deque(双端队列)而不是 list,主要是因为 deque 的 append 和 pop 操作都是 O(1),而且 maxlen 参数可以自动丢弃超过长度限制的最老记录,不需要手动维护。对于高频交易日志来说,这个细节很重要——10万条记录的 deque 在内存占用和操作速度上都很稳定。
二、交易频率的实时统计与动态上限计算
新规第二条的核心约束是"单日最高下单频率不得超过历史平均的3倍"。这个约束有两个关键点:第一,历史平均需要从真实的交易数据中计算,不能拍脑袋;第二,这个上限会随着市场状态动态变化(新规第三条:波动市期间降频)。
def record_order(self, order_id: str, timestamp: datetime,
symbol: str, side: str, quantity: int):
self.order_log.append({
"order_id": order_id,
"timestamp": timestamp,
"symbol": symbol,
"side": side,
"quantity": quantity,
"status": "PENDING"
})
self.order_count_today += 1
self.last_order_time = timestamp
self.check_compliance() # 每次下单后立即检查
def check_compliance(self):
today_orders = self.get_today_order_count()
limit = self.get_current_limit()
usage_rate = today_orders / limit if limit > 0 else 0
self.compliance_status = {
"order_count": today_orders,
"limit": limit,
"usage_rate": usage_rate,
"remaining": max(0, limit - today_orders),
"is_compliant": today_orders <= limit,
"warning_level": self.get_warning_level(usage_rate)
}
return self.compliance_status
def get_warning_level(self, usage_rate):
if usage_rate >= 1.0:
return "RED" # 超限
elif usage_rate >= 0.85:
return "ORANGE" # 即将超限(剩余15%以内)
elif usage_rate >= 0.70:
return "YELLOW" # 注意(剩余30%以内)
return "GREEN" # 安全
实盘经验:ORANGE级别(剩余15%以内)是关键节点。我的做法是在达到ORANGE时自动降低下单频率——把6分钟K线改成12分钟K线,信号精度降低一半,但频率消耗速度也相应减半。这个简单的调整,让我的策略在4月7日市场大幅波动那天,最终没有触碰到红线。
三、波动市触发条件的自动检测
新规第三条:在沪深300指数日内涨跌幅超过5%时,所有量化策略的下单频率上限下调至50%;超过7%时,下调至25%。这个触发条件需要实时监控。实现方式是:用akshare获取指数的实时数据,然后判断当前涨跌幅是否触发了降频条件。
import akshare as ak
import time
class MarketVolatilityMonitor:
def __init__(self):
self.threshold_5pct = 0.05
self.threshold_7pct = 0.07
self.volatility_status = "NORMAL"
self.current_change = 0.0
def check_volatility(self):
try:
# 获取沪深300实时数据
df = ak.stock_zh_index_spot_em(symbol="000300")
# 提取当前涨跌幅(百分比)
change_pct = float(df[df["名称"]=="沪深300"]["涨跌幅"].values[0])
self.current_change = change_pct / 100.0 # 转为小数
# 判断触发条件
if abs(self.current_change) >= self.threshold_7pct:
self.volatility_status = "7%_TRIGGER"
elif abs(self.current_change) >= self.threshold_5pct:
self.volatility_status = "5%_TRIGGER"
else:
self.volatility_status = "NORMAL"
return self.volatility_status
except Exception as e:
print(f"Failed to get volatility: {e}")
return self.volatility_status
在实际使用中,这个检查函数需要放在一个后台线程里,每30秒运行一次,而不是在每次下单的时候才调用。因为下单的判断必须基于最新的市场状态,而每次获取实时数据的网络延迟可能是几百毫秒到几秒,不适合放在下单的关键路径里。
四、动态频率上限:把三条规则整合在一起
现在把前三个模块整合起来,形成一个完整的合规监控系统。核心逻辑是:基础上限 × 波动市系数 = 当前可用下单频率。
def get_current_limit(self):
# 基础上限 = 历史均值 × 3倍
base_limit = self.hist_avg_daily_orders * self.freq_multiplier
# 波动市降频系数
if self.market_volatility == "7%_TRIGGER":
return base_limit * 0.25 # 降至25%
elif self.market_volatility == "5%_TRIGGER":
return base_limit * 0.50 # 降至50%
return base_limit # 正常状态
def get_compliance_report(self) -> dict:
status = self.check_compliance()
current_limit = self.get_current_limit()
report = {
"account_id": self.account_id,
"strategy": self.strategy_name,
"report_time": datetime.now().isoformat(),
"market_volatility": self.market_volatility,
"current_limit": current_limit,
"today_orders": status["order_count"],
"remaining_quota": status["remaining"],
"usage_rate": f"{status['usage_rate']:.1%}",
"warning_level": status["warning_level"],
"is_compliant": status["is_compliant"],
"estimated_full_at": self.estimate_full_time()
}
return report
def estimate_full_time(self):
# 估算配额用完的时间
if not self.order_log:
return None
today_orders = self.get_today_order_count()
remaining = self.get_current_limit() - today_orders
if remaining <= 0:
return "IMMEDIATELY"
# 计算最近N笔订单的平均间隔
recent_orders = list(self.order_log)[-50:]
if len(recent_orders) < 2:
return "UNKNOWN"
intervals = [(recent_orders[i+1]["timestamp"] -
recent_orders[i]["timestamp"]).total_seconds()
for i in range(len(recent_orders)-1)]
avg_interval = np.mean(intervals)
hours_left = remaining * avg_interval / 3600
return f"~{hours_left:.1f} hours"
重要提醒:这个估算时间是参考值,不是精确预测。因为交易频率本身会随着市场机会的变化而变化,不可能用一个线性外推来精确预测。但这个估算对于安排当天的交易计划非常有用——如果系统告诉你"剩余配额大约还能支撑2小时",你就可以提前决定今天剩余的交易怎么做,而不是等到最后一刻才发现配额用完了。
五、集成到交易系统:拦截器的实现
把合规监控集成到交易系统的最佳方式,是用装饰器模式(Decorator Pattern)把下单函数包裹起来。这样原来的下单逻辑不需要任何改动,只需要在函数定义前加一行装饰器代码。
from functools import wraps
def compliance_guard(monitor: ComplianceMonitor):
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
# 下单前检查
status = monitor.check_compliance()
if status["warning_level"] == "RED":
raise Exception(f"ORDER_REJECTED: Frequency limit reached. "
f"Limit={status['limit']}, Used={status['order_count']}")
if status["warning_level"] in ["ORANGE", "YELLOW"]:
print(f"WARNING [{status['warning_level']}]: "
f"Usage={status['usage_rate']:.1%}, "
f"Remaining={status['remaining']} orders")
# 执行下单
result = func(*args, **kwargs)
# 下单后记录
if isinstance(result, dict) and "order_id" in result:
monitor.record_order(
order_id=result["order_id"],
timestamp=datetime.now(),
symbol=kwargs.get("symbol", args[0] if args else ""),
side=kwargs.get("side", "BUY"),
quantity=kwargs.get("quantity", 0)
)
return result
return wrapper
return decorator
# 使用方法
monitor = ComplianceMonitor(account_id="ACC001", strategy_name="高频量价V2")
@compliance_guard(monitor)
def send_order(symbol: str, side: str, quantity: int) -> dict:
# 这里放你的实际下单逻辑
return {"order_id": "ORD12345", "status": "SUBMITTED"}
装饰器的优势在于它是非侵入式的——你的交易系统不需要因为合规要求而重写任何一行核心逻辑。装饰器只是一个"外壳",在原有函数执行之前和之后加上合规检查的逻辑。如果有一天新规取消了,或者规则变了,只需要换一个装饰器,原来的下单逻辑完全不动。
六、合规日志与定期报告生成
新规第一条要求报备策略类型和参数区间。我建议在实际报备之前,先把自己的合规日志整理清楚。报备时监管层很可能会要求你提供策略的历史运行数据,包括日均下单频率、峰值频率、波动市的频率变化等。这些数据如果平时没有记录,临时整理会很被动。
def generate_daily_report(self) -> pd.DataFrame:
# 生成日报:每日各时间段的下单分布
df = pd.DataFrame(list(self.order_log))
if df.empty:
return pd.DataFrame()
df["hour"] = df["timestamp"].dt.hour
hourly_stats = df.groupby("hour").agg(
order_count=("order_id", "count"),
total_quantity=("quantity", "sum"),
unique_symbols=("symbol", "nunique")
).reset_index()
# 计算当日频率统计
stats = {
"total_orders": len(df),
"unique_symbols": df["symbol"].nunique(),
"avg_orders_per_hour": len(df) / max(df["hour"].nunique(), 1),
"peak_hour": hourly_stats.loc[hourly_stats["order_count"].idxmax(), "hour"],
"compliance_status": self.check_compliance()["is_compliant"],
"market_volatility_events": self.count_volatility_events()
}
return hourly_stats, stats
实盘经验:合规日志不只是用来应付监管的,它本身也是优化策略的重要数据来源。我每天收盘后会看这份日报,重点关注的是:下单频率的峰值出现在哪个时间段,这个时间段和市场流动性之间的关系如何,以及有没有出现异常的频率波动(比如某天突然频率翻倍但市场机会没有相应增加)。这些数据对于调整策略参数非常有价值。
总结:合规不是负担,是护城河
写完这个合规监控模块,我有一个很深的感触:很多量化从业者对"合规"两个字有抵触情绪,觉得这是给交易增加负担,是妨碍赚钱的条条框框。但我的实际感受是相反的——一个设计良好的合规监控系统,本质上是一个交易风险管理系统。合规检查帮你做的事情,和止损线帮你做的事情,本质上是同一件事:让你在失控之前停下来。
在新规落地之前,我的交易系统是没有这个功能的。策略在下单的时候,只考虑信号强度,不考虑还剩多少下单配额。新规来了之后,我被迫把这套合规监控系统做出来了。现在我回看这套系统,它帮我避免了好几次在市场最极端的时候透支下单配额的情况。如果没有新规,我可能永远不会意识到这个问题。
所以我想说,合规不是给策略增加的成本,而是给策略增加的一层保护。这层保护让你在极端市场环境下有足够的缓冲空间,不会因为一时的冲动而把整个策略的职业生涯葬送掉。在市场里活着,比在任何一次交易里赚多少钱,都重要得多。
本文仅供参考,不构成投资建议。Python代码需根据实际数据接口调整。