經過純Python完成股票回測框架的搭建。git
不管是傳統股票交易仍是量化交易,沒法避免的一個問題是咱們須要檢驗本身的交易策略是否可行,而最簡單的方式就是利用歷史數據檢驗交易策略,而回測框架就是提供這樣的一個平臺讓交易策略在歷史數據中不斷交易,最終生成最終結果,經過查看結果的策略收益,年化收益,最大回測等用以評估交易策略的可行性。github
代碼地址在最後。json
本項目並非一個已完善的項目, 還在不斷的完善。數據結構
回測框架應該至少包含兩個部分, 回測類, 交易類.
回測類提供各類鉤子函數,用於放置本身的交易邏輯,交易類用於模擬市場的交易平臺,這個類提供買入,賣出的方法。架構
以本身的回測框架爲例。主要包含下面兩個文件app
backtest/ backtest.py broker.py
backtest.py主要提供BackTest這個類用於提供回測框架,暴露如下鉤子函數.框架
def initialize(self): """在回測開始前的初始化""" pass def before_on_tick(self, tick): pass def after_on_tick(self, tick): pass def before_trade(self, order): """在交易以前會調用此函數 能夠在此放置資金管理及風險管理的代碼 若是返回True就容許交易,不然放棄交易 """ return True def on_order_ok(self, order): """當訂單執行成功後調用""" pass def on_order_timeout(self, order): """當訂單超時後調用""" pass def finish(self): """在回測結束後調用""" pass @abstractmethod def on_tick(self, bar): """ 回測實例必須實現的方法,並編寫本身的交易邏輯 """ pass
玩過量化平臺的回測框架或者開源框架應該對這些鉤子函數不陌生,只是名字不同而已,大多數功能是一致的,除了on_tick.ide
之因此是on_tick而不是on_bar, 是由於我但願交易邏輯是一個一個時間點的參與交易,在這個時間點我能夠獲取全部當前時間的全部股票以及以前的股票數據,用於判斷是否交易,而不是一個時間點的一個一個股票參與交易邏輯。函數
而broker.py主要提供buy,sell兩個方法用於交易。post
def buy(self, code, price, shares, ttl=-1): """ 限價提交買入訂單 --------- Parameters: code:str 股票代碼 price:float or None 最高可買入的價格, 若是爲None則按市價買入 shares:int 買入股票數量 ttl:int 訂單容許存在的最大時間,默認爲-1,永不超時 --------- return: dict { "type": 訂單類型, "buy", "code": 股票代碼, "date": 提交日期, "ttl": 存活時間, 當ttl等於0時則超時,日後不會在執行 "shares": 目標股份數量, "price": 目標價格, "deal_lst": 交易成功的歷史數據,如 [{"price": 成交價格, "date": 成交時間, "commission": 交易手續費, "shares": 成交份額 }] "" } """ if price is None: stock_info = self.ctx.tick_data[code] price = stock_info[self.deal_price] order = { "type": "buy", "code": code, "date": self.ctx.now, "ttl": ttl, "shares": shares, "price": price, "deal_lst": [] } self.submit(order) return order def sell(self, code, price, shares, ttl=-1): """ 限價提交賣出訂單 --------- Parameters: code:str 股票代碼 price:float or None 最低可賣出的價格, 若是爲None則按市價賣出 shares:int 賣出股票數量 ttl:int 訂單容許存在的最大時間,默認爲-1,永不超時 --------- return: dict { "type": 訂單類型, "sell", "code": 股票代碼, "date": 提交日期, "ttl": 存活時間, 當ttl等於0時則超時,日後不會在執行 "shares": 目標股份數量, "price": 目標價格, "deal_lst": 交易成功的歷史數據,如 [{"open_price": 開倉價格, "close_price": 成交價格, "close_date": 成交時間, "open_date": 持倉時間, "commission": 交易手續費, "shares": 成交份額, "profit": 交易收益}] "" } """ if code not in self.position: return if price is None: stock_info = self.ctx.tick_data[code] price = stock_info[self.deal_price] order = { "type": "sell", "code": code, "date": self.ctx.now, "ttl": ttl, "shares": shares, "price": price, "deal_lst": [] } self.submit(order) return order
因爲我很討厭抽象出太多類,抽象出太多類及方法,我怕我本身都忘記了,因此對於對象的選擇都是儘量的使用經常使用的數據結構,如list, dict.
這裏用一個dict表明一個訂單。
上面的這些方法保證了一個回測框架的基本交易邏輯,而回測的運行還須要一個調度器不斷的驅動這些方法,這裏的調度器以下。
class Scheduler(object): """ 整個回測過程當中的調度中心, 經過一個個時間刻度(tick)來驅動回測邏輯 全部被調度的對象都會綁定一個叫作ctx的Context對象,因爲共享整個回測過程當中的全部關鍵數據, 可用變量包括: ctx.feed: {code1: pd.DataFrame, code2: pd.DataFrame}對象 ctx.now: 循環所處時間 ctx.tick_data: 循環所處時間的全部有報價的股票報價 ctx.trade_cal: 交易日曆 ctx.broker: Broker對象 ctx.bt/ctx.backtest: Backtest對象 可用方法: ctx.get_hist """ def __init__(self): """""" self.ctx = Context() self._pre_hook_lst = [] self._post_hook_lst = [] self._runner_lst = [] def run(self): # runner指存在可調用的initialize, finish, run(tick)的對象 runner_lst = list(chain(self._pre_hook_lst, self._runner_lst, self._post_hook_lst)) # 循環開始前爲broker, backtest, hook等實例綁定ctx對象及調用其initialize方法 for runner in runner_lst: runner.ctx = self.ctx runner.initialize() # 建立交易日曆 if "trade_cal" not in self.ctx: df = list(self.ctx.feed.values())[0] self.ctx["trade_cal"] = df.index # 經過遍歷交易日曆的時間依次調用runner # 首先調用全部pre-hook的run方法 # 而後調用broker,backtest的run方法 # 最後調用post-hook的run方法 for tick in self.ctx.trade_cal: self.ctx.set_currnet_time(tick) for runner in runner_lst: runner.run(tick) # 循環結束後調用全部runner對象的finish方法 for runner in runner_lst: runner.finish()
在Backtest類實例化的時候就會自動建立一個調度器對象,而後經過Backtest實例的start方法就能啓動調度器,而調度器會根據歷史數據的一個一個時間戳不斷驅動Backtest, Broker實例被調用。
爲了處理不一樣實例之間的數據訪問隔離,因此經過一個將一個Context對象綁定到Backtest, Broker實例上,經過self.ctx訪問共享的數據,共享的數據主要包括feed對象,即歷史數據,一個數據結構以下的字典對象。
{code1: pd.DataFrame, code2: pd.DataFrame}
而這個Context對象也綁定了Broker, Backtest的實例, 這就可使得數據訪問接口統一,可是可能致使數據訪問混亂,這就要看策略者的使用了,這樣的一個好處就是減小了一堆代理方法,經過添加方法去訪問其餘的對象的方法,真不嫌麻煩,那些人。
綁定及Context對象代碼以下:
class Context(UserDict): def __getattr__(self, key): # 讓調用這能夠經過索引或者屬性引用皆可 return self[key] def set_currnet_time(self, tick): self["now"] = tick tick_data = {} # 獲取當前全部有報價的股票報價 for code, hist in self["feed"].items(): df = hist[hist.index == tick] if len(df) == 1: tick_data[code] = df.iloc[-1] self["tick_data"] = tick_data def get_hist(self, code=None): """若是不指定code, 獲取截至到當前時間的全部股票的歷史數據""" if code is None: hist = {} for code, hist in self["feed"].items(): hist[code] = hist[hist.index <= self.now] elif code in self.feed: return {code: self.feed[code]} return hist class Scheduler(object): """ 整個回測過程當中的調度中心, 經過一個個時間刻度(tick)來驅動回測邏輯 全部被調度的對象都會綁定一個叫作ctx的Context對象,因爲共享整個回測過程當中的全部關鍵數據, 可用變量包括: ctx.feed: {code1: pd.DataFrame, code2: pd.DataFrame}對象 ctx.now: 循環所處時間 ctx.tick_data: 循環所處時間的全部有報價的股票報價 ctx.trade_cal: 交易日曆 ctx.broker: Broker對象 ctx.bt/ctx.backtest: Backtest對象 可用方法: ctx.get_hist """ def __init__(self): """""" self.ctx = Context() self._pre_hook_lst = [] self._post_hook_lst = [] self._runner_lst = [] def add_feed(self, feed): self.ctx["feed"] = feed def add_hook(self, hook, typ="post"): if typ == "post" and hook not in self._post_hook_lst: self._post_hook_lst.append(hook) elif typ == "pre" and hook not in self._pre_hook_lst: self._pre_hook_lst.append(hook) def add_broker(self, broker): self.ctx["broker"] = broker def add_backtest(self, backtest): self.ctx["backtest"] = backtest # 簡寫 self.ctx["bt"] = backtest def add_runner(self, runner): if runner in self._runner_lst: return self._runner_lst.append(runner)
爲了使得整個框架可擴展,回測框架中框架中抽象了一個Hook類,這個類能夠在在每次回測框架調用前或者調用後被調用,這樣就能夠加入一些處理邏輯,好比統計資產變化等。
這裏建立了一個Stat的Hook對象,用於統計資產變化。
class Stat(Base): def __init__(self): self._date_hist = [] self._cash_hist = [] self._stk_val_hist = [] self._ast_val_hist = [] self._returns_hist = [] def run(self, tick): self._date_hist.append(tick) self._cash_hist.append(self.ctx.broker.cash) self._stk_val_hist.append(self.ctx.broker.stock_value) self._ast_val_hist.append(self.ctx.broker.assets_value) @property def data(self): df = pd.DataFrame({"cash": self._cash_hist, "stock_value": self._stk_val_hist, "assets_value": self._ast_val_hist}, index=self._date_hist) df.index.name = "date" return df
而經過這些統計的數據就能夠計算最大回撤年化率等。
def get_dropdown(self): high_val = -1 low_val = None high_index = 0 low_index = 0 dropdown_lst = [] dropdown_index_lst = [] for idx, val in enumerate(self._ast_val_hist): if val >= high_val: if high_val == low_val or high_index >= low_index: high_val = low_val = val high_index = low_index = idx continue dropdown = (high_val - low_val) / high_val dropdown_lst.append(dropdown) dropdown_index_lst.append((high_index, low_index)) high_val = low_val = val high_index = low_index = idx if low_val is None: low_val = val low_index = idx if val < low_val: low_val = val low_index = idx if low_index > high_index: dropdown = (high_val - low_val) / high_val dropdown_lst.append(dropdown) dropdown_index_lst.append((high_index, low_index)) return dropdown_lst, dropdown_index_lst @property def max_dropdown(self): """最大回車率""" dropdown_lst, dropdown_index_lst = self.get_dropdown() if len(dropdown_lst) > 0: return max(dropdown_lst) else: return 0 @property def annual_return(self): """ 年化收益率 y = (v/c)^(D/T) - 1 v: 最終價值 c: 初始價值 D: 有效投資時間(365) 注: 雖然投資股票只有250天,可是持有股票後的非交易日也沒辦法投資到其餘地方,因此這裏我取365 參考: https://wiki.mbalib.com/zh-tw/%E5%B9%B4%E5%8C%96%E6%94%B6%E7%9B%8A%E7%8E%87 """ D = 365 c = self._ast_val_hist[0] v = self._ast_val_hist[-1] days = (self._date_hist[-1] - self._date_hist[0]).days ret = (v / c) ** (D / days) - 1 return ret
至此一個筆者須要的回測框架造成了。
在回測框架中我並無集成各類獲取數據的方法,由於這並非回測框架必須集成的部分,規定數據結構就能夠了,數據的獲取經過查看數據篇,
回測報告我也放在了回測框架以外,這裏寫了一個Plottter的對象用於繪製一些回測指標等。結果以下:
下面是一個回測示例。
import json from backtest import BackTest from reporter import Plotter class MyBackTest(BackTest): def initialize(self): self.info("initialize") def finish(self): self.info("finish") def on_tick(self, tick): tick_data = self.ctx["tick_data"] for code, hist in tick_data.items(): if hist["ma10"] > 1.05 * hist["ma20"]: self.ctx.broker.buy(code, hist.close, 500, ttl=5) if hist["ma10"] < hist["ma20"] and code in self.ctx.broker.position: self.ctx.broker.sell(code, hist.close, 200, ttl=1) if __name__ == '__main__': from utils import load_hist feed = {} for code, hist in load_hist("000002.SZ"): # hist = hist.iloc[:100] hist["ma10"] = hist.close.rolling(10).mean() hist["ma20"] = hist.close.rolling(20).mean() feed[code] = hist mytest = MyBackTest(feed) mytest.start() order_lst = mytest.ctx.broker.order_hist_lst with open("report/order_hist.json", "w") as wf: json.dump(order_lst, wf, indent=4, default=str) stats = mytest.stat stats.data.to_csv("report/stat.csv") print("策略收益: {:.3f}%".format(stats.total_returns * 100)) print("最大回徹率: {:.3f}% ".format(stats.max_dropdown * 100)) print("年化收益: {:.3f}% ".format(stats.annual_return * 100)) print("夏普比率: {:.3f} ".format(stats.sharpe)) plotter = Plotter(feed, stats, order_lst) plotter.report("report/report.png")