先來看一下一個簡單的例子網絡
例1:app
async def foo(): print('enter foo ...') await bar() print('exit foo ...') async def bar(): print('enter bar ...') print('exit bar ...') f = foo() try: f.send(None) except StopIteration as e: print(e.value)
例2:socket
async def foo(): print('enter foo ...') try: bar().send(None) except StopIteration as e: pass print('exit foo ...') async def bar(): print('enter bar ...') print('exit bar ...') f = foo() try: f.send(None) except StopIteration as e: print(e.value)
也就是說 await bar()
等價於這個async
try: bar().send(None) except StopIteration as e: pass
更進一步來說,await 協程的嵌套就跟函數調用同樣,沒什麼兩樣。函數
def foo(): print('enter foo ...') bar() print('exit foo ...') def bar(): print('enter bar ...') print('exit bar ...') foo()
理解了跟函數調用同樣就能夠看當作是這樣:oop
執行f.send(None)時其實就是執行debug
print('enter foo ...') print('enter bar ...') print('exit bar ...') print('exit foo ...')
例3:code
class Future: def __iter__(self): print('enter Future ...') yield self print('foo 恢復執行') print('exit Future ...') __await__ = __iter__ async def foo(): print('enter foo ...') await bar() print('exit foo ...') async def bar(): future = Future() print('enter bar ...') await future print('exit bar ...') f = foo() try: f.send(None) print('foo 掛起在yield處 ') print('--'*10) f.send(None) except StopIteration as e: print(e.value)
執行結果:協程
enter foo ... enter bar ... enter Future ... foo 掛起在yield處 -------------------- foo 恢復執行 exit Future ... exit bar ... exit foo ... None
Future是一個Awaitable對象,實現了__await__方法,await future 其實是會進入到future.__await__方法中也就是future.__iter__方法中的邏輯,執行到 yield self 處foo協程才真正被掛起,返回future對象自己,f.send(None)才真正的執行完畢,對象
第一次調用f.send(None),執行:
print('enter foo ...') print('enter bar ...') print('enter Future ...')
被掛起
第二次調用f.send(None),執行:
print('exit Future ...') print('exit bar ...') print('exit foo ...')
也就是說這樣一個foo協程完整的調用過程就是以下過程:
- foo print('enter foo ...') - bar print('enter bar ...') - future print('enter Future ...') # 以上是第一次f.send(None)執行的邏輯,命名爲part1 - future yield self --------------------------------------------------------------- - future print('exit Future ...') # 如下是第二次f.send(None)執行的邏輯,命名爲part2 - bar print('exit bar ...') - foo print('exit foo ...')
加入咱們把這兩次f.send(None)調用的邏輯分別命名成part1和part2,那也就是說,經過future這個對象,準確的說是yield關鍵字,真正的把foo協程要執行的完整邏輯分紅了兩部分part1和patr2。而且foo的協程狀態會被掛起在yield處,這樣就要調用兩次f.send(None)才能,執行完foo協程,而不是在例2中,直接只調用一次f.send(None)就執行完了foo協程。這就是Future對象的做用。
重點:沒有 await future 的協程是沒有靈魂的協程,並不會被真正的掛起,只需一次 send(None) 調用便可執行完畢,只有有 await future
的協程纔是真正能夠被掛起的協程,須要執行兩次 send(None) 才能執行完該協程的完整邏輯。
這裏小結一下Future的做用
yield 起到了掛起協程的做用。
經過 yield 把 foo 協程的執行邏輯真正的分紅了 part1 和 part2 兩部分。
例4:
class Future: def __iter__(self): print('enter Future ...') print('foo 掛起在yield處 ') yield self print('foo 恢復執行') print('exit Future ...') return 'future' __await__ = __iter__ class Task: def __init__(self, cor): self.cor = cor def _step(self): cor = self.cor try: result = cor.send(None) except Exception as e: pass async def foo(): print('enter foo ...') await bar() print('exit foo ...') async def bar(): future = Future() print('enter bar ...') await future print('exit bar ...') f = foo() task = Task(f) task._step() print('--' * 10) task._step()
執行結果:
enter foo ... enter bar ... enter Future ... foo 掛起在yield處 -------------------- foo 恢復執行 exit Future ... exit bar ... exit foo ...
這個例4與例3不一樣在於,如今有一個Task類,咱們把f.send(None)d的操做,封裝在了 Task 的 _step 方法中,調用 task._step() 等因而執行 part1 中的邏輯,再次調用
task._step() 等因而執行part2中的邏輯。如今不想手動的 兩次調用task._step() 方法,咱們寫一個簡單的Loop類來幫忙完成對task._step的屢次調用,請看下面這個例子。
例5:
class Future: def __iter__(self): print('enter Future ...') print('foo 掛起在yield處 ') yield self print('foo 恢復執行') print('exit Future ...') return 'future' __await__ = __iter__ class Task: def __init__(self, cor, *, loop=None): self.cor = cor self._loop = loop def _step(self): cor = self.cor try: result = cor.send(None) except StopIteration as e: self._loop.close() except Exception as e: pass class Loop: def __init__(self): self._stop = False def create_task(self, cor): task = Task(cor, loop = self) return task def run_until_complete(self, task): while not self._stop: task._step() def close(self): self._stop = True async def foo(): print('enter foo ...') await bar() print('exit foo ...') async def bar(): future = Future() print('enter bar ...') await future print('exit bar ...') if __name__ == '__main__': f = foo() loop = Loop() task = loop.create_task(f) loop.run_until_complete(task)
執行結果:
enter foo ... enter bar ... enter Future ... foo 掛起在yield處 foo 恢復執行 exit Future ... exit bar ... exit foo ...
例5中咱們實現了一個簡單 Loop 類,在while循環中調用task._step方法。
例6:
class Future: def __init__(self, *, loop=None): self._result = None self._callbacks = [] def set_result(self, result): self._result = result callbacks = self._callbacks[:] self._callbacks = [] for callback in callbacks: loop._ready.append(callback) def add_callback(self, callback): self._callbacks.append(callback) def __iter__(self): print('enter Future ...') print('foo 掛起在yield處 ') yield self print('foo 恢復執行') print('exit Future ...') return 'future' __await__ = __iter__ class Task: def __init__(self, cor, *, loop=None): self.cor = cor self._loop = loop def _step(self): cor = self.cor try: result = cor.send(None) # 1. cor 協程執行完畢時,會拋出StopIteration,說明cor執行完畢了,這是關閉loop except StopIteration as e: self._loop.close() # 2. 有異常時 except Exception as e: """處理異常邏輯""" # 3. result爲Future對象時 else: if isinstance(result, Future): result.add_callback(self._wakeup) # 當即調用,讓下一loop輪循環中立馬執行self._wakeup result.set_result(None) def _wakeup(self): self._step() class Loop: def __init__(self): self._stop = False self._ready = [] def create_task(self, cor): task = Task(cor, loop = self) self._ready.append(task._step) return task def run_until_complete(self, task): assert isinstance(task, Task) while not self._stop: n = len(self._ready) for i in range(n): step = self._ready.pop() step() def close(self): self._stop = True async def foo(): print('enter foo ...') await bar() print('exit foo ...') async def bar(): future = Future(loop=loop) print('enter bar ...') await future print('exit bar ...') if __name__ == '__main__': f = foo() loop = Loop() task = loop.create_task(f) loop.run_until_complete(task)
執行結果:
enter foo ... enter bar ... enter Future ... foo 掛起在yield處 foo 恢復執行 exit Future ... exit bar ... exit foo ...
在例6中,咱們構建了3個稍微複雜點類,Loop類,Task, Future類,這3個類在整個協程執行流程的調度過程當中有很強的相互做用關係。
Future
掛起協程的執行流程,把協程的邏輯分爲part1和part2兩部分。
Task
把協程的part1和part2邏輯封裝到task._step和task._wakeup方法中,在不一樣的時機分別把它們註冊到loop對象中,task._step是建立task實例的時候就註冊到了loop中,task._wakeup則是在task._setp執行完掛在yield future處,因爲有await future語句的存在,必然是返回一個future對象,判斷確實是一個future對象,就把task._wakeup註冊到future中,future.set_result()則會在合適的時機被調用,一旦它被調用,就會把future中註冊的task._wakeup註冊到loop中,而後就會在loop循環中調用task._wakeup,協程的part2的邏輯才得以執行,最後拋出StopIteration異常。
Loop
在一個死循環中執行註冊到loop中的task._step和task._wakeup方法,完成對協程完整邏輯的執行。
雖然咱們本身構建的這三個類的實現很簡單,可是這體現asyncio實現事件循環的核心原理,咱們實現loop中並無模擬耗時等待以及對真正IO事件的監聽,對應於asyncio來講,它也是構建了Future, Task, Loop這3個類,只是功能要比咱們本身構建的要複雜得多,loop對象的while中經過select(timeout)函數的調用實現模擬耗時操做和實現了對網絡IO事件的監聽,這樣咱們只要在寫了一個執行一個IO操做時,都會有一個future對象 await future,經過future來掛起當前的協程,好比想進行一個socket鏈接,協程的僞代碼以下:
future = Future # 非阻塞調用,須要try...except... socket.connect((host, port)) # 註冊一個回調函數到write_callbackselect中,只要socket發生可寫事件,就執行回調 add_writer(write_callback, future) await future ...
當咱們在調用socket.connect((host, port)),由於是非阻塞socket,會立馬返回,而後把這個write_callback, future註冊成select的可寫事件的回調函數,這個回調函數何時被執行呢,就是在loop循環的select(timeout)返回了可寫事件時纔會觸發,回調函數中會調用future.set_result(),也就是說future.set_result的觸發時機是在socket鏈接成功時,select(timeout)返回了可寫事件時,future.set_result的做用就是把協程的part2部分註冊到loop,而後在下一輪的循環中當即調用,使得協程的await future下面的語句得以繼續執行。
因爲我這裏沒有貼asyncio的loop,task,future對象的源碼,因此這個例子看起來會很抽象,在上一篇asyncio中貼了這幾個類的源碼,想詳細瞭解的能夠查看個人上一篇文章《asyncio系列之簡單協程的基本執行流程分析》。小夥伴們也能夠對照着asyncio的源碼來debug,這樣再來理解這裏說的這個例子就比較容易了。
下一篇將介紹asyncio.sleep()的實現機制。