
欢迎加入专注于财经数据与量化投研的【数据科学实战】知识星球!在这里,您将获取持续更新的《财经数据宝典》和《量化投研宝典》,这两部宝典相辅相成,为您在量化投研道路上提供明确指引。 我们提供了精选的国内外量化投研的 250+ 篇高质量文章,并每日更新最新研究成果,涵盖策略开发、因子分析、风险管理等核心领域。 无论您是量化投资新手还是经验丰富的研究者,星球社区都能帮您少走弯路,事半功倍,共同探索数据驱动的投资世界!
在量化交易领域,有一句话广为流传:"垃圾进,垃圾出"(Garbage in, garbage out)。无论你的交易策略多么精妙,模型多么复杂,如果输入的市场数据存在问题,最终的回测结果和实盘表现都会大打折扣。
数据质量问题往往是"沉默的杀手"——它不会报错,不会崩溃,只会悄悄地让你的策略学习到错误的模式,让你的回测结果充满幻觉。本文将基于实战经验,带你构建一套完整的市场数据清洗流程,让你的量化研究建立在坚实的数据基础之上。
市场数据天生就是混乱的:交易所会调整交易时间,不同数据供应商的数据存在差异,服务器时钟会漂移,股票会有拆股分红,加密货币市场 24 小时不停歇。如果不在前期处理好这些问题,你的模型会学习到数据伪影,回测会产生信息泄露,实盘系统会追逐虚假信号。
常见的数据问题包括以下几类:
缺失数据问题:K 线或 Tick 数据缺失会导致技术指标计算错误,交易信号对价格跳空视而不见。
异常值问题:错误报价、乌龙指等噪声数据会让特征变得不稳定,止损逻辑失效。
时间不同步问题:多资产数据时间不对齐会在配对交易或组合模型中产生虚假相关性。
公司行为问题:拆股、分红等公司行为如不处理,会产生虚假的价格跳跃,破坏风险指标。
时区问题:时区和夏令时处理不当会导致 K 线逻辑错位,交易时段规则失效。
一套完整的数据清洗流程应包含以下环节:
第一步:标准化数据结构。统一字段命名为 ts、open、high、low、close、volume、symbol、venue。
第二步:统一时区。解析时间戳,先定位到交易所本地时间,再转换为 UTC 时间。
第三步:排序去重。按 symbol 和 ts 稳定排序,对重复数据保留最后一条或加权处理。
第四步:验证 OHLCV 数据。确保 low ≤ min(open, close),high ≥ max(open, close),low ≤ high,volume ≥ 0。
第五步:处理缺失数据。事件驱动模型保留缺口并编码为特征;基于 K 线的指标需明确选择删除、插值或前向填充策略。
第六步:检测异常值。使用稳健的 z-score/MAD 或滚动 IQR 方法,对异常值进行截断或删除,并保留审计记录。
第七步:处理公司行为。对股票进行拆股和分红调整,同时保留原始价格和调整后价格。
第八步:期货连续合约处理。定义换月规则,进行后向调整或比率调整。
第九步:重采样与规整化。对齐到交易日历,创建预期时间索引,按容差重新索引。
第十步:多资产对齐。将不同资产的 K 线对齐到统一时间网格,填充时避免使用未来数据。
下面是一套可直接使用的 Pandas 数据清洗代码,涵盖了上述核心步骤。
import pandas as pdimport numpy as npdefload_raw(csv_path, symbol):""" 加载原始 CSV 数据并标准化格式 参数: csv_path: CSV 文件路径 symbol: 股票或资产代码 返回: 标准化后的 DataFrame """ df = pd.read_csv( csv_path, usecols=["timestamp", "open", "high", "low", "close", "volume"], dtype={"open": "float64","high": "float64","low": "float64","close": "float64","volume": "float64" } )# 解析时间戳,假设数据源为交易所本地时间 df["ts"] = pd.to_datetime(df["timestamp"], utc=False, errors="coerce")# 示例:将美东时间转换为 UTC df["ts"] = ( df["ts"] .dt.tz_localize("America/New_York", nonexistent="shift_forward") .dt.tz_convert("UTC") )# 整理列名和顺序 df = ( df.drop(columns=["timestamp"]) .assign(symbol=symbol) .rename(columns=str.lower) [["ts", "open", "high", "low", "close", "volume", "symbol"]] )return dfdefsort_dedup(df):""" 对数据进行排序并去除重复记录 参数: df: 原始 DataFrame 返回: 排序去重后的 DataFrame """# 按 symbol 和时间戳排序 df = df.sort_values(["symbol", "ts"])# 对于重复时间戳,保留最后一条(通常是供应商修正后的数据) df = df.drop_duplicates(subset=["symbol", "ts"], keep="last")return dfdefohlcv_checks(df):""" 检查 OHLCV 数据是否符合逻辑约束 返回不符合约束的异常记录 """ bad = (# low 应该小于等于 open 和 close 的最小值 (df["low"] > df[["open", "close"]].min(axis=1)) |# high 应该大于等于 open 和 close 的最大值 (df["high"] < df[["open", "close"]].max(axis=1)) |# low 应该小于等于 high (df["low"] > df["high"]) |# 成交量不能为负 (df["volume"] < 0) )return df.loc[bad]deffix_bad_bars(df):""" 修复不符合逻辑约束的 K 线数据 采用软修复策略:尽可能修正边界值,无法修正时标记为 NaN """# 修正 low 值:取 open、close、high 中的最小值 df["low"] = np.minimum( df["low"], df[["open", "close", "high"]].min(axis=1) )# 修正 high 值:取 open、close、low 中的最大值 df["high"] = np.maximum( df["high"], df[["open", "close", "low"]].max(axis=1) )# 如果 low 仍然大于 high,标记为 NaN 待人工审核 df.loc[df["low"] > df["high"], ["low", "high"]] = np.nan# 负成交量标记为 NaN df.loc[df["volume"] < 0, "volume"] = np.nanreturn dfdefreport_missing(df, freq="1min"):""" 统计每个 symbol 的缺失 K 线数量 """ out = []for sym, g in df.groupby("symbol", group_keys=False):# 构建预期的时间索引 rng = pd.date_range( g["ts"].min().floor(freq), g["ts"].max().ceil(freq), freq=freq )# 计算缺失的时间点 missing = pd.Index(rng).difference(g["ts"]) out.append({"symbol": sym, "missing_count": len(missing)})return pd.DataFrame(out)deffill_policy(df, method="ffill", max_gap="5min"):""" 按指定策略填充缺失数据 参数: method: 填充方法,'ffill' 为前向填充,'interpolate' 为插值 max_gap: 最大允许填充的时间间隔,超过此间隔不填充 """ df = df.sort_values(["symbol", "ts"])def_fill(g): g = g.set_index("ts")if method == "ffill":# 计算时间间隔,对大间隔进行分块处理 dt = g.index.to_series().diff() mask = (dt.notna()) & (dt > pd.to_timedelta(max_gap)) block = mask.cumsum().values# 按块进行前向填充,避免跨大间隔填充 g[["open", "high", "low", "close", "volume"]] = ( g.groupby(block)[["open", "high", "low", "close", "volume"]] .ffill() )elif method == "interpolate":# 线性插值,限制最大插值范围 g[["open", "high", "low", "close", "volume"]] = ( g[["open", "high", "low", "close", "volume"]] .interpolate(limit=5) )return g.reset_index()return df.groupby("symbol", group_keys=False).apply(_fill)defcap_outliers(df, col="close", window=50, z=6.0):""" 使用稳健统计方法检测并截断异常值 参数: col: 要检测的列名 window: 滚动窗口大小 z: 异常值阈值(以 MAD 倍数计) """def_cap(g): x = g[col]# 计算滚动中位数 med = x.rolling(window, min_periods=20).median()# 计算滚动 MAD(中位数绝对偏差) mad = (x - med).abs().rolling(window, min_periods=20).median()# 计算稳健 z-score(1.4826 是使 MAD 与标准差可比的常数) robust_z = (x - med) / (1.4826 * mad.replace(0, np.nan))# 计算截断边界 upper = med + z * 1.4826 * mad lower = med - z * 1.4826 * mad# 将超出边界的值截断到边界 g[col] = np.where( robust_z > z, upper, np.where(robust_z < -z, lower, x) )return greturn df.groupby("symbol", group_keys=False).apply(_cap)defalign_symbols(dfs, freq="1min"):""" 将多个资产的数据对齐到统一的时间网格 参数: dfs: 字典,格式为 {symbol: DataFrame} 返回: 对齐后的字典 """ aligned = {}# 构建所有资产的时间并集 idx = Nonefor sym, g in dfs.items(): sidx = pd.DatetimeIndex(g["ts"]).tz_convert("UTC") idx = sidx if idx isNoneelse idx.union(sidx) idx = idx.sort_values()# 将每个资产重新索引到统一时间网格for sym, g in dfs.items(): gg = g.set_index("ts").reindex(idx) gg.index.name = "ts" aligned[sym] = gg.reset_index()return aligneddefclean_market_csv(csv_path, symbol, bar_freq="1min"):""" 端到端的市场数据清洗流程 参数: csv_path: 数据文件路径 symbol: 资产代码 bar_freq: K 线频率 返回: 清洗后的 DataFrame """# 加载并标准化数据 df = load_raw(csv_path, symbol)# 排序去重 df = sort_dedup(df)# 检查并修复异常 K 线 bad = ohlcv_checks(df)ifnot bad.empty: print(f"发现 {len(bad)} 条异常 K 线,正在修复...") df = fix_bad_bars(df)# 规整化时间索引 df = regularize(df, freq=bar_freq)# 按策略填充缺失数据 df = fill_policy(df, method="ffill", max_gap="10min")# 检测并截断异常值 df = cap_outliers(df, col="close", window=100, z=8.0)# 最终数据质量断言assert df["ts"].is_monotonic_increasing, "时间戳必须单调递增"assertnot df.duplicated(subset=["symbol", "ts"]).any(), "不允许存在重复记录"return df处理缺失数据时,需要根据具体场景选择合适的策略:
删除策略适用于严谨的研究场景,虽然更安全但会损失流动性较差资产的信息。
有界前向填充适用于需要稳定指标的场景,但必须进行间隔感知的掩码处理。
插值策略适用于需要平滑特征的场景,但要避免在收盘到开盘之间进行插值。
基于模型的填补是最后的手段,必须记录方法和不确定性。
在应用填充策略前,建议进行以下诊断分析:按小时、交易时段和 symbol 统计缺失情况,发现流动性问题或供应商故障;统计连续缺失的长度,长时间连续缺失通常意味着日历或交易时间表问题;分析缺失与成交量、波动率的相关性,警惕选择性偏差。
幸存者偏差:不要用当前的成分股列表来构建历史数据,否则会高估策略表现。
前视偏差:永远不要用未来的数据填充缺失值,即使是 reindex 的默认行为也可能导致这个问题。
时区混淆:磁盘存储使用 UTC 时间,只在必要时转换为交易所本地时间进行分析。
隐藏的不完整交易日:节假日和临时停牌会导致异常收益,需要显式标记。
盲目信任供应商标记:验证供应商标记为"已修正"的数据,并存储自己的质量控制标记。
数据清洗工作虽然不够光鲜,却是值得信赖的量化研究和实盘交易的基石。通过建立清晰的缺失数据处理策略、稳健的异常值处理方法、日历感知的时间对齐机制以及透明的调整记录,你的特征和回测结果将反映真实的市场行为,而非数据的怪癖。
记住这条核心原则:永远不要前向填充收益率或标签。如果必须为指标填充收盘价,请对大间隔进行掩码处理并记录在案。
建议将每个清洗步骤封装为小型、可测试的函数,持久化数据时附带元数据(供应商、日历、调整规则),对流程进行版本控制,并生成每日质量报告。只有这样,你的量化交易系统才能建立在坚实可靠的数据基础之上。
加入专注于财经数据与量化投研的知识星球【数据科学实战】,获取本文完整研究解析、代码实现细节。
核心权益如下:
星球已有丰富内容积累,包括量化投研论文、财经高频数据、 PyBroker 视频教程、定期直播、数据分享和答疑解难。适合对量化投研和财经数据分析有兴趣的学习者及从业者。欢迎加入我们!
好文推荐
1. 用 Python 打造股票预测系统:Transformer 模型教程(一)
2. 用 Python 打造股票预测系统:Transformer 模型教程(二)
3. 用 Python 打造股票预测系统:Transformer 模型教程(三)
4. 用 Python 打造股票预测系统:Transformer 模型教程(完结)
6. YOLO 也能预测股市涨跌?计算机视觉在股票市场预测中的应用
9. Python 量化投资利器:Ridge、Lasso 和 Elastic Net 回归详解
好书推荐