import timeimport datetimeimport pandas as pdfrom collections import defaultdictfrom threading import Lockfrom loguru import loggerfrom xtquant.xttrader import XtQuantTrader, XtQuantTraderCallbackfrom xtquant.xttype import StockAccountfrom xtquant import xtconstantimport xtquant.xtdata as xtdata# ========== 企业微信日志配置 ==========from Logger_wx import init_loginit_log('企业微信机器人id')# ========== 全局配置区(用户可修改) ==========PATH = 'D:/QMT实盘-交易端/userdata_mini/' # 极简版QMT的路径ACCOUNT_ID = '券商账号' # 券商客户号HUADIAN = 0.01 # 滑点(元)# 监控配置MONITOR_INTERVAL = 3 # 监控间隔(秒)# 交易时间配置AM_START = datetime.time(9, 30)AM_END = datetime.time(11, 30)PM_START = datetime.time(13, 0)PM_END = datetime.time(15, 0) # 15:00结束监控# 涨停封单阈值(股)LIMIT_UP_BID_VOL_THRESHOLD = 200_0000 # 2万手 = 200万股# ========== 交易回调类 ==========class MyXtQuantTraderCallback(XtQuantTraderCallback): # 状态码映射 ORDER_STATUS_MAP = { 48: '未报', 49: '待报', 50: '已报', 51: '已报待撤', 52: '部成待撤', 53: '部撤', 54: '已撤', 55: '部成', 56: '已成', 57: '废单', 255: '未知' } def _get_stock_name(self, code): """获取股票名称""" try: detail = xtdata.get_instrument_detail(code) if detail: return detail.get('InstrumentName', code) except: pass return code def on_disconnected(self): logger.warning("⚠️ QMT连接断开") def on_stock_order(self, order): name = self._get_stock_name(order.stock_code) status_str = self.ORDER_STATUS_MAP.get(order.order_status, str(order.order_status)) logger.debug(f"委托回报: {order.stock_code}{name}, 状态:{status_str}({order.order_status}), 系统号:{order.order_sysid}") def on_stock_trade(self, trade): name = self._get_stock_name(trade.stock_code) logger.info(f"成交回报: {trade.stock_code}{name}, 价格:{trade.traded_price}, 数量:{trade.traded_volume}") def on_order_error(self, order_error): logger.error(f"下单失败: 订单号:{order_error.order_id}, 错误码:{order_error.error_id}, 信息:{order_error.error_msg}") def on_cancel_error(self, cancel_error): logger.error(f"撤单失败: 订单号:{cancel_error.order_id}, 错误码:{cancel_error.error_id}, 信息:{cancel_error.error_msg}")# ========== 持仓监控管理器 ==========class PositionMonitor: def __init__(self, xt_trader, account): self.xt_trader = xt_trader self.account = account self.lock = Lock() # 监控状态: code -> { # 'high_price': float, # 当日最高价(从tick数据获取) # 'high_gain_pct': float, # 最高价涨幅%(相对昨日收盘价) # 'pre_close_price': float, # 昨日收盘价 # 'cost_price': float, # 持仓成本价 # 'triggered': bool, # 是否已触发卖出 # 'start_time': datetime # 开始监控时间 # } self.monitoring = {} # 正在监控的股票(满足最高价涨幅条件且未触发卖出) # 冷却区改为dict,记录触发时的高点信息 # cooled_down: code -> { # 'trigger_high_price': float, # 触发卖出时的最高价 # 'trigger_high_gain_pct': float, # 触发卖出时的最高价涨幅%(相对昨日收盘价) # 'trigger_pre_close_price': float, # 触发时的昨日收盘价 # 'trigger_time': datetime, # 触发时间 # 'cost_price': float # 成本价 # } self.cooled_down = {} # 已触发卖出、等待创新高的股票(冷却期) # 当日已卖出的股票记录 self.sold_records = defaultdict(list) logger.info("=" * 60) logger.info("✅ 持仓监控管理器初始化完成") logger.info("📊 策略参数: 最高价涨幅阈值=7.0%, 回撤阈值=2.0%") logger.info("=" * 60) def get_position_info(self): """获取当前持仓信息""" positions = self.xt_trader.query_stock_positions(self.account) if positions is None: return {} pos_dict = {} for pos in positions: if pos.volume > 0: pos_dict[pos.stock_code] = { 'volume': pos.volume, 'can_use_volume': pos.can_use_volume, 'open_price': pos.open_price, 'market_value': pos.market_value } return pos_dict def get_stock_name(self, code): """获取股票名称""" try: detail = xtdata.get_instrument_detail(code) if detail: return detail.get('InstrumentName', code) except: pass return code def is_limit_up(self, tick_data): #这个判断涨停的函数要改 """ 判断股票是否涨停 涨停判断:卖1价格为0或数量为0,且买1价格>0 """ try: ask_prices = tick_data.get('askPrice', []) ask_vols = tick_data.get('askVol', []) bid_prices = tick_data.get('bidPrice', []) sell_1_empty = (len(ask_prices) == 0 or ask_prices[0] == 0 or len(ask_vols) == 0 or ask_vols[0] == 0) buy_1_valid = len(bid_prices) > 0 and bid_prices[0] > 0 return sell_1_empty and buy_1_valid except Exception as e: logger.error(f"涨停判断出错: {e}") return False def calculate_sell_volume(self, can_use_volume): """计算卖出数量(一半),按100股取整""" half = can_use_volume // 2 sell_vol = (half // 100) * 100 if sell_vol < 100: return can_use_volume else: return sell_vol def sell_half_position(self, code, name, price, reason, tick_data=None): """ 卖出半仓 reason: 'drawback'(回撤) 或 'limit_up'(涨停开板) """ try: position = self.xt_trader.query_stock_position(self.account, code) if not position or position.can_use_volume <= 0: logger.warning(f"⚠️ {code} 无可卖持仓,跳过卖出") return False, -1 sell_volume = self.calculate_sell_volume(position.can_use_volume) if sell_volume <= 0: return False, -1 if reason == 'drawback': # 回撤卖出:卖1价格 - 滑点 if tick_data: ask_prices = tick_data.get('askPrice', []) base_price = ask_prices[0] if len(ask_prices) > 0 and ask_prices[0] > 0 else price else: base_price = price final_price = max(base_price - HUADIAN, 0.01) price_type = xtconstant.FIX_PRICE order_remark = f"回撤2%止盈_滑点{HUADIAN}" else: # limit_up # 涨停开板卖出:买1价格 if tick_data: bid_prices = tick_data.get('bidPrice', []) base_price = bid_prices[0] if len(bid_prices) > 0 and bid_prices[0] > 0 else price else: base_price = price final_price = base_price - HUADIAN price_type = xtconstant.FIX_PRICE order_remark = f"涨停开板_封单不足{LIMIT_UP_BID_VOL_THRESHOLD // 10000}万手" logger.info(f"📝 准备卖出 {name}({code}): 价{final_price:.3f}, 量{sell_volume}, 因:{reason}") order_id = self.xt_trader.order_stock( account=self.account, stock_code=code, order_type=xtconstant.STOCK_SELL, order_volume=sell_volume, price_type=price_type, price=final_price, strategy_name='DynamicTP', order_remark=order_remark ) success = order_id != -1 if success: self.sold_records[code].append({ 'time': datetime.datetime.now(), 'price': final_price, 'volume': sell_volume, 'reason': reason, 'order_id': order_id }) logger.success(f"✅ {name}({code}) 卖出委托成功,订单:{order_id}, 价:{final_price:.3f}, 量{sell_volume}") else: logger.error(f"❌ {name}({code}) 卖出委托失败") return success, order_id except Exception as e: logger.error(f"❌ 卖出异常 {code}: {e}") return False, -1 def update_monitoring_list(self, positions, tick_data_dict): """ 更新监控列表 核心逻辑: 1. 计算当日最高价涨幅 = (当日最高价 - 昨日收盘价) / 昨日收盘价 * 100% 2. 最高价涨幅 > 7.0% 则加入监控(不再判断浮盈) 3. 监控中的股票,从最高价回撤 > 2% 则卖出半仓,移入冷却区 4. 冷却区的股票,必须创新高(新的最高价 > 触发时的高点)才可重新监控 """ current_time = datetime.datetime.now() with self.lock: # 步骤1:检查cooled_down中的股票是否创新高,可以重新监控 reactivated = [] for code in list(self.cooled_down.keys()): if code not in positions: del self.cooled_down[code] continue tick = tick_data_dict.get(code) if not tick: continue current_price = tick.get('lastPrice', 0) cost_price = positions[code]['open_price'] today_high = tick.get('high', current_price) pre_close_price = tick.get('lastClose', 0) if pre_close_price <= 0: continue # 获取触发卖出时记录的高点信息 cooled_info = self.cooled_down[code] trigger_high = cooled_info['trigger_high_price'] trigger_gain_pct = cooled_info['trigger_high_gain_pct'] trigger_pre_close = cooled_info.get('trigger_pre_close_price', pre_close_price) # 计算当前最高价涨幅(相对昨日收盘价) current_high_gain_pct = (today_high - pre_close_price) / pre_close_price * 100 # 重新激活条件:必须严格创新高(新的最高价 > 触发时的最高价) # 且新的最高价涨幅 > 触发时的最高价涨幅(不再判断浮盈) is_new_high = today_high > trigger_high is_higher_gain = current_high_gain_pct > trigger_gain_pct if is_new_high and is_higher_gain: reactivated.append(code) del self.cooled_down[code] self.monitoring[code] = { 'high_price': today_high, 'high_gain_pct': current_high_gain_pct, 'pre_close_price': pre_close_price, 'cost_price': cost_price, 'triggered': False, 'start_time': current_time } name = self.get_stock_name(code) logger.info( f"🔄 {name}({code}) 重新激活监控: " f"新高点{today_high:.3f}(+{current_high_gain_pct:.2f}%, 昨收{pre_close_price:.3f}) > " f"原高点{trigger_high:.3f}(+{trigger_gain_pct:.2f}%)" ) else: # 记录调试信息,说明为什么没有激活 name = self.get_stock_name(code) if current_time.minute % 5 == 0 and current_time.second < 3: logger.debug( f"⏸️ {name}({code}) 仍在冷却: " f"当前高{today_high:.3f}(+{current_high_gain_pct:.2f}%, 昨收{pre_close_price:.3f}) vs " f"触发高{trigger_high:.3f}(+{trigger_gain_pct:.2f}%), " f"新高{is_new_high}, 更高涨幅{is_higher_gain}" ) # 步骤2:检查monitoring中的股票,更新最高价或触发卖出 triggered_codes = [] for code in list(self.monitoring.keys()): if code not in positions: del self.monitoring[code] continue tick = tick_data_dict.get(code) if not tick: continue current_price = tick.get('lastPrice', 0) info = self.monitoring[code] cost_price = info['cost_price'] pre_close_price = tick.get('lastClose', 0) # 更新当日最高价(从tick数据获取) today_high = tick.get('high', current_price) if today_high > info['high_price']: old_high = info['high_price'] info['high_price'] = today_high if pre_close_price > 0: info['high_gain_pct'] = (today_high - pre_close_price) / pre_close_price * 100 name = self.get_stock_name(code) logger.debug( f"📈 {name}({code}) 更新最高价: {old_high:.3f} -> {today_high:.3f}, " f"涨幅:{info['high_gain_pct']:.2f}%(昨收{pre_close_price:.3f})") # 检查触发条件 high_price = info['high_price'] drawdown_pct = (high_price - current_price) / high_price * 100 if high_price > 0 else 0 name = self.get_stock_name(code) # 条件A:从最高价回撤超过2%(不再判断浮盈) if drawdown_pct >= 2.0: logger.info( f"🎯 {name}({code}) 触发回撤卖出: " f"最高{high_price:.3f}, 当前{current_price:.3f}, " f"回撤{drawdown_pct:.2f}%(阈值2.0%)" ) success, order_id = self.sell_half_position(code, name, current_price, 'drawback', tick) if success: triggered_codes.append(code) info['triggered'] = True continue # 条件B:涨停且封单不足(不再判断浮盈) if self.is_limit_up(tick): bid_vols = tick.get('bidVol', []) buy_1_vol = bid_vols[0] if len(bid_vols) > 0 else 0 if buy_1_vol < LIMIT_UP_BID_VOL_THRESHOLD: limit_up_price = tick.get('bidPrice', [current_price])[0] logger.info( f"🎯 {name}({code}) 触发涨停开板: " f"买1封单{buy_1_vol / 10000:.1f}万手 < 阈值{LIMIT_UP_BID_VOL_THRESHOLD / 10000:.0f}万手" ) success, order_id = self.sell_half_position(code, name, limit_up_price, 'limit_up', tick) if success: triggered_codes.append(code) info['triggered'] = True # 步骤3:将触发卖出的股票移出monitoring,放入cooled_down(记录触发时的高点) for code in triggered_codes: if code in self.monitoring: trigger_info = self.monitoring[code] self.cooled_down[code] = { 'trigger_high_price': trigger_info['high_price'], 'trigger_high_gain_pct': trigger_info['high_gain_pct'], 'trigger_pre_close_price': trigger_info.get('pre_close_price', 0), 'trigger_time': current_time, 'cost_price': trigger_info['cost_price'] } del self.monitoring[code] name = self.get_stock_name(code) logger.info( f"🧊 {name}({code}) 移入冷却区: " f"记录高点{trigger_info['high_price']:.3f}(+{trigger_info['high_gain_pct']:.2f}%), " f"等待创新高后重新监控" ) # 步骤4:检查新持仓,加入监控 for code, pos in positions.items(): # 已经在监控中或冷却区,跳过 if code in self.monitoring or code in self.cooled_down: continue tick = tick_data_dict.get(code) if not tick: continue current_price = tick.get('lastPrice', 0) cost_price = pos['open_price'] today_high = tick.get('high', current_price) pre_close_price = tick.get('lastClose', 0) if pre_close_price <= 0 or cost_price <= 0: continue # 关键指标计算:最高价涨幅 = (当日最高价 - 昨日收盘价) / 昨日收盘价 * 100 high_gain_pct = (today_high - pre_close_price) / pre_close_price * 100 current_profit_pct = (current_price - cost_price) / cost_price * 100 # 监控条件:最高价涨幅 > 7.0%(不再判断浮盈) if high_gain_pct > 7.0: name = self.get_stock_name(code) self.monitoring[code] = { 'high_price': today_high, 'high_gain_pct': high_gain_pct, 'pre_close_price': pre_close_price, 'cost_price': cost_price, 'triggered': False, 'start_time': current_time } logger.info( f"🔍 新增监控 {name}({code}): " f"昨收{pre_close_price:.3f}, 最高{today_high:.3f}, " f"当日最高涨幅{high_gain_pct:.2f}%, 当前盈亏{current_profit_pct:.2f}%" ) return len(self.monitoring) def get_status_report(self): """获取当前状态报告""" with self.lock: return { 'monitoring_count': len(self.monitoring), 'cooled_down_count': len(self.cooled_down), 'monitoring_stocks': [ f"{code}(最高涨幅{info['high_gain_pct']:.1f}%)" for code, info in self.monitoring.items() ], 'cooled_down_stocks': [ f"{code}(高点{info['trigger_high_price']:.2f}/+{info['trigger_high_gain_pct']:.1f}%)" for code, info in self.cooled_down.items() ], 'today_sold_count': sum(len(v) for v in self.sold_records.values()) }# ========== 时间检查函数 ==========def is_in_trading_time(): """检查当前是否在交易时间内(9:30-11:30, 13:00-15:00)""" now = datetime.datetime.now().time() if AM_START <= now <= AM_END: return True if PM_START <= now <= PM_END: return True return Falsedef should_stop_trading(): """检查是否应该停止交易(15:00之后)""" now = datetime.datetime.now().time() return now > PM_END# ========== 主程序 ==========def main(): """主程序入口""" logger.info("=" * 70) logger.info("🚀 QMT动态止盈策略 V260318 启动") logger.info(f"📁 QMT路径: {PATH}") logger.info(f"💳 资金账号: {ACCOUNT_ID}") logger.info(f"📊 滑点: {HUADIAN}元") logger.info(f"🎯 策略参数: 最高价涨幅≥7.0%, 回撤≥2.0% 卖出") logger.info(f"⏰ 交易时段: 9:30-11:30, 13:00-15:00") logger.info("=" * 70) # 初始化QMT session_id = int(datetime.datetime.now().timestamp()) % 100000 xt_trader = XtQuantTrader(PATH, session_id) callback = MyXtQuantTraderCallback() xt_trader.register_callback(callback) xt_trader.start() connect_result = xt_trader.connect() if connect_result != 0: logger.error(f"❌ QMT连接失败: {connect_result}") return logger.info("✅ QMT连接成功") account = StockAccount(ACCOUNT_ID) subscribe_result = xt_trader.subscribe(account) if subscribe_result != 0: logger.error(f"❌ 账号订阅失败: {subscribe_result}") return logger.info(f"✅ 账号订阅成功: {ACCOUNT_ID}") monitor = PositionMonitor(xt_trader, account) last_status_time = None running = True try: while running: now = datetime.datetime.now() # 检查是否到达结束时间 if should_stop_trading(): logger.info("⏰ 到达15:00,结束监控") status = monitor.get_status_report() logger.success( f"📊 交易结束统计:\n" f" - 今日监控过: {status['monitoring_count'] + status['cooled_down_count']}只\n" f" - 冷却区: {status['cooled_down_count']}只\n" f" - 今日总卖出: {status['today_sold_count']}次" ) running = False break # 检查是否在交易时间 if not is_in_trading_time(): if last_status_time is None or (now - last_status_time).seconds >= 180: logger.debug(f"⏳ 非交易时间 {now.strftime('%H:%M:%S')},等待...") last_status_time = now time.sleep(10) continue # 交易时间内执行监控 try: positions = monitor.get_position_info() if not positions: logger.debug("📭 当前无持仓") time.sleep(MONITOR_INTERVAL) continue # 获取全推数据 stock_list = list(positions.keys()) tick_data = xtdata.get_full_tick(stock_list) if not tick_data: logger.warning("⚠️ 获取tick数据失败") time.sleep(MONITOR_INTERVAL) continue # 更新监控 monitoring_count = monitor.update_monitoring_list(positions, tick_data) # 定期输出状态(每2分钟) if now.minute % 2 == 0 and now.second < 3: status = monitor.get_status_report() logger.info( f"📊 状态 [{now.strftime('%H:%M:%S')}]: " f"监控{status['monitoring_count']}只{status['monitoring_stocks']}, " f"冷却{status['cooled_down_count']}只{status['cooled_down_stocks']}, " f"已卖{status['today_sold_count']}次" ) last_status_time = now time.sleep(MONITOR_INTERVAL) except Exception as e: logger.error(f"❌ 监控循环异常: {e}") time.sleep(MONITOR_INTERVAL) except KeyboardInterrupt: logger.info("🛑 用户中断") except Exception as e: logger.error(f"❌ 主程序异常: {e}") finally: logger.info("🧹 执行清理...") try: xt_trader.stop() logger.info("✅ QMT已停止") except: pass logger.info("👋 程序已退出")if __name__ == '__main__': main()