tornado 的協程原理分析
版本:4.3.0
爲支持異步,tornado 實現了一個協程庫。python
tornado 實現的協程框架有下面幾個特色:git
因而可知,這是 python 協程的一個經典的實現。github
本文將實現一個相似 tornado 實現的基礎協程框架,並闡述相應的原理。多線程
使用 time 來實現定時器回調的時間計算。
bisect 的 insort 方法維護一個時間有限的定時器隊列。
functools 的 partial 方法綁定函數部分參數。
使用 backports_abc 導入 Generator 來判斷函數是不是生成器。app
import time import bisect import functools from backports_abc import Generator as GeneratorType
是一個穿梭於協程和調度器之間的信使。
提供了回調函數註冊(當異步事件完成後,調用註冊的回調)、中間結果保存、結束結果返回等功能
add_done_callback 註冊回調函數,當 Future 被解決時,改回調函數被調用。
set_result 設置最終的狀態,而且調用已註冊的回調函數框架
協程中的每個 yield 對應一個協程,相應的對應一個 Future 對象,譬如:異步
@coroutine def routine_main(): yield routine_simple() yield sleep(1)
這裏的 routine_simple() 和 sleep(1) 分別對應一個協程,同時有一個 Future 對應。函數
class Future(object): def __init__(self): self._done = False self._callbacks = [] self._result = None def _set_done(self): self._done = True for cb in self._callbacks: cb(self) self._callbacks = None def done(self): return self._done def add_done_callback(self, fn): if self._done: fn(self) else: self._callbacks.append(fn) def set_result(self, result): self._result = result self._set_done() def result(self): return self._result
這裏的 IOLoop 去掉了 tornado 源代碼中 IO 相關部分,只保留了基本須要的功能,若是命名爲 CoroutineLoop 更貼切。tornado
這裏的 IOLoop 提供基本的回調功能。它是一個線程循環,在循環中完成兩件事:oop
程序中註冊的回調事件,最終都會在此處執行。
能夠認爲,協程程序自己、協程的驅動程序 都會在此處執行。
協程自己使用 wrapper 包裝,並最後註冊到 IOLoop 的事件回調,因此它的從預激到結束的代碼所有在 IOLoop 回調中執行。
而協程預激後,會把 Runner.run() 函數註冊到 IOLoop 的事件回調,以驅動協程向前運行。
理解這一點對於理解協程的運行原理相當重要。
這就是單線程異步的基本原理。由於都在一個線程循環中執行,咱們能夠不用處理多線程須要面對的各類繁瑣的事情。
事件循環,回調事件和定時器事件在循環中調用。
執行一個協程。
將 run 註冊進全局回調,在 run 中調用 func()啓動協程。
註冊協程結束回調 stop, 退出 run_sync 的 start 循環,事件循環隨之結束。
class IOLoop(object):, def __init__(self): self._callbacks = [] self._timers = [] self._running = False @classmethod def instance(cls): if not hasattr(cls, "_instance"): cls._instance = cls() return cls._instance def add_future(self, future, callback): future.add_done_callback( lambda future: self.add_callback(functools.partial(callback, future))) def add_timeout(self, when, callback): bisect.insort(self._timers, (when, callback)) def call_later(self, delay, callback): return self.add_timeout(time.time() + delay, callback) def add_callback(self, call_back): self._callbacks.append(call_back) def start(self): self._running = True while self._running: # 回調任務 callbacks = self._callbacks self._callbacks = [] for call_back in callbacks: call_back() # 定時器任務 while self._timers and self._timers[0][0] < time.time(): task = self._timers[0][1] del self._timers[0] task() def stop(self): self._running = False def run_sync(self, func): future_cell = [None] def run(): try: future_cell[0] = func() except Exception: pass self.add_future(future_cell[0], lambda future: self.stop()) self.add_callback(run) self.start() return future_cell[0].result()
協程裝飾器。
協程由 coroutine 裝飾,分爲兩類:
裝飾協程,並經過註冊回調驅動協程運行。
程序中經過 yield coroutine_func() 方式調用協程。
此時,wrapper 函數被調用:
若是是生成器,則
若是是普通函數,則
協程返回 Future 對象,供外層的協程處理。外部經過操做該 Future 控制協程的運行。
每一個 yield 對應一個協程,每一個協程擁有一個 Future 對象。
外部協程獲取到內部協程的 Future 對象,若是內部協程還沒有結束,將 Runner.run() 方法註冊到 內部協程的 Future 的結束回調。
這樣,在內部協程結束時,會調用註冊的 run() 方法,從而驅動外部協程向前執行。
各個協程經過 Future 造成一個鏈式回調關係。
Runner 類在下面單獨小節描述。
def coroutine(func): return _make_coroutine_wrapper(func) # 每一個協程都有一個 future, 表明當前協程的運行狀態 def _make_coroutine_wrapper(func): @functools.wraps(func) def wrapper(*args, **kwargs): future = Future() try: result = func(*args, **kwargs) except (Return, StopIteration) as e: result = _value_from_stopiteration(e) except Exception: return future else: if isinstance(result, GeneratorType): try: yielded = next(result) except (StopIteration, Return) as e: future.set_result(_value_from_stopiteration(e)) except Exception: pass else: Runner(result, future, yielded) try: return future finally: future = None future.set_result(result) return future return wrapper
由於沒有使用 yield from,協程沒法直接返回值,因此使用拋出異常的方式返回。
python 2 沒法在生成器中使用 return 語句。可是生成器中拋出的異常能夠在外部 send() 語句中捕獲。
因此,使用拋出異常的方式,將返回值存儲在異常的 value 屬性中,拋出。外部使用諸如:
try: yielded = gen.send(value) except Return as e:
這樣的方式獲取協程的返回值。
class Return(Exception): def __init__(self, value=None): super(Return, self).__init__() self.value = value self.args = (value,)
Runner 是協程的驅動器類。
self.result_future 保存當前協程的狀態。
self.future 保存 yield 子協程傳遞回來的協程狀態。
從子協程的 future 獲取協程運行結果 send 給當前協程,以驅動協程向前執行。
注意,會判斷子協程返回的 future
若是 future 已經 set_result,表明子協程運行結束,回到 while Ture 循環,繼續往下執行下一個 send;
若是 future 未 set_result,表明子協程運行未結束,將 self.run 註冊到子協程結束的回調,這樣,子協程結束時會調用 self.run,從新驅動協程執行。
若是本協程 send() 執行過程當中,捕獲到 StopIteration 或者 Return 異常,說明本協程執行結束,設置 result_future 的協程返回值,此時,註冊的回調函數被執行。這裏的回調函數爲本協程的父協程所註冊的 run()。
至關於喚醒已經處於 yiled 狀態的父協程,經過 IOLoop 回調 run 函數,再執行 send()。
class Runner(object): def __init__(self, gen, result_future, first_yielded): self.gen = gen self.result_future = result_future self.io_loop = IOLoop.instance() self.running = False self.future = None if self.handle_yield(first_yielded): self.run() def run(self): try: self.running = True while True: try: # 每個 yield 處看作一個協程,對應一個 Future # 將該協程的結果 send 出去 # 這樣外層形如 ret = yiled coroutine_func() 可以獲取到協程的返回數據 value = self.future.result() yielded = self.gen.send(value) except (StopIteration, Return) as e: # 協程執行完成,再也不註冊回調 self.result_future.set_result(_value_from_stopiteration(e)) self.result_future = None return except Exception: return # 協程未執行結束,繼續使用 self.run() 進行驅動 if not self.handle_yield(yielded): return finally: self.running = False def handle_yield(self, yielded): self.future = yielded if not self.future.done(): # 給 future 增長執行結束回調函數,這樣,外部使用 future.set_result 時會調用該回調 # 而該回調是把 self.run() 註冊到 IOLoop 的事件循環 # 因此,future.set_result 會把 self.run() 註冊到 IOLoop 的事件循環,從而在下一個事件循環中調用 self.io_loop.add_future( self.future, lambda f: self.run()) return False return True
sleep 是一個延時協程,充分展現了協程的標準實現。
流程以下圖:
def sleep(duration): f = Future() IOLoop.instance().call_later(duration, lambda: f.set_result(None)) return f
@coroutine def routine_ur(url, wait): yield sleep(wait) print('routine_ur {} took {}s to get!'.format(url, wait)) @coroutine def routine_url_with_return(url, wait): yield sleep(wait) print('routine_url_with_return {} took {}s to get!'.format(url, wait)) raise Return((url, wait)) # 非生成器協程,不會爲之生成單獨的 Runner() # coroutine 運行結束後,直接返回一個已經執行結束的 future @coroutine def routine_simple(): print("it is simple routine") @coroutine def routine_simple_return(): print("it is simple routine with return") raise Return("value from routine_simple_return") @coroutine def routine_main(): yield routine_simple() yield routine_ur("url0", 1) ret = yield routine_simple_return() print(ret) ret = yield routine_url_with_return("url1", 1) print(ret) ret = yield routine_url_with_return("url2", 2) print(ret) if __name__ == '__main__': IOLoop.instance().run_sync(routine_main)
運行輸出爲:
it is simple routine routine_ur url0 took 1s to get! it is simple routine with return value from routine_simple_return routine_url_with_return url1 took 1s to get! ('url1', 1) routine_url_with_return url2 took 2s to get! ('url2', 2)
能夠觀察到協程 sleep 已經生效。
author:bigfish
copyright: 許可協議 知識共享署名-非商業性使用 4.0 國際許可協議