以前之因此看greenlet的代碼實現,主要就是想要看看gevent庫的實現代碼。java
。。python
而後知道了gevent的協程是基於greenlet來實現的。。。因此就又先去看了看greenlet的實現。。。多線程
這裏就不說greenlet的詳細實現了。關鍵就是棧數據的複製拷貝,棧指針的位移。app
。。less
因爲gevent帶有本身的I/O以及定時循環,因此它對greenlet又加了一層的擴展。。jvm
。ide
這裏咱們用例如如下的代碼來舉樣例,而後再來詳細的分析gevent是怎樣擴展greenlet的吧:函數
import gevent def hello(fname): print "hello : ", fname gevent.sleep(0) print "12321 : ", fname task1 = gevent.spawn(hello, "fjs1") task2 = gevent.spawn(hello, "fjs2") task1.join()
這段代碼的輸出例如如下:工具
嗯,那麼閒來看看spawn方法是怎樣建立協程的吧:oop
#類方法,這個說白了gevent提供的一層構造 @classmethod def spawn(cls, *args, **kwargs): """Return a new :class:`Greenlet` object, scheduled to start. The arguments are passed to :meth:`Greenlet.__init__`. """ g = cls(*args, **kwargs) #先構造greenlet對象 g.start() #調用start方法,至關於在hub對象的loop上註冊回調,這個回調的做用就是調用當前greenlet的switch切換到這個greenlet的運行 return g
這種方法是一個類方法,用於建立一個Greenlet,只是這個要注意。當前這個greenlet已經不是前面提到的greenlet庫中定義的那樣了,其作了一層簡單的擴展。
。
。
來看看構造函數:
#繼承了greenlet,至關因而過擴展了一些功能 class Greenlet(greenlet): """A light-weight cooperatively-scheduled execution unit.""" def __init__(self, run=None, *args, **kwargs): hub = get_hub() greenlet.__init__(self, parent=hub) #這裏將所有建立的greenlet對象的parent都指向了這個惟一的hub對象 if run is not None: self._run = run #記錄run信息 self.args = args self.kwargs = kwargs self._links = deque() self.value = None self._exception = _NONE self._notifier = None self._start_event = None
這裏直接繼承了greenlet庫中greenlet的定義。而後在構造函數中有比較重要的地方,可以看到,所有構造出來的協程的parent都將會指向一個名字叫hub的主協程。。
。
這個很是關鍵。它就是整個gevent的主循環協程,所有建立的業務協程的執行都要依賴於它的調度和管理。。
。
好了。在上面spawn過程當中。最後還調用了start方法啓動協程。那麼咱們來看看這種方法的定義吧:
#事實上這個主要是在hub對象的loop上面掛起一個要運行的回調,而這個回調的功能就是切換到這個greenlet的運行 def start(self): """Schedule the greenlet to run in this loop iteration""" if self._start_event is None: #事實上這個僅僅是在hub的loop上面掛起一個回調,而後在hub的loop裏面會運行這個回調 self._start_event = self.parent.loop.run_callback(self.switch) #在hub對象的loop裏面調用當前greenlet的switch回調,開始run方法的運行
代碼仍是很是easy。事實上無非就是在parent也就是hub的loop上面註冊了一個回調,而這個回調就是當前這個協程的switch方法,。。那麼等到這個回調被運行的時候,那麼也就是開始這個協程的運行的時候。
。。
這裏咱們先不去看hub以及它的loop的實現。
。。就先將其理解爲主循環,管理所有的回調。定時,以及I/O事件。
。。
嗯,接下來來看看join方法的實現吧。
。
熟悉多線程的都知道。在多線程環境下,join方法就是堵塞當前線程。直到join的目的線程返回了爲止。
。
。固然這裏就不是線程了。變成了協程。
。。。
來看看這個join方法的代碼吧:
#將當前運行環境掛起,知道join的greenlet運行完了 def join(self, timeout=None): """Wait until the greenlet finishes or *timeout* expires. Return ``None`` regardless. """ if self.ready(): #假設都已經跑完了,那麼直接返回吧 return else: switch = getcurrent().switch #獲取當前greenlet的switch self.rawlink(switch) #註冊當前環境greenlet的回調,那麼之後在這個要等待的greenlet運行完後。將會回調這個 try: t = Timeout.start_new(timeout) #建立一個timer對象 try: result = self.parent.switch() #中止當前環境greenlet的運行,調度hub運行 assert result is self, 'Invalid switch into Greenlet.join(): %r' % (result, ) finally: t.cancel() #取消timeout except Timeout: self.unlink(switch) #在掛起的回調中去除 if sys.exc_info()[1] is not t: raise except: self.unlink(switch) raise
首先調用getCurrent方法來獲取當前環境的協程,而後獲取它的switch方法,將其放置到要join的協程的回調隊列裏面,當這個要join的協程執行完了以後,將會調用這些回調。這樣,就可以恢復當前協程的執行了。。。
在如下咱們可以看到。調用了parent也就是hub的switch方法。切換到hub的運行,這個裏面將會開始要join的協程的運行,這裏並不是直接切換到join的協程的運行。。這點需要注意。。
。
另外,gevent本身的greenlet的定義增長了run方法,也就是每次運行都將會從這裏開始。
。。代碼例如如下:
#當前greenlet的運行部分,事實上就是調用傳進來的函數。而後運行完了以後再調用那些掛起的回調 def run(self): try: if self._start_event is None: self._start_event = _dummy_event else: self._start_event.stop() try: result = self._run(*self.args, **self.kwargs) #運行傳進來的函數 except: self._report_error(sys.exc_info()) return self._report_result(result) #這個主要是用於運行掛起的回調 finally: self.__dict__.pop('_run', None) self.__dict__.pop('args', None) self.__dict__.pop('kwargs', None)
#這個主要是爲了在hub的loop中掛起回調,用於運行當前這個greenlet所有掛起的回調 #這裏也不是立刻運行這些在這個greenlet上面掛起的回調,而是運行繼續掛到loop的回調上面去。這樣可以讓當前協程儘快返回 #而且假設就在當前協程運行這些回調會出問題,因爲假設回調帶有別的協程的switch方法,那麼switch以後。就再也回不到這個協程繼續運行別的回調了 #而在loop上面運行這些回調,也就是hub上,運行這些回調,即便切換到別的協程,之後也會早晚回到hub上繼續運行,因此能保證回調能所有運行完。。 def _report_result(self, result): self._exception = None self.value = result if self._links and not self._notifier: self._notifier = self.parent.loop.run_callback(self._notify_links)
至於爲何這麼大費周章。上面的凝視應該說的很是清楚了吧。。。
好了,接下來再來分析一下sleep的實現,代碼例如如下:
#事實上sleep的主要目的就是將當前的運行切換出去,回到hub的主循環 def sleep(seconds=0, ref=True): """Put the current greenlet to sleep for at least *seconds*. *seconds* may be specified as an integer, or a float if fractional seconds are desired. If *ref* is false, the greenlet running sleep() will not prevent gevent.wait() from exiting. """ hub = get_hub() #獲取hub對象 loop = hub.loop #獲取hub的loop對象 if seconds <= 0: #假設這裏並無時間 waiter = Waiter() #建立waiter對象。主要是爲了維護當前greenlet與hub之間的切換 loop.run_callback(waiter.switch) #在loop上面掛起一個回調,事實上就是在loop中再恢復當前sleep的greenlet的運行 waiter.get() #在這個裏面最基本的功能就是記錄當前的greenlet對象。而後將棧切換到hub上面運行 else: hub.wait(loop.timer(seconds, ref=ref)) #帶定時的wait
事實上這裏分爲了兩種種類,就是在sleep的時候傳入的超時時間。小於等於0的以及大於0的。。。
對於sleep操做,假設是在多線程的環境裏,好比java的sleep,事實上就是堵塞當前的線程。這樣子jvm會調度別的線程的執行,而對於gevent,事實上不少其它的是可以理解爲當前協程主動的放棄CPU資源,等到之後再執行。
。
。
首先來看看對於超時小於等於零的。事實上原理很是easy,就是進行switch,切換到hub協程的運行,並且在hub的loop上面註冊一個回調。用於切換回到當前協程的運行。。。
這裏有一點需要的注意的就是。並無直接在代碼中體現switch的操做,而是多了一個waiter對象。。。而後在loop上面註冊的回調是waiter的switch方法,而後調用了waiter對象的get方法。。。
這裏看gevent的凝視才知道。waiter對象可以理解爲gevent封裝的協程之間的協做工具,詳細的協程之間的切換都由waiter來作。避免讓用戶本身的代碼涉及到switch操做。因爲這樣子很是easy出錯。。
。咱們來看看waiter的定義吧:
#事實上這個對象僅僅是爲了維護用戶greenlet與hub之間的切換關係 #將會在hub裏面註冊當前waiter對象的switch方法做爲回調,而後在hub的loop裏面將會運行這個回調 class Waiter(object): """A low level communication utility for greenlets. Wrapper around greenlet's ``switch()`` and ``throw()`` calls that makes them somewhat safer: * switching will occur only if the waiting greenlet is executing :meth:`get` method currently; * any error raised in the greenlet is handled inside :meth:`switch` and :meth:`throw` * if :meth:`switch`/:meth:`throw` is called before the receiver calls :meth:`get`, then :class:`Waiter` will store the value/exception. The following :meth:`get` will return the value/raise the exception. The :meth:`switch` and :meth:`throw` methods must only be called from the :class:`Hub` greenlet. The :meth:`get` method must be called from a greenlet other than :class:`Hub`. >>> result = Waiter() >>> timer = get_hub().loop.timer(0.1) >>> timer.start(result.switch, 'hello from Waiter') >>> result.get() # blocks for 0.1 seconds 'hello from Waiter' If switch is called before the greenlet gets a chance to call :meth:`get` then :class:`Waiter` stores the value. >>> result = Waiter() >>> timer = get_hub().loop.timer(0.1) >>> timer.start(result.switch, 'hi from Waiter') >>> sleep(0.2) >>> result.get() # returns immediatelly without blocking 'hi from Waiter' .. warning:: This a limited and dangerous way to communicate between greenlets. It can easily leave a greenlet unscheduled forever if used incorrectly. Consider using safer :class:`Event`/:class:`AsyncResult`/:class:`Queue` classes. """ __slots__ = ['hub', 'greenlet', 'value', '_exception'] def __init__(self, hub=None): if hub is None: self.hub = get_hub() #獲取頂層hub對象 else: self.hub = hub self.greenlet = None self.value = None self._exception = _NONE def clear(self): self.greenlet = None self.value = None self._exception = _NONE def __str__(self): if self._exception is _NONE: return '<%s greenlet=%s>' % (type(self).__name__, self.greenlet) elif self._exception is None: return '<%s greenlet=%s value=%r>' % (type(self).__name__, self.greenlet, self.value) else: return '<%s greenlet=%s exc_info=%r>' % (type(self).__name__, self.greenlet, self.exc_info) def ready(self): """Return true if and only if it holds a value or an exception""" return self._exception is not _NONE def successful(self): """Return true if and only if it is ready and holds a value""" return self._exception is None @property def exc_info(self): "Holds the exception info passed to :meth:`throw` if :meth:`throw` was called. Otherwise ``None``." if self._exception is not _NONE: return self._exception #調度greenlet的運行,這種方法僅僅能在hub的loop裏面運行 def switch(self, value=None): """Switch to the greenlet if one's available. Otherwise store the value.""" greenlet = self.greenlet if greenlet is None: self.value = value self._exception = None else: #僅僅能在hub裏面調用waiter的switch方法 assert getcurrent() is self.hub, "Can only use Waiter.switch method from the Hub greenlet" switch = greenlet.switch try: switch(value) #恢復記錄的greenlet的運行 except: self.hub.handle_error(switch, *sys.exc_info()) def switch_args(self, *args): return self.switch(args) def throw(self, *throw_args): """Switch to the greenlet with the exception. If there's no greenlet, store the exception.""" greenlet = self.greenlet if greenlet is None: self._exception = throw_args else: assert getcurrent() is self.hub, "Can only use Waiter.switch method from the Hub greenlet" throw = greenlet.throw try: throw(*throw_args) except: self.hub.handle_error(throw, *sys.exc_info()) #這個的最基本的做用就是記錄要等待的greenlet def get(self): """If a value/an exception is stored, return/raise it. Otherwise until switch() or throw() is called.""" if self._exception is not _NONE: if self._exception is None: return self.value else: getcurrent().throw(*self._exception) else: assert self.greenlet is None, 'This Waiter is already used by %r' % (self.greenlet, ) self.greenlet = getcurrent() #記錄當前的greenlet對象。在hub的loop裏面將會調用當前waiter的switch回調,將會恢復這個greenlet的運行 try: return self.hub.switch() #切換到hub上面去運行,那麼原來那個greenlet的運行到這裏就臨時中斷了,待會switch會這裏繼續運行 finally: self.greenlet = None def __call__(self, source): if source.exception is None: self.switch(source.value) else: self.throw(source.exception) # can also have a debugging version, that wraps the value in a tuple (self, value) in switch() # and unwraps it in wait() thus checking that switch() was indeed called
這個代碼應很是好理解,而且凝視都說的很是清楚。
。。
比較重要的就是get方法,這種方法將會保存當前運行的協程,而後切換到hub的運行,對於switch方法,將會切換回剛開始的協程的運行。。
好了,上面介紹了sleep不帶超時的實現。
。
接下來來看看帶超時的實現:
hub.wait(loop.timer(seconds, ref=ref)) #帶定時的wait
這裏首先建立了一個timer對象。這個可以理解爲在loop上面註冊了一個超時,接着看代碼:
#用於在loop上面註冊watcher並等待 def wait(self, watcher): waiter = Waiter() #首先建立一個waiter對象 unique = object() watcher.start(waiter.switch, unique) #當watcher超時的時候將會調用waiter的switch方法 try: result = waiter.get() #調用waiter的get方法,主要是讓將當前調用sleep的greenlet切換出去。而後切換到hub的執行 assert result is unique, 'Invalid switch into %s: %r (expected %r)' % (getcurrent(), result, unique) finally: watcher.stop()
依舊是建立waiter對象,以及它的get方法,只是這裏要注意的是,將waiter的switch回調是註冊到剛剛建立的timer對象上的,而不是直接註冊到loop上面。這樣待會timer超時的時候將會調用回調。恢復sleep的協程的運行。。
好了。這裏gevent的大致上協程,以及切換關係都幾乎相同了。
。
。。