距离我们上次讨论事件驱动回测系统已有一段时间,该话题始于本系列前文。在之前的文章中,我介绍了如何编写一个适用于历史回测场景的ExecutionHandler替代实现。本文将完成对应的Interactive Brokers API处理器开发,以推动系统向实盘交易迈进。
在本文中,我也会将一些基础接口完整封装进事件驱动系统中,待与实时行情对接后,即可构成自动化执行系统的基础框架。
从功能完备性角度,该类本可加入执行优化逻辑与复杂错误处理机制,从而变得相当复杂。但为便于大家快速把握核心设计思路,并在此基础上根据自身交易风格灵活扩展,我选择保持其架构相对简洁。
Python 实现 按照惯例,首先创建 Python 文件并导入所需库。该文件命名为ib_execution.py,与其他事件驱动模块置于同一目录下。
我们将导入必要的日期时间处理库、IbPy 模块对象,以及IBExecutionHandler 所处理的具体事件类型。
# ib_execution.pyimport datetimeimport timefrom ib.ext.Contract import Contractfrom ib.ext.Order import Orderfrom ib.opt import ibConnection, messagefrom event import FillEvent, OrderEventfrom execution import ExecutionHandler
接下来我们来定义 IBExecutionHandler 类。构造函数__init__首先需要传入events事件队列参数,同时还需指定order_routing订单路由策略,默认设为 "SMART"。若有特定交易所需求,可在此处进行相应配置。默认currency币种设置为美元。
在方法内部,我们初始化了一个 fill_dict 字典,用于后续生成 FillEvent 实例。同时创建tws_conn 连接对象,用于管理与 Interactive Brokers API 的连接状态。此外还需要设定一个初始order_id,用以跟踪后续所有订单、避免重复。最后一步是注册各类消息处理器(具体实现将在后文详述)。
# ib_execution.pyclass IBExecutionHandler(ExecutionHandler):"""Handles order execution via the Interactive BrokersAPI, for use against accounts when trading livedirectly."""def __init__(self, events,order_routing="SMART",currency="USD"):"""Initialises the IBExecutionHandler instance."""self.events = eventsself.order_routing = order_routingself.currency = currencyself.fill_dict = {}self.tws_conn = self.create_tws_connection()self.order_id = self.create_initial_order_id()self.register_handlers()
IB API 基于消息驱动的事件系统,其工作方式与我们的事件驱动回测引擎类似,允许类对特定消息做出自定义响应。为保持代码简洁,本文未引入完善的错误处理机制,仅通过 _error_handler 方法将错误信息输出至终端。
当捕获到 "orderStatus" 消息且状态为订单已成交通知时,该方法会调用 create_fill来生成FillEvent 实例,同时将消息内容输出至终端,以便记录日志和调试。
# ib_execution.pydef _error_handler(self, msg):"""Handles the capturing of error messages"""# Currently no error handling.print "Server Error: %s" % msgdef _reply_handler(self, msg):"""Handles of server replies"""# Handle open order orderId processingif msg.typeName == "openOrder" and \msg.orderId == self.order_id and \not self.fill_dict.has_key(msg.orderId):self.create_fill_dict_entry(msg)# Handle Fillsif msg.typeName == "orderStatus" and \msg.status == "Filled" and \self.fill_dict[msg.orderId]["filled"] == False:self.create_fill(msg)print "Server Response: %s, %s\n" % (msg.typeName, msg)
以下定义的 create_tws_connection 方法用于建立与 IB API 的连接,内部调用 IbPy 提供的 ibConnection 对象实现。默认端口设为 7496,客户端 ID 默认为 10。连接对象创建完成后,即调用 connect 执行实际连接操作。
# ib_execution.pydef create_tws_connection(self):"""Connect to the Trader Workstation (TWS) running on theusual port of 7496, with a clientId of 10.The clientId is chosen by us and we will needseparate IDs for both the execution connection andmarket data connection, if the latter is used elsewhere."""tws_conn = ibConnection()tws_conn.connect()return tws_conn
create_initial_order_id 方法来维护订单标识。当前示例中默认设初始值为 "1",但在实际应用中,更严谨的做法是通过接口查询 IB 当前可用的最新订单 ID 并进行设置。如需手动重置当前 API 订单编号,可通过 Trader Workstation 界面操作:进入“全局配置” > “API 设置”面板进行调整。# ib_execution.pydef create_initial_order_id(self):"""Creates the initial order ID used for InteractiveBrokers to keep track of submitted orders."""# There is scope for more logic here, but we# will use "1" as the default for now.return 1
# ib_execution.pydef register_handlers(self):"""Register the error and server replymessage handling functions."""# Assign the error handling function defined above# to the TWS connectionself.tws_conn.register(self._error_handler, 'Error')# Assign all of the server reply messages to the# reply_handler function defined aboveself.tws_conn.registerAll(self._reply_handler)
Contract实例。# ib_execution.pydef create_contract(self, symbol, sec_type, exch, prim_exch, curr):"""Create a Contract object defining what willbe purchased, at which exchange and in which currency.symbol - The ticker symbol for the contractsec_type - The security type for the contract ('STK' is 'stock')exch - The exchange to carry out the contract onprim_exch - The primary exchange to carry out the contract oncurr - The currency in which to purchase the contract"""contract = Contract()contract.m_symbol = symbolcontract.m_secType = sec_typecontract.m_exchange = exchcontract.m_primaryExch = prim_exchcontract.m_currency = currreturn contract
Order 实例。该方法接收订单类型(如市价单或限价单)、交易数量以及交易方向(买入或卖出)作为参数,最终返回一个完整的 Order实例。# ib_execution.pydef create_order(self, order_type, quantity, action):"""Create an Order object (Market/Limit) to go long/short.order_type - 'MKT', 'LMT' for Market or Limit ordersquantity - Integral number of assets to orderaction - 'BUY' or 'SELL'"""order = Order()order.m_orderType = order_typeorder.m_totalQuantity = quantityorder.m_action = actionreturn order
# ib_execution.pydef create_fill_dict_entry(self, msg):"""Creates an entry in the Fill Dictionary that listsorderIds and provides security information. This isneeded for the event-driven behaviour of the IBserver message behaviour."""self.fill_dict[msg.orderId] = {"symbol": msg.contract.m_symbol,"exchange": msg.contract.m_exchange,"direction": msg.order.m_action,"filled": False}
# ib_execution.pydef create_fill(self, msg):"""Handles the creation of the FillEvent that will beplaced onto the events queue subsequent to an orderbeing filled."""fd = self.fill_dict[msg.orderId]# Prepare the fill datasymbol = fd["symbol"]exchange = fd["exchange"]filled = msg.filleddirection = fd["direction"]fill_cost = msg.avgFillPrice# Create a fill event objectfill = FillEvent(datetime.datetime.utcnow(), symbol,exchange, filled, direction, fill_cost)# Make sure that multiple messages don't create# additional fills.self.fill_dict[msg.orderId]["filled"] = True# Place the fill event onto the event queueself.events.put(fill_event)
前置方法均已实现后,接下来需要重写 ExecutionHandler 抽象基类中的 execute_order 。该方法的核心职责是通过 IB API 实际执行订单委托。
首先需验证接收的事件确为 OrderEvent类型,随后分别构造 Contract与 Order 对象并完成参数配置。两者准备就绪后,调用连接对象的placeOrder方法,同时传入对应的order_id 进行委托。
此处有一个极易被忽视但至关重要的细节:务必调用 time.sleep(1),以确保订单指令真正送达 IB。在我个人的测试环境中,若缺失此行代码,API 将表现出明显不稳定的行为。
最后,对当前订单 ID 进行自增处理,以防止产生重复订单。
# ib_execution.pydef execute_order(self, event):"""Creates the necessary InteractiveBrokers order objectand submits it to IB via their API.The results are then queried in order to generate acorresponding Fill object, which is placed back onthe event queue.Parameters:event - Contains an Event object with order information."""if event.type == 'ORDER':# Prepare the parameters for the asset orderasset = event.symbolasset_type = "STK"order_type = event.order_typequantity = event.quantitydirection = event.direction# Create the Interactive Brokers contract via the# passed Order eventib_contract = self.create_contract(asset, asset_type, self.order_routing,self.order_routing, self.currency)# Create the Interactive Brokers order via the# passed Order eventib_order = self.create_order(order_type, quantity, direction)# Use the connection to the send the order to IBself.tws_conn.placeOrder(self.order_id, ib_contract, ib_order)# NOTE: This following line is crucial.# It ensures the order goes through!time.sleep(1)# Increment the order ID for this sessionself.order_id += 1
该类构成了Interactive Brokers执行处理程序的基础,可用于替代仅适用于回测的模拟执行模块。不过,要启用IB执行处理器,还需进一步构建实时行情接口处理器,以取代回测系统中的历史数据模块——这部分内容将在后续文章中展开。
由此可见,我们正在最大限度地复用回测与实盘系统的代码,使得组件“替换面”尽可能小,从而确保两者在行为上高度一致,甚至几乎无异。