前面文章咱們提到了上下文管理器,可是這個上下文管理器只適用於同步代碼,不能用於異步代碼(async def形式),不過不用擔憂今天咱們就來討論在異步中如何使用上下文管理器。
特別提醒本教程所使用的Python版本爲Python3.7。html
異步上下文管理器。相似於同步上下文管理器,咱們知道使用with能夠實現一個上下文管理的器,而對於異步上下文管理器其根本表現形式爲async with,下面的一段代碼告訴你async with是如何運做的。併發
import asyncio class AContext: def __init__(self): print("in init") async def __aenter__(self): print("in aenter") async def __aexit__(self, exc_type, exc_val, exc_tb): print("in aexit") async def main(): async with AContext() as ac: print("in with", ac) if __name__ == '__main__': print("start") asyncio.run(main())
輸出內容app
start in init in aenter in with None in aexit
下面說下async with和with的不一樣地方
語法上,with實現了enter和exit兩個方法,async with實現了相似方法
aenter和aexit在同步的基礎上加個a,實際上就是表明asynchronous。
實現上,使用普通的函數就能夠實現with,可是async with須要經過異步函數的形式去實現,就像上面的例子同樣。異步
從Python 3.7開始,有兩種方法能夠編寫異步上下文管理器。一種就是前面提到的魔法函數的實現,另一種就是contextlib的另一個模塊asynccontextmanager。經過裝飾器的方式實現一個異步上下文管理器async
import asyncio from contextlib import asynccontextmanager from concurrent.futures.thread import ThreadPoolExecutor class AsyncFile(object): def __init__(self, file, loop=None, executor=None): if not loop: loop = asyncio.get_running_loop() # 獲取當前運行事件循環 if not executor: executor = ThreadPoolExecutor(10) # 線程池數量10 self.file = file self.loop = loop self.executor = executor self.pending = [] self.result = [] def write(self, string): """ 實現異步寫操做 :param string: 要寫的內容 :return: """ self.pending.append( self.loop.run_in_executor( self.executor, self.file.write, string, ) ) def read(self, i): """ 實現異步讀操做 :param i: :return: """ self.pending.append( self.loop.run_in_executor(self.executor, self.file.read, i,) ) def readlines(self): self.pending.append( self.loop.run_in_executor(self.executor, self.file.readlines, ) ) @asynccontextmanager async def async_open(path, mode="w"): with open(path, mode=mode) as f: loop = asyncio.get_running_loop() file = AsyncFile(f, loop=loop) try: yield file finally: file.result = await asyncio.gather(*file.pending, loop=loop)
上面的代碼經過使用asyncio中run_in_executor運行一個線程,來使得阻塞操做變爲非阻塞操做,達到異步非阻塞的目的。
AsyncFile類提供了一些方法,這些方法將用於將write、read和readlines的調用添加到pending列表中。這些任務經過finally塊中的事件循環在ThreadPoolExecutor進行調度。
yield 前半段用來表示_aenter_()
yield 後半段用來表示_aexit_()
使用finally之後能夠保證連接資源等使用完以後可以關閉。函數
若是調用前面示例中的異步上下文管理器,則須要使用關鍵字async with來進行調用。另外帶有async with的語句只能在異步函數中使用。oop
from asynciodemo.asyncwith import async_open import asyncio import tempfile import os async def main(): tempdir = tempfile.gettempdir() path = os.path.join(tempdir, "run.txt") print(f"臨時文件目錄:{path}") async with async_open(path, mode='w') as f: f.write("公衆號: ") f.write("Python") f.write("學習") f.write("開發") if __name__ == '__main__': asyncio.run(main())
使用方法和with相似能夠經過使用as,而後使用其句柄,惟一須要注意的就是要在異步函數中使用。學習
在以前的一些異步教程裏和你們說了關於協程中的幾個同步方法,asyncio.wait和asyncio.gather,這裏咱們能夠配合這些方法經過異步上下文管理器來實現同步任務,請看以下代碼線程
import asyncio # 同步掛起協程 class Sync(): def __init__(self): self.pending = [] self.finished = None def schedule_coro(self, coro, shield=True): #若是去掉asyncio.shield,在取消fut函數的時候,就會致使coro協程也出錯。 fut = asyncio.shield(coro) if shield else asyncio.ensure_future(coro) self.pending.append(fut) return fut async def __aenter__(self): return self async def __aexit__(self, exc_type, exc_val, exc_tb): # 退出async with的時候,任務列表中的任務進行併發操做。 self.finished = await asyncio.gather(*self.pending, return_exceptions=True) async def workload1(): await asyncio.sleep(2) print("These coroutines will be executed return 41") return 41 async def workload2(): await asyncio.sleep(2) print("These coroutines will be executed return 42") return 42 async def workload3(): await asyncio.sleep(2) print("These coroutines will be executed return 43") return 43 async def main(): async with Sync() as sync: # 使用異步上下文能夠建立同步協程程序 sync.schedule_coro(workload1()) sync.schedule_coro(workload2()) sync.schedule_coro(workload3()) print("All scheduled corotines have retuned or throw:", sync.finished) if __name__ == '__main__': asyncio.run(main())
輸出code
These coroutines will be executed return 41 These coroutines will be executed return 42 These coroutines will be executed return 43 All scheduled corotines have retuned or throw: [41, 42, 43]
1.程序是同步的形式併發輸出的。 2. schedule_coro的做用是將協程workload1,workload2,workload3添加到任務列表pending,退出async with的時候,任務列表中的任務進行併發操做。