Python 進階 異步async/await

一,前言

  本文將會講述Python 3.5以後出現的async/await的使用方法,我從上看到一篇不錯的博客,本身對其進行了梳理。該文章原地址http://www.javashuo.com/article/p-fsblxikv-d.htmlhtml

二,Python常見的函數形式

  2.1 普通函數

def fun():
    return 1

if __name__ == '__main__':
    fun()
 

  普通函數,沒有什麼特別的,直接函數名加括號調用便可。python

  2.2 生成器函數

def generator_fun():
    yield 1

if __name__ == '__main__':
    print(generator_fun())  #<generator object generator_fun at 0x00000000005E6E08>
    print(generator_fun().send(None))  
    for i in generator_fun():
        print(i)

  與普通函數不一樣的是,生成器函數返回的是一個生成器對象。咱們能夠經過send()方法或者是for循環的方法來對其進行取值。express

  2.3 (異步)協程函數

async def async_fun():
    return 1

if __name__ == '__main__':
    print(async_fun())  # <coroutine object fun at 0x0000000002156E08>
    print(async_fun().send(None))  # StopIteration: 1
    try:
        async_fun().send(None)
    except StopIteration as e:
        print(e.value)

  異步函數的調用返回的是一個協程對象,若改對象經過send()進行調用,會報一個StopIteration錯,若要取值,就須要捕獲該異常,e.value的形式進行獲取。多線程

  在協程函數中,能夠經過await語法來掛起自身的協程,並等待另外一個協程完成直到返回結果:app

async def async_function():
    return 1

async def await_coroutine():
    result = await async_function()
    print(result)
    
run(await_coroutine())
# 1

  2.4 異步生成器

async def async_fun():
    async for i in generator_async_fun():
        print(i)


async def generator_async_fun():
    yield 1

if __name__ == '__main__':
    async_fun().send(None)

  異步生成器的調用比較特殊,它須要依賴別的異步函數進行調用。dom

三,await的使用

  await語法能夠掛起自聲協程,等待另外一個協程完成直到返回結果。可是有一些地方須要注意:ssh

  3.1 await注意事項

  await語法只能出如今經過async修飾的函數中,不然會報SyntaxError錯誤。異步

  並且await後面的對象須要是一個Awaitable,或者實現了相關的協議。async

  3.2 關於await的源代碼解析

  查看Awaitable抽象類的代碼,代表了只要一個類實現了__await__方法,那麼經過它構造出來的實例就是一個Awaitable:函數

class Awaitable(metaclass=ABCMeta):
    __slots__ = ()

    @abstractmethod
    def __await__(self):
        yield

    @classmethod
    def __subclasshook__(cls, C):
        if cls is Awaitable:
            return _check_methods(C, "__await__")
        return NotImplemented

  並且能夠看到,Coroutine類也繼承了Awaitable,並且實現了send,throw和close方法。因此await一個調用異步函數返回的協程對象是合法的。

class Coroutine(Awaitable):
    __slots__ = ()

    @abstractmethod
    def send(self, value):
        ...

    @abstractmethod
    def throw(self, typ, val=None, tb=None):
        ...

    def close(self):
        ...
        
    @classmethod
    def __subclasshook__(cls, C):
        if cls is Coroutine:
            return _check_methods(C, '__await__', 'send', 'throw', 'close')
        return NotImplemented

  3.3 關於異步生成器的實例

  案例:假如我要到一家超市去購買土豆,而超市貨架上的土豆數量是有限的:

class Potato:
    @classmethod
    def make(cls, num, *args, **kws):
        potatos = []
        for i in range(num):
            potatos.append(cls.__new__(cls, *args, **kws))
        return potatos

all_potatos = Potato.make(5)

  如今我想要買50個土豆,每次從貨架上拿走一個土豆放到籃子:

def take_potatos(num):
    count = 0
    while True:
        if len(all_potatos) == 0:
            sleep(.1)
        else:
            potato = all_potatos.pop()
            yield potato
            count += 1
            if count == num:
                break

def buy_potatos():
    bucket = []
    for p in take_potatos(50):
        bucket.append(p)

  對應到代碼中,就是迭代一個生成器的模型,顯然,當貨架上的土豆不夠的時候,這時只可以死等,並且在上面例子中等多長時間都不會有結果(由於一切都是同步的),也許能夠用多進程和多線程解決,而在現實生活中,更應該像是這樣的:

async def take_potatos(num):
    count = 0
    while True:
        if len(all_potatos) == 0:
            await ask_for_potato()
        potato = all_potatos.pop()
        yield potato
        count += 1
        if count == num:
            break

  當貨架上的土豆沒有了以後,我能夠詢問超市請求須要更多的土豆,這時候須要等待一段時間直到生產者完成生產的過程:

async def ask_for_potato():
    await asyncio.sleep(random.random())
    all_potatos.extend(Potato.make(random.randint(1, 10)))

  當生產者完成和返回以後,這是便能從await掛起的地方繼續往下跑,完成消費的過程。而這整一個過程,就是一個異步生成器迭代的流程:

async def buy_potatos():
    bucket = []
    async for p in take_potatos(50):
        bucket.append(p)
        print(f'Got potato {id(p)}...')

  async for語法表示咱們要後面迭代的是一個異步生成器。

def main():
    import asyncio
    loop = asyncio.get_event_loop()
    res = loop.run_until_complete(buy_potatos())
    loop.close()

  用asyncio運行這段代碼,結果是這樣的:

Got potato 4338641384...
Got potato 4338641160...
Got potato 4338614736...
Got potato 4338614680...
Got potato 4338614568...
Got potato 4344861864...

  既然是異步的,在請求以後不必定要死等,而是能夠作其餘事情。好比除了土豆,我還想買番茄,這時只須要在事件循環中再添加一個過程:

def main():
    import asyncio
    loop = asyncio.get_event_loop()
    res = loop.run_until_complete(asyncio.wait([buy_potatos(), buy_tomatos()]))
    loop.close()

  再來運行這段代碼:

Got potato 4423119312...
Got tomato 4423119368...
Got potato 4429291024...
Got potato 4421640768...
Got tomato 4429331704...
Got tomato 4429331760...
Got tomato 4423119368...

  3.4 AsyncGenerator的定義

  它須要實現__aiter__和__anext__兩個核心方法,以及asend,athrow,aclose方法。

class AsyncGenerator(AsyncIterator):
    __slots__ = ()

    async def __anext__(self):
        ...

    @abstractmethod
    async def asend(self, value):
        ...

    @abstractmethod
    async def athrow(self, typ, val=None, tb=None):
        ...

    async def aclose(self):
        ...

    @classmethod
    def __subclasshook__(cls, C):
        if cls is AsyncGenerator:
            return _check_methods(C, '__aiter__', '__anext__',
                                  'asend', 'athrow', 'aclose')
        return NotImplemented

  異步生成器是在3.6以後纔有的特性,一樣的還有異步推導表達式,所以在上面的例子中,也能夠寫成這樣:

bucket = [p async for p in take_potatos(50)]

  相似的,還有await表達式:

result = [await fun() for fun in funcs if await condition()]

四,async的其餘用法

  4.1 async修飾類普通方法

  除了函數以外,類實例的普通方法也能用async語法修飾:

class ThreeTwoOne:
    async def begin(self):
        print(3)
        await asyncio.sleep(1)
        print(2)
        await asyncio.sleep(1)
        print(1)        
        await asyncio.sleep(1)
        return

async def game():
    t = ThreeTwoOne()
    await t.begin()
    print('start')

  實例方法的調用一樣是返回一個coroutine:

function = ThreeTwoOne.begin
method = function.__get__(ThreeTwoOne, ThreeTwoOne())
import inspect
assert inspect.isfunction(function)
assert inspect.ismethod(method)
assert inspect.iscoroutine(method())

  4.2 async 修飾類方法

class ThreeTwoOne:
    @classmethod
    async def begin(cls):
        print(3)
        await asyncio.sleep(1)
        print(2)
        await asyncio.sleep(1)
        print(1)        
        await asyncio.sleep(1)
        return

async def game():
    await ThreeTwoOne.begin()
    print('start')

  4.3 async的上下文管理器應用

  根據PEP 492中,async也能夠應用到上下文管理器中,__aenter__和__aexit__須要返回一個Awaitable:

class GameContext:
    async def __aenter__(self):
        print('game loading...')
        await asyncio.sleep(1)

    async def __aexit__(self, exc_type, exc, tb):
        print('game exit...')
        await asyncio.sleep(1)

async def game():
    async with GameContext():
        print('game start...')
        await asyncio.sleep(2)

  在3.7版本,contextlib中會新增一個asynccontextmanager裝飾器來包裝一個實現異步協議的上下文管理器:

from contextlib import asynccontextmanager

@asynccontextmanager
async def get_connection():
    conn = await acquire_db_connection()
    try:
        yield
    finally:
        await release_db_connection(conn)

五,await和yield from

  Python3.3的yield from語法能夠把生成器的操做委託給另外一個生成器,生成器的調用方能夠直接與子生成器進行通訊:

def sub_gen():
    yield 1
    yield 2
    yield 3

def gen():
    return (yield from sub_gen())

def main():
    for val in gen():
        print(val)
# 1
# 2
# 3

  利用這一特性,使用yield from可以編寫出相似協程效果的函數調用,在3.5以前,asyncio正是使用@asyncio.coroutine和yield from語法來建立協程:

# https://docs.python.org/3.4/library/asyncio-task.html
import asyncio

@asyncio.coroutine
def compute(x, y):
    print("Compute %s + %s ..." % (x, y))
    yield from asyncio.sleep(1.0)
    return x + y

@asyncio.coroutine
def print_sum(x, y):
    result = yield from compute(x, y)
    print("%s + %s = %s" % (x, y, result))

loop = asyncio.get_event_loop()
loop.run_until_complete(print_sum(1, 2))
loop.close()

  然而,用yield from容易在表示協程和生成器中混淆,沒有良好的語義性,因此在Python 3.5推出了更新的async/await表達式來做爲協程的語法。所以相似如下的調用是等價的:

async with lock:
    ...
    
with (yield from lock):
    ...
######################
def main():
    return (yield from coro())

def main():
    return (await coro())

  那麼,怎麼把生成器包裝爲一個協程對象呢?這時候能夠用到types包中的coroutine裝飾器(若是使用asyncio作驅動的話,那麼也可使用asyncio的coroutine裝飾器),@types.coroutine裝飾器會將一個生成器函數包裝爲協程對象:

import asyncio
import types

@types.coroutine
def compute(x, y):
    print("Compute %s + %s ..." % (x, y))
    yield from asyncio.sleep(1.0)
    return x + y

async def print_sum(x, y):
    result = await compute(x, y)
    print("%s + %s = %s" % (x, y, result))

loop = asyncio.get_event_loop()
loop.run_until_complete(print_sum(1, 2))
loop.close()

  儘管兩個函數分別使用了新舊語法,但他們都是協程對象,也分別稱做native coroutine以及generator-based coroutine,所以不用擔憂語法問題。下面觀察一個asyncio中Future的例子:

import asyncio

future = asyncio.Future()

async def coro1():
    await asyncio.sleep(1)
    future.set_result('data')

async def coro2():
    print(await future)

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait([
    coro1(), 
    coro2()
]))
loop.close()

  兩個協程在在事件循環中,協程coro1在執行第一句後掛起自身切到asyncio.sleep,而協程coro2一直等待future的結果,讓出事件循環,計時器結束後coro1執行了第二句設置了future的值,被掛起的coro2恢復執行,打印出future的結果'data'。

  future能夠被await證實了future對象是一個Awaitable,進入Future類的源碼能夠看到有一段代碼顯示了future實現了__await__協議:

class Future:
    ...
    def __iter__(self):
        if not self.done():
            self._asyncio_future_blocking = True
            yield self  # This tells Task to wait for completion.
        assert self.done(), "yield from wasn't used with future"
        return self.result()  # May raise too.

    if compat.PY35:
        __await__ = __iter__ # make compatible with 'await' expression

  當執行await future這行代碼時,future中的這段代碼就會被執行,首先future檢查它自身是否已經完成,若是沒有完成,掛起自身,告知當前的Task(任務)等待future完成。

  當future執行set_result方法時,會觸發如下的代碼,設置結果,標記future已經完成:

def set_result(self, result):
    ...
    if self._state != _PENDING:
        raise InvalidStateError('{}: {!r}'.format(self._state, self))
    self._result = result
    self._state = _FINISHED
    self._schedule_callbacks()

  最後future會調度自身的回調函數,觸發Task._step()告知Task驅動future從以前掛起的點恢復執行,不難看出,future會執行下面的代碼:

class Future:
    ...
    def __iter__(self):
        ...
        assert self.done(), "yield from wasn't used with future"
        return self.result()  # May raise too.

  最終返回結果給調用方。

相關文章
相關標籤/搜索