用代码量化筹码分布,让选股策略更精准
在上一篇文章中,我们详细介绍了通达信WINNER函数的原理和应用。但很多读者反馈:如何在Python中实现这个函数? 毕竟,对于量化交易者来说,能把通达信的经典指标迁移到Python环境中,意味着可以自动化选股、批量回测、实盘交易。
今天,我们就来一步步讲解如何用Python实现WINNER函数,并搭建完整的筹码分析系统。
一、WINNER函数的本质理解
在动手写代码之前,先明确WINNER函数的数学含义:
WINNER(CLOSE) = 当前价格以下的筹码总量 / 总流通筹码
简单说,就是计算有多少比例的筹码成本低于当前股价,这些筹码处于盈利状态。
但要准确计算这个值,我们需要理解筹码分布的核心模型——成本分布的概率密度模型。
筹码分布的底层逻辑
每笔成交可以看作筹码在不同价格之间的转移。模型的核心假设是:当日的成交量会按照一定的概率分布,分配到当日的最高价到最低价之间。
常见的分布模型有:
均匀分布:假设成交量在价格区间内均匀分布
三角分布:假设成交量集中在均价附近,向两端递减
正态分布:假设成交量在均价附近呈正态分布
通达信官方算法未完全公开,但通过逆向工程和实践验证,业界已经总结出了较准确的复现方法。
二、最简实现:滑窗比例法
如果只是需要快速实现,最简单的理解方式是:统计过去一段时间内,收盘价在当前价格以下的占比。
import pandas as pdimport numpy as npdef winner_simple(close_prices, current_price, window=250): """ 最简版WINNER函数 - 滑窗比例法 参数: close_prices: 历史收盘价序列 current_price: 当前价格 window: 统计窗口期(默认250个交易日) 返回: 获利盘比例 (0-1) """ # 取最近window天的数据 recent_prices = close_prices[-window:] # 计算低于当前价格的比例 winner_ratio = (recent_prices <= current_price).sum() / len(recent_prices) return winner_ratio# 使用示例# 假设df是包含收盘价的数据框df['winner_simple'] = df['close'].rolling(250).apply( lambda x: (x <= x.iloc[-1]).sum() / len(x))
优点:代码简单,容易理解
缺点:精度较低,因为没有考虑成交量的分布
三、进阶实现:基于量价分布的WINNER
为了提高精度,需要把成交量因素纳入考虑。核心思路是:
将历史价格区间划分为多个小格子
每个格子的筹码密度 = 落入该价格区间的成交量 × 衰减系数
WINNER = 当前价格以下的筹码总量 / 总筹码
import pandas as pdimport numpy as npfrom typing import Tupleclass ChipAnalysis: """筹码分布分析类""" def __init__(self, df: pd.DataFrame, decay_coeff: float = 1.0): """ 初始化 参数: df: 包含open, high, low, close, volume的DataFrame decay_coeff: 换手衰减系数,默认1.0 """ self.df = df.copy() self.decay_coeff = decay_coeff self.chip_distribution = None # 存储筹码分布 def distribute_volume(self, row: pd.Series, n_bins: int = 100) -> np.ndarray: """ 将单日成交量分配到价格区间(三角分布模型) 参数: row: 单日数据(含high, low, close, volume) n_bins: 划分的格子数(精度) 返回: 各价格区间的筹码分配数组 """ high, low, close, volume = row['high'], row['low'], row['close'], row['volume'] # 生成价格格子 price_bins = np.linspace(low, high, n_bins + 1) bin_width = (high - low) / n_bins # 三角分布: 在close处密度最高,向high和low线性递减 # 计算每个格子的概率密度 densities = np.zeros(n_bins) for i in range(n_bins): bin_center = (price_bins[i] + price_bins[i+1]) / 2 if bin_center <= close: # 左半边:线性增加 density = (bin_center - low) / (close - low) if close > low else 1 else: # 右半边:线性减少 density = (high - bin_center) / (high - close) if high > close else 1 densities[i] = max(0, min(1, density)) # 归一化并乘以成交量 densities = densities / densities.sum() if densities.sum() > 0 else densities return densities * volume * self.decay_coeff def build_chip_distribution(self, lookback: int = 250, n_bins: int = 100): """ 构建筹码分布模型 参数: lookback: 回溯周期(天) n_bins: 价格区间划分精度 """ # 初始化筹码分布数组 price_range = self.df['high'].max() - self.df['low'].min() min_price = self.df['low'].min() - price_range * 0.1 max_price = self.df['high'].max() + price_range * 0.1 price_edges = np.linspace(min_price, max_price, n_bins + 1) # 累积筹码 total_chips = np.zeros(n_bins) for idx, row in self.df.tail(lookback).iterrows(): daily_chips = self.distribute_volume(row, n_bins) # 应用衰减(可选) total_chips = total_chips * 0.998 + daily_chips self.chip_distribution = total_chips self.price_edges = price_edges self.bin_centers = (price_edges[:-1] + price_edges[1:]) / 2 def winner(self, price: float) -> float: """ 计算获利盘比例 WINNER(price) 参数: price: 目标价格(如当前收盘价) 返回: 获利盘比例 (0-1) """ if self.chip_distribution is None: raise ValueError("请先调用 build_chip_distribution() 构建筹码分布") # 找到价格所在的区间索引 idx = np.searchsorted(self.bin_centers, price) # 累计获利筹码 winner_chips = self.chip_distribution[:idx].sum() total_chips = self.chip_distribution.sum() return winner_chips / total_chips if total_chips > 0 else 0 def cost(self, percentile: float) -> float: """ 计算成本分布百分位 COST(percentile) 参数: percentile: 百分位 (0-100) 返回: 对应百分位的价格 """ if self.chip_distribution is None: raise ValueError("请先调用 build_chip_distribution() 构建筹码分布") target_volume = self.chip_distribution.sum() * percentile / 100 cumsum = np.cumsum(self.chip_distribution) idx = np.searchsorted(cumsum, target_volume) idx = min(idx, len(self.bin_centers) - 1) return self.bin_centers[idx]# 使用示例# 准备数据(需包含OHLCV)# df = yf.download('000001.SS', period='1y')# 创建分析器# chip = ChipAnalysis(df)# chip.build_chip_distribution(lookback=250)# 计算当前获利盘比例# winner_value = chip.winner(df['close'].iloc[-1])# print(f"获利盘比例: {winner_value:.2%}")# 计算90%成本分布# cost_90 = chip.cost(90)# print(f"90%成本线: {cost_90:.2f}")
这个实现采用了三角分布模型,更接近通达信的实际算法。
四、生产级方案:使用fengwo模块
如果你追求最高精度和性能,强烈推荐使用fengwo模块。它通过逆向工程直接对齐通达信算法,底层用C/C++编写,速度和准确性都有保障。
安装
基本使用
import fengwoimport pandas as pdimport yfinance as yf# 获取数据df = yf.download('AAPL', period='6mo', interval='1d')# 方法1:直接计算WINNERwinner_values = fengwo.WINNER(df['Close'].values)# 方法2:计算成本分布cost_50 = fengwo.COST(df, 50) # 50%成本线print(f"最新获利盘比例: {winner_values[-1]:.2%}")
多股票并行计算
from concurrent.futures import ThreadPoolExecutorimport fengwodef calc_winner(symbol): df = yf.download(symbol, period='6mo') return fengwo.WINNER(df['Close'].values)[-1]symbols = ['AAPL', 'GOOGL', 'MSFT', 'TSLA']with ThreadPoolExecutor(max_workers=4) as executor: results = dict(zip(symbols, executor.map(calc_winner, symbols)))for symbol, winner in results.items(): print(f"{symbol}: 获利盘比例 {winner:.2%}")
fengwo模块破除了Python的GIL限制,可以充分利用多核CPU并行计算。
五、实战案例:基于WINNER的选股策略
将WINNER函数与策略逻辑结合,构建可量化的选股系统:
def winner_pick_stocks(df_dict, winner_threshold_low=0.1, winner_threshold_high=0.9): """ 基于WINNER的选股策略 参数: df_dict: {股票代码: DataFrame} 字典 winner_threshold_low: 底部阈值(超卖) winner_threshold_high: 顶部阈值(超买) 返回: 信号字典 """ signals = {} for symbol, df in df_dict.items(): # 计算WINNER chip = ChipAnalysis(df) chip.build_chip_distribution(lookback=250) winner = chip.winner(df['close'].iloc[-1]) # 信号判断 if winner < winner_threshold_low: signals[symbol] = {'signal': 'BUY', 'winner': winner, 'reason': '底部区域'} elif winner > winner_threshold_high: signals[symbol] = {'signal': 'SELL', 'winner': winner, 'reason': '顶部区域'} else: signals[symbol] = {'signal': 'HOLD', 'winner': winner, 'reason': '中性区域'} return signals# 实战示例# stock_data = {}# for symbol in ['000001.SS', '000002.SZ', '600036.SS']:# stock_data[symbol] = yf.download(symbol, period='1y')# # results = winner_pick_stocks(stock_data)# for symbol, info in results.items():# print(f"{symbol}: {info['signal']} ({info['reason']}) - 获利盘:{info['winner']:.2%}")
六、注意事项与使用技巧
1. 数据完整性的重要性
WINNER函数的准确性高度依赖完整的价量数据,缺少任何一天的数据都可能影响筹码累积的准确性。
2. 衰减系数的调整
不同股票的换手率特性不同,可以根据实际调整衰减系数:
3. 与其他指标配合
单一指标的信号可靠性有限,建议组合使用:
策略类型 | 指标组合 | 说明 |
底部确认 | WINNER < 0.1 + VOL放量 | 恐慌释放+资金进场 |
趋势确认 | WINNER < 0.3 + MA金叉 | 底部回升确认 |
风险预警 | WINNER > 0.9 + MACD顶背离 | 获利盘集中+动能衰竭 |
4. 局限性认知
筹码分布模型存在几个固有缺陷:
因此,在使用时要结合实际情况理性判断。
七、总结
本文为你提供了三种Python实现WINNER函数的方法:
方法 | 精度 | 性能 | 难度 | 推荐场景 |
滑窗比例法 | ⭐⭐ | ⭐⭐⭐⭐⭐ | ⭐ | |
自定义筹码模型 | | | | 策略研究、个性化定制 |
fengwo模块 | | | | 生产环境、专业量化 |
无论选择哪种方案,关键是理解WINNER函数的核心原理——它将价格、成交量、时间三个维度的信息融合成一个可量化的指标,帮助我们透视市场的真实成本分布。
把通达信的经典指标搬到Python,意味着你可以: