tornado 源碼之 coroutine 分析

tornado 源碼之 coroutine 分析

tornado 的協程原理分析
版本:4.3.0

爲支持異步,tornado 實現了一個協程庫。python

tornado 實現的協程框架有下面幾個特色:git

  1. 支持 python 2.7,沒有使用 yield from
    特性,純粹使用 yield 實現
  2. 使用拋出異常的方式從協程返回值
  3. 採用 Future 類代理協程(保存協程的執行結果,當攜程執行結束時,調用註冊的回調函數)
  4. 使用 IOLoop 事件循環,當事件發生時在循環中調用註冊的回調,驅動協程向前執行

因而可知,這是 python 協程的一個經典的實現。github

本文將實現一個相似 tornado 實現的基礎協程框架,並闡述相應的原理。多線程

外部庫

使用 time 來實現定時器回調的時間計算。
bisect 的 insort 方法維護一個時間有限的定時器隊列。
functools 的 partial 方法綁定函數部分參數。
使用 backports_abc 導入 Generator 來判斷函數是不是生成器。app

import time
import bisect
import functools
from backports_abc import Generator as GeneratorType

Future

是一個穿梭於協程和調度器之間的信使。
提供了回調函數註冊(當異步事件完成後,調用註冊的回調)、中間結果保存、結束結果返回等功能

add_done_callback 註冊回調函數,當 Future 被解決時,改回調函數被調用。
set_result 設置最終的狀態,而且調用已註冊的回調函數框架

協程中的每個 yield 對應一個協程,相應的對應一個 Future 對象,譬如:異步

@coroutine
def routine_main():
    yield routine_simple()

    yield sleep(1)

這裏的 routine_simple() 和 sleep(1) 分別對應一個協程,同時有一個 Future 對應。函數

class Future(object):
    def __init__(self):
        self._done = False
        self._callbacks = []
        self._result = None

    def _set_done(self):
        self._done = True
        for cb in self._callbacks:
            cb(self)
        self._callbacks = None

    def done(self):
        return self._done

    def add_done_callback(self, fn):
        if self._done:
            fn(self)
        else:
            self._callbacks.append(fn)

    def set_result(self, result):
        self._result = result
        self._set_done()

    def result(self):
        return self._result

IOLoop

這裏的 IOLoop 去掉了 tornado 源代碼中 IO 相關部分,只保留了基本須要的功能,若是命名爲 CoroutineLoop 更貼切。tornado

這裏的 IOLoop 提供基本的回調功能。它是一個線程循環,在循環中完成兩件事:oop

  1. 檢測有沒有註冊的回調並執行
  2. 檢測有沒有到期的定時器回調並執行

程序中註冊的回調事件,最終都會在此處執行。
能夠認爲,協程程序自己、協程的驅動程序 都會在此處執行。
協程自己使用 wrapper 包裝,並最後註冊到 IOLoop 的事件回調,因此它的從預激到結束的代碼所有在 IOLoop 回調中執行。
而協程預激後,會把 Runner.run() 函數註冊到 IOLoop 的事件回調,以驅動協程向前運行。

理解這一點對於理解協程的運行原理相當重要。

這就是單線程異步的基本原理。由於都在一個線程循環中執行,咱們能夠不用處理多線程須要面對的各類繁瑣的事情。

IOLoop.start

事件循環,回調事件和定時器事件在循環中調用。

IOLoop.run_sync

執行一個協程。

將 run 註冊進全局回調,在 run 中調用 func()啓動協程。
註冊協程結束回調 stop, 退出 run_sync 的 start 循環,事件循環隨之結束。

class IOLoop(object):,
    def __init__(self):
        self._callbacks = []
        self._timers = []
        self._running = False

    @classmethod
    def instance(cls):
        if not hasattr(cls, "_instance"):
            cls._instance = cls()
        return cls._instance

    def add_future(self, future, callback):
        future.add_done_callback(
            lambda future: self.add_callback(functools.partial(callback, future)))

    def add_timeout(self, when, callback):
        bisect.insort(self._timers, (when, callback))

    def call_later(self, delay, callback):
        return self.add_timeout(time.time() + delay, callback)

    def add_callback(self, call_back):
        self._callbacks.append(call_back)

    def start(self):
        self._running = True
        while self._running:

            # 回調任務
            callbacks = self._callbacks
            self._callbacks = []
            for call_back in callbacks:
                call_back()

            # 定時器任務
            while self._timers and self._timers[0][0] < time.time():
                task = self._timers[0][1]
                del self._timers[0]
                task()

    def stop(self):
        self._running = False

    def run_sync(self, func):
        future_cell = [None]

        def run():
            try:
                future_cell[0] = func()
            except Exception:
                pass

            self.add_future(future_cell[0], lambda future: self.stop())

        self.add_callback(run)

        self.start()
        return future_cell[0].result()

coroutine

協程裝飾器。
協程由 coroutine 裝飾,分爲兩類:

  1. 含 yield 的生成器函數
  2. 無 yield 語句的普通函數

裝飾協程,並經過註冊回調驅動協程運行。
程序中經過 yield coroutine_func() 方式調用協程。
此時,wrapper 函數被調用:

  1. 獲取協程生成器
  2. 若是是生成器,則

    1. 調用 next() 預激協程
    2. 實例化 Runner(),驅動協程
  3. 若是是普通函數,則

    1. 調用 set_result() 結束協程

協程返回 Future 對象,供外層的協程處理。外部經過操做該 Future 控制協程的運行。
每一個 yield 對應一個協程,每一個協程擁有一個 Future 對象。

外部協程獲取到內部協程的 Future 對象,若是內部協程還沒有結束,將 Runner.run() 方法註冊到 內部協程的 Future 的結束回調。
這樣,在內部協程結束時,會調用註冊的 run() 方法,從而驅動外部協程向前執行。

各個協程經過 Future 造成一個鏈式回調關係。

Runner 類在下面單獨小節描述。

def coroutine(func):
    return _make_coroutine_wrapper(func)

# 每一個協程都有一個 future, 表明當前協程的運行狀態
def _make_coroutine_wrapper(func):
    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        future = Future()

        try:
            result = func(*args, **kwargs)
        except (Return, StopIteration) as e:
            result = _value_from_stopiteration(e)
        except Exception:
            return future
        else:
            if isinstance(result, GeneratorType):
                try:
                    yielded = next(result)
                except (StopIteration, Return) as e:
                    future.set_result(_value_from_stopiteration(e))
                except Exception:
                    pass
                else:
                    Runner(result, future, yielded)
                try:
                    return future
                finally:
                    future = None
        future.set_result(result)
        return future
    return wrapper

協程返回值

由於沒有使用 yield from,協程沒法直接返回值,因此使用拋出異常的方式返回。

python 2 沒法在生成器中使用 return 語句。可是生成器中拋出的異常能夠在外部 send() 語句中捕獲。
因此,使用拋出異常的方式,將返回值存儲在異常的 value 屬性中,拋出。外部使用諸如:

try:
    yielded = gen.send(value)
except Return as e:

這樣的方式獲取協程的返回值。

class Return(Exception):
    def __init__(self, value=None):
        super(Return, self).__init__()
        self.value = value
        self.args = (value,)

Runner

Runner 是協程的驅動器類。

self.result_future 保存當前協程的狀態。
self.future 保存 yield 子協程傳遞回來的協程狀態。
從子協程的 future 獲取協程運行結果 send 給當前協程,以驅動協程向前執行。

注意,會判斷子協程返回的 future
若是 future 已經 set_result,表明子協程運行結束,回到 while Ture 循環,繼續往下執行下一個 send;
若是 future 未 set_result,表明子協程運行未結束,將 self.run 註冊到子協程結束的回調,這樣,子協程結束時會調用 self.run,從新驅動協程執行。

若是本協程 send() 執行過程當中,捕獲到 StopIteration 或者 Return 異常,說明本協程執行結束,設置 result_future 的協程返回值,此時,註冊的回調函數被執行。這裏的回調函數爲本協程的父協程所註冊的 run()。
至關於喚醒已經處於 yiled 狀態的父協程,經過 IOLoop 回調 run 函數,再執行 send()。

class Runner(object):
    def __init__(self, gen, result_future, first_yielded):
        self.gen = gen
        self.result_future = result_future
        self.io_loop = IOLoop.instance()
        self.running = False
        self.future = None

        if self.handle_yield(first_yielded):
            self.run()

    def run(self):
        try:
            self.running = True
            while True:

                try:
                    # 每個 yield 處看作一個協程,對應一個 Future
                    # 將該協程的結果 send 出去
                    # 這樣外層形如  ret = yiled coroutine_func() 可以獲取到協程的返回數據
                    value = self.future.result()
                    yielded = self.gen.send(value)
                except (StopIteration, Return) as e:
                    # 協程執行完成,再也不註冊回調
                    self.result_future.set_result(_value_from_stopiteration(e))
                    self.result_future = None
                    return
                except Exception:
                    return
                # 協程未執行結束,繼續使用 self.run() 進行驅動
                if not self.handle_yield(yielded):
                    return
        finally:
            self.running = False

    def handle_yield(self, yielded):
        self.future = yielded
        if not self.future.done():
            # 給 future 增長執行結束回調函數,這樣,外部使用 future.set_result 時會調用該回調
            # 而該回調是把 self.run() 註冊到 IOLoop 的事件循環
            # 因此,future.set_result 會把 self.run() 註冊到 IOLoop 的事件循環,從而在下一個事件循環中調用
            self.io_loop.add_future(
                self.future, lambda f: self.run())
            return False
        return True

sleep

sleep 是一個延時協程,充分展現了協程的標準實現。

  • 建立一個 Future,並返回給外部協程;
  • 外部協程發現是一個未完的狀態,將 run()註冊到 Future 的完成回調,同時外部協程被掛起;
  • 在設置的延時後,IOLoop 會回調 set_result 結束協程;
  • IOLoop 調用 run() 函數;
  • IOLoop 調用 send(),喚醒掛起的外部協程。

流程以下圖:

def sleep(duration):
    f = Future()
    IOLoop.instance().call_later(duration, lambda: f.set_result(None))
    return f

運行

@coroutine
def routine_ur(url, wait):
    yield sleep(wait)
    print('routine_ur {} took {}s to get!'.format(url, wait))


@coroutine
def routine_url_with_return(url, wait):
    yield sleep(wait)
    print('routine_url_with_return {} took {}s to get!'.format(url, wait))
    raise Return((url, wait))

# 非生成器協程,不會爲之生成單獨的 Runner()
# coroutine 運行結束後,直接返回一個已經執行結束的 future
@coroutine
def routine_simple():
    print("it is simple routine")

@coroutine
def routine_simple_return():
    print("it is simple routine with return")
    raise Return("value from routine_simple_return")

@coroutine
def routine_main():
    yield routine_simple()

    yield routine_ur("url0", 1)

    ret = yield routine_simple_return()
    print(ret)

    ret = yield routine_url_with_return("url1", 1)
    print(ret)

    ret = yield routine_url_with_return("url2", 2)
    print(ret)


if __name__ == '__main__':
    IOLoop.instance().run_sync(routine_main)

運行輸出爲:

it is simple routine
routine_ur url0 took 1s to get!
it is simple routine with return
value from routine_simple_return
routine_url_with_return url1 took 1s to get!
('url1', 1)
routine_url_with_return url2 took 2s to get!
('url2', 2)

能夠觀察到協程 sleep 已經生效。

源碼

simple_coroutine.py

copyright

author:bigfish
copyright: 許可協議 知識共享署名-非商業性使用 4.0 國際許可協議

相關文章
相關標籤/搜索