asyncio系列之抽絲剝繭分析事件調度的核心原理

先來看一下一個簡單的例子網絡

例1:app

async def foo():
    print('enter foo ...')
    await bar()
    print('exit foo ...')

async def bar():
    print('enter bar ...')
    print('exit bar ...')

f = foo()
try:
    f.send(None)
except StopIteration as e:
    print(e.value)

例2:socket

async def foo():
    print('enter foo ...')
    try:
        bar().send(None)
    except StopIteration as e:
        pass
    print('exit foo ...')

async def bar():
    print('enter bar ...')
    print('exit bar ...')

f = foo()
try:
    f.send(None)
except StopIteration as e:
    print(e.value)

也就是說 await bar() 等價於這個async

try:
    bar().send(None)
except StopIteration as e:
    pass

更進一步來說,await 協程的嵌套就跟函數調用同樣,沒什麼兩樣。函數

def foo():
    print('enter foo ...')
    bar()
    print('exit foo ...')

def bar():
    print('enter bar ...')
    print('exit bar ...')

foo()

理解了跟函數調用同樣就能夠看當作是這樣:oop

執行f.send(None)時其實就是執行debug

print('enter foo ...')      

    print('enter bar ...')
    print('exit bar ...')

print('exit foo ...')

例3:code

class Future:

    def __iter__(self):
        print('enter Future ...')
        yield self
        print('foo 恢復執行')
        print('exit Future ...')
        
    __await__ = __iter__

async def foo():
    print('enter foo ...')
    await bar()
    print('exit foo ...')

async def bar():
    future = Future()
    print('enter bar ...')
    await future
    print('exit bar ...')

f = foo()
try:
    f.send(None)
    print('foo 掛起在yield處 ')
    print('--'*10)
    f.send(None)
except StopIteration as e:
    print(e.value)

執行結果:協程

enter foo ...
enter bar ...
enter Future ...
foo 掛起在yield處 
--------------------
foo 恢復執行
exit Future ...
exit bar ...
exit foo ...
None

Future是一個Awaitable對象,實現了__await__方法,await future 其實是會進入到future.__await__方法中也就是future.__iter__方法中的邏輯,執行到 yield self 處foo協程才真正被掛起,返回future對象自己,f.send(None)才真正的執行完畢,對象

  • 第一次調用f.send(None),執行:

    print('enter foo ...')
      print('enter bar ...')
      print('enter Future ...')

被掛起

  • 第二次調用f.send(None),執行:

    print('exit Future ...')
      print('exit bar ...')
      print('exit foo ...')

也就是說這樣一個foo協程完整的調用過程就是以下過程:

- foo print('enter foo ...')
    - bar print('enter bar ...')
        - future print('enter Future ...')   # 以上是第一次f.send(None)執行的邏輯,命名爲part1
        - future yield  self  ---------------------------------------------------------------
        - future print('exit Future ...')       # 如下是第二次f.send(None)執行的邏輯,命名爲part2
    - bar print('exit bar ...')
- foo print('exit foo ...')

加入咱們把這兩次f.send(None)調用的邏輯分別命名成part1和part2,那也就是說,經過future這個對象,準確的說是yield關鍵字,真正的把foo協程要執行的完整邏輯分紅了兩部分part1和patr2。而且foo的協程狀態會被掛起在yield處,這樣就要調用兩次f.send(None)才能,執行完foo協程,而不是在例2中,直接只調用一次f.send(None)就執行完了foo協程。這就是Future對象的做用。

重點:沒有 await future 的協程是沒有靈魂的協程,並不會被真正的掛起,只需一次 send(None) 調用便可執行完畢,只有有 await future
的協程纔是真正能夠被掛起的協程,須要執行兩次 send(None) 才能執行完該協程的完整邏輯。

這裏小結一下Future的做用

  1. yield 起到了掛起協程的做用。

  2. 經過 yield 把 foo 協程的執行邏輯真正的分紅了 part1 和 part2 兩部分。

例4:

class Future:

    def __iter__(self):
        print('enter Future ...')
        print('foo 掛起在yield處 ')
        yield self
        print('foo 恢復執行')
        print('exit Future ...')
        return 'future'

    __await__ = __iter__

class Task:

    def __init__(self, cor):
        self.cor = cor

    def _step(self):
        cor = self.cor
        try:
            result = cor.send(None)
        except Exception as e:
            pass

async def foo():
    print('enter foo ...')
    await bar()
    print('exit foo ...')

async def bar():
    future = Future()
    print('enter bar ...')
    await future
    print('exit bar ...')

f = foo()

task = Task(f)
task._step()
print('--' * 10)
task._step()

執行結果:

enter foo ...
enter bar ...
enter Future ...
foo 掛起在yield處 
--------------------
foo 恢復執行
exit Future ...
exit bar ...
exit foo ...

這個例4與例3不一樣在於,如今有一個Task類,咱們把f.send(None)d的操做,封裝在了 Task 的 _step 方法中,調用 task._step() 等因而執行 part1 中的邏輯,再次調用
task._step() 等因而執行part2中的邏輯。如今不想手動的 兩次調用task._step() 方法,咱們寫一個簡單的Loop類來幫忙完成對task._step的屢次調用,請看下面這個例子。

例5:

class Future:

    def __iter__(self):
        print('enter Future ...')
        print('foo 掛起在yield處 ')
        yield self
        print('foo 恢復執行')
        print('exit Future ...')
        return 'future'

    __await__ = __iter__

class Task:

    def __init__(self, cor, *, loop=None):
        self.cor = cor
        self._loop = loop

    def _step(self):
        cor = self.cor
        try:
            result = cor.send(None)
        except StopIteration as e:
            self._loop.close()
        except Exception as e:
            pass

class Loop:

    def __init__(self):
        self._stop = False

    def create_task(self, cor):
        task = Task(cor, loop = self)
        return task

    def run_until_complete(self, task):
        while not self._stop:
            task._step()

    def close(self):
        self._stop = True

async def foo():
    print('enter foo ...')
    await bar()
    print('exit foo ...')

async def bar():
    future = Future()
    print('enter bar ...')
    await future
    print('exit bar ...')

if __name__ == '__main__':
    
    f = foo()
    loop = Loop()
    task = loop.create_task(f)
    loop.run_until_complete(task)

執行結果:

enter foo ...
enter bar ...
enter Future ...
foo 掛起在yield處 
foo 恢復執行
exit Future ...
exit bar ...
exit foo ...

例5中咱們實現了一個簡單 Loop 類,在while循環中調用task._step方法。

例6:

class Future:

    def __init__(self, *, loop=None):
        self._result = None
        self._callbacks = []

    def set_result(self, result):
        self._result = result
        callbacks = self._callbacks[:]
        self._callbacks = []
        for callback in callbacks:
            loop._ready.append(callback)

    def add_callback(self, callback):
        self._callbacks.append(callback)

    def __iter__(self):
        print('enter Future ...')
        print('foo 掛起在yield處 ')
        yield self
        print('foo 恢復執行')
        print('exit Future ...')
        return 'future'

    __await__ = __iter__

class Task:

    def __init__(self, cor, *, loop=None):
        self.cor = cor
        self._loop = loop

    def _step(self):
        cor = self.cor
        try:
            result = cor.send(None)
        # 1. cor 協程執行完畢時,會拋出StopIteration,說明cor執行完畢了,這是關閉loop
        except StopIteration as e:
            self._loop.close()
        # 2. 有異常時
        except Exception as e:
            """處理異常邏輯"""
        # 3. result爲Future對象時
        else:
            if isinstance(result, Future):
                result.add_callback(self._wakeup)
                # 當即調用,讓下一loop輪循環中立馬執行self._wakeup
                result.set_result(None)

    def _wakeup(self):
        self._step()


class Loop:

    def __init__(self):
        self._stop = False
        self._ready = []
    def create_task(self, cor):
        task = Task(cor, loop = self)
        self._ready.append(task._step)
        return task

    def run_until_complete(self, task):

        assert isinstance(task, Task)

        while not self._stop:
            n = len(self._ready)
            for i in range(n):
                step = self._ready.pop()
                step()
    def close(self):
        self._stop = True

async def foo():
    print('enter foo ...')
    await bar()
    print('exit foo ...')

async def bar():
    future = Future(loop=loop)
    print('enter bar ...')
    await future
    print('exit bar ...')

if __name__ == '__main__':

    f = foo()
    loop = Loop()
    task = loop.create_task(f)
    loop.run_until_complete(task)

執行結果:

enter foo ...
enter bar ...
enter Future ...
foo 掛起在yield處 
foo 恢復執行
exit Future ...
exit bar ...
exit foo ...

在例6中,咱們構建了3個稍微複雜點類,Loop類,Task, Future類,這3個類在整個協程執行流程的調度過程當中有很強的相互做用關係。

  • Future

    掛起協程的執行流程,把協程的邏輯分爲part1和part2兩部分。

  • Task

    把協程的part1和part2邏輯封裝到task._step和task._wakeup方法中,在不一樣的時機分別把它們註冊到loop對象中,task._step是建立task實例的時候就註冊到了loop中,task._wakeup則是在task._setp執行完掛在yield future處,因爲有await future語句的存在,必然是返回一個future對象,判斷確實是一個future對象,就把task._wakeup註冊到future中,future.set_result()則會在合適的時機被調用,一旦它被調用,就會把future中註冊的task._wakeup註冊到loop中,而後就會在loop循環中調用task._wakeup,協程的part2的邏輯才得以執行,最後拋出StopIteration異常。

  • Loop

    在一個死循環中執行註冊到loop中的task._step和task._wakeup方法,完成對協程完整邏輯的執行。

雖然咱們本身構建的這三個類的實現很簡單,可是這體現asyncio實現事件循環的核心原理,咱們實現loop中並無模擬耗時等待以及對真正IO事件的監聽,對應於asyncio來講,它也是構建了Future, Task, Loop這3個類,只是功能要比咱們本身構建的要複雜得多,loop對象的while中經過select(timeout)函數的調用實現模擬耗時操做和實現了對網絡IO事件的監聽,這樣咱們只要在寫了一個執行一個IO操做時,都會有一個future對象 await future,經過future來掛起當前的協程,好比想進行一個socket鏈接,協程的僞代碼以下:

future = Future
# 非阻塞調用,須要try...except...
socket.connect((host, port))    
# 註冊一個回調函數到write_callbackselect中,只要socket發生可寫事件,就執行回調
add_writer(write_callback, future)
await future
...

當咱們在調用socket.connect((host, port)),由於是非阻塞socket,會立馬返回,而後把這個write_callback, future註冊成select的可寫事件的回調函數,這個回調函數何時被執行呢,就是在loop循環的select(timeout)返回了可寫事件時纔會觸發,回調函數中會調用future.set_result(),也就是說future.set_result的觸發時機是在socket鏈接成功時,select(timeout)返回了可寫事件時,future.set_result的做用就是把協程的part2部分註冊到loop,而後在下一輪的循環中當即調用,使得協程的await future下面的語句得以繼續執行。

因爲我這裏沒有貼asyncio的loop,task,future對象的源碼,因此這個例子看起來會很抽象,在上一篇asyncio中貼了這幾個類的源碼,想詳細瞭解的能夠查看個人上一篇文章《asyncio系列之簡單協程的基本執行流程分析》。小夥伴們也能夠對照着asyncio的源碼來debug,這樣再來理解這裏說的這個例子就比較容易了。

下一篇將介紹asyncio.sleep()的實現機制。

相關文章
相關標籤/搜索