tornado 源碼之 StackContext(一)

tornado 的異步上下文機制分析python

contents

咱們實現一個簡單的 MyIOLoop 類,模仿 tornado 的 IOLoop,實現異步回調
實現一個簡單的 MyStackContext 類,模仿 tornado 的 StackContext,實現上下文app

MyIOLoop

模擬 tornado IOLoop異步

class MyIOLoop:
    def __init__(self):
        self._callbacks = []

 @classmethod
    def instance(cls):
        if not hasattr(cls, "_instance"):
            cls._instance = cls()
        return cls._instance

    def add_callback(self, call_back):
        self._callbacks.append(call_back)

    def start(self):
        callbacks = self._callbacks
        self._callbacks = []
        for call_back in callbacks:
            call_back()

複製代碼

異步回調異常的捕獲

由輸出能夠看到,回調函數 call_func 中拋出的異常,在 main 函數中沒法被捕獲
main 函數只能捕獲當時運行的 async_task 中拋出的異常,async_task 只是向 MyIOLoop 註冊了一個回調,並無當場調用回調
call_func 函數最終在 MyIOLoop.start 中調用,其異常沒有被捕獲async

my_io_loop = MyIOLoop.instance()
times = 0


def call_func():
    print 'run call_func'
    raise ValueError('except in call_func')


def async_task():
    global times
    times += 1
    print 'run async task {}'.format(times)
    my_io_loop.add_callback(call_back=call_func)


def main():
    try:
        async_task()
    except Exception as e:
        print 'main exception {}'.format(e)
        print 'end'


if __name__ == '__main__':
    main()
    my_io_loop.start()

# run async task 1
# Traceback (most recent call last):
# run call_func
# File "E:/learn/python/simple-python/stack_context_example.py", line 56, in <module>
# my_io_loop.start()
# File "E:/learn/python/simple-python/stack_context_example.py", line 26, in start
# call_back()
# File "E:/learn/python/simple-python/stack_context_example.py", line 36, in call_func
# raise ValueError('except in call_func')
# ValueError: except in call_func
複製代碼

使用 wrap

能夠使用 wrap 的方式,把函數調用和異常捕捉寫在一塊兒,回調實際調用的是帶異常捕捉的函數 wrapper函數

my_io_loop = MyIOLoop.instance()
times = 0


def call_func():
    print 'run call_func'
    raise ValueError('except in call_func')


def wrapper(func):
    try:
        func()
    except Exception as e:
        print 'wrapper exception {}'.format(e)


def async_task():
    global times
    times += 1
    print 'run async task {}'.format(times)
    my_io_loop.add_callback(call_back=functools.partial(wrapper, call_func))


def main():
    try:
        async_task()
    except Exception as e:
        print 'main exception {}'.format(e)
        print 'end'


if __name__ == '__main__':
    main()
    my_io_loop.start()

# run async task 1
# run call_func
# wrapper exception except in call_func

複製代碼

由此,能夠想到,構造一個上下文環境,使用全局變量保存這個執行環境,等回調函數執行的時候,構造出這個環境tornado

使用 contextlib

下面模仿了 tornado 異步上下文實現機制oop

  1. MyStackContext 使用 __enter__ __exit__ 支持上下文
  2. MyStackContext 構造函數參數爲一個上下文對象
  3. with MyStackContext(context)進行以下動做:
    在 MyStackContext(context) 構造時,把 context 註冊進全局工廠 MyStackContext.context_factory
    1. 進入 MyStackContext 的__enter
    2. 構造一個 context 對象
    3. 調用 context 對象的 __enter,進入真正 context 上下文
    4. 執行 context 上下文,my_context yield 語句前的部分
    5. 執行上下文包裹的語句,async_task
    6. async_task 中 add_callback,實際保存的 wrap, wrap 將此時的全局上下文環境 MyStackContext.context_factory 保存,以方便 call_back 調用
    7. 調用 context 對象的 __exit,退出 context 上下文
    8. 進入 MyStackContext 的__exit
  4. my_io_loop.start() 執行, 調用註冊的 _call_back
  5. 實際調用 wrapped 函數
    1. 獲取保存的 context 環境
    2. with context
    3. 調用真正的 callback

這樣,在 main 函數中執行源碼分析

with MyStackContext(my_context):
    async_task()
複製代碼

構造一個執行上下文 my_context,異步函數將在這個上下文中調用
效果上至關於在 my_context 這個上下文環境中調用 async_task
相似:spa

def my_context():
    print '---enter my_context--->>'
    try:
        async_task()
    except Exception as e:
        print 'handler except: {}'.format(e)
    finally:
        print '<<---exit my_context ---'

複製代碼
import contextlib
import functools


class MyIOLoop:
    def __init__(self):
        self._callbacks = []

 @classmethod
    def instance(cls):
        if not hasattr(cls, "_instance"):
            cls._instance = cls()
        return cls._instance

    def add_callback(self, call_back):
        self._callbacks.append(wrap(call_back))

    def start(self):
        callbacks = self._callbacks
        self._callbacks = []
        for call_back in callbacks:
            self._call_back(call_back)

 @staticmethod
    def _call_back(func):
        func()


class MyStackContext(object):
    context_factory = []

    def __init__(self, context):
        if context:
            MyStackContext.context_factory.append(context)

    def __enter__(self):
        try:
            self.context = self.context_factory[0]()
            self.context.__enter__()
        except Exception:
            raise

    def __exit__(self, type, value, traceback):
        try:
            return self.context.__exit__(type, value, traceback)
        finally:
            pass


def wrap(fn):
    def wrapped(callback, contexts, *args, **kwargs):
        context = contexts[0]()
        with context:
            callback(*args, **kwargs)

    contexts = MyStackContext.context_factory
    result = functools.partial(wrapped, fn, contexts)
    return result


my_io_loop = MyIOLoop.instance()

times = 0


def call_func():
    print 'run call_func'
    raise ValueError('except in call_func')


def async_task():
    global times
    times += 1
    print 'run async task {}'.format(times)
    my_io_loop.add_callback(call_back=call_func)


@contextlib.contextmanager
def my_context():
    print '---enter my_context--->>'
    try:
        yield
    except Exception as e:
        print 'handler except: {}'.format(e)
    finally:
        print '<<---exit my_context ---'


def main():
    with MyStackContext(my_context):
        async_task()
    print 'end main'


if __name__ == '__main__':
    main()
    my_io_loop.start()

# ---enter my_context--->>
# run async task 1
# <<---exit my_context ---
# end main
# ---enter my_context--->>
# run call_func
# handler except: except in call_func
# <<---exit my_context ---
複製代碼

inspired by

Tornado 源碼分析(二)異步上下文管理(StackContext)code

copyright

author:bigfish
copyright: 許可協議 知識共享署名-非商業性使用 4.0 國際許可協議

相關文章
相關標籤/搜索