asyncio之異步上下文管理器

異步上下文管理器

前面文章咱們提到了上下文管理器,可是這個上下文管理器只適用於同步代碼,不能用於異步代碼(async def形式),不過不用擔憂今天咱們就來討論在異步中如何使用上下文管理器。
特別提醒本教程所使用的Python版本爲Python3.7。html

async with

異步上下文管理器。相似於同步上下文管理器,咱們知道使用with能夠實現一個上下文管理的器,而對於異步上下文管理器其根本表現形式爲async with,下面的一段代碼告訴你async with是如何運做的。併發

import asyncio
class AContext:
    def __init__(self):
        print("in init")

    async def __aenter__(self):
        print("in aenter")
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        print("in aexit")

async def main():
    async with AContext() as ac:
        print("in with", ac)

if __name__ == '__main__':
    print("start")
    asyncio.run(main())

輸出內容app

start
in init
in aenter
in with None
in aexit

下面說下async with和with的不一樣地方
語法上,with實現了enter和exit兩個方法,async with實現了相似方法
aenter和aexit在同步的基礎上加個a,實際上就是表明asynchronous。
實現上,使用普通的函數就能夠實現with,可是async with須要經過異步函數的形式去實現,就像上面的例子同樣。異步

asynccontextmanager

從Python 3.7開始,有兩種方法能夠編寫異步上下文管理器。一種就是前面提到的魔法函數的實現,另一種就是contextlib的另一個模塊asynccontextmanager。經過裝飾器的方式實現一個異步上下文管理器async

import asyncio
from contextlib import asynccontextmanager
from concurrent.futures.thread import ThreadPoolExecutor


class AsyncFile(object):
    def __init__(self, file, loop=None, executor=None):
        if not loop:
            loop = asyncio.get_running_loop()  # 獲取當前運行事件循環
        if not executor:
            executor = ThreadPoolExecutor(10)  # 線程池數量10
        self.file = file
        self.loop = loop
        self.executor = executor
        self.pending = []
        self.result = []

    def write(self, string):
        """
        實現異步寫操做
        :param string: 要寫的內容
        :return:
        """
        self.pending.append(
            self.loop.run_in_executor(
                self.executor, self.file.write, string,
            )
        )

    def read(self, i):
        """
        實現異步讀操做
        :param i:
        :return:
        """
        self.pending.append(
            self.loop.run_in_executor(self.executor, self.file.read, i,)
        )

    def readlines(self):
        self.pending.append(
            self.loop.run_in_executor(self.executor, self.file.readlines, )
        )

@asynccontextmanager
async def async_open(path, mode="w"):
    with open(path, mode=mode) as f:
        loop = asyncio.get_running_loop()
        file = AsyncFile(f, loop=loop)
        try:
            yield file
        finally:
            file.result = await asyncio.gather(*file.pending, loop=loop)

上面的代碼經過使用asyncio中run_in_executor運行一個線程,來使得阻塞操做變爲非阻塞操做,達到異步非阻塞的目的。
AsyncFile類提供了一些方法,這些方法將用於將write、read和readlines的調用添加到pending列表中。這些任務經過finally塊中的事件循環在ThreadPoolExecutor進行調度。
yield 前半段用來表示_aenter_()
yield 後半段用來表示_aexit_()
使用finally之後能夠保證連接資源等使用完以後可以關閉。函數

運行異步上下文管理器

若是調用前面示例中的異步上下文管理器,則須要使用關鍵字async with來進行調用。另外帶有async with的語句只能在異步函數中使用。oop

from asynciodemo.asyncwith import async_open
import asyncio
import tempfile
import os


async def main():
    tempdir = tempfile.gettempdir()
    path = os.path.join(tempdir, "run.txt")
    print(f"臨時文件目錄:{path}")

    async with async_open(path, mode='w') as f:
        f.write("公衆號: ")
        f.write("Python")
        f.write("學習")
        f.write("開發")


if __name__ == '__main__':
    asyncio.run(main())

使用方法和with相似能夠經過使用as,而後使用其句柄,惟一須要注意的就是要在異步函數中使用。學習

同步任務

在以前的一些異步教程裏和你們說了關於協程中的幾個同步方法,asyncio.wait和asyncio.gather,這裏咱們能夠配合這些方法經過異步上下文管理器來實現同步任務,請看以下代碼線程

import asyncio


# 同步掛起協程

class Sync():
    def __init__(self):
        self.pending = []
        self.finished = None

    def schedule_coro(self, coro, shield=True):
       #若是去掉asyncio.shield,在取消fut函數的時候,就會致使coro協程也出錯。
        fut = asyncio.shield(coro) if shield else asyncio.ensure_future(coro)
        self.pending.append(fut)
        return fut

    async def __aenter__(self):
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        # 退出async with的時候,任務列表中的任務進行併發操做。
        self.finished = await asyncio.gather(*self.pending, return_exceptions=True)


async def workload1():
    await asyncio.sleep(2)
    print("These coroutines will be executed return 41")
    return 41


async def workload2():
    await asyncio.sleep(2)
    print("These coroutines will be executed return 42")
    return 42


async def workload3():
    await asyncio.sleep(2)
    print("These coroutines will be executed return 43")
    return 43


async def main():
    async with Sync() as sync:
        # 使用異步上下文能夠建立同步協程程序
        sync.schedule_coro(workload1())
        sync.schedule_coro(workload2())
        sync.schedule_coro(workload3())
    print("All scheduled corotines have retuned or throw:", sync.finished)


if __name__ == '__main__':
    asyncio.run(main())

輸出code

These coroutines will be executed return 41
These coroutines will be executed return 42
These coroutines will be executed return 43
All scheduled corotines have retuned or throw: [41, 42, 43]

1.程序是同步的形式併發輸出的。 2. schedule_coro的做用是將協程workload1,workload2,workload3添加到任務列表pending,退出async with的時候,任務列表中的任務進行併發操做。

相關文章
相關標籤/搜索