在本系列的前两篇文章中,我们讨论了事件驱动型回测系统以及事件对象的类层次结构。在本文中,我们将探讨市场数据在历史回测与实盘交易中的具体应用机制。
构建事件驱动交易系统的核心目标之一是尽量减少回测组件与实盘执行组件之间的代码重复。理想情况下,我们希望将相同的信号生成方法和投资组合管理组件同时应用于历史测试和实盘交易。为了实现这一点,生成信号的策略对象Strategy和基于信号提供订单的投资组合对象Portfolio必须通过统一接口获取历史数据和实时市场数据。
这就引出了基于数据处理器DataHandler对象的类层次结构的概念,它为所有子类提供了一个向系统其它组件提供市场数据的统一接口。通过这种方式,任何子类数据处理器都可以被“替换”,而不会影响策略或投资组合的计算逻辑。
具体的子类示例可能包括 历史CSV数据处理器 HistoricCSVDataHandler、证券主数据处理器QuandlDataHandler、SecuritiesMasterDataHandler等。在本文中,我们仅考虑创建一个历史CSV数据处理器,它将为股票加载包含开盘价-最低价-最高价-收盘价-成交量-持仓量(Open-Low-High-Close-Volume-OpenInterest)的日内CSV数据。然后,这可以在系统的每一次“心跳”中,以逐根K线为基础“滴灌式”地将数据输入到策略Strategy和投资组合类Portfolio中,从而避免前视偏差。
第一个任务是导入必要的库,也就是pandas 库和抽象基类工具。由于数据处理器还需要生成市场事件,我们要导入上一篇教程中描述的event.py文件:
# data.pyimport datetimeimport os, os.pathimport pandas as pdfrom abc import ABCMeta, abstractmethodfrom event import MarketEvent
数据处理器DataHandler是一个抽象基类(ABC),这意味着无法直接实例化该类的对象,只能通过实例化其子类来使用。这样设计的原理在于:,抽象基类为所有后续的数据处理器DataHandler子类提供了必须遵循的统一接口,从而确保与其他交互类别的兼容性。
我们使用__metaclass__属性告知 Python 这是一个抽象基类。此外,我们还使用@abstractmethod装饰系统来标识那些需要在子类中具体实现的方法(这与C++中的纯虚函数概念一致)。
其中两个核心方法是get_latest_bars和update_bars。前者返回从当前“心跳”时间戳起对应的最近的若干根 K 线数据,这对策略类Strategy中所需要进行的滚动计算非常有用;后者则提供了一种“滴灌式”数据更新机制,将新的 K 线数据逐条送入一个新的数据结构中,严格防止前视偏差的发生。要注意的是,如果尝试直接实例化这个抽象基类,程序将抛出异常:
# data.pyclass DataHandler(object): """ DataHandler is an abstract base class providing an interface for all subsequent (inherited) data handlers (both live and historic). The goal of a (derived) DataHandler object is to output a generated set of bars (OLHCVI) for each symbol requested. This will replicate how a live strategy would function as current market data would be sent "down the pipe". Thus a historic and live system will be treated identically by the rest of the backtesting suite. """ __metaclass__ = ABCMeta @abstractmethod def get_latest_bars(self, symbol, N=1): """ Returns the last N bars from the latest_symbol list, or fewer if less bars are available. """ raise NotImplementedError("Should implement get_latest_bars()") @abstractmethod def update_bars(self): """ Pushes the latest bar to the latest symbol structure for all symbols in the symbol list. """ raise NotImplementedError("Should implement update_bars()")
在定义了数据处理器DataHandler 抽象基类(ABC)之后,下一步就是为历史 CSV 文件创建一个数据处理器。具体来说,HistoricCSVDataHandler将接收多个 CSV 文件(每个交易代码对应一个文件),并将它们转换为为包含pandas数据帧的字典结构。
该数据处理器需要几个参数,即一个用于推送市场事件 MarketEvent 信息的事件队列、CSV 文件的绝对路径以及一个代码列表。以下是该类的初始化代码:
# data.pyclass HistoricCSVDataHandler(DataHandler): """ HistoricCSVDataHandler is designed to read CSV files for each requested symbol from disk and provide an interface to obtain the "latest" bar in a manner identical to a live trading interface. """ def __init__(self, events, csv_dir, symbol_list): """ Initialises the historic data handler by requesting the location of the CSV files and a list of symbols. It will be assumed that all files are of the form 'symbol.csv', where symbol is a string in the list. Parameters: events - The Event Queue. csv_dir - Absolute directory path to the CSV files. symbol_list - A list of symbol strings. """ self.events = events self.csv_dir = csv_dir self.symbol_list = symbol_list self.symbol_data = {} self.latest_symbol_data = {} self.continue_backtest = True self._open_convert_csv_files()
该处理器会默认地尝试以“SYMBOL.csv”的格式打开文件,其中 symbol 是交易品种代码。文件的格式可以修改以处理其他数据格式。文件的打开由下面的_open_convert_csv_files方法处理。
在HistoricCSVDataHandler内部使用 pandas 作为数据存储的一个好处是,所有被跟踪代码符号的索引可以合并在一起。这使得缺失的数据点能够通过前向填充、后向填充或插值等方法进行补全,从而确保不同交易品种的数据能够在逐根K线的层面上进行比较。这对于均值回归策略很有必要。要注意在合并所有交易代码索引时对union和reindex方法的使用:
# data.py def _open_convert_csv_files(self): """ Opens the CSV files from the data directory, converting them into pandas DataFrames within a symbol dictionary. For this handler it will be assumed that the data is taken from Yahoo. Thus its format will be respected. """ comb_index = None for s in self.symbol_list: # Load the CSV file with no header information, indexed on date self.symbol_data[s] = pd.read_csv( os.path.join(self.csv_dir, '%s.csv' % s), header=0, index_col=0, parse_dates=True, names=[ 'datetime', 'open', 'high', 'low', 'close', 'adj_close', 'volume' ] ) self.symbol_data[s].sort_index(inplace=True) # Combine the index to pad forward values if comb_index is None: comb_index = self.symbol_data[s].index else: comb_index.union(self.symbol_data[s].index) # Set the latest symbol_data to None self.latest_symbol_data[s] = [] for s in self.symbol_list: self.symbol_data[s] = self.symbol_data[s].reindex( index=comb_index, method='pad' ) self.symbol_data[s]["returns"] = self.symbol_data[s]["adj_close"].pct_change().dropna() self.symbol_data[s] = self.symbol_data[s].iterrows() # Reindex the dataframes for s in self.symbol_list: self.symbol_data[s] = self.symbol_data[s].reindex(index=comb_index, method='pad').iterrows()
_get_new_bar 方法创建了一个生成器,用于提供格式化后的K线数据。这意味着对该方法的后续调用将逐根“生成”新的K线,直至该代码对应的数据全部输出完毕:# data.py def _get_new_bar(self, symbol): """ Returns the latest bar from the data feed as a tuple of (sybmbol, datetime, open, low, high, close, volume). """ for b in self.symbol_data[symbol]: yield tuple([symbol, datetime.datetime.strptime(b[0], '%Y-%m-%d %H:%M:%S'), b[1][0], b[1][1], b[1][2], b[1][3], b[1][4]])
第一个需要实现的数据处理器抽象方法是get_latest_bars。该方法仅从 latest_symbol_data 结构中提供最近N根 K 线的列表。当设置N=1时,即可获取当前数据柱(以列表形式封装):# data.py def get_latest_bars(self, symbol, N=1): """ Returns the last N bars from the latest_symbol list, or N-k if less available. """ try: bars_list = self.latest_symbol_data[symbol] except KeyError: print "That symbol is not available in the historical data set." else: return bars_list[-N:]
最后一个方法update_bars是数据处理器中需要实现的第二个抽象方法。该方法在将最新K线数据追加到latest_symbol_data的同时,会生成一个市场事件MarketEvent并将其加入队列:
# data.py def update_bars(self): """ Pushes the latest bar to the latest_symbol_data structure for all symbols in the symbol list. """ for s in self.symbol_list: try: bar = self._get_new_bar(s).next() except StopIteration: self.continue_backtest = False else: if bar is not None: self.latest_symbol_data[s].append(bar) self.events.put(MarketEvent())
因此,我们得到了一个由DataHandler派生的对象,系统其余组件利用它来跟踪市场数据。策略Strategy、投资组合Portfolio和执行处理ExecutionHandler对象都需要当前的市场数据,因此将其集中管理可以避免存储冗余。
在下一篇文章中,我们将探讨策略类Strategy的层次结构,并描述如何设计一个能够处理多个交易代码的策略,从而为投资组合对象生成多个 信号事件SignalEvents。