Tornado源碼分析系列之一: 化異步爲'同步'的Future和gen.coroutine

轉自:http://blog.nathon.wang/2015/06/24/tornado-source-insight-01-gen/html

用Tornado也有一段時間,Tornado的文檔仍是比較匱乏的,可是幸虧其代碼短小精悍,頗有可讀性,遇到問題時老是習慣深刻到其源碼中。
這對於提高本身的Python水平和對於網絡及HTTP的協議的理解也頗有幫助。本文是Tornado源碼系列的第一篇文章,網上關於Tornado源碼分
析的文章也很多,大可能是從Event loop入手,分析Event loop的工做原理,以及在其上如何構建TCPServer和HTTPServer。因此我就不想拾前
人的牙慧再去寫一遍,固然這些內容我後續會涉及到,可是作爲開篇第一章,我想從更加獨特的角度來分析Tornado,這裏就說說Tornado的gen
和concurrent兩個模塊, 這個話題網上彷佛還很少,呵呵。node

設計從需求出發,要考證一段的代碼爲何寫成這樣而不是那樣, 首先要看代碼解決了什麼需求。 看下代碼中的例子先:網絡

1
2
3
4
5
6
7
8
9
class AsyncHandler(RequestHandler):
@asynchronous
def get(self):
http_client = AsyncHTTPClient()
http_client.fetch('http://example.com', callback=self.on_fetch)

def on_fetch(self, response):
do_something_with_response(response)
self.render('template.html')

通過gen.coroutine修飾以後上面的這段代碼能夠改成app

1
2
3
4
5
6
7
class GenAsyncHandler(RequestHandler):
@gen.coroutine
def get(self):
http_client = AsyncHTTPClient()
response = yield http_client.fetch('http://example.com')
do_something_with_response(response)
self.render('template.html')

初識這段代碼以爲好神奇,其實gen.coroutine只不過是將一個基於callback的典型的異步調用適配成基於yield的僞同步,說是僞同步是由於代碼流程上類
似同步,可是實際倒是異步的。這樣作有幾個好處:
1。控制流跟同步相似,咱們知道callback裏去作控制流仍是比較噁心的,就算nodejs裏的async這樣的模塊,可是分支多起來也很是很差寫。(爽)
2。能夠共享變量,沒有了callback,全部的本地變量在同一個做用域中。 (爽爽)
3。能夠並行執行,yield能夠拋出list或dict,並行執行其中的異步流程。(爽爽爽。。。此處省略一萬個爽)異步

神奇的gen.coroutine裝飾器是怎麼作到這一切的?讓我首先買個關子,不是進入到gen裏面分析coroutine和Runner這兩核心的方法(類),而是首先分析一些這
些方法(類)中用到的一些技術, 而後再回到coroutine裝飾器和Runner類中。async

首先要理解的是generator是如何經過yield與外界進行通訊的。函數

1
2
3
4
5
6
7
8
9
def test():
print ('step 1.......')
res = yield 10
print ('step 2.......', res) (3)

gen = test()
gen.send(None) #next(gen) (1)
data = gen.send(20) (2)
print ('yield out .....', data)

步驟1啓動了generator,步驟2向generator內部發送數據,並經過yield向generator外部拋出結果10, 最後的執行結果是tornado

1
2
3
step 1.......
step 2....... 20
yield out ..... 10

而後讓我再說說Future,Future是對異步調用結果的封裝。一個callback型的異步調用的執行結果不只包括調用的返回,還包括調用得到返回以後須要執行的回調,因此才須要將
異步調用的結果封裝一下,做爲一個異步調用執行結果的佔位符。Future類基本能夠這麼寫oop

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
class Future(object):
def __init__(self):
self._callback = []
self._result = None
self._done = False

def set_callback(self, cb):
self._callback.append(cb)

def _run_callback(self):
for cb in self._callback:
cb()

def set_result(self, result)
self._done = True
self._result = result
self._run_callback()

def is_ready(self):
return self._done is True

固然這只是個簡約版的,詳細能夠參看concurrent.Future。fetch

最後再來講說另外一個重要的函數Task, 這個函數的主要做用是將一個callback型的異步調用適配成一個返回Future的異步調用,而這個做爲異步調用結果的Future會在原來的那個callback被時解析出來

1
2
3
4
5
6
7
8
9
def Task(func, *args, **kwargs):
future = Future
def set_result(result):
if future.done():
return
future.set_result(result)

func(*args, callback=_argument_adapter(set_result), **kwargs)
return future

這裏忽略了一些與本文無關的部分。能夠看到Task裏面構造了一個callback,_argument_adapter是將callback的參數進行適配,將不定參數適配成一個參數也就是result, 最後經過
future.set_result(result)將result賦值給future,這樣future就被解析出來。 那麼問題來了,AsyncHTTPClient並無通過Task的適配,而是直接返回一個Future。這個Future是在
何時解析的呢?進httpclient.py來看下AsyncHTTPClient是如何解析Future的,這是httpclient.py中的fetch函數,也就是咱們實際發起http請求的那個函數

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
def fetch(self, request, callback=None, raise_error=True, **kwargs):
.....
future = TracebackFuture()
if callback is not None:
callback = stack_context.wrap(callback)

def handle_future(future):
exc = future.exception()
if isinstance(exc, HTTPError) and exc.response is not None:
response = exc.response
elif exc is not None:
response = HTTPResponse(
request, 599, error=exc,
request_time=time.time()-request.start_time
)
else:
response = future.result()
self.io_loop.add_callback(callback, response)
future.add_done_callback(handle_future)

def handle_response(response):
if raise_error and response.error:
future.set_exception(response.error)
else:
future.set_result(response)
self.fetch_impl(request, handle_response)
return future

fetch中定義一個表明fetch異步調用執行結果的future,若是調用時傳入了callback,並非直接將callback傳給fetch_impl,而是首先給future設置一個名爲handle_future解析完成後的回調,這個handle_future
中經過add_callback把實際傳進來的callback加入到IOLoop中讓IOLoop規劃其調用。而傳入到fetch_impl中的callback 則換成被了handle_response這個函數,
fetch_impl最後會在當收到response的時候調用handle_response回調(這個有興趣能夠看下,若是之後有寫httpserver相關的分析可能會再分析), handle_response會解析出表明執行結果的future。對沒有設置callback的調用,future解析結束整個流程也就結束了。而對於設置了callback的調用,future完成以後會調用handle_future 。
畫個簡圖來描述一下調用過程
fetch->fetch_impl->HTTP請求直到有response或出錯,若是有response回調handle_response->future.set_result(response)(future有值了)->若是fetch帶了callback則handle_future->ioloop中調用callback
至此能夠看到AsyncHTTPClient是如何把一個callback型的異步調用轉換成一個返回future的異步調用,而這個future會在handle_response調用時被解析獲得返回的response。

好了,差很少該深刻gen.coroutine這個裝飾器以及其最終實現Runner類。其實看完上面的內容gen.coroutine和Runner的做用也呼之欲出,其主要功能就是拿到yield出的異步調用返回的future,看這個
future是否已經完成,若是完成就把結果再send到generator中,若是沒有完成就要爲future設置一個完成時回調,這個回調的主要做用就是啓動Runner(也就是調用run方法)。至於future啥時候完成,這個
gen.coroutine和Runner可無論,你必須設計一個AsyncHTTPClient中fetch那樣的返回Future的異步調用或者用Task封裝一下你的帶有callback的異步調用。下面是節選gen.coroutine裝飾器中主要方法
_make_coroutine_wrapper的代碼的主要部分

1
2
3
4
5
6
7
8
9

try:
yielded = next(result)
except (StopIteration, Return) as e:
future.set_result(getattr(e, 'value', None))
except Exception:
future.set_exc_info(sys.exc_info())
else:
Runner(result, future, yielded)

result就是被裝飾的函數返回的generator,next啓動這個generator, 若是generator拋出StopIteration和Return兩個異常,表示generator已經解析出結果,將這個結果設置給最後coroutine返回的
future。若是有其餘異常表示generator執行過程當中發生了異常,將異常設置到future中。排除這兩種狀況,表示generator尚未執行完畢,調用Runner執行generator。Runner的參數result就是還沒
有運行完畢generator, future是表明coroutine執行結果的那個future, 而yielded是func返回的future(或者YieldPoint,我們只考慮future的狀況)。再深刻到Runner中,主要有兩個函數handle_yield
和run,handle_yield主要是肯定generator返回的yielded是不是一個執行完成的yielded(對於yielded是future的狀況來講就是future.is_ready() == True),若是沒有執行完成則須要設置future完成時
執行run方法,也就是future.add_done_callback(future, lambda f:self.run())並返回False也就是不執行立刻run, 不然返回True並當即執行run方法,由於這時候已經有異步調用的結果了。
run方法拿到yielded的執行結果,並傳入到generator中。這樣generator內部就能經過yield拿到異步調用的執行結果了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
 def handle_yield(self, yielded):
#處理YieldPoint忽略掉,可是原理跟Future是同樣的
try:
self.future = convert_yielded(yielded)
except BadYieldError:
self.future = TracebackFuture()
self.future.set_exc_info(sys.exc_info())

if not self.future.done() or self.future is moment:
self.io_loop.add_future(
self.future, lambda f: self.run())
return False
return True

def run(self):
if self.running or self.finished:
return
try:
self.running = True
while True:
future = self.future
if not future.done(): #執行run時generator返回的那個future必須已經有結果,不然就不必傳回到generator中了
return
self.future = None
try:
orig_stack_contexts = stack_context._state.contexts
exc_info = None

try:
value = future.result()
except Exception:
self.had_exception = True
exc_info = sys.exc_info()

if exc_info is not None:
yielded = self.gen.throw(*exc_info)
exc_info = None
else:
yielded = self.gen.send(value)

if stack_context._state.contexts is not orig_stack_contexts:
self.gen.throw(
stack_context.StackContextInconsistentError(
'stack_context inconsistency (probably caused '
'by yield within a "with StackContext" block)'))
except (StopIteration, Return) as e:
#generator執行完畢併成功的處理
except Exception:
#generator執行過程當中異常的處理
if not self.handle_yield(yielded):
#這裏generator尚未執行完畢,yielded是generator迭代過一次以後返回的新yielded。若是yieled尚未被解析出結果就經過handle_yield給yieled設置完成時的重啓run的回調,
#不然yielded已經有結果,就再次運行run,因此run中才會有一個循環
return
finally:
self.running = False

分析完畢,沒看懂的同窗能夠在讀兩遍代碼,主要仍是要抓住coroutine裝飾器只不過是將callback型調用轉換成generator型僞同步調用的一個適配器這個關鍵點,閱讀起代碼來就明白多了。期待下篇吧,準備寫stack_context異步調用中的異常捕獲問題

相關文章
相關標籤/搜索