【前言】在搞定交易接口後,咱們開發交易系統的第一步就是要弄清楚系統的工做原理。本文是我看的公司的中頻平臺文檔的總結,公司是基於開源的vn.py修改而來,可是驅動引擎是同樣的。git
會參考vn.py官方文檔,公司是參考其修改的。但在正在研發的高頻交易平臺上是根據CTP接口接發數據的格式,合併成單線程,配合solarflare的網卡,速度不吃虧。本文以及下面的三篇文章主要是現有的中頻平臺的,固然高頻相似於次,增長了共享內存等。github
全部的計算機程序均可以大體分爲兩類:腳本型(單次運行)和連續運行型(直到用戶主動退出)。算法
腳本型的程序包括最先的批處理文件以及使用Python作交易策略回測等等,這類程序的特色是在用戶啓動後會按照編程時設計好的步驟一步步運行,全部步驟運行完後自動退出。數據庫
連續運行型的程序包含了操做系統和絕大部分咱們平常使用的軟件等等,這類程序啓動後會處於一個無限循環中連續運行,直到用戶主動退出時纔會結束。編程
咱們要開發的交易系統就是屬於連續運行型程序,而這種程序根據其計算邏輯的運行機制不一樣,又能夠粗略的分爲時間驅動和事件驅動兩種。併發
時間驅動的程序邏輯相對容易設計,簡單來講就是讓電腦每隔一段時間自動作一些事情。這個事情自己能夠很複雜、包括不少步驟,但這些步驟都是線性的,按照順序一步步執行下來。app
如下代碼展現了一個很是簡單的時間驅動的Python程序。框架
from time import sleep def demo(): print u'時間驅動的程序每隔1秒運行demo函數' while 1: demo() sleep(1.0)
時間驅動的程序本質上就是每隔一段時間固定運行一次腳本(上面代碼中的demo函數)。儘管腳本自身能夠很長、包含很是多的步驟,可是咱們能夠看出這種程序的運行機制相對比較簡單、容易理解。異步
舉一些量化交易相關的例子:函數
每隔5分鐘,經過新浪財經網頁的公開API讀取一次滬深300成分股的價格,根據當日漲幅進行排序後輸出到電腦屏幕上。
每隔1秒鐘,檢查一次最新收到的股指期貨TICK數據,更新K線和其餘技術指標,檢查是否知足趨勢策略的下單條件,若知足則執行下單。
對速度要求較高的量化交易方面(日內CTA策略、高頻策略等等),時間驅動的程序會存在一個很是大的缺點:對數據信息在反應操做上的處理延時。例子中,在每次邏輯腳本運行完等待的那1秒鐘裏,程序對於接收到的新數據信息(行情、成交推送等等)是不會作出任何反應的,只有在等待時間結束後腳本再次運行時纔會進行相關的計算處理。而處理延時在量化交易中的直接後果就是:市價單滑點、限價單錯過本可成交的價格。
時間驅動的程序在量化交易方面還存在一些其餘的缺點:如浪費CPU的計算資源、實現異步邏輯複雜度高等等。
與時間驅動對應的就是事件驅動的程序:當某個新的事件被推送到程序中時(如API推送新的行情、成交),程序當即調用和這個事件相對應的處理函數進行相關的操做。
上面例子的事件驅動版:交易程序對股指TICK數據進行監聽,當沒有新的行情過來時,程序保持監聽狀態不進行任何操做;當收到新的數據時,數據處理函數當即更新K線和其餘技術指標,並檢查是否知足趨勢策略的下單條件執行下單。
對於簡單的程序,咱們能夠採用上面測試代碼中的方案,直接在API的回調函數中寫入相應的邏輯。但隨着程序複雜度的增長,這種方案會變得愈來愈不可行。假設咱們有一個帶有圖形界面的量化交易系統,系統在某一時刻接收到了API推送的股指期貨行情數據,針對這個數據系統須要進行以下處理:
更新圖表上顯示的K線圖形(繪圖)
更新行情監控表中股指期貨的行情數據(表格更新)
策略1須要運行一次內部算法,檢查該數據是否會觸發策略進行下單(運算、下單)
策略2一樣須要運行一次內部算法,檢查該數據是否會觸發策略進行下單(運算、下單)
風控系統須要檢查最新行情價格是否會致使帳戶的總體風險超限,若超限須要進行報警(運算、報警)
此時將上面全部的操做都寫到一個回調函數中無疑變成了很是差的方案,代碼過長容易出錯不說,可擴展性也差,每添加一個策略或者功能則又須要修改以前的源代碼(有經驗的讀者會知道,常常修改生產代碼是一種很是危險的運營管理方法)。
小結:雖然咱們的交易平臺上沒有圖形界面,由於這只是一種輔助功能,不是生產的核心功能。可是,也有可能有其餘的信號或者事件須要咱們處理,那麼什麼時候處理?分配多少資源處理?因此應該下降耦合,爲了解決這種狀況,咱們須要用到事件驅動引擎來管理不一樣事件的事件監聽函數並執行全部和事件驅動相關的操做。
vn.py框架中的vn.event模塊包含了一個可擴展的事件驅動引擎。整個引擎的實現並不複雜,除去註釋、空行後大概也就100行左右的代碼:
# encoding: UTF-8 # 系統模塊 from Queue import Queue, Empty from threading import Thread # 第三方模塊 from PyQt4.QtCore import QTimer # 本身開發的模塊 from eventType import * ######################################################################## class EventEngine: """ 事件驅動引擎 事件驅動引擎中全部的變量都設置爲了私有,這是爲了防止不當心 從外部修改了這些變量的值或狀態,致使bug。 變量說明 __queue:私有變量,事件隊列 __active:私有變量,事件引擎開關 __thread:私有變量,事件處理線程 __timer:私有變量,計時器 __handlers:私有變量,事件處理函數字典 方法說明 __run: 私有方法,事件處理線程連續運行用 __process: 私有方法,處理事件,調用註冊在引擎中的監聽函數 __onTimer:私有方法,計時器固定事件間隔觸發後,向事件隊列中存入計時器事件 start: 公共方法,啓動引擎 stop:公共方法,中止引擎 register:公共方法,向引擎中註冊監聽函數 unregister:公共方法,向引擎中註銷監聽函數 put:公共方法,向事件隊列中存入新的事件 事件監聽函數必須定義爲輸入參數僅爲一個event對象,即: 函數 def func(event) ... 對象方法 def method(self, event) ... """ #---------------------------------------------------------------------- def __init__(self): """初始化事件引擎""" # 事件隊列 self.__queue = Queue() # 事件引擎開關 self.__active = False # 事件處理線程 self.__thread = Thread(target = self.__run) # 計時器,用於觸發計時器事件 self.__timer = QTimer() self.__timer.timeout.connect(self.__onTimer) # 這裏的__handlers是一個字典,用來保存對應的事件調用關係 # 其中每一個鍵對應的值是一個列表,列表中保存了對該事件進行監聽的函數功能 self.__handlers = {} #---------------------------------------------------------------------- def __run(self): """引擎運行""" while self.__active == True: try: event = self.__queue.get(block = True, timeout = 1) # 獲取事件的阻塞時間設爲1秒 self.__process(event) except Empty: pass #---------------------------------------------------------------------- def __process(self, event): """處理事件""" # 檢查是否存在對該事件進行監聽的處理函數 if event.type_ in self.__handlers: # 若存在,則按順序將事件傳遞給處理函數執行 [handler(event) for handler in self.__handlers[event.type_]] # 以上語句爲Python列表解析方式的寫法,對應的常規循環寫法爲: #for handler in self.__handlers[event.type_]: #handler(event) #---------------------------------------------------------------------- def __onTimer(self): """向事件隊列中存入計時器事件""" # 建立計時器事件 event = Event(type_=EVENT_TIMER) # 向隊列中存入計時器事件 self.put(event) #---------------------------------------------------------------------- def start(self): """引擎啓動""" # 將引擎設爲啓動 self.__active = True # 啓動事件處理線程 self.__thread.start() # 啓動計時器,計時器事件間隔默認設定爲1秒 self.__timer.start(1000) #---------------------------------------------------------------------- def stop(self): """中止引擎""" # 將引擎設爲中止 self.__active = False # 中止計時器 self.__timer.stop() # 等待事件處理線程退出 self.__thread.join() #---------------------------------------------------------------------- def register(self, type_, handler): """註冊事件處理函數監聽""" # 嘗試獲取該事件類型對應的處理函數列表,若無則建立 try: handlerList = self.__handlers[type_] except KeyError: handlerList = [] self.__handlers[type_] = handlerList # 若要註冊的處理器不在該事件的處理器列表中,則註冊該事件 if handler not in handlerList: handlerList.append(handler) #---------------------------------------------------------------------- def unregister(self, type_, handler): """註銷事件處理函數監聽""" # 嘗試獲取該事件類型對應的處理函數列表,若無則忽略該次註銷請求 try: handlerList = self.handlers[type_] # 若是該函數存在於列表中,則移除 if handler in handlerList: handlerList.remove(handler) # 若是函數列表爲空,則從引擎中移除該事件類型 if not handlerList: del self.handlers[type_] except KeyError: pass #---------------------------------------------------------------------- def put(self, event): """向事件隊列中存入事件""" self.__queue.put(event)
當事件驅動引擎對象被建立時,初始化函數__init__會建立如下私有變量:
__queue:用來保存事件的隊列
__active:用來控制引擎啓動、中止的開關
__thread:負責處理事件、執行具體操做的線程
__timer:用來每隔一段時間觸發定時事件的計時器
__handlers:用來保存不一樣類型事件所對應的事件處理函數的字典
引擎提供了register方法,用來向引擎註冊事件處理函數的監聽,傳入參數爲
type_:表示事件類型的常量字符串,由用戶自行定義,注意不一樣事件類型間不能重複
handler:當該類型的事件被觸發時,用戶但願進行相應操做的事件處理函數,函數的定義方法參考代碼中的註釋
當用戶調用register方法註冊事件處理函數時,引擎會嘗試獲取__handlers字典中該事件類型所對應的處理函數列表(若無則建立一個空列表),並向這個列表中添加該事件處理函數。使用了Python的列表對象,用戶能夠很容易的控制同一個事件類型下多個事件處理函數的工做順序,所以對某些涉及多步操做的複雜算法能夠保證按照正確的順序執行,這點是相比於某些系統0消息機制(如Qt的Signal/Slot)最大的優點。
如當標的物行情發生變化時,期權高頻套利算法須要執行如下操做:
使用訂價引擎先計算新的期權理論價、希臘值
使用風控引擎對當前持倉的風險度彙總,並計算報價的中間價
使用套利引擎基於預先設定的價差、下單手數等參數,計算具體價格併發單
以上三步操做,只需在交易系統啓動時按順序註冊監聽到標的物行情事件上,就能夠保證操做順序的正確。
和register對應的是unregister方法,用於註銷事件處理函數的監聽,傳入參數相同,具體原理請參照源代碼。在實際應用中,用戶能夠動態的組合使用register和unregister方法,只在須要監聽某些事件的時候監聽,完成後取消監聽,從而節省CPU資源。
這裏讓筆者吐槽一下某些國內的C++平臺(固然不是指全部的),每一個策略對系統裏全部的訂單回報進行監聽,若是是自身相關的就處理,不相關的就PASS。這種寫法,光是判斷是否和自身相關就得多作多少無謂的判斷、浪費多少CPU資源,隨着策略數量的增長,浪費呈線性增長的趨勢,這種平臺還叫囂作高頻,唉......
用戶能夠經過引擎的put方法向事件隊列__queue中存入事件,等待事件處理線程來進行處理,事件類的實現以下:
######################################################################## class Event: """事件對象""" #---------------------------------------------------------------------- def __init__(self, type_=None): """Constructor""" self.type_ = type_ # 事件類型 self.dict_ = {} # 字典用於保存具體的事件數據
對象建立時用戶能夠選擇傳入事件類型字符串type_做爲參數。dict_字典用於保存具體事件相關的數據信息,以供事件處理函數進行操做。
事件引擎的事件處理線程__thread中執行連續運行工做的函數爲__run:當事件引擎的開關__active沒有被關閉時,引擎嘗試從事件隊列中讀取最新的事件,若讀取成功則當即調用__process函數處理該事件,若沒法讀取(隊列爲空)則進入阻塞狀態節省CPU資源,當阻塞時間(默認爲1秒)結束時再次進入以上循環。
__process函數工做時,首先檢查事件對象的事件類型在__handlers字典中是否存在,若存在(說明有事件處理函數在監聽該事件)則按照註冊順序調用監聽函數列表中的事件處理函數進行相關操做。
事件引擎中的__timer是一個PyQt中的QTimer對象,提供的功能很是簡單:每隔一段時間(由用戶設定)自動運行函數__onTimer。__onTimer函數會建立一個類型爲EVENT_TIMER(在eventType.py文件中定義)的事件對象,並調用引擎的put方法存入到事件隊列中。
敏感的讀者可能已經意識到了,這個計時器本質上是一個由時間驅動的功能。儘管咱們在前文中提到了事件驅動在量化交易平臺開發中的重要性,但不能否認某些交易功能的實現必須基於時間驅動,例如:下單後若2秒不成交則當即撤單、每隔5分鐘將當日的成交記錄保存到數據庫中等。這類功能在實現時就能夠選擇使用事件處理函數對EVENT_TIMER類型的計時器事件進行監聽(參考下一章節「事件驅動引擎使用」中的示例)。
用戶能夠經過start和stop兩個方法來啓動和中止事件驅動引擎,原理很簡單讀者能夠直接參考源代碼。
當啓動計時器時,事件間隔默認設定爲了1秒(1000毫秒),這個參數用戶能夠視乎本身的需求進行調整。假設用戶使用時間驅動的函數工做間隔爲分鐘級,則能夠選擇將參數設置爲60秒(600000毫秒),以此類推。
一樣在eventEngine.py中,包含了一段測試代碼test函數,用來展現事件驅動引擎的使用方法:
#---------------------------------------------------------------------- def test(): """測試函數""" import sys from datetime import datetime from PyQt4.QtCore import QCoreApplication def simpletest(event): print u'處理每秒觸發的計時器事件:%s' % str(datetime.now()) app = QCoreApplication(sys.argv) ee = EventEngine() ee.register(EVENT_TIMER, simpletest) ee.start() app.exec_() # 直接運行腳本能夠進行測試 if __name__ == '__main__': test()
test函數總體上包含了這幾步:
導入相關的包(sys、datetime、PyQt4),注意因爲EventEngine的實現中使用了PyQt4的QTimer類,所以整個程序的運行必須包含在Qt事件循環中,即便用QCoreApplication(或者PyQt4.QtGui中的QApplication)的exec_()方法在程序主線程中啓動事件循環。
定義一個簡單的函數simpletest,該函數包含一個輸入參數event對象,函數被調用後會打印一段字符以及當前的時間
建立QCoreApplication對象app
建立事件驅動引擎EventEngine對象ee
向引擎中註冊simpletest函數對定時器事件EVENT_TIMER的監聽
啓動事件驅動引擎
啓動Qt事件循環
總體上看,當用戶開發本身的程序時,須要修改的只是第2步和第5步:建立本身的事件處理函數並將這些函數註冊到相應的事件類型上進行監聽。
有了API接口和事件驅動引擎,接下來咱們能夠開始開發本身的平臺了,後面的幾篇文章將會一步步展現一個簡單的LTS交易平臺的開發過程。