tornado 的協程原理分析
版本:4.3.0python
爲支持異步,tornado 實現了一個協程庫。git
tornado 實現的協程框架有下面幾個特色:github
因而可知,這是 python 協程的一個經典的實現。bash
本文將實現一個相似 tornado 實現的基礎協程框架,並闡述相應的原理。多線程
使用 time 來實現定時器回調的時間計算。
bisect 的 insort 方法維護一個時間有限的定時器隊列。
functools 的 partial 方法綁定函數部分參數。
使用 backports_abc 導入 Generator 來判斷函數是不是生成器。app
import time
import bisect
import functools
from backports_abc import Generator as GeneratorType
複製代碼
是一個穿梭於協程和調度器之間的信使。
提供了回調函數註冊(當異步事件完成後,調用註冊的回調)、中間結果保存、結束結果返回等功能框架
add_done_callback 註冊回調函數,當 Future 被解決時,改回調函數被調用。
set_result 設置最終的狀態,而且調用已註冊的回調函數異步
協程中的每個 yield 對應一個協程,相應的對應一個 Future 對象,譬如:函數
@coroutine
def routine_main():
yield routine_simple()
yield sleep(1)
複製代碼
這裏的 routine_simple() 和 sleep(1) 分別對應一個協程,同時有一個 Future 對應。tornado
class Future(object):
def __init__(self):
self._done = False
self._callbacks = []
self._result = None
def _set_done(self):
self._done = True
for cb in self._callbacks:
cb(self)
self._callbacks = None
def done(self):
return self._done
def add_done_callback(self, fn):
if self._done:
fn(self)
else:
self._callbacks.append(fn)
def set_result(self, result):
self._result = result
self._set_done()
def result(self):
return self._result
複製代碼
這裏的 IOLoop 去掉了 tornado 源代碼中 IO 相關部分,只保留了基本須要的功能,若是命名爲 CoroutineLoop 更貼切。
這裏的 IOLoop 提供基本的回調功能。它是一個線程循環,在循環中完成兩件事:
程序中註冊的回調事件,最終都會在此處執行。 能夠認爲,協程程序自己、協程的驅動程序 都會在此處執行。 協程自己使用 wrapper 包裝,並最後註冊到 IOLoop 的事件回調,因此它的從預激到結束的代碼所有在 IOLoop 回調中執行。 而協程預激後,會把 Runner.run() 函數註冊到 IOLoop 的事件回調,以驅動協程向前運行。
理解這一點對於理解協程的運行原理相當重要。
這就是單線程異步的基本原理。由於都在一個線程循環中執行,咱們能夠不用處理多線程須要面對的各類繁瑣的事情。
事件循環,回調事件和定時器事件在循環中調用。
執行一個協程。
將 run 註冊進全局回調,在 run 中調用 func()啓動協程。
註冊協程結束回調 stop, 退出 run_sync 的 start 循環,事件循環隨之結束。
class IOLoop(object):,
def __init__(self):
self._callbacks = []
self._timers = []
self._running = False
@classmethod
def instance(cls):
if not hasattr(cls, "_instance"):
cls._instance = cls()
return cls._instance
def add_future(self, future, callback):
future.add_done_callback(
lambda future: self.add_callback(functools.partial(callback, future)))
def add_timeout(self, when, callback):
bisect.insort(self._timers, (when, callback))
def call_later(self, delay, callback):
return self.add_timeout(time.time() + delay, callback)
def add_callback(self, call_back):
self._callbacks.append(call_back)
def start(self):
self._running = True
while self._running:
# 回調任務
callbacks = self._callbacks
self._callbacks = []
for call_back in callbacks:
call_back()
# 定時器任務
while self._timers and self._timers[0][0] < time.time():
task = self._timers[0][1]
del self._timers[0]
task()
def stop(self):
self._running = False
def run_sync(self, func):
future_cell = [None]
def run():
try:
future_cell[0] = func()
except Exception:
pass
self.add_future(future_cell[0], lambda future: self.stop())
self.add_callback(run)
self.start()
return future_cell[0].result()
複製代碼
協程裝飾器。
協程由 coroutine 裝飾,分爲兩類:
裝飾協程,並經過註冊回調驅動協程運行。 程序中經過 yield coroutine_func() 方式調用協程。
此時,wrapper 函數被調用:
協程返回 Future 對象,供外層的協程處理。外部經過操做該 Future 控制協程的運行。
每一個 yield 對應一個協程,每一個協程擁有一個 Future 對象。
外部協程獲取到內部協程的 Future 對象,若是內部協程還沒有結束,將 Runner.run() 方法註冊到 內部協程的 Future 的結束回調。
這樣,在內部協程結束時,會調用註冊的 run() 方法,從而驅動外部協程向前執行。
各個協程經過 Future 造成一個鏈式回調關係。
Runner 類在下面單獨小節描述。
def coroutine(func):
return _make_coroutine_wrapper(func)
# 每一個協程都有一個 future, 表明當前協程的運行狀態
def _make_coroutine_wrapper(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
future = Future()
try:
result = func(*args, **kwargs)
except (Return, StopIteration) as e:
result = _value_from_stopiteration(e)
except Exception:
return future
else:
if isinstance(result, GeneratorType):
try:
yielded = next(result)
except (StopIteration, Return) as e:
future.set_result(_value_from_stopiteration(e))
except Exception:
pass
else:
Runner(result, future, yielded)
try:
return future
finally:
future = None
future.set_result(result)
return future
return wrapper
複製代碼
由於沒有使用 yield from,協程沒法直接返回值,因此使用拋出異常的方式返回。
python 2 沒法在生成器中使用 return 語句。可是生成器中拋出的異常能夠在外部 send() 語句中捕獲。
因此,使用拋出異常的方式,將返回值存儲在異常的 value 屬性中,拋出。外部使用諸如:
try:
yielded = gen.send(value)
except Return as e:
複製代碼
這樣的方式獲取協程的返回值。
class Return(Exception):
def __init__(self, value=None):
super(Return, self).__init__()
self.value = value
self.args = (value,)
複製代碼
Runner 是協程的驅動器類。
self.result_future 保存當前協程的狀態。
self.future 保存 yield 子協程傳遞回來的協程狀態。 從子協程的 future 獲取協程運行結果 send 給當前協程,以驅動協程向前執行。
注意,會判斷子協程返回的 future
若是 future 已經 set_result,表明子協程運行結束,回到 while Ture 循環,繼續往下執行下一個 send;
若是 future 未 set_result,表明子協程運行未結束,將 self.run 註冊到子協程結束的回調,這樣,子協程結束時會調用 self.run,從新驅動協程執行。
若是本協程 send() 執行過程當中,捕獲到 StopIteration 或者 Return 異常,說明本協程執行結束,設置 result_future 的協程返回值,此時,註冊的回調函數被執行。這裏的回調函數爲本協程的父協程所註冊的 run()。
至關於喚醒已經處於 yiled 狀態的父協程,經過 IOLoop 回調 run 函數,再執行 send()。
class Runner(object):
def __init__(self, gen, result_future, first_yielded):
self.gen = gen
self.result_future = result_future
self.io_loop = IOLoop.instance()
self.running = False
self.future = None
if self.handle_yield(first_yielded):
self.run()
def run(self):
try:
self.running = True
while True:
try:
# 每個 yield 處看作一個協程,對應一個 Future
# 將該協程的結果 send 出去
# 這樣外層形如 ret = yiled coroutine_func() 可以獲取到協程的返回數據
value = self.future.result()
yielded = self.gen.send(value)
except (StopIteration, Return) as e:
# 協程執行完成,再也不註冊回調
self.result_future.set_result(_value_from_stopiteration(e))
self.result_future = None
return
except Exception:
return
# 協程未執行結束,繼續使用 self.run() 進行驅動
if not self.handle_yield(yielded):
return
finally:
self.running = False
def handle_yield(self, yielded):
self.future = yielded
if not self.future.done():
# 給 future 增長執行結束回調函數,這樣,外部使用 future.set_result 時會調用該回調
# 而該回調是把 self.run() 註冊到 IOLoop 的事件循環
# 因此,future.set_result 會把 self.run() 註冊到 IOLoop 的事件循環,從而在下一個事件循環中調用
self.io_loop.add_future(
self.future, lambda f: self.run())
return False
return True
複製代碼
sleep 是一個延時協程,充分展現了協程的標準實現。
流程以下圖:
def sleep(duration):
f = Future()
IOLoop.instance().call_later(duration, lambda: f.set_result(None))
return f
複製代碼
@coroutine
def routine_ur(url, wait):
yield sleep(wait)
print('routine_ur {} took {}s to get!'.format(url, wait))
@coroutine
def routine_url_with_return(url, wait):
yield sleep(wait)
print('routine_url_with_return {} took {}s to get!'.format(url, wait))
raise Return((url, wait))
# 非生成器協程,不會爲之生成單獨的 Runner()
# coroutine 運行結束後,直接返回一個已經執行結束的 future
@coroutine
def routine_simple():
print("it is simple routine")
@coroutine
def routine_simple_return():
print("it is simple routine with return")
raise Return("value from routine_simple_return")
@coroutine
def routine_main():
yield routine_simple()
yield routine_ur("url0", 1)
ret = yield routine_simple_return()
print(ret)
ret = yield routine_url_with_return("url1", 1)
print(ret)
ret = yield routine_url_with_return("url2", 2)
print(ret)
if __name__ == '__main__':
IOLoop.instance().run_sync(routine_main)
複製代碼
運行輸出爲:
it is simple routine
routine_ur url0 took 1s to get!
it is simple routine with return
value from routine_simple_return
routine_url_with_return url1 took 1s to get!
('url1', 1)
routine_url_with_return url2 took 2s to get!
('url2', 2)
複製代碼
能夠觀察到協程 sleep 已經生效。
author:bigfish
copyright: 許可協議 知識共享署名-非商業性使用 4.0 國際許可協議