import pandas as pdimport numpy as npfrom scipy import statsimport talibclass WashingPatternAnalyzer: def __init__(self, df): """ df需包含列: ['open', 'high', 'low', 'close', 'volume'] """ self.df = df.copy() self.n = len(df) self.features = {} def calculate_basic_features(self): """基础技术指标计算""" self.df['ma5'] = talib.MA(self.df['close'], 5) self.df['ma10'] = talib.MA(self.df['close'], 10) self.df['ma20'] = talib.MA(self.df['close'], 20) self.df['ma30'] = talib.MA(self.df['close'], 30) # 波动率 self.df['atr'] = talib.ATR(self.df['high'], self.df['low'], self.df['close'], 14) # 量比 self.df['vol_ma5'] = talib.MA(self.df['volume'], 5) self.df['vol_ratio'] = self.df['volume'] / self.df['vol_ma5'] # 涨跌幅 self.df['pct_change'] = self.df['close'].pct_change() def platform_washing(self, lookback=30, recent_days=10): """ 1. 平台式洗盘 特征:长期缩量 + 窄幅横盘 + 近期大阳突破 """ if self.n < lookback + recent_days: return 0 # 取前lookback天 prev_data = self.df.iloc[-(lookback+recent_days):-recent_days] recent_data = self.df.iloc[-recent_days:] # 平台期特征:波动极小,均线粘合,极度缩量 price_range = (prev_data['high'].max() - prev_data['low'].min()) / prev_data['close'].mean() ma_spread = (prev_data['ma5'].std() + prev_data['ma10'].std() + prev_data['ma20'].std()) / prev_data['close'].mean() vol_shrink = prev_data['volume'].mean() / self.df['volume'].rolling(60).mean().iloc[-lookback-1:-1].mean() # 近期突破特征:大阳线,放量 recent_up = recent_data['close'] > recent_data['open'] big_body = (recent_data['close'] - recent_data['open']) / recent_data['open'] > 0.05 volume_surge = recent_data['volume'].iloc[-1] / prev_data['volume'].mean() > 2 # 综合评分 platform_score = 0 if price_range < 0.08 and ma_spread < 0.02: # 窄幅+均线粘合 platform_score += 0.3 if vol_shrink < 0.6: # 极度缩量 platform_score += 0.3 if big_body.any() and volume_surge: # 大阳突破+放量 platform_score += 0.4 return min(platform_score, 1.0) def震荡_washing(self, lookback=40): """ 2. 震荡式洗盘 特征:大幅波动,无规律,量能时大时小 """ if self.n < lookback: return 0 data = self.df.iloc[-lookback:] # 计算波动率和方向变化 returns = data['close'].pct_change().dropna() volatility = returns.std() * np.sqrt(252) # 年化波动率 # 方向变化次数(震荡特征) direction_changes = ((returns > 0) != (returns.shift(1) > 0)).sum() # 量能变化率 vol_cv = data['volume'].std() / data['volume'].mean() # 无序度:K线阴阳交错 yin_yang_alternate = ((data['close'] > data['open']) != (data['close'].shift(1) > data['open'].shift(1))).mean() score = 0 if volatility > 0.25: # 高波动 score += 0.3 if direction_changes > lookback * 0.3: # 频繁变向 score += 0.3 if vol_cv > 0.8: # 量能变化大 score += 0.2 if yin_yang_alternate > 0.6: # 阴阳交错频繁 score += 0.2 return min(score, 1.0) def pullback_washing(self, lookback=20): """ 3. 边拉边洗式洗盘 特征:上升趋势,回调不破关键均线,快速收复 """ if self.n < lookback: return 0 data = self.df.iloc[-lookback:] # 上升趋势:收盘价逐步抬高 uptrend = data['close'].iloc[-1] > data['close'].iloc[0] * 1.1 # 涨幅>10% # 回调不破MA10 ma10_breaks = (data['low'] < data['ma10']).sum() # 快速收复失地:回调后3日内回到前高附近 max_close = data['close'][:lookback//2].max() recovery = data['close'].iloc[-1] > max_close * 0.95 # 回调幅度小 drawdown = (data['close'].cummax() - data['close']) / data['close'].cummax() max_drawdown = drawdown.max() score = 0 if uptrend: score += 0.3 if ma10_breaks <= 2: # 很少破10日线 score += 0.3 if recovery: score += 0.2 if max_drawdown < 0.05: # 最大回撤<5% score += 0.2 return min(score, 1.0) def w_bottom_washing(self, lookback=60): """ 4. W底洗盘 特征:双底形态,右底低于左底,突破颈线放量 """ if self.n < lookback: return 0 data = self.df.iloc[-lookback:] # 找局部低点 lows = [] for i in range(5, len(data)-5): if data['low'].iloc[i] == data['low'].iloc[i-5:i+5].min(): lows.append((i, data['low'].iloc[i])) if len(lows) < 2: return 0 left_bottom_idx, left_bottom = lows[0] right_bottom_idx, right_bottom = lows[1] # 右底低于左底 right_lower = right_bottom < left_bottom # 两个底部相隔一定距离 bottom_distance = right_bottom_idx - left_bottom_idx > 10 # 突破颈线(两底中间高点) neckline = data['high'].iloc[left_bottom_idx:right_bottom_idx].max() breakout = data['close'].iloc[-1] > neckline # 突破时放量 breakout_vol = data['volume'].iloc[-1] > data['volume'].iloc[-5:].mean() * 1.5 score = 0 if right_lower and bottom_distance: score += 0.4 if breakout: score += 0.3 if breakout_vol: score += 0.3 return min(score, 1.0) def v_bottom_washing(self, lookback=20): """ 5. V底洗盘 特征:快速下跌→快速反转,底部放量 """ if self.n < lookback: return 0 data = self.df.iloc[-lookback:] # 前半段快速下跌 half = lookback // 2 first_half_drop = (data['close'].iloc[half] - data['close'].iloc[0]) / data['close'].iloc[0] fast_drop = first_half_drop < -0.08 # 跌幅>8% # 后半段快速反转 second_half_rise = (data['close'].iloc[-1] - data['close'].iloc[half]) / abs(data['close'].iloc[half] - data['close'].iloc[0]) fast_rise = second_half_rise > 0.08 # 涨幅>8% # 整体V型:最低点在中间附近 min_idx = data['low'].argmin() v_shape = abs(min_idx - half) < 3 # 最低点接近中间 # 底部放量 bottom_vol = data['volume'].iloc[min_idx-2:min_idx+3].mean() > data['volume'].iloc[:half].mean() * 1.5 score = 0 if fast_drop and fast_rise: score += 0.4 if v_shape: score += 0.3 if bottom_vol: score += 0.3 return min(score, 1.0) def volume_break_washing(self, lookback=40): """ 6. 缩量突破式洗盘 特征:前期缩量,突破时放量,突破趋势线 """ if self.n < lookback: return 0 data = self.df.iloc[-lookback:] # 前期(前70%)极度缩量 early_data = data.iloc[:int(lookback*0.7)] late_data = data.iloc[int(lookback*0.7):] early_vol_shrink = early_data['volume'].mean() / data['volume'].rolling(80).mean().iloc[-lookback-1:-1].mean() # 后期放量突破 volume_surge = late_data['volume'].iloc[-1] > early_data['volume'].mean() * 2 # 突破前高(假设前高在early_data中) prev_high = early_data['high'].max() breakout = data['close'].iloc[-1] > prev_high * 1.03 # 突破3% score = 0 if early_vol_shrink < 0.5: # 前期极度缩量 score += 0.4 if volume_surge: score += 0.3 if breakout: score += 0.3 return min(score, 1.0) def accumulation_washing(self, lookback=50): """ 7. 堆量拉涨式洗盘 特征:小阴小阳堆量缓慢上涨,建立仓位 """ if self.n < lookback: return 0 data = self.df.iloc[-lookback:] # 小阴小阳:实体小,涨跌幅小 body_size = abs(data['close'] - data['open']) / data['open'] small_body = (body_size < 0.03).mean() > 0.8 # 80%是小实体 # 堆量:成交量温和放大,阶梯状 vol_trend = np.polyfit(range(len(data)), data['volume'], 1)[0] > 0 # 量能上升 vol_consistent = data['volume'].std() / data['volume'].mean() < 0.5 # 量能稳定 # 缓慢上涨 slow_rise = (data['close'].iloc[-1] - data['close'].iloc[0]) / data['close'].iloc[0] > 0.15 # 涨幅>15% low_volatility = data['close'].pct_change().std() < 0.02 # 日波动率<2% score = 0 if small_body: score += 0.3 if vol_trend and vol_consistent: score += 0.3 if slow_rise and low_volatility: score += 0.4 return min(score, 1.0) def analyze_all(self): """分析所有形态""" self.calculate_basic_features() patterns = { 'platform': self.platform_washing(), '震荡': self.震荡_washing(), 'pullback': self.pullback_washing(), 'w_bottom': self.w_bottom_washing(), 'v_bottom': self.v_bottom_washing(), 'volume_break': self.volume_break_washing(), 'accumulation': self.accumulation_washing() } return patterns# 使用示例if __name__ == "__main__": # 生成模拟数据 np.random.seed(42) dates = pd.date_range('2023-01-01', periods=100, freq='D') # 模拟平台式洗盘数据 base_price = 10 noise = np.random.normal(0, 0.1, 100) noise[30:40] = np.random.normal(-0.5, 0.05, 10) # 打压 noise[40:60] = np.random.normal(0.05, 0.02, 20) # 平台 noise[60:] = np.random.normal(0.3, 0.1, 40) # 突破拉升 prices = base_price + np.cumsum(noise) volumes = np.ones(100) * 1000 volumes[60:] = volumes[60:] * 3 # 突破放量 df = pd.DataFrame({ 'date': dates, 'open': prices - 0.1, 'high': prices + 0.1, 'low': prices - 0.2, 'close': prices, 'volume': volumes }) # 分析 analyzer = WashingPatternAnalyzer(df) results = analyzer.analyze_all() for pattern, score in results.items(): print(f"{pattern}: {score:.2f}")