#Coroutineshtml
做者:MetalBug 時間:2015-03-17 出處:http://my.oschina.net/u/247728/blog 聲明:版權全部,侵犯必究
tornado.gen
— Simplify asynchronous code tornado.concurrent
— Work with threads and futures安全
##1.Future Future
用於保存異步任務的結果。 在同步應用中,一般用Future
存儲來自其餘線程異步任務的結果。 可是在Tornado中,Future
並非線程安全的,這是處於效率的考慮,由於Tornado模型的使用的是 one loop per thread
。數據結構
###1.1內部實現-實現細節 在Future
中,使用了_TracebackLogger
用於記錄exception。 在設置了exception(即異步任務拋異常),添加_TracebackLogger
。 當調用了Funture.result()
,Future.exception()
時會將_TracebackLogger
清除。由於調用者已經知道了出現異常。 只有在沒有調用以上函數時,Future
被析構時_TracebackLogger
會將錯誤信息記錄。這樣防止異步任務中出現異常卻由於沒被調用致使異常不被意識到。app
Future.set_exec_info
中,添加_TracebackLogger
異步
def set_exc_info(self, exc_info): ## if not _GC_CYCLE_FINALIZERS: self._tb_logger = _TracebackLogger(exc_info)
在__del__
中,記錄exceptionasync
if _GC_CYCLE_FINALIZERS: def __del__(self): if not self._log_traceback: # set_exception() was not called, or result() or exception() # has consumed the exception return tb = traceback.format_exception(*self._exc_info) app_log.error('Future %r exception was never retrieved: %s', self, ''.join(tb).rstrip())
##2.coroutine coroutine
是一個修飾符,使用coroutine
,能夠將異步的操做以一個Generator
實現,而不用寫一系列回調函數。函數
示例:tornado
一般狀況下異步操做須要寫成回調函數:oop
class AsyncHandler(RequestHandler): @asynchronous def get(self): http_client = AsyncHTTPClient() http_client.fetch("http://example.com", callback=self.on_fetch) def on_fetch(self, response): do_something_with_response(response) self.render("template.html")
使用coroutine
,能夠寫成一個Generator
形式,而不用寫成多個回調。fetch
class GenAsyncHandler(RequestHandler): @gen.coroutine def get(self): http_client = AsyncHTTPClient() response = yield http_client.fetch("http://example.com") do_something_with_response(response) self.render("template.html")
在用coroutine
修飾的Generator
中,在遇到異步函數時,使用yield
,這裏須要注意的是在Tornado中,異步函數返回的大部分是Future
(還能夠是dict
,list
可用convert_yield
轉換),這樣yield
返回的的Future.result()
。
coroutine
內部經過_make_coroutine_wrapper
和Runner
實現。 流程圖以下:
###1._make_coroutine_wrapper
####內部實現-主要函數 _make_coroutine_wrapper
能夠認爲是一個Inline的Runner
。 由於不能保證每一個被修飾的func都是Generator
,都會yield,因此將獲取第一個yield
分離開來減小了初始化Runner
的開銷。
其主要作了如下工做:
調用func,獲得result
若是result是Generator
,調用Generator.next
獲得第一個yield
,調用Runner
處理以後工做
def _make_coroutine_wrapper(func, replace_callback): @functools.wraps(func) def wrapper(*args, **kwargs): future = TracebackFuture() ### try: result = func(*args, **kwargs) ### else: if isinstance(result, types.GeneratorType): try: ##用於檢查棧的一致性,由於yield可以在StackContext中返回 orig_stack_contexts = stack_context._state.contexts yielded = next(result) ### ##generator已經執行完畢,或者經過yield返回一個Return(Exception) except (StopIteration, Return) as e: future.set_result(getattr(e, 'value', None)) except Exception: future.set_exc_info(sys.exc_info()) ### else: ##調用Runner處理後續工做 Runner(result, future, yielded) try: return future finally: future = None future.set_result(result) return future return wrapper
####內部實現-實現細節
_make_coroutine_wrapper
在返回Future
時,使用了try...finally
。 這裏對內存進行了優化,若是在next(result)
時拋出異常,那麼Future.set_exc_info
會被調用,這時候_TracebackLogger
記錄當前棧內容(使用traceback),也包含了future
自己所在棧,這樣出現了循環引用,將函數內即本地的future
置爲None
,可以避免循環,從而提升GC回收效率。
# Subtle memory optimization: if next() raised an exception, # the future's exc_info contains a traceback which # includes this stack frame. This creates a cycle, # which will be collected at the next full GC but has # been shown to greatly increase memory usage of # benchmarks (relative to the refcount-based scheme # used in the absence of cycles). We can avoid the # cycle by clearing the local variable after we return it. try: return future finally: future = None
###2.Runner
Runner
處理Generator
,將執行結果以Future
返回。
其工做流程以下:
yield
返回的是Future
,得Runner
到Future
的result,調用Generator.send
將result返回到Generator
中Generator.next
獲得下一個yield
,回到1####內部實現-數據結構
self.gen
處理的Generator
self.result_future
用於存儲result的Future
self.future
當前yield的異步函數的Future
####內部實現-主要函數
在初始化中,能夠看到主要涉及到的是handle_yield
和run
函數。
def __init__(self, gen, result_future, first_yielded): ### if self.handle_yield(first_yielded): self.run()
handle_yield 處理當前yield的異步函數。 若是異步函數已經完成(即self.future.done()
),那麼返回Ture
。 不然利用IOLoop.add_future
將self.run
註冊到IOLoop
中,因此當self.future
完成時,調用self.run
。
if not self.future.done() or self.future is moment: self.io_loop.add_future( self.future, lambda f: self.run()) return False return True
run 開始和重啓Generator
,持續執行到下一個yield
。當Generator
已經完成,返回設置好結果的Future
。
def run(self): ### try: self.running = True while True: future = self.future ### try: ### try: value = future.result() ### else: ###將self.future.result()發送到generator中,重啓generator yielded = self.gen.send(value) ### ###generator結束,設置self.result_future而後返回 except (StopIteration, Return) as e: self.finished = True self.future = _null_future if self.pending_callbacks and not self.had_exception: raise LeakedCallbackError( "finished without waiting for callbacks %r" % self.pending_callbacks) self.result_future.set_result(getattr(e, 'value', None)) self.result_future = None self._deactivate_stack_context() return ### if not self.handle_yield(yielded): return finally: self.running = False