from tornado.concurrent import Future def async_call_method(fun, *args, **kwargs): future = Future() // 定義一個閉包 finish def finish(): try: result = fun(*args, **kwargs) if future._callbacks: IOLoop.current().add_callback(future.set_result, result) else: future.set_result(result) except: if future._callbacks: IOLoop.current().add_callback(future.set_exc_info, sys.exc_info()) else: future.set_exc_info(sys.exc_info()) child_gr = greenlet.greenlet(finish) child_gr.switch() return future
Future 是一種用於併發編程的模式,首次引入是在 python 3.2 的 concurrent.futures 模塊。python
Future 對象是一個對於異步返回結果的佔位符。mysql
一個 Future 對象包含了一次異步操做的結果。在同步編程中,Futures 被用於等待從一個線程池或進程池裏返回的結果;在 tornado 中,future 一般被用在 IOLoop.add_future 或者在一個 gen.coroutine 函數中 yielding 它們。ios
tornado.concurrent.Future 和 concurrent.futures.Future 類似,可是其不是線程安全的(所以,在單線程事件循環應用在速度更快)git
通過一番搜索,查詢到 async_call_method()
這個函數來自於 github.com/snower/TorMySQL.github
通過對該項目代碼的仔細閱讀,我發現了它是如何實現了 mysql 的異步操做。sql
... def connect(self): # 入口函數 # 設置 future 佔位符 future = Future() # 定義回調函數 def on_connected(connection_future): if connection_future._exc_info is None: future.set_result(self) else: future.set_exc_info(connection_future.exc_info()) self._connection = Connection(defer_connect = True, *self._args, **self._kwargs) self._connection.set_close_callback(self.connection_close_callback) # 用 greenlet 包裝 self._connection.connect 並返回 future # 要使 async_call_method 包裝後的函數有非阻塞的特性,必須達成如下要求 # 1. 函數能夠訪問 父greenlet # 2. 函數中全部 IO 操做均支持非阻塞(好比: 非阻塞由 socket 的 non-blocking 特性支持) # 3. 函數中執行 IO 操做後當即將運行權交還給主函數(父greenlet, 如:ioloop 時間循環)(greenlet.switch) # 4. 函數中全部 IO 操做均返回 Future # 5. Future.callback 運行後當即將運行權(greenlet.switch)返回給當前函數(greenlet.current),完成當前函數的剩餘部分 connection_future = async_call_method(self._connection.connect) # 當 connection_future 狀態爲 finished, 調用 on_connected() # finished => 調用 connection_future.set_result() IOLoop.current().add_future(connection_future, on_connected) return future ...
... # IOStream 基於 tornado.iostream.IOStream sock = IOStream(sock) sock.set_close_callback(self.stream_close_callback) # getcurrent() 返回包裝了當前函數的 greenlet child_gr = greenlet.getcurrent() # main 是指 父greenlet(主函數, 時間循環?) main = child_gr.parent assert main is not None, "Execut must be running in child greenlet" ... def connected(future): if self._loop_connect_timeout: self._loop.remove_timeout(self._loop_connect_timeout) self._loop_connect_timeout = None if future._exc_info is not None: child_gr.throw(future.exception()) else: self._sock = sock # 將運行權交還給當前 greenlet child_gr.switch() # IOStream.connect 是 no-blocking 的 socket 操做 future = sock.connect(address) # 給 sock.connect 操做添加回調函數 self._loop.add_future(future, connected) # 而後把運行權交還給 父greenlet # 直到鏈接成功,connected() 中會將運行權交還給 當前greenlet main.switch() ...
要使 async_call_method 包裝後的函數有非阻塞的特性,必須達成如下要求編程
函數能夠訪問 父greenlet安全
函數中全部 IO 操做均支持非阻塞(好比: 非阻塞由 socket 的 non-blocking 特性支持)閉包
函數中執行 IO 操做後當即將運行權交還給主函數(父greenlet, 如:ioloop 時間循環)(greenlet.switch)併發
函數中全部 IO 操做均返回 Future
Future.callback 運行後當即將運行權(greenlet.switch)返回給當前函數(greenlet.current),完成當前函數的剩餘部分
async_call_method 包裝後的函數要實現非阻塞,最終仍是依賴於 socket 的非阻塞
=> socket.setblocking(False)
。
github.com/snower/TorMySQL
中於 mysql
的交互所有經過 IOStream()
的如下方法實現:
* def _handle_events(self, fd, events): # ioloop 在事件發生時調用 _handle_events * def _handle_connect(self): * def _handle_read(self): # 當事件爲讀取事件時,讀取數據到 buffer, 而後 future.set_result(data) * def _handle_write(self): # 當事件爲寫事件時,讀取數據到 buffer, 而後 future.set_result(data) * def read(self, num_bytes): * def write(self, data):
經過對上述方法進行 設置 future 佔位符
,並基於 non-blocking socket 實現上述方法的非阻塞。