事件循環和協程

前言

第一次接觸異步編程這個概念是在 Python 裏面,去年的時候就由於不清楚 Python 中異步編程的實現原理找了不少資料研究,但最後也沒有搞得很清楚。python

後來又由於重心逐漸轉移到了 Java 上,就暫時放棄了對 Python 中異步原理的探究,直到前段時間,被 JavaScript 的異步坑了一次後, 讓我想起了 Python 裏面的異步……ajax

而後我就在網上看到了一篇文章,看了半天后在文章的評論中發現了本身兩年前的評論……編程

當時我那個心情啊,太曲折了,因而乎決定找個時間在研究一下 Python 中的異步,這即是這篇博客的由來。多線程

  • 注 1:雖然這篇博客源自探究 Python 異步編程實現原理的過程,可是,並不會包含太多 Python 中異步編程的實現原理,由於我尚未搞明白 QAQ
  • 注 2:博客中最後的實現代碼存在問題,但一時間找不到解決辦法,只好做爲一種思路發出來,等待後續研究解決

Python 中異步編程模型的特殊性

我前後接觸過了 Python、JavaScript1 和 Java 中的異步編程,其中,Python 的是最爲特殊的一個,由於它是 單線程異步阻塞 的。app

一般來講,異步編程模型下最小的執行單位是一個個任務(每每就是一個函數),而爲了可以在一個任務執行完成後執行另外的操做,又會引入回調任務(函數)。 不一樣的任務和回調任務的執行,每每又是經過事件循環和任務隊列來完成。異步

假如將這些東西拆分開來,像 JavaScript 那樣,放在不一樣的線程下面處理,是很容易理解的,由於總體結構足夠清晰,可是 Python 不行,由於 GIL 的緣由,若是 Python 仍是使用多線程的方式來實現異步編程的話, 並不能帶來多少性能上的提高,所以,Python 異步中的事件循環、任務隊列和任務的執行都在一個線程裏面。async

並且,Python 中異步的實現是基於協程的,這不只使得 Python 中的異步是阻塞的,並且,最小執行單位再也不是單個任務,而是單個任務的一部分。異步編程

這就讓 Python 中異步的實現變得複雜起來,原本 Python 的源碼就很差讀,好傢伙,如今更很差讀了,同時又由於異步編程每每都和異步 I/O 掛鉤,刷的一下,源碼中一堆和異步 I/O 相關的代碼……函數

這讓我明白了,想快速搞明白 Python 異步是咋回事是不可能的,畢竟,我要作的是經過閱讀源碼倒推做者的思路,這很難!!!oop

所以,我換了一個思路,我先本身用協程實現一個簡單的事件循環,在慢慢去讀 Python 的源碼,總能夠了吧!

這就是爲啥這篇博客說是在探究 Python 異步編程的實現原理,可是標題連 Python 這個單詞都沒有的緣由。

協程的基本認識

Python 中的協程是經過生成器來實現的,可是,基本上全部博客將協程的時候都會說的一句話,協程不等於生成器,它們只是長得像:

def grep(pattern):
    """ >>> g = grep("python") >>> g.send(None) Looking for python >>> g.send("Yeah, but no, but yeah, but no") >>> g.send("A series of tubes") >>> g.send("python generators rock!") python generators rock! """
    print("Looking for %s" % pattern)
    while True:
        line = yield
        if pattern in line:
            print(line)
複製代碼

上面這個協程不斷接收來自 send 方法的輸入,在通過判斷後進行輸出,假如把它當作生成器使用的話,那麼你只能獲得無數的 None 值。

本質上,在低版本的 Python 中生成器和協程沒有區別,就看你怎麼用,關鍵就在於協程 消費 值,而生成器 生成 值,固然了,高版本的 Python 對協程提供了更多的支持, 使得它們再也不同樣,可是,這篇博客裏面,全部的協程都經過生成器實現。

所以,咱們須要關注後面會用到的幾個特性:

  1. yield 的左值會接收來自 send 方法的輸入,可是協程在第一次運行時還沒到達 yield 語句處,所以沒法傳遞參數,只能經過 None 值來調用協程:

    def coroutine():
        while True:
            val = yield
            print(val)
    複製代碼

    執行輸出:

    In [9]: coro = coroutine()
    
    In [10]: coro.send(None)
    
    In [11]: coro.send(1)
    1
    
    In [12]: coro = coroutine()
    
    In [13]: coro.send(1)
    ---------------------------------------------------------------------------
    TypeError                                 Traceback (most recent call last)
    <ipython-input-13-e272bd1527da> in <module>()
    ----> 1 coro.send(1)
    
    TypeError: can't send non-None value to a just-started generator
    複製代碼
  2. 能夠經過 yield from 語句遞歸調用協程,效果以下:

    def coroutine():
        for i in range(3):
            val = yield
            print('coroutine %s' % val)
    
    def invoker():
        yield from coroutine()
    複製代碼

    執行輸出(就是會報異常):

    In [29]: coro = invoker()
    
    In [30]: coro.send(None)
    
    In [31]: coro.send(1)
    coroutine 1
    
    In [32]: coro.send(2)
    coroutine 2
    
    In [33]: coro.send(3)
    coroutine 3
    ---------------------------------------------------------------------------
    StopIteration                             Traceback (most recent call last)
    <ipython-input-33-8e657389bc11> in <module>()
    ----> 1 coro.send(3)
    
    StopIteration:
    複製代碼
  3. 協程能夠有返回值,保存在 StopIteration 異常中,做爲 yield from 的左值時能夠直接接收:

    def coroutine():
        val = yield
        return 'coroutine %s' % val
    複製代碼

    執行輸出:

    In [42]: coro = coroutine()
    
    In [43]: coro.send(None)
    
    In [44]: coro.send(1)
    ---------------------------------------------------------------------------
    StopIteration                             Traceback (most recent call last)
    <ipython-input-44-e272bd1527da> in <module>()
    ----> 1 coro.send(1)
    
    StopIteration: coroutine 1
    複製代碼

簡單事件循環的實現

原本想將 Future & TaskEventLoop 分紅兩節的,結果 TaskEventLoop 耦合在了一塊兒,只好合在一塊兒了,下面是代碼:

class EventLoop:
    def __init__(self):
        self._ready = []

    def call_soon(self, task):
        self._ready.append(task)

    def run_forever(self, coro):
        root = Task(coro, self)
        while self._ready:
            task = self._ready.pop(0)
            task.step(Future())
        return root.result

class Future:
    def __init__(self):
        # 經過 result 來保存協程的返回值
        self.result = None
        # 經過 _callbacks 來保存回調函數
        self._callbacks = []

    def add_done_callback(self, fn):
        self._callbacks.append(fn)

    def set_result(self, result):
        # try suppression bug
        self.result = self.result or result
        # 執行完成後將自身做爲參數傳遞給回調函數
        for callback in self._callbacks:
            callback(self)

class Task(Future):
    # 協程類型
    coroutine = type((i for i in range(0)))

    def __init__(self, coro, loop):
        super().__init__()
        self.coro = coro
        self.loop = loop
        # 將自身加入任務隊列
        self.loop.call_soon(self)

    def step(self, future):
        try:
            result = self.coro.send(future.result)
        except StopIteration as exc:
            # 觸發 StopIteration 異常時說明協程已經執行結束
            self.set_result(exc.value)
        else:
            # 協程返回協程,將其轉換爲 Task 後將 self.step 註冊爲期回調函數等待喚醒
            if type(result) == self.coroutine:
                result = Task(result, self.loop)
                result.add_done_callback(self.step)
            # 協程返回任務,將 self.step 註冊爲回調函數等待喚醒
            elif isinstance(result, Task):
                # there is a bug
                result.add_done_callback(self.step)
                self.loop.call_soon(result)
            # 協程返回其餘東西,不受理,直接將 self 再次放入任務隊列
            else:
                self.loop.call_soon(self)
複製代碼

一開始實現的時候是想用一個外部的事件循環來操做,不須要 Task 持有事件循環,可是實現過程當中發現那樣存在一點問題,便學着 Python 中的方式將事件循環傳遞給 Task 操做, 但這裏的實現是依然存在問題。

在只存在協程和同序返回 Task 的狀況下測試沒有問題,可是當存在異序返回 Task 的狀況下問題就出現了,下面的測試代碼即是異序返回,我經過在 set_result 中判斷 result 的方式暫時抑制了該異常, 可是,這是治標不治本的方式。若是有大佬知道方案,請務必告訴我 QAQ

測試代碼:

_loop = EventLoop()

def main():
    ta = Task(say_hello(), _loop)
    tb = Task(say_world(), _loop)

    b = yield tb
    a = yield ta

    return a + b

def say_world():
    print('world')
    yield
    return 'world'

def say_hello():
    print('hello')
    yield from say_other()
    return 'hello '


def say_other():
    print('other')
    yield

print(_loop.run_forever(main()))
複製代碼

輸出:

hello
other
world
hello world
複製代碼

結語

折騰了一圈後結果仍是隻能獲得一份存在問題的代碼,和去年的時候差很少,但比去年好的是,多少多了一點思路。

可是,仍是差得好遠啊……

參考連接

Footnotes

1 ES6 中 async/await 的原理尚未怎麼了解過,所以這裏的異步只包括 ajax 這類異步操做

相關文章
相關標籤/搜索