本文着重於回測相關得模塊。python
因爲上一篇文章實在是寫得太爛了, 這一篇文章從新開始寫。數據結構
以官方教程示例爲例app
python -c "from pyalgotrade.tools import yahoofinance; yahoofinance.download_daily_bars('orcl', 2000, 'orcl-2000.csv')"
from pyalgotrade import strategy from pyalgotrade.barfeed import yahoofeed from pyalgotrade.technical import ma class MyStrategy(strategy.BacktestingStrategy): def __init__(self, feed, instrument, smaPeriod): super(MyStrategy, self).__init__(feed, 1000) self.__position = None self.__instrument = instrument # We'll use adjusted close values instead of regular close values. self.setUseAdjustedValues(True) self.__sma = ma.SMA(feed[instrument].getPriceDataSeries(), smaPeriod) def onEnterOk(self, position): execInfo = position.getEntryOrder().getExecutionInfo() self.info("BUY at $%.2f" % (execInfo.getPrice())) def onEnterCanceled(self, position): self.__position = None def onExitOk(self, position): execInfo = position.getExitOrder().getExecutionInfo() self.info("SELL at $%.2f" % (execInfo.getPrice())) self.__position = None def onExitCanceled(self, position): # If the exit was canceled, re-submit it. self.__position.exitMarket() def onBars(self, bars): # Wait for enough bars to be available to calculate a SMA. if self.__sma[-1] is None: return bar = bars[self.__instrument] # If a position was not opened, check if we should enter a long position. if self.__position is None: if bar.getPrice() > self.__sma[-1]: # Enter a buy market order for 10 shares. The order is good till canceled. self.__position = self.enterLong(self.__instrument, 10, True) # Check if we have to exit the position. elif bar.getPrice() < self.__sma[-1] and not self.__position.exitActive(): self.__position.exitMarket() def run_strategy(smaPeriod): # Load the yahoo feed from the CSV file feed = yahoofeed.Feed() feed.addBarsFromCSV("orcl", "orcl-2000.csv") # Evaluate the strategy with the feed. myStrategy = MyStrategy(feed, "orcl", smaPeriod) myStrategy.run() print "Final portfolio value: $%.2f" % myStrategy.getBroker().getEquity() run_strategy(15)
用於承載回測的數據,提供接口訪問,驅動整個事件循環。ide
# 導入yahoofeed模塊 from pyalgotrade.barfeed import yahoofeed # 建立yahoofeed.Feed類建立其實例 feed = yahoofeed.Feed() # 經過addBarsFromCSV加載本地csv文件 # 傳入股票代碼名, 文件路徑 feed.addBarsFromCSV("orcl", "orcl-2000.csv")
注: 由IntelliJ Idea生成函數
由上圖可知, 分別繼承不一樣的BarFeed,最終業務邏輯基類pyalgotrade.observer.subject.post
主要方法調用順序以下:ui
yahooFeed.addBarsFromCSV lua
-> csvFeed.BarFeed.addBarsFromCSV spa
-> membf.BarFeed.addBarsFromSequence 3d
-> barfeed.registerInstrument
-> feed.registerDataSeries
-> barfeed.createDataSeries
在Feed中有兩個比較重要的數據對象
pyalgotrade/pyalgotrade/dataseries/bards.py class BarDataSeries(dataseries.SequenceDataSeries): def __init__(self, maxLen=None): super(BarDataSeries, self).__init__(maxLen) self.__openDS = dataseries.SequenceDataSeries(maxLen) self.__closeDS = dataseries.SequenceDataSeries(maxLen) self.__highDS = dataseries.SequenceDataSeries(maxLen) self.__lowDS = dataseries.SequenceDataSeries(maxLen) self.__volumeDS = dataseries.SequenceDataSeries(maxLen) self.__adjCloseDS = dataseries.SequenceDataSeries(maxLen) self.__extraDS = {} self.__useAdjustedValues = False
BarDataSeries提供一系列方法返回相應的數據序列,以getOpenDataSeries爲例
pyalgotrade/pyalgotrade/dataseries/bards.py:87 def getOpenDataSeries(self): """Returns a :class:`pyalgotrade.dataseries.DataSeries` with the open prices.""" return self.__openDS
而dataseries.SequenceDataSeries對象是一個數據存儲在collections.ListDeque對象上,並集成事件監聽的類對象.
self._bars在membf.BarFeed.addBarsFromSequence方法中讀取csv文件生成.
self._ds在barfeed.createDataSeries方法中建立一個默認長度爲1024的BarDataSeries空數據對象.
bar是含有時間, 開盤價, 收盤價, 當日最高價, 當日最低價, 成交量,復權收盤價的數據對象.
self.__bars是key爲股票代碼, value是元素爲bars數據對象的列表的字典.
self.__ds是BarDataSeries對象
事件循環是PyalgoTrade的數據引擎,驅動着整個策略運轉.
下面是Pyalgotrade內部事件循環的一個簡單的實現。
# coding: utf8 import abc class Event(object): """事件類. 用於訂閱指定的操做,如函數 當事件執行emit方法的時候,遍歷訂閱了的操做,並執行該操做""" def __init__(self): # 內部handlers列表 self.__handlers = [] def subscribe(self, handler): if handler not in self.__handlers: self.__handlers.append(handler) def emit(self, *args, **kwargs): """執行全部訂閱了的操做""" for handler in self.__handlers: handler(*args, **kwargs) class Subject(object): """將元類指向abc.ABCMeta元類 1. 當抽象方法未被實現的時候,不能新建該類的實例 2. abstractmethod至關於子類要實現的接口,若是不實現,則不能新建該類的實例""" __metaclass__ = abc.ABCMeta @abc.abstractmethod def start(self): pass @abc.abstractmethod def stop(self): pass @abc.abstractmethod def dispatch(self): raise NotImplementedError() @abc.abstractmethod def eof(self): raise NotImplementedError() class Dispatcher(object): """調度類 1. 維護事件循環 2. 不斷的調度subject的disptch操做並判斷是否結束""" def __init__(self): self.__subjects = [] self.__stop = False def run(self): """運行整個事件循環並在調度以前,以後分別調用subject的start, stop方法""" try: for subject in self.__subjects: subject.start() while not self.__stop: eof, dispatched = self.dispatch() if eof: self.__stop = True finally: for subject in self.__subjects: subject.stop() def dispatch(self): ret = False eof = False for subject in self.__subjects: ret = subject.dispatch() is True eof = subject.eof() return eof, ret def addSubject(self, subject): self.__subjects.append(subject) class Broker(Subject): """Broker 類""" def dispatch(self): return None def eof(self): return None def start(self): pass def stop(self): pass class Feed(Subject): """Feed類 1. 承載數據源 2. 經過數據驅動事件循環""" def __init__(self, size): self.__data = range(size) self.__nextPos = 0 self.__event = Event() def start(self): pass def stop(self): pass def dispatch(self): value = self.__data[self.__nextPos] self.__event.emit(value) self.__nextPos += 1 return True def getNewValueEvent(self): return self.__event def eof(self): return self.__nextPos >= len(self.__data) class Strategy(object): def __init__(self, broker, feed): self.__dispatcher = Dispatcher() self.__feed = feed self.__broker = broker # 將策略的self.__onBars方法傳入Feed的self.__event裏面 # 當Feed調用dispatch方法的時候, 會指定self.__onBars函數 self.__feed.getNewValueEvent().subscribe(self.__onBars) # 注意順序,Feed對象必須在最後 self.__dispatcher.addSubject(self.__broker) self.__dispatcher.addSubject(self.__feed) def __onBars(self, value): print("dispatch before.") self.onBars(value) print("dispatch after") def onBars(self, value): print("on Bar: {}".format(value)) def run(self): self.__dispatcher.run() if __name__ == '__main__': feed = Feed(3) broker = Broker() myStrategy = Strategy(broker, feed) myStrategy.run() output: dispatch before. on Bar: 0 dispatch after dispatch before. on Bar: 1 dispatch after dispatch before. on Bar: 2 dispatch after
上面的代碼主要說明策略的onBars方法是怎麼被調用的。
關於Broker怎麼被驅動,在後面講解
- 策略中維護一個調度器dispatcher,當策略啓動的時候, 調度器dipatcher啓動, 並嘗試調用feed,broker start方法.
- 不斷調用feed, broker的dispatch方法, 判斷是否結束, 若是結束, 則作結束動做, 調用feed, broker的stop方法
- feed對象在調用dispatch方法的時候, feed對象會觸發自身維護的self._event. 而self._event在MyStrategy._init_方法中,經過self._feed.getNewValueEvent().subscribe(self._onBars)訂閱了MyStrategy._onBars方法, 因此Feed對象每次dispatch的時候,MyStrategy._onBars都會被調用.
至此, Feed對象怎麼驅動策略的邏輯已經清晰。
接下來,講解BaseStrategy, BacktestingStrategy初始化過程
策略的繼承鏈並不複雜, 全部策略的基類是BaseStartegy, BacktestingStrategy是提供給用戶使用的策略,至少實現onBars函數則能夠回測。
BaseStrategy, BacktestingStrategy的初始化源代碼以下
pyalgotrade/pyalgotrade/strategy/__init__.py class BaseStartegy(object): def __init__(self, barFeed, broker): # 綁定barFeed對象 self.__barFeed = barFeed # 綁定broker對象 self.__broker = broker # 交易相關的倉位 self.__activePositions = set() # 訂單處理順序 self.__orderToPosition = {} # bar被處理後的事件 self.__barsProcessedEvent = observer.Event() # analyzer列表 self.__analyzers = [] # 命名的analyzer列表 self.__namedAnalyzers = {} # 從新取樣的feed對象列表 self.__resampledBarFeeds = [] # 調度器對象 self.__dispatcher = dispatcher.Dispatcher() # broker的訂單被更新時的事件, 訂閱self.__onOrderEvent方法 self.__broker.getOrderUpdatedEvent().subscribe(self.__onOrderEvent) # barfeed值被更新的時候的事件(當barfeed被調度的時候),訂閱self.__onBars方法 self.__barFeed.getNewValuesEvent().subscribe(self.__onBars) # 調度器的開始事件,訂閱self.onStart方法 self.__dispatcher.getStartEvent().subscribe(self.onStart) # 調度器的空閒事件, 訂閱self.__onIdle方法 self.__dispatcher.getIdleEvent().subscribe(self.__onIdle) # 分別將繼承了Subject類的broker,barFeed對象加入到調度器的subject列表 self.__dispatcher.addSubject(self.__broker) self.__dispatcher.addSubject(self.__barFeed) # 日誌級別的初始化 self.__logger = logger.getLogger(BaseStrategy.LOGGER_NAME) class BacktestingStrategy(BaseStrategy): # 默認初始化一個持有100w現金的虛擬帳戶 def __init__(self, barFeed, cash_or_brk=1000000): # 若是沒有傳入cash_or_brk參數, 或者傳入數值類型的值 # 則傳入cash_or_brk,barFeed對象新建一個backtesting.Broker實例,並調用父類的__init__方法 # 若是傳入的cash_or_brk參數值是backtesting.Broker的實例, 則直接使用 if isinstance(cash_or_brk, pyalgotrade.broker.Broker): broker = cash_or_brk else: broker = backtesting.Broker(cash_or_brk, barFeed) BaseStrategy.__init__(self, barFeed, broker) # 默認self.__useAdjustedValue=False self.__useAdjustedValues = False # 配置日誌參數 self.setUseEventDateTimeInLogs(True) self.setDebugMode(True)
總的來講真正Strategy對象,barFeed對象,broker對象訂閱了更多的事件, 以及更多的判斷。但,內核都是調度器驅動着barFeed, broker對象不斷的被調度(調用dispatch方法), 而barFeed對象會不斷的從self._bars中取數據追加到self._ds對象中,並將取出來的數據提交的self._event中,而self._event訂閱了Strategy.__onBars方法, 因此不斷的驅動着Strategy的自定義策略(onBars裏面定義的交易邏輯).
在Strategy對象初始化時候, 會初始化一個虛擬的回測帳戶.
回測帳戶broker須要傳入barfeed對象, 並在barfeed的event對象裏面訂閱本身的onBars函數,源碼以下:
pyalgotrade/pyalgotrade/broker/__init__.py class Broker(broker.Broker): LOGGER_NAME = "broker.backtesting" def __init__(self, cash, barFeed, commission=None): super(Broker, self).__init__() assert(cash >= 0) self.__cash = cash if commission is None: self.__commission = NoCommission() else: self.__commission = commission self.__shares = {} self.__activeOrders = {} self.__useAdjustedValues = False # 持倉策略, 使用DefaultStrategy # 使用DefaultStrategy.volumeLimit = 0.25 # 當交易訂單的成交量大於當前bar的成交量的25%則不能成交 # 沒有滑點 # 沒有手續費 self.__fillStrategy = fillstrategy.DefaultStrategy() self.__logger = logger.getLogger(Broker.LOGGER_NAME) # 讓barfeed對象訂閱self.onBars方法 barFeed.getNewValuesEvent().subscribe(self.onBars) self.__barFeed = barFeed self.__allowNegativeCash = False self.__nextOrderId = 1
由上可知,當barFeed對象數據更新的時候,還會調用BackTestBroker.onBars方法.
當使用enterLong之類交易方法,則會返回一個Postion的對象,這個對象承載着當前各股的持倉比例,以及持有現金.
以enterLong方法說明持倉流程.
以exitMarket方法說明平倉流程.
源代碼調用鏈太長....因此文字歸納.
當咱們買入或者賣出的時候,實際上是提交一個訂單給交易帳戶(broker), 交易帳戶會根據交易訂單的類型,動做等相關信息執行相關的操做.
交易訂單的類型參考: https://www.thebalance.com/understanding-stock-orders-3141318
通常有買入(作多), 賣出(作空)兩種交易類型, 可是這兩種類型成交的方式分別由市價成交, 限價成交.
因此一共由如下四種類型,對應Strategy的四個方法:
以enter開頭是更加上層的方法, 建議使用.
goodTillCanceled爲了適配實盤接口, 實盤接口可能有前一天的訂單不會再執行的限制,因此設置goodTillCanceled=True保證次日或者更後的時間,訂單依然有效,直至手動取消.
除了提交交易訂單還能夠提交止損訂單, 分別對應Strategy的兩個方法.
每一個提交的訂單會到下一個事件循環纔會判斷條件是否符合,纔會執行.
經過藉助自定義指標或者自帶的指標,如SMA,EMA,MACD等能夠更全面的看待股票的走勢以及信號.
下面是技術指標基類的初始化過程.
pyalgotrade/pyalgotrade/technical/__init__.py class EventWindow(object): """數據實際承載類 數據保存在self__values裏面 """ def __init__(self, windowSize, dtype=float, skipNone=True): assert(windowSize > 0) assert(isinstance(windowSize, int)) self.__values = collections.NumPyDeque(windowSize, dtype) self.__windowSize = windowSize self.__skipNone = skipNone def onNewValue(self, dateTime, value): """提供onNewValue方法將新的值傳入""" if value is not None or not self.__skipNone: self.__values.append(value) def getValues(self): """獲取EventWindows的全部值""" return self.__values.data() def getWindowSize(self): """獲取EventWindow Size""" return self.__windowSize def windowFull(self): """eventWindow是否已經填滿""" return len(self.__values) == self.__windowSize def getValue(self): """子類須實現的類""" raise NotImplementedError() class EventBasedFilter(dataseries.SequenceDataSeries): def __init__(self, dataSeries, eventWindow, maxLen=None): super(EventBasedFilter, self).__init__(maxLen) self.__dataSeries = dataSeries # 當dataseries數據有新值的時候,調用self.__onNewValues方法 self.__dataSeries.getNewValueEvent().subscribe(self.__onNewValue) self.__eventWindow = eventWindow def __onNewValue(self, dataSeries, dateTime, value): # 讓EventWindow對象計算新值 self.__eventWindow.onNewValue(dateTime, value) # 獲取計算後的結果 newValue = self.__eventWindow.getValue() # 將值保存到自身實例裏面, 即self.__values # 由於繼承了dataseries.SequenceDataSeries類 # 而dataseries.SequenceDataSeries父類實現了__getitem__方法, 因此可使用索引取值. self.appendWithDateTime(dateTime, newValue) def getDataSeries(self): return self.__dataSeries def getEventWindow(self): return self.__eventWindow
在Feed對象初始過程當中,會初始化兩個比較重要的數據結構, 一個是self._bars, 一個是self._ds,在整個事件驅動中, 策略不停的從self_bars中取數據,而後使用appendWithDateTime方法將數據追加的self._ds裏面。
源碼以下:
pyalgotrade/pyalgotrade/dataseries/bards.py # 首先調用BarDataSeries的appendWithDateTime方法 class BarDataSeries(dataseries.SequenceDataSeries): def appendWithDateTime(self, dateTime, bar): assert(dateTime is not None) assert(bar is not None) bar.setUseAdjustedValue(self.__useAdjustedValues) super(BarDataSeries, self).appendWithDateTime(dateTime, bar) self.__openDS.appendWithDateTime(dateTime, bar.getOpen()) self.__closeDS.appendWithDateTime(dateTime, bar.getClose()) self.__highDS.appendWithDateTime(dateTime, bar.getHigh()) self.__lowDS.appendWithDateTime(dateTime, bar.getLow()) self.__volumeDS.appendWithDateTime(dateTime, bar.getVolume()) self.__adjCloseDS.appendWithDateTime(dateTime, bar.getAdjClose()) # Process extra columns. for name, value in bar.getExtraColumns().iteritems(): extraDS = self.__getOrCreateExtraDS(name) extraDS.appendWithDateTime(dateTime, value) pyalgotrade/dataseries/__init__.py # 而後調用SequenceDataSeries對象的appendWithDateTime # 在這個方法中提交數據更新的事件 class SequenceDataSeries(DataSeries): def appendWithDateTime(self, dateTime, value): """ Appends a value with an associated datetime. .. note:: If dateTime is not None, it must be greater than the last one. """ if dateTime is not None and len(self.__dateTimes) != 0 and self.__dateTimes[-1] >= dateTime: raise Exception("Invalid datetime. It must be bigger than that last one") assert(len(self.__values) == len(self.__dateTimes)) self.__dateTimes.append(dateTime) self.__values.append(value) self.getNewValueEvent().emit(self, dateTime, value)
使用技術指標須要傳入dataSeries對象, 能夠經過getPriceDataSeries, getOpenDataSeries等得到.
因爲上面已經有完整版本的代碼,這裏作必定的刪減, 並作註解.
# 集成strategy.BacktestingStrategy類 class MyStrategy(strategy.BacktestingStrategy): def __init__(self, feed, instrument, smaPeriod): # 調用父類__init__方法 super(MyStrategy, self).__init__(feed, 1000) # 初始狀況下,postion設置爲零, postion通常只持倉比例 self.__position = None # 股票代碼 self.__instrument = instrument # We'll use adjusted close values instead of regular close values. # 是否使用復權收盤價 self.setUseAdjustedValues(True) # 初始化策略指標 self.__sma = ma.SMA(feed[instrument].getPriceDataSeries(), smaPeriod) # 省略其餘鉤子函數 # 必須實現的onBars函數,用於買賣的主要邏輯 def onBars(self, bars): # 若是沒有簡單移動平均值則什麼都不作 if self.__sma[-1] is None: return # 取出指定股票代碼的bar對象 bar = bars[self.__instrument] # 若是postion is None,即持倉爲0 if self.__position is None: # 若是收盤價大於簡單移動平均值則買入 if bar.getPrice() > self.__sma[-1]: # 買入,enterLong=作多 self.__position = self.enterLong(self.__instrument, 10, True) # 反之賣出 elif bar.getPrice() < self.__sma[-1] and not self.__position.exitActive(): self.__position.exitMarket()
BarFeed像是PyalgoTrade中的燃料,不斷的供給給策略的Dispatcher調度器, 使整個策略不斷運行,直至沒有燃料(沒有新的數據.)
BarFeed使數據源的一個抽象,裏面保存着兩個重要的數據結構, self._bars, self._ds.
self.__bars是key爲股票代碼, value是元素爲bar數據對象的列表的字典.
self.__ds爲BarDataSeries對象.
Broker維護着虛擬帳戶裏面的現金以及相關股票的倉位.接收訂單並實時的處理訂單, 計算收益等.
Position爲股票倉位持有狀況的對象, 提供交易的相關接口.
EventBasedFilter爲技術指標, 能夠計算相關指標如MACD, SMA等, 也能夠自定義本身的技術指標.
Strategy爲自定義策略,只需實現onBars函數便可完成買賣邏輯, 將Broker,Position相關接口放在Strategy實例方法裏面, 同一調用接口.