在入口文件中,咱們看到了除了窗體界面的產生,還有關於MainEngine
和EventEngin
部分。今天來學習下MainEngine
的代碼。python
首先在run代碼中,咱們看到如下的代碼app
main_engine.add_gateway(DeribitGateway) main_engine.add_app(OptionMasterApp)
從上述代碼能夠基本猜想全部的網管,設置,甚至策略引擎行情,都跟MainEngine有關係,MainEngine應該是一個主線,把全部的組件穿插起來了。想必MainEngine必定是一條重要的大魚。下面咱們進入MainEngin開始學習 位置:\vnpy\trader\engine.py
學習
class MainEngine: #初始化 def __init__(self, event_engine: EventEngine = None): pass #添加引擎 def add_engine(self, engine_class: Any): pass #添加網管 def add_gateway(self, gateway_class: Type[BaseGateway]): pass #添加app def add_app(self, app_class: Type[BaseApp]): pass #初始化引擎 def init_engines(self): pass #寫入日誌 def write_log(self, msg: str, source: str = ""): pass #得到引擎 def get_engine(self, engine_name: str): pass #得到默認設置 def get_default_setting(self, gateway_name: str): pass #得到全部引擎的名字 def get_all_gateway_names(self): pass #得到全部的APP def get_all_apps(self): pass #得到全部的交易所 def get_all_exchanges(self): pass #鏈接到行情 def connect(self, setting: dict, gateway_name: str): pass #合約訂閱 def subscribe(self, req: SubscribeRequest, gateway_name: str): pass #下單 def send_order(self, req: OrderRequest, gateway_name: str): pass #取消訂單 def cancel_order(self, req: CancelRequest, gateway_name: str): pass #批量下單 def send_orders(self, reqs: Sequence[OrderRequest], gateway_name: str): pass #批量取消訂單 def cancel_orders(self, reqs: Sequence[CancelRequest], gateway_name: str): pass #歷史查詢 def query_history(self, req: HistoryRequest, gateway_name: str): pass #關閉 def close(self): pass
我把全部的方法體都去掉,僅僅保留方法名稱和參數,從上面的結構基本上能看出來MainEngine
幾乎是一個把各個組件(GateWay, App, Engin)都鏈接在一塊兒,同時提供了跟交易相關的鏈接、訂閱、下單、撤單、歷史記錄等都相關的柔和在一塊兒的大雜燴。接下來咱們就一個一個方法的閱讀學習。日誌
__init__
def __init__(self, event_engine: EventEngine = None): """""" if event_engine: self.event_engine = event_engine else: self.event_engine = EventEngine() self.event_engine.start() self.gateways = {} self.engines = {} self.apps = {} self.exchanges = [] os.chdir(TRADER_DIR) # Change working directory self.init_engines() # Initialize function engines
初始化的代碼基本上就是實例化事件引擎(EventEngin),而後啓動,同時爲getways
,engines
,apps
,exchange
創建一個字典,同時調用init_engines()的方法。code
def add_engine(self, engine_class: Any): """ Add function engine. """ engine = engine_class(self, self.event_engine) self.engines[engine.engine_name] = engine return engine def add_gateway(self, gateway_class: Type[BaseGateway]): """ Add gateway. """ gateway = gateway_class(self.event_engine) self.gateways[gateway.gateway_name] = gateway # Add gateway supported exchanges into engine for exchange in gateway.exchanges: if exchange not in self.exchanges: self.exchanges.append(exchange) return gateway def add_app(self, app_class: Type[BaseApp]): """ Add app. """ app = app_class() self.apps[app.app_name] = app engine = self.add_engine(app.engine_class) return engine
上述3個Add方法比較簡單,基本上都是把添加各類APP、Engine、Gateway的話,都把它們的實例化和name造成鍵值對放入map中去。惟一咱們能夠獲得的線索是gateway 和Exchanges之間應該是有對應關係的話,話一句話說,gateway就是交易場所的API對接網管。因此每次添加一個gateway,就添加了一個交易所名稱。對象
def get_gateway(self, gateway_name: str): """ Return gateway object by name. """ gateway = self.gateways.get(gateway_name, None) if not gateway: self.write_log(f"找不到底層接口:{gateway_name}") return gateway def get_engine(self, engine_name: str): """ Return engine object by name. """ engine = self.engines.get(engine_name, None) if not engine: self.write_log(f"找不到引擎:{engine_name}") return engine def get_default_setting(self, gateway_name: str): """ Get default setting dict of a specific gateway. """ gateway = self.get_gateway(gateway_name) if gateway: return gateway.get_default_setting() return None
基本上是從map中根據指定的名字得到對象的方法。一目瞭然。get_default_setting
方法應該屬於get_gateway的一個下屬方法,得到gateway的setting.接口
def get_all_apps(self): """ Get all app objects. """ return list(self.apps.values()) def get_all_exchanges(self): """ Get all exchanges. """ return self.exchanges
也是對add進入的對象進行全局獲取的方法。事件
def connect(self, setting: dict, gateway_name: str): """ Start connection of a specific gateway. """ gateway = self.get_gateway(gateway_name) if gateway: gateway.connect(setting)
基本上能夠理解connect是對gateway的一種裝飾方法。經過add能夠插入gateway,而後調用connect方法的話,能夠經過gateway_name得到接口,而後再調用gateway的connect方法。不出意料下面的訂閱行情、下單、撤單、批量撤單都是相似的做用。ci
def subscribe(self, req: SubscribeRequest, gateway_name: str): """ Subscribe tick data update of a specific gateway. """ gateway = self.get_gateway(gateway_name) if gateway: gateway.subscribe(req) def send_order(self, req: OrderRequest, gateway_name: str): """ Send new order request to a specific gateway. """ gateway = self.get_gateway(gateway_name) if gateway: return gateway.send_order(req) else: return "" def cancel_order(self, req: CancelRequest, gateway_name: str): """ Send cancel order request to a specific gateway. """ gateway = self.get_gateway(gateway_name) if gateway: gateway.cancel_order(req) def send_orders(self, reqs: Sequence[OrderRequest], gateway_name: str): """ """ gateway = self.get_gateway(gateway_name) if gateway: return gateway.send_orders(reqs) else: return ["" for req in reqs] def cancel_orders(self, reqs: Sequence[CancelRequest], gateway_name: str): """ """ gateway = self.get_gateway(gateway_name) if gateway: gateway.cancel_orders(reqs) def query_history(self, req: HistoryRequest, gateway_name: str): """ Send cancel order request to a specific gateway. """ gateway = self.get_gateway(gateway_name) if gateway: return gateway.query_history(req) else: return None
def init_engines(self): """ Init all engines. """ self.add_engine(LogEngine) self.add_engine(OmsEngine) self.add_engine(EmailEngine)
把日誌, OMSEngin, EmialEngin裝配進來get
def close(self): """ Make sure every gateway and app is closed properly before programme exit. """ # Stop event engine first to prevent new timer event. self.event_engine.stop() for engine in self.engines.values(): engine.close() for gateway in self.gateways.values(): gateway.close()
基本上是程序退出之後的善後工做罷了。
經過對MainEngin代碼的梳理,咱們看到其實MainEngine自己沒有什麼深奧之處。就是一個適配器的模式,把全部的操做抽象成了APP,Gateway, Engine。而且提供了一個統一的操做入口,這樣能夠方便實現擴展。