python 協程庫gevent學習--源碼學習(一)

總算仍是要來梳理一下這幾天深刻研究以後學習到的東西了。segmentfault

這幾天一直在看之前跟jd對接的項目寫的那個gevent代碼。爲了查錯,基本上深刻淺出了一次gevent幾個重要部件的實現和其工做的原理。函數

 

這裏用一個簡單demo依次分析運行流程和介紹相關概念最後得出結論:oop

import gevent

def test_1():
    print '切換不出去'
    print '切換出去我不是循環'
    gevent.sleep(1)

def test_2():
    print '切換'
    print '切換出去我去'
    gevent.sleep(3)

gevent.spawn(test_1)
gevent.spawn(test_2)
gevent.sleep(1)

 

 

在具體介紹各部分具體怎麼運轉得時候我想要先提幾個必定會用到的gevent類:源碼分析

gevent hub:能夠在圖上明顯看到,hub.loop其實就是gevent的事件循環核心。這也是全部Greenlet實例的parents。想要實現回調,須要去hub上註冊一個你當前棧的回調,以讓hub在處理完其餘事情以後能使用greenlet.switch(注意大寫的Greenlet是gevent從新實現的類繼承了greenlet。區別文中的大小寫對於理解很重要)回到原來的棧中。學習

Waiter類:其實我以爲Waiter類要理解到他的功能以後,纔會以爲比較簡單。咱們能夠把Waiter實例化以後將他的switch方法註冊到hub中。這裏看一段代碼:this

result = Waiter()
timer = get_hub().loop.timer(5)
timer.start(result.switch, 'hello from Waiter')
print result.get()

也就是上面的第三行。這裏的第二行能夠實現向主循環中註冊一個5秒等待事件。註冊以後就開始計時了。最後調用get方法去切換到hub主循環。下面上get的代碼:spa

    def get(self):
        """If a value/an exception is stored, return/raise it. Otherwise until switch() or throw() is called."""
        if self._exception is not _NONE:
            if self._exception is None:
                return self.value
            else:
                getcurrent().throw(*self._exception)
        else:
            assert self.greenlet is None, 'This Waiter is already used by %r' % (self.greenlet, )
            self.greenlet = getcurrent()
            try:
                return self.hub.switch()
            finally:
                self.greenlet = None

這裏會執行self.greenlet = getcurrent(), 將當前棧的保存到self.greenlet中,以保證後面能夠經過開始向主循環中註冊的Waiter().switch切換回來。而後調用self.hub.switch()方法:.net

    def switch(self):
        switch_out = getattr(getcurrent(), 'switch_out', None)
        if switch_out is not None:
            switch_out()
        return greenlet.switch(self)

這裏最後一句像主循環進行切換以後,運行hub的run方法:code

    def run(self):
        assert self is getcurrent(), 'Do not call Hub.run() directly'
        while True:
            loop = self.loop
            loop.error_handler = self
            try:
                loop.run()
            finally:
                loop.error_handler = None  # break the refcount cycle
            self.parent.throw(LoopExit('This operation would block forever'))
        # this function must never return, as it will cause switch() in the parent greenlet
        # to return an unexpected value
        # It is still possible to kill this greenlet with throw. However, in that case
        # switching to it is no longer safe, as switch will return immediatelly

執行loop.run方法,就能夠開始依次運行回調了。在第一次執行的時候到這裏啓用hub的loop循環而後執行了loop.run以後就是依次運行註冊的回調。對象

下次再有的回調運行的都是在loop循環裏執行不會再運行到hub調用run方法了這裏注意。

 

執行註冊過來的Waiter().switch回調切換到Waiter.switch中進行執行繼續看代碼:

    def switch(self, value=None):
        """Switch to the greenlet if one's available. Otherwise store the value."""
        greenlet = self.greenlet
        if greenlet is None:
            self.value = value
            self._exception = None
        else:
            assert getcurrent() is self.hub, "Can only use Waiter.switch method from the Hub greenlet"
            switch = greenlet.switch
            try:
                switch(value)
            except:
                self.hub.handle_error(switch, *sys.exc_info())

將當時Waiter()裏面保存的greenlet拿出來,帶上value參數切換回去。這裏至關於咱們切換回main。回到當時切換到hub的地方:

            try:
                return self.hub.switch()
            finally:
                self.greenlet = None

return self.hub.switch()帶回來的值而後執行finally清空Waiter()的greenlet的值爲None結束運行。下面我會用不一樣的例子來展現Waiter的用法。

greenlet: greenlet 提供了一種在不一樣的調用棧之間自由跳躍的功能。

libev: 這裏用到了loop watcher.timer,其實真正在處理io事件的時候這個纔是更重要的。 

 

下面開始講解最頂上貼出的例子:

貼出gevent.spawn的源碼:

    @classmethod
    def spawn(cls, *args, **kwargs):
        """Return a new :class:`Greenlet` object, scheduled to start.

        The arguments are passed to :meth:`Greenlet.__init__`.
        """
        g = cls(*args, **kwargs)
        g.start()
        return g

運行到gevent.spawn(test1)的時候會實例化一個Greenlet實例g。g調用了start方法,將g.switch註冊到hub中。如下是start方法的源碼:

    def start(self):
        """Schedule the greenlet to run in this loop iteration"""
        if self._start_event is None:
            self._start_event = self.parent.loop.run_callback(self.switch)

調用loop.run_callback註冊greenlet.switch方法到主循環hub中。

 

總的來講gevent.spawn()乾的事情就是生成一個Greenlet實例,而後將這個實例的self.switch方法註冊到主循環回調中。test2同理,直接看到gevent.sleep(1)

gevent.sleep()是很是重要也很是有用的實現,我以爲這是理解gevent顯式切換(explicit switch)的關鍵。我不想精簡代碼,因此貼上全部源碼慢慢分析:

def sleep(seconds=0, ref=True):
    """Put the current greenlet to sleep for at least *seconds*.

    *seconds* may be specified as an integer, or a float if fractional seconds
    are desired.

    If *ref* is false, the greenlet running sleep() will not prevent gevent.wait()
    from exiting.
    """
    hub = get_hub()
    loop = hub.loop
    if seconds <= 0:
        waiter = Waiter()
        loop.run_callback(waiter.switch)
        waiter.get()
    else:
        hub.wait(loop.timer(seconds, ref=ref))

仍是先得到hub,而後將hub.loop保存給loop,以後判斷有沒有傳seconds參數咱們傳遞了seconds參數爲1s,因而調用hub的wait方法將watcher loop.timer()作參數傳遞進去。

這裏watcher的叫法來源於libev事件驅動庫,hub.loop中對底層的libev庫作了一一對應的封裝。這裏咱們使用的是一個timer的watcher,那麼當咱們處理io事件的時候,使用的事件驅動可能就會變成io的watcher了。繼續往下看hub.wait函數:

    def wait(self, watcher):
        waiter = Waiter()
        unique = object()
        watcher.start(waiter.switch, unique)
        try:
            result = waiter.get()
            assert result is unique, 'Invalid switch into %s: %r (expected %r)' % (getcurrent(), result, unique)
        finally:
            watcher.stop()

 實例化Waiter()類。而後向loop.timerwatcher註冊一個waiter.switch,帶了個參數unique。而後執行waiter.get()

    def get(self):
        """If a value/an exception is stored, return/raise it. Otherwise until switch() or throw() is called."""
        if self._exception is not _NONE: if self._exception is None: return self.value else: getcurrent().throw(*self._exception) else: assert self.greenlet is None, 'This Waiter is already used by %r' % (self.greenlet, ) self.greenlet = getcurrent() try: return self.hub.switch() finally: self.greenlet = None

這裏會執行self.greenlet = getcurrent(), 將當前棧的保存到self.greenlet中,以保證後面能夠經過開始向主循環中註冊的Waiter().switch切換回來,而後調用self.hub.switch()

    def switch(self):
        switch_out = getattr(getcurrent(), 'switch_out', None)
        if switch_out is not None:
            switch_out()
        return greenlet.switch(self)

而後到hub.switch()函數中切換攜程到Hub中執行run()。

    def run(self):
        assert self is getcurrent(), 'Do not call Hub.run() directly'
        while True:
            loop = self.loop
            loop.error_handler = self
            try:
                loop.run()
            finally:
                loop.error_handler = None  # break the refcount cycle
            self.parent.throw(LoopExit('This operation would block forever'))
        # this function must never return, as it will cause switch() in the parent greenlet
        # to return an unexpected value
        # It is still possible to kill this greenlet with throw. However, in that case
        # switching to it is no longer safe, as switch will return immediatelly

繼續執行loop.run。這裏loop.run就會開始執行剛纔註冊上來的回調,咱們第一個註冊上來的回調是_run=test1的Greenlet回調。這裏注意這裏執行的run方法其實就已是Greenlet的run方法了,回調回來的時候Greenlet繼承的greenlet底層實現了執行的時候會執行他的run方法,咱們也能夠經過重寫_run方法本身定義這裏會執行的邏輯。

    def run(self):
        try:
            if self._start_event is None:
                self._start_event = _dummy_event
            else:
                self._start_event.stop()
            try:
                result = self._run(*self.args, **self.kwargs)
            except:
                self._report_error(sys.exc_info())
                return
            self._report_result(result)
        finally:
            self.__dict__.pop('_run', None)
            self.__dict__.pop('args', None)
            self.__dict__.pop('kwargs', None)

當執行到這一句 result = self._run(*self.args, **self.kwargs)的時候,咱們會執行最開始保存在Greenlet中的_run函數也就是test1。而後就會去執行test1函數了。

 

執行test1函數以後咱們繼續使用gevent.sleep(1)向hub註冊回調。注意這個回調確定要排在main函數也就是最外層函數的後面,無論外面函數休眠多少秒這裏都會等待,由於很重要的一點是hub自己實際上是順序且阻塞的。

 

而後這個時候會繼續運行到test2所在的Greenlet對象執行以後繼續用gevent.sleep(1)註冊回調。而後下一次再運行到self.hub.switch時greenlet.switch(self)方法就會切換到第一個註冊的Waiter().switch()上了,也就是咱們在main上面使用gevent.sleep(1)註冊的那個回調。這個回調會讓咱們順利返回到main上的那個greenlet.至此就結束了整個過程。

 

其實切換比較難以理解的我以爲仍是思路和那可惡的處處都是的switch。並且各種的switch方法還都叫switch。。。一不當心就弄錯,絕對是難理解的關鍵。 我本人不用本子記的時候看得很是暈。這些東西我相信只有多看纔可以理解。 後面的文章我會繼續探索,隱式切換的方方面面 以及寫一些實例來控制切換。gevent 這傢伙給我埋坑太深,我已經下決心要徹底摸透了。

 

Reference:

http://blog.csdn.net/yueguanghaidao/article/details/24281751 gevent源碼分析

https://segmentfault.com/a/1190000000613814 gevent源碼分析

http://xlambda.com/gevent-tutorial/#_2 gevent指南

http://blog.csdn.net/yueguanghaidao/article/details/39122867 [gevent源碼分析] gevent兩架馬車-libev和greenlet

相關文章
相關標籤/搜索