gevent是目前應用很是普遍的網絡庫,高效的輪詢IO庫libev加上greenlet實現的協程(coroutine),使得gevent的性能很是出色,尤爲是在web應用中。本文介紹gevent的調度流程,主要包括gevent對greenlet的封裝和使用,以及greenlet與libev的協做。閱讀本文須要對greenlet有必定的認識,能夠參考這篇文章,另外,本文分析的gevent版本爲1.2,能夠經過gevent.version_info查看版本號。html
gevent來源於eventlet,自稱比後者實現更簡單、API更方便且性能更好,許多開源的web服務器也使用了gevent,如gunicorn、paste,固然gevent本生也能夠做爲一個python web服務器使用。這篇文章對常見的wsgi server進行性能對比,gevent無論在http1.0仍是http1.1都表現很是出色。下圖是目前經常使用的http1.1標準下的表現:python
gevent高效的祕訣就是greenlet和libev啦,greenlet在以前的博文有介紹,gevent對greenlet的使用比較限制,只能在兩層協程之間切換,簡單也不容易出錯。libev使用輪訓非阻塞的方式進行事件處理,好比unix下的epoll。早期gevent使用libevent,後來替換成libev,由於libev「提供更少的核心功能以求更改的效率」,這裏有libev和libevent的性能對比:git
若是想了解gevent的調度流程,最重要的是對greenlet有基本的瞭解。下面總結一些我的認爲比較重要的點:github
在gevent中,有兩個類繼承了greenlet.greenlet,分別是gevent.hub.Hub和gevent.greenlet.Greenlet。後文中,若是是greenlet.greenlet這種寫法,那麼指的是原生的類庫greentlet,若是是greenlet(或者Greenlet)那麼指gevent封裝後的greenlet。web
首先,給出總結性的結論,後面再結合實例和源碼一步步分析。服務器
每一個gevent線程都有一個hub,前面提到hub是greenlet.greenlet的實例。hub實例在須要的時候創生(Lazy Created),那麼其parent是main greenlet。以後任何的Greenlet(注意是greenlet.greenlet的子類)實例的parent都設置成hub。hub調用libev提供的事件循環來處理Greenlet表明的任務,當Greenlet實例結束(正常或者異常)以後,執行邏輯又切換到hub。網絡
咱們看下面最簡單的代碼:app
>>> import gevent
>>> gevent.sleep(1)
>>>異步
上面的代碼很簡單,但事實上gevent的核心都包含在其中,接下來結合源碼進行分析。socket
首先看sleep函數(gevent.hub.sleep):
1 def sleep(seconds=0, ref=True): 2 hub = get_hub() 3 loop = hub.loop 4 if seconds <= 0: 5 waiter = Waiter() 6 loop.run_callback(waiter.switch) 7 waiter.get() 8 else: 9 hub.wait(loop.timer(seconds, ref=ref))
首先是獲取hub(第2行),而後在hub上wait這個定時器事件(第9行)。get_hub源碼以下(gevent.hub.get_hub):
1 def get_hub(*args, **kwargs): 2 """ 3 Return the hub for the current thread. 4 5 """ 6 hub = _threadlocal.hub 7 if hub is None: 8 hubtype = get_hub_class() 9 hub = _threadlocal.hub = hubtype(*args, **kwargs) 10 return hub
能夠看到,hub是線程內惟一的,以前也提到過greenlet是線程獨立的,每一個線程有各自的greenlet棧。hubtype默認就是gevent.hub.Hub,在hub的初始化函數(__init__)中,會建立loop屬性,默認也就是libev的python封裝。
回到sleep函數定義,hub.wait(loop.timer(seconds, ref=ref))。hub.wait函數很是關鍵,對於任何阻塞性操做,好比timer、io都會調用這個函數,其做用一句話歸納:從當前協程切換到hub,直到watcher對應的事件就緒再從hub切換回來。wait函數源碼以下(gevent.hub.Hub.wait):
1 def wait(self, watcher): 2 """ 3 Wait until the *watcher* (which should not be started) is ready. 4 5 """ 6 waiter = Waiter() 7 unique = object() 8 watcher.start(waiter.switch, unique) 9 try: 10 result = waiter.get() 11 if result is not unique: 12 raise InvalidSwitchError('Invalid switch into %s: %r (expected %r)' % (getcurrent(), result, unique)) 13 finally: 14 watcher.stop()
形參watcher就是loop.timer實例,其cython描述在corecext.pyx,咱們簡單理解成是一個定時器事件就好了。上面的代碼中,建立了一個Waiter(gevent.hub.Waiter)對象,這個對象起什麼做用呢,這個類的doc寫得很是清楚
Waiter.__doc__
A low level communication utility for greenlets.
Waiter is a wrapper around greenlet's ``switch()`` and ``throw()`` calls that makes them somewhat safer:
* switching will occur only if the waiting greenlet is executing :meth:`get` method currently;
* any error raised in the greenlet is handled inside :meth:`switch` and :meth:`throw`
* if :meth:`switch`/:meth:`throw` is called before the receiver calls :meth:`get`, then :class:`Waiter`
will store the value/exception. The following :meth:`get` will return the value/raise the exception
簡而言之,是對greenlet.greenlet類switch 和 throw函數的分裝,用來存儲返回值greenlet的返回值或者捕獲在greenlet中拋出的異常。咱們知道,在原生的greenlet中,若是一個greenlet拋出了異常,那麼該異常將會展開至其parent greenlet。
回到Hub.wait函數,第8行 watcher.start(waiter.switch, unique) 註冊了一個回調,在必定時間(1s)以後調用回調函數waiter.switch。注意,waiter.switch此時並無執行。而後第10行調用waiter.get。看看這個get函數(gevent.hub.Waiter.get):
1 def get(self): 2 """If a value/an exception is stored, return/raise it. Otherwise until switch() or throw() is called.""" 3 if self._exception is not _NONE: 4 if self._exception is None: 5 return self.value 6 else: 7 getcurrent().throw(*self._exception) 8 else: 9 if self.greenlet is not None: 10 raise ConcurrentObjectUseError('This Waiter is already used by %r' % (self.greenlet, )) 11 self.greenlet = getcurrent() # 存儲當前協程,以後從hub switch回來的時候使用 12 try: 13 return self.hub.switch() # switch到hub 14 finally: 15 self.greenlet = None
核心的邏輯在第11到15行,11行中,getcurrent獲取當前的greenlet(在這個測試代碼中,是main greenlet,即最原始的greenlet),將其複製給waiter.greenlet。而後13行switch到hub,在greenlet回顧章節的第二條提到,greenlet.greenlet的子類須要重寫run方法,當調用子類的switch時會調用到該run方法。Hub的run方法實現以下:
1 def run(self): 2 """ 3 Entry-point to running the loop. This method is called automatically 4 when the hub greenlet is scheduled; do not call it directly. 5 6 :raises LoopExit: If the loop finishes running. This means 7 that there are no other scheduled greenlets, and no active 8 watchers or servers. In some situations, this indicates a 9 programming error. 10 """ 11 assert self is getcurrent(), 'Do not call Hub.run() directly' 12 while True: 13 loop = self.loop 14 loop.error_handler = self 15 try: 16 loop.run() 17 finally: 18 loop.error_handler = None # break the refcount cycle 19 self.parent.throw(LoopExit('This operation would block forever', self))
loop天然是libev的事件循環。doc中提到,這個loop理論上會一直循環,若是結束,那麼代表沒有任何監聽的事件(包括IO 定時等)。以前在Hub.wait函數中註冊了定時器,那麼在這個run中,若是時間到了,那麼會調用定時器的callback,也就是以前的waiter.switch, 咱們再來看看這個函數(gevent.hub.Waiter.switch):
1 def switch(self, value=None): 2 """Switch to the greenlet if one's available. Otherwise store the value.""" 3 greenlet = self.greenlet 4 if greenlet is None: 5 self.value = value 6 self._exception = None 7 else: 8 assert getcurrent() is self.hub, "Can only use Waiter.switch method from the Hub greenlet" 9 switch = greenlet.switch 10 try: 11 switch(value) 12 except: 13 self.hub.handle_error(switch, *sys.exc_info())
核心代碼在第8到13行,第8行保證調用到該函數的時候必定在hub這個協程中,這是很天然的,由於這個函數必定是在Hub.run中被調用。第11行switch到waiter.greenlet這個協程,在講解waiter.get的時候就提到了waiter.greenlet是main greenlet。注意,這裏得switch會回到main greenlet被切出的地方(也就是main greenlet掛起的地方),那就是在waiter.get的第10行,整個邏輯也就恢復到main greenlet繼續執行。
總結:sleep的做用很簡單,觸發一個阻塞的操做,致使調用hub.wait,從當前greenlet.greenlet切換至Hub,超時以後再從hub切換到以前的greenlet繼續執行。經過這個例子能夠知道,gevent將任何阻塞性的操做封裝成一個Watcher,而後從調用阻塞操做的協程切換到Hub,等到阻塞操做完成以後,再從Hub切換到以前的協程。
上面這個例子,雖然可以理順gevent的調度流程,但事實上並無體現出gevent 協做的優點。接下來看看gevent tutorial的例子:
1 import gevent 2 3 def foo(): 4 print('Running in foo') 5 gevent.sleep(0) 6 print('Explicit context switch to foo again') 7 8 def bar(): 9 print('Explicit context to bar') 10 gevent.sleep(0) 11 print('Implicit context switch back to bar') 12 13 gevent.joinall([ 14 gevent.spawn(foo), 15 gevent.spawn(bar), 16 ]) 17 18 # output 19 Running in foo 20 Explicit context to bar 21 Explicit context switch to foo again 22 Implicit context switch back to bar
從輸出能夠看到, foo和bar依次輸出,顯然是在gevent.sleep的時候發生了執行流程切換,gevent.sleep再前面已經介紹了,那麼這裏主要關注spawn和joinall函數
gevent.spawn本質調用了gevent.greenlet.Greenlet的類方法spawn:
1 @classmethod 2 def spawn(cls, *args, **kwargs): 3 g = cls(*args, **kwargs) 4 g.start() 5 return g
這個類方法調用了Greenlet的兩個函數,__init__ 和 start. init函數中最爲關鍵的是這段代碼:
1 def __init__(self, run=None, *args, **kwargs): 2 greenlet.__init__(self, None, get_hub()) # 將新創生的greenlet實例的parent一概設置成hub 3 4 if run is not None: 5 self._run = run
start函數的定義也很簡單(gevent.greenlet.Greenlet.start):
1 def start(self): 2 """Schedule the greenlet to run in this loop iteration""" 3 if self._start_event is None: 4 self._start_event = self.parent.loop.run_callback(self.switch)
註冊回調事件self.switch到hub.loop,注意Greenlet.switch最終會調用到Greenlet._run, 也就是spawn函數傳入的callable對象(foo、bar)。這裏僅僅是註冊,但尚未開始事件輪詢,gevent.joinall就是用來啓動事件輪詢並等待運行結果的。
joinall函數會一路調用到gevent.hub.iwait函數:
1 def iwait(objects, timeout=None, count=None): 2 """ 3 Iteratively yield *objects* as they are ready, until all (or *count*) are ready 4 or *timeout* expired. 5 """ 6 # QQQ would be nice to support iterable here that can be generated slowly (why?) 7 if objects is None: 8 yield get_hub().join(timeout=timeout) 9 return 10 11 count = len(objects) if count is None else min(count, len(objects)) 12 waiter = _MultipleWaiter() # _MultipleWaiter是Waiter的子類 13 switch = waiter.switch 14 15 if timeout is not None: 16 timer = get_hub().loop.timer(timeout, priority=-1) 17 timer.start(switch, _NONE) 18 19 try: 20 for obj in objects: 21 obj.rawlink(switch) # 這裏往hub.loop註冊了回調 22 23 for idx in xrange(count): 24 print 'for in iwait', idx 25 item = waiter.get() # 這裏會切換到hub 26 print 'come here ', item, getcurrent() 27 waiter.clear() 28 if item is _NONE: 29 return 30 yield item 31 finally: 32 if timeout is not None: 33 timer.stop() 34 for obj in objects: 35 unlink = getattr(obj, 'unlink', None) 36 if unlink: 37 try: 38 unlink(switch) 39 except: 40 traceback.print_exc()
而後iwait函數第23行開始的循環,逐個調用waiter.get。這裏的waiter是_MultipleWaiter(Waiter)的實例,其get函數最終調用到Waiter.get。前面已經詳細介紹了Waiter.get,簡而言之,就是switch到hub。咱們利用greenlet的tracing功能能夠看到整個greenlet.greenlet的switch流程,修改後的代碼以下:
1 import gevent 2 import greenlet 3 def callback(event, args): 4 print event, args[0], '===:>>>>', args[1] 5 6 def foo(): 7 print('Running in foo') 8 gevent.sleep(0) 9 print('Explicit context switch to foo again') 10 11 def bar(): 12 print('Explicit context to bar') 13 gevent.sleep(0) 14 print('Implicit context switch back to bar') 15 16 print 'main greenlet info: ', greenlet.greenlet.getcurrent() 17 print 'hub info', gevent.get_hub() 18 oldtrace = greenlet.settrace(callback) 19 20 gevent.joinall([ 21 gevent.spawn(foo), 22 gevent.spawn(bar), 23 ]) 24 greenlet.settrace(oldtrace)
切換流程及緣由見下圖:
總結:gevent.spawn建立一個新的Greenlet,並註冊到hub的loop上,調用gevent.joinall或者Greenlet.join的時候開始切換到hub。
本文經過兩個簡單的例子並結合源碼分析了gevent的協程調度流程。gevent的使用很是方便,尤爲是在web server中,基本上應用App什麼都不用作就能享受gevent帶來的好處。筆者閱讀gevent源碼最重要的緣由在於想了解gevent對greenlet的封裝和使用,greenlet很強大,強大到容易出錯,而gevent保證在兩層協程之間切換,值得借鑑!
references:
http://www.cnblogs.com/xybaby/p/6337944.html
https://pypi.python.org/pypi/greenlet
http://software.schmorp.de/pkg/libev.html
http://nichol.as/benchmark-of-python-web-servers