Tornado4.2源碼分析-Coroutine(協程)

#Coroutineshtml

做者:MetalBug
時間:2015-03-17
出處:http://my.oschina.net/u/247728/blog
聲明:版權全部,侵犯必究

tornado.gen — Simplify asynchronous code tornado.concurrent — Work with threads and futures安全

##1.Future Future用於保存異步任務的結果。 在同步應用中,一般用Future存儲來自其餘線程異步任務的結果。 可是在Tornado中,Future並非線程安全的,這是處於效率的考慮,由於Tornado模型的使用的是 one loop per thread數據結構

###1.1內部實現-實現細節 在Future中,使用了_TracebackLogger 用於記錄exception。 在設置了exception(即異步任務拋異常),添加_TracebackLogger。 當調用了Funture.result(),Future.exception() 時會將_TracebackLogger清除。由於調用者已經知道了出現異常。 只有在沒有調用以上函數時,Future被析構時_TracebackLogger會將錯誤信息記錄。這樣防止異步任務中出現異常卻由於沒被調用致使異常不被意識到。app

Future.set_exec_info中,添加_TracebackLogger異步

def set_exc_info(self, exc_info):
    ##
    if not _GC_CYCLE_FINALIZERS:
        self._tb_logger = _TracebackLogger(exc_info)

__del__中,記錄exceptionasync

if _GC_CYCLE_FINALIZERS:
    def __del__(self):
        if not self._log_traceback:
            # set_exception() was not called, or result() or exception()
            # has consumed the exception
            return

        tb = traceback.format_exception(*self._exc_info)

        app_log.error('Future %r exception was never retrieved: %s',
                      self, ''.join(tb).rstrip())

##2.coroutine coroutine是一個修飾符,使用coroutine,能夠將異步的操做以一個Generator實現,而不用寫一系列回調函數。函數

示例:tornado

一般狀況下異步操做須要寫成回調函數:oop

class AsyncHandler(RequestHandler):
    @asynchronous
    def get(self):
        http_client = AsyncHTTPClient()
        http_client.fetch("http://example.com",
                          callback=self.on_fetch)

    def on_fetch(self, response):
        do_something_with_response(response)
        self.render("template.html")

使用coroutine,能夠寫成一個Generator形式,而不用寫成多個回調。fetch

class GenAsyncHandler(RequestHandler):
    @gen.coroutine
    def get(self):
        http_client = AsyncHTTPClient()
        response = yield http_client.fetch("http://example.com")
        do_something_with_response(response)
        self.render("template.html")

在用coroutine修飾的Generator中,在遇到異步函數時,使用yield,這裏須要注意的是在Tornado中,異步函數返回的大部分是Future(還能夠是dictlist 可用convert_yield轉換),這樣yield返回的的Future.result()

coroutine內部經過_make_coroutine_wrapperRunner實現。 流程圖以下: Tornado4.2-coroutine

###1._make_coroutine_wrapper

####內部實現-主要函數 _make_coroutine_wrapper能夠認爲是一個Inline的Runner。 由於不能保證每一個被修飾的func都是Generator,都會yield,因此將獲取第一個yield分離開來減小了初始化Runner的開銷。

其主要作了如下工做:

  1. 調用func,獲得result

  2. 若是result是Generator,調用Generator.next獲得第一個yield,調用Runner處理以後工做

    def _make_coroutine_wrapper(func, replace_callback): @functools.wraps(func) def wrapper(*args, **kwargs): future = TracebackFuture() ### try: result = func(*args, **kwargs) ### else: if isinstance(result, types.GeneratorType): try: ##用於檢查棧的一致性,由於yield可以在StackContext中返回 orig_stack_contexts = stack_context._state.contexts yielded = next(result) ### ##generator已經執行完畢,或者經過yield返回一個Return(Exception) except (StopIteration, Return) as e: future.set_result(getattr(e, 'value', None)) except Exception: future.set_exc_info(sys.exc_info()) ### else: ##調用Runner處理後續工做 Runner(result, future, yielded) try: return future finally: future = None future.set_result(result) return future return wrapper

####內部實現-實現細節

_make_coroutine_wrapper在返回Future時,使用了try...finally。 這裏對內存進行了優化,若是在next(result)時拋出異常,那麼Future.set_exc_info會被調用,這時候_TracebackLogger記錄當前棧內容(使用traceback),也包含了future自己所在棧,這樣出現了循環引用,將函數內即本地的future置爲None,可以避免循環,從而提升GC回收效率。

# Subtle memory optimization: if next() raised an exception,
# the future's exc_info contains a traceback which
# includes this stack frame.  This creates a cycle,
# which will be collected at the next full GC but has
# been shown to greatly increase memory usage of
# benchmarks (relative to the refcount-based scheme
# used in the absence of cycles).  We can avoid the
# cycle by clearing the local variable after we return it.
try:
    return future
finally:
    future = None

###2.Runner

Runner處理Generator,將執行結果以Future返回。

其工做流程以下:

  1. yield返回的是Future,得RunnerFuture的result,調用Generator.send將result返回到Generator
  2. 繼續調用Generator.next獲得下一個yield,回到1

####內部實現-數據結構

self.gen 處理的Generator self.result_future 用於存儲result的Future self.future 當前yield的異步函數的Future

####內部實現-主要函數

在初始化中,能夠看到主要涉及到的是handle_yieldrun函數。

def __init__(self, gen, result_future, first_yielded):
    ###
    if self.handle_yield(first_yielded):
        self.run()

handle_yield 處理當前yield的異步函數。 若是異步函數已經完成(即self.future.done()),那麼返回Ture。 不然利用IOLoop.add_futureself.run註冊到IOLoop中,因此當self.future完成時,調用self.run

if not self.future.done() or self.future is moment:
        self.io_loop.add_future(
            self.future, lambda f: self.run())
        return False
return True

run 開始和重啓Generator,持續執行到下一個yield。當Generator已經完成,返回設置好結果的Future

def run(self):
    ###
    try:
        self.running = True
        while True:
            future = self.future
            ###
            try:
            ###
                try:
                    value = future.result()
                ###
                else:
                ###將self.future.result()發送到generator中,重啓generator
                    yielded = self.gen.send(value)
                ###
            ###generator結束,設置self.result_future而後返回
            except (StopIteration, Return) as e:
                self.finished = True
                self.future = _null_future
                if self.pending_callbacks and not self.had_exception:
                    raise LeakedCallbackError(
                        "finished without waiting for callbacks %r" %
                        self.pending_callbacks)
                self.result_future.set_result(getattr(e, 'value', None))
                self.result_future = None
                self._deactivate_stack_context()
                return
            ###
            if not self.handle_yield(yielded):
                return
    finally:
        self.running = False
相關文章
相關標籤/搜索