本文將會講述Python 3.5以後出現的async/await的使用方法,我從上看到一篇不錯的博客,本身對其進行了梳理。該文章原地址http://www.javashuo.com/article/p-fsblxikv-d.htmlhtml
def fun(): return 1 if __name__ == '__main__': fun()
普通函數,沒有什麼特別的,直接函數名加括號調用便可。python
def generator_fun(): yield 1 if __name__ == '__main__': print(generator_fun()) #<generator object generator_fun at 0x00000000005E6E08> print(generator_fun().send(None)) for i in generator_fun(): print(i)
與普通函數不一樣的是,生成器函數返回的是一個生成器對象。咱們能夠經過send()方法或者是for循環的方法來對其進行取值。express
async def async_fun(): return 1 if __name__ == '__main__': print(async_fun()) # <coroutine object fun at 0x0000000002156E08> print(async_fun().send(None)) # StopIteration: 1 try: async_fun().send(None) except StopIteration as e: print(e.value)
異步函數的調用返回的是一個協程對象,若改對象經過send()進行調用,會報一個StopIteration錯,若要取值,就須要捕獲該異常,e.value的形式進行獲取。多線程
在協程函數中,能夠經過await語法來掛起自身的協程,並等待另外一個協程完成直到返回結果:app
async def async_function(): return 1 async def await_coroutine(): result = await async_function() print(result) run(await_coroutine()) # 1
async def async_fun(): async for i in generator_async_fun(): print(i) async def generator_async_fun(): yield 1 if __name__ == '__main__': async_fun().send(None)
異步生成器的調用比較特殊,它須要依賴別的異步函數進行調用。dom
await語法能夠掛起自聲協程,等待另外一個協程完成直到返回結果。可是有一些地方須要注意:ssh
await語法只能出如今經過async修飾的函數中,不然會報SyntaxError錯誤。異步
並且await後面的對象須要是一個Awaitable,或者實現了相關的協議。async
查看Awaitable抽象類的代碼,代表了只要一個類實現了__await__方法,那麼經過它構造出來的實例就是一個Awaitable:函數
class Awaitable(metaclass=ABCMeta): __slots__ = () @abstractmethod def __await__(self): yield @classmethod def __subclasshook__(cls, C): if cls is Awaitable: return _check_methods(C, "__await__") return NotImplemented
並且能夠看到,Coroutine類也繼承了Awaitable,並且實現了send,throw和close方法。因此await一個調用異步函數返回的協程對象是合法的。
class Coroutine(Awaitable): __slots__ = () @abstractmethod def send(self, value): ... @abstractmethod def throw(self, typ, val=None, tb=None): ... def close(self): ... @classmethod def __subclasshook__(cls, C): if cls is Coroutine: return _check_methods(C, '__await__', 'send', 'throw', 'close') return NotImplemented
案例:假如我要到一家超市去購買土豆,而超市貨架上的土豆數量是有限的:
class Potato: @classmethod def make(cls, num, *args, **kws): potatos = [] for i in range(num): potatos.append(cls.__new__(cls, *args, **kws)) return potatos all_potatos = Potato.make(5)
如今我想要買50個土豆,每次從貨架上拿走一個土豆放到籃子:
def take_potatos(num): count = 0 while True: if len(all_potatos) == 0: sleep(.1) else: potato = all_potatos.pop() yield potato count += 1 if count == num: break def buy_potatos(): bucket = [] for p in take_potatos(50): bucket.append(p)
對應到代碼中,就是迭代一個生成器的模型,顯然,當貨架上的土豆不夠的時候,這時只可以死等,並且在上面例子中等多長時間都不會有結果(由於一切都是同步的),也許能夠用多進程和多線程解決,而在現實生活中,更應該像是這樣的:
async def take_potatos(num): count = 0 while True: if len(all_potatos) == 0: await ask_for_potato() potato = all_potatos.pop() yield potato count += 1 if count == num: break
當貨架上的土豆沒有了以後,我能夠詢問超市請求須要更多的土豆,這時候須要等待一段時間直到生產者完成生產的過程:
async def ask_for_potato(): await asyncio.sleep(random.random()) all_potatos.extend(Potato.make(random.randint(1, 10)))
當生產者完成和返回以後,這是便能從await掛起的地方繼續往下跑,完成消費的過程。而這整一個過程,就是一個異步生成器迭代的流程:
async def buy_potatos(): bucket = [] async for p in take_potatos(50): bucket.append(p) print(f'Got potato {id(p)}...')
async for語法表示咱們要後面迭代的是一個異步生成器。
def main(): import asyncio loop = asyncio.get_event_loop() res = loop.run_until_complete(buy_potatos()) loop.close()
用asyncio運行這段代碼,結果是這樣的:
Got potato 4338641384... Got potato 4338641160... Got potato 4338614736... Got potato 4338614680... Got potato 4338614568... Got potato 4344861864...
既然是異步的,在請求以後不必定要死等,而是能夠作其餘事情。好比除了土豆,我還想買番茄,這時只須要在事件循環中再添加一個過程:
def main(): import asyncio loop = asyncio.get_event_loop() res = loop.run_until_complete(asyncio.wait([buy_potatos(), buy_tomatos()])) loop.close()
再來運行這段代碼:
Got potato 4423119312... Got tomato 4423119368... Got potato 4429291024... Got potato 4421640768... Got tomato 4429331704... Got tomato 4429331760... Got tomato 4423119368...
它須要實現__aiter__和__anext__兩個核心方法,以及asend,athrow,aclose方法。
class AsyncGenerator(AsyncIterator): __slots__ = () async def __anext__(self): ... @abstractmethod async def asend(self, value): ... @abstractmethod async def athrow(self, typ, val=None, tb=None): ... async def aclose(self): ... @classmethod def __subclasshook__(cls, C): if cls is AsyncGenerator: return _check_methods(C, '__aiter__', '__anext__', 'asend', 'athrow', 'aclose') return NotImplemented
異步生成器是在3.6以後纔有的特性,一樣的還有異步推導表達式,所以在上面的例子中,也能夠寫成這樣:
bucket = [p async for p in take_potatos(50)]
相似的,還有await表達式:
result = [await fun() for fun in funcs if await condition()]
除了函數以外,類實例的普通方法也能用async語法修飾:
class ThreeTwoOne: async def begin(self): print(3) await asyncio.sleep(1) print(2) await asyncio.sleep(1) print(1) await asyncio.sleep(1) return async def game(): t = ThreeTwoOne() await t.begin() print('start')
實例方法的調用一樣是返回一個coroutine:
function = ThreeTwoOne.begin method = function.__get__(ThreeTwoOne, ThreeTwoOne()) import inspect assert inspect.isfunction(function) assert inspect.ismethod(method) assert inspect.iscoroutine(method())
class ThreeTwoOne: @classmethod async def begin(cls): print(3) await asyncio.sleep(1) print(2) await asyncio.sleep(1) print(1) await asyncio.sleep(1) return async def game(): await ThreeTwoOne.begin() print('start')
根據PEP 492中,async也能夠應用到上下文管理器中,__aenter__和__aexit__須要返回一個Awaitable:
class GameContext: async def __aenter__(self): print('game loading...') await asyncio.sleep(1) async def __aexit__(self, exc_type, exc, tb): print('game exit...') await asyncio.sleep(1) async def game(): async with GameContext(): print('game start...') await asyncio.sleep(2)
在3.7版本,contextlib中會新增一個asynccontextmanager裝飾器來包裝一個實現異步協議的上下文管理器:
from contextlib import asynccontextmanager @asynccontextmanager async def get_connection(): conn = await acquire_db_connection() try: yield finally: await release_db_connection(conn)
Python3.3的yield from語法能夠把生成器的操做委託給另外一個生成器,生成器的調用方能夠直接與子生成器進行通訊:
def sub_gen(): yield 1 yield 2 yield 3 def gen(): return (yield from sub_gen()) def main(): for val in gen(): print(val) # 1 # 2 # 3
利用這一特性,使用yield from可以編寫出相似協程效果的函數調用,在3.5以前,asyncio正是使用@asyncio.coroutine和yield from語法來建立協程:
# https://docs.python.org/3.4/library/asyncio-task.html import asyncio @asyncio.coroutine def compute(x, y): print("Compute %s + %s ..." % (x, y)) yield from asyncio.sleep(1.0) return x + y @asyncio.coroutine def print_sum(x, y): result = yield from compute(x, y) print("%s + %s = %s" % (x, y, result)) loop = asyncio.get_event_loop() loop.run_until_complete(print_sum(1, 2)) loop.close()
然而,用yield from容易在表示協程和生成器中混淆,沒有良好的語義性,因此在Python 3.5推出了更新的async/await表達式來做爲協程的語法。所以相似如下的調用是等價的:
async with lock: ... with (yield from lock): ... ###################### def main(): return (yield from coro()) def main(): return (await coro())
那麼,怎麼把生成器包裝爲一個協程對象呢?這時候能夠用到types包中的coroutine裝飾器(若是使用asyncio作驅動的話,那麼也可使用asyncio的coroutine裝飾器),@types.coroutine裝飾器會將一個生成器函數包裝爲協程對象:
import asyncio import types @types.coroutine def compute(x, y): print("Compute %s + %s ..." % (x, y)) yield from asyncio.sleep(1.0) return x + y async def print_sum(x, y): result = await compute(x, y) print("%s + %s = %s" % (x, y, result)) loop = asyncio.get_event_loop() loop.run_until_complete(print_sum(1, 2)) loop.close()
儘管兩個函數分別使用了新舊語法,但他們都是協程對象,也分別稱做native coroutine以及generator-based coroutine,所以不用擔憂語法問題。下面觀察一個asyncio中Future的例子:
import asyncio future = asyncio.Future() async def coro1(): await asyncio.sleep(1) future.set_result('data') async def coro2(): print(await future) loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait([ coro1(), coro2() ])) loop.close()
兩個協程在在事件循環中,協程coro1在執行第一句後掛起自身切到asyncio.sleep,而協程coro2一直等待future的結果,讓出事件循環,計時器結束後coro1執行了第二句設置了future的值,被掛起的coro2恢復執行,打印出future的結果'data'。
future能夠被await證實了future對象是一個Awaitable,進入Future類的源碼能夠看到有一段代碼顯示了future實現了__await__協議:
class Future: ... def __iter__(self): if not self.done(): self._asyncio_future_blocking = True yield self # This tells Task to wait for completion. assert self.done(), "yield from wasn't used with future" return self.result() # May raise too. if compat.PY35: __await__ = __iter__ # make compatible with 'await' expression
當執行await future這行代碼時,future中的這段代碼就會被執行,首先future檢查它自身是否已經完成,若是沒有完成,掛起自身,告知當前的Task(任務)等待future完成。
當future執行set_result方法時,會觸發如下的代碼,設置結果,標記future已經完成:
def set_result(self, result): ... if self._state != _PENDING: raise InvalidStateError('{}: {!r}'.format(self._state, self)) self._result = result self._state = _FINISHED self._schedule_callbacks()
最後future會調度自身的回調函數,觸發Task._step()告知Task驅動future從以前掛起的點恢復執行,不難看出,future會執行下面的代碼:
class Future: ... def __iter__(self): ... assert self.done(), "yield from wasn't used with future" return self.result() # May raise too.
最終返回結果給調用方。