asyncio的核心概念與基本架構python
本文針對的是python3.4之後的版本的,由於從3.4開始才引入asyncio,後面的3.5 3.6 3.7版本是向前兼容的,只不過語法上面有稍微的改變。好比在3.4版本中使用@asyncio.coroutine裝飾器和yield from語句,可是在3.5之後的版本中使用async、await兩個關鍵字代替,雖然語法上稍微有所差別,可是原理是同樣的。express
(1)result = yield from future,返回future的結果。編程
(2)result = yield from coroutine,等候另外一個協程函數返回結果或者是觸發異常 安全
(3)result= yield from task,返回一個task的結果架構
(4)return expression,做爲一個函數拋出返回值併發
(5)raise exception異步
如何理解事件循環:async
線程一直在各個協程方法之間永不停歇的遊走,遇到一個yield from 或者await就懸掛起來,而後又走到另一個方法,依次進行下去,知道事件循環全部的方法執行完畢。實際上loop是BaseEventLoop的一個實例,咱們能夠查看定義,它到底有哪些方法可調用異步編程
協程函數,不是像普通函數那樣直接調用運行的,必須添加到事件循環中,而後由事件循環去運行,單獨運行協程函數是不會有結果的。函數
import time import asyncio async def say_after_time(delay,what): await asyncio.sleep(delay) print(what) async def main(): print(f"開始時間爲: {time.time()}") await say_after_time(1,"hello") await say_after_time(2,"world") print(f"結束時間爲: {time.time()}") ''' 直接運行 ''' # >>> main() # <coroutine object main at 0x1053bb7c8> ''' 須要經過事件循環來調用''' loop=asyncio.get_event_loop() #建立事件循環對象 #loop=asyncio.new_event_loop() #與上面等價,建立新的事件循環 loop.run_until_complete(main()) #經過事件循環對象運行協程函數 loop.close()
(1)獲取事件循環對象的幾種方式:
loop=asyncio.get_running_loop()
,返回(獲取)在當前線程中正在運行的事件循環,若是沒有正在運行的事件循環,則會顯示錯誤
loop=asyncio.get_event_loop()
,得到一個事件循環,若是當前線程尚未事件循環,則建立一個新的事件循環loop
loop=asyncio.set_event_loop(loop)
, 設置一個事件循環爲當前線程的事件循環;
loop=asyncio.new_event_loop()
,建立一個新的事件循環
(2)經過事件循環運行協程函數的兩種方式:
建立事件循環對象loop,即 asyncio.get_event_loop()
,經過事件循環運行協程函數
直接經過 asyncio.run(function_name)
運行協程函數。
可是須要注意的是,首先run函數是python3.7版本新添加的,前面的版本是沒有的;其次,這個run函數老是會建立一個新的事件循環並在run結束以後關閉事件循環,因此,若是在同一個線程中已經有了一個事件循環,則不能再使用這個函數了,由於同一個線程不能有兩個事件循環,並且這個run函數不能同時運行兩次,由於他已經建立一個了。即同一個線程中是不容許有多個事件循環loop的。 asyncio.run()是python3.7 新添加的內容,也是後面推薦的運行任務的方式,由於它是高層API,後面會講到它與asyncio.run_until_complete()的差別性,run_until_complete()是相對較低層的API。
有三類對象是可等待的,即 coroutines
, Tasks
, and Futures
.
coroutine
:本質上就是一個函數,一前面的生成器yield和yield from爲基礎,再也不贅述;
Tasks
: 任務,顧名思義,就是要完成某件事情,其實就是對協程函數進一步的封裝;
Future
:它是一個「更底層」的概念,他表明一個異步操做的最終結果,由於異步操做通常用於耗時操做,結果不會當即獲得,會在「未來」獲得異步運行的結果,故而命名爲 Future。
三者的關係,coroutine
能夠自動封裝成 task
,而Task是 Future
的子類。
Task用來 併發調度的協程, 單純的協程函數僅僅是一個函數而已,將其包裝成任務,任務是能夠包含各類狀態的,異步編程最重要的就是對異步操做狀態的把控了。
(1)建立任務(兩種方法):
方法一:task = asyncio.create_task(coro()) # 這是3.7版本新添加的
方法二:task = asyncio.ensure_future(coro())
,也可使用loop.create_future()
,loop.create_task(coro)
也是能夠的。
(2)獲取某一個任務的方法:
方法一:task=asyncio.current_task(loop=None);返回在某一個指定的loop中,當前正在運行的任務,若是沒有任務正在運行,則返回None;若是loop爲None,則默認爲在當前的事件循環中獲取,
方法二:asyncio.all_tasks(loop=None);返回某一個loop中尚未結束的任務;
Future是一個較低層的可等待(awaitable)對象,他表示的是異步操做的最終結果,當一個Future對象被等待的時候,協程會一直等待,直到Future已經運算完畢。 Future是Task的父類,通常狀況下,已不用去管它們二者的詳細區別,也沒有必要去用Future,用Task就能夠了,返回 future 對象的低級函數的一個很好的例子是 loop.run_in_executor().
asyncio分爲高層API和低層API。咱們前面所講的Coroutine和Tasks屬於高層API,而Event Loop 和Future屬於低層API。所謂的高層API主要是指那些asyncio.xxx()的方法。
High-level APIs
●Coroutines and Tasks(本文要寫的) ●Streams ●Synchronization Primitives ●Subprocesses ●Queues ●Exceptions
Low-level APIs
●Event Loop(下一篇要寫的) ●Futures ●Transports and Protocols ●Policies ●Platform Support
1)運行異步協程 asyncio.run(coro, *, debug=False) #運行一個一步程序,參見上面 2)建立任務 task=asyncio.create_task(coro) #python3.7 ,參見上面 task = asyncio.ensure_future(coro()) 3)睡眠 await asyncio.sleep(delay, result=None, *, loop=None) 這個函數表示的是:當前的那個任務(協程函數)睡眠多長時間,而容許其餘任務執行。這是它與time.sleep()的區別,time.sleep()是當前線程休息 4)併發運行多個任務 await asyncio.gather(*coros_or_futures, loop=None, return_exceptions=False) 它自己也是awaitable的。當全部的任務都完成以後,返回的結果是一個列表的形式、 5)防止任務取消 await asyncio.shield(*arg, *, loop=None) 6)設置timeout await asyncio.wait_for(aw, timeout, *, loop=None) 當異步操做須要執行的時間超過waitfor設置的timeout,就會觸發異常,因此在編寫程序的時候,若是要給異步操做設置timeout,必定要選擇合適,
若是異步操做自己的耗時較長,而你設置的timeout過短,會涉及到她還沒作完,就拋出異常了。 7)多個協程函數時候的等候 await asyncio.wait(aws, *, loop=None, timeout=None, return_when=ALL_COMPLETED) 與上面的區別是,第一個參數aws是一個集合,要寫成集合set的形式,好比: {func(),func(),func3()} 表示的是一系列的協程函數或者是任務,其中協程會自動包裝成任務。事實上,寫成列表的形式也是能夠的。 該函數的返回值是兩個Tasks/Futures的集合: (done, pending) 其中done是一個集合,表示已經完成的任務tasks;pending也是一個集合,表示尚未完成的任務。 常見的使用方法爲:done, pending = await asyncio.wait(aws)
(1)他是做爲一個python協程對象,和Future對象很像的這麼一個對象,但不是線程安全的;他繼承了Future全部的API,,除了Future.set_result()和Future.set_Exception();
(2)使用高層API asyncio.create_task()建立任務,或者是使用低層API loop.create_task()或者是loop.ensure_future()建立任務對象;
(3)相比於協程函數,任務時有狀態的,可使用Task.cancel()進行取消,這會觸發CancelledError異常,使用cancelled()檢查是否取消。
cancel函數:
import asyncio async def cancel_me(): print('cancel_me(): before sleep') try: await asyncio.sleep(3600) #模擬一個耗時任務 except asyncio.CancelledError: print('cancel_me(): cancel sleep') raise finally: print('cancel_me(): after sleep') async def main(): #經過協程建立一個任務,須要注意的是,在建立任務的時候,就會跳入到異步開始執行 #由於是3.7版本,建立一個任務就至關因而運行了異步函數cancel_me task = asyncio.create_task(cancel_me()) #等待一秒鐘 await asyncio.sleep(1) print('main函數休息完了') #發出取消任務的請求 task.cancel() try: await task #由於任務被取消,觸發了異常 except asyncio.CancelledError: print("main(): cancel_me is cancelled now") asyncio.run(main()) '''運行結果爲: cancel_me(): before sleep main函數休息完了 cancel_me(): cancel sleep cancel_me(): after sleep main(): cancel_me is cancelled now '''
兩種方法:第一種是直接經過Task.result()來獲取;第二種是綁定一個回調函數來獲取,即函數執行完畢後調用一個函數來獲取異步函數的返回值。
1,經過result函數
import asyncio import time async def hello1(a,b): print("Hello world 01 begin") await asyncio.sleep(3) #模擬耗時任務3秒 print("Hello again 01 end") return a+b coroutine=hello1(10,5) loop = asyncio.get_event_loop() #第一步:建立事件循環 task=asyncio.ensure_future(coroutine) #第二步:將多個協程函數包裝成任務列表 loop.run_until_complete(task) #第三步:經過事件循環運行 print('-------------------------------------') print(task.result()) loop.close() '''運行結果爲 Hello world 01 begin Hello again 01 end ------------------------------------- 15 '''
2, 經過定義回調函數
import asyncio import time async def hello1(a,b): print("Hello world 01 begin") await asyncio.sleep(3) #模擬耗時任務3秒 print("Hello again 01 end") return a+b def callback(future): #定義的回調函數 print(future.result()) loop = asyncio.get_event_loop() #第一步:建立事件循環 task=asyncio.ensure_future(hello1(10,5)) #第二步:將多個協程函數包裝成任務 task.add_done_callback(callback) #並被任務綁定一個回調函數 loop.run_until_complete(task) #第三步:經過事件循環運行 loop.close() #第四步:關閉事件循環 '''運行結果爲: Hello world 01 begin Hello again 01 end 15 '''
所謂的回調函數,就是指協程函數coroutine執行結束時候會調用回調函數。並經過參數future獲取協程執行的結果。咱們建立的task和回調裏的future對象,其實是同一個對象,由於task是future的子類。
import asyncio import time from functools import partial async def get_url(): print('start get url') await asyncio.sleep(2) # await 後面跟的必須是一個 await 對象 print('end get url') return 'stack' def test(url,future): print(url,'hello, stack') if __name__ == '__main__': start = time.time() loop = asyncio.get_event_loop() # loop.run_until_complete(get_url()) # 只是提交了一個請求,時間2s tasks = [get_url() for i in range(10)] # get_future = asyncio.ensure_future(get_url()) # 得到返回值用法1,源碼上依然是先判斷loop,而後調用create_task # get_future = loop.create_task(get_url()) # 方法2,還能夠繼續添加函數,執行邏輯 # get_future.add_done_callback(partial(test, 'Stack')) # 函數自己在得到調用時須要一個任意形數,參數便是 get_future 自己,不然報錯 # 若是函數須要傳遞參數,須要經過 偏函數 partial 模塊來解決,以及函數的形參須要放在前面 loop.run_until_complete(asyncio.wait(tasks)) # 提交了10次,時間也是2s # loop.run_until_complete(asyncio.gather(*tasks)) 效果同上 # gather 和 wait 的區別 # gather是更高一級的抽象,且使用更加靈活,可使用分組,以及取消任務 print(time.time() - start) # print(get_future.result()) # 接收返回值
針對3.7的版本
import asyncio import time async def hello1(a,b): print("Hello world 01 begin") await asyncio.sleep(3) #模擬耗時任務3秒 print("Hello again 01 end") return a+b async def hello2(a,b): print("Hello world 02 begin") await asyncio.sleep(2) #模擬耗時任務2秒 print("Hello again 02 end") return a-b async def hello3(a,b): print("Hello world 03 begin") await asyncio.sleep(4) #模擬耗時任務4秒 print("Hello again 03 end") return a*b async def main(): results=await asyncio.gather(hello1(10,5),hello2(10,5),hello3(10,5)) for result in results: print(result) asyncio.run(main()) '''運行結果爲: Hello world 01 begin Hello world 02 begin Hello world 03 begin Hello again 02 end Hello again 01 end Hello again 03 end 15 5 50 '''
總結:
第一步:構建一個入口函數main 它也是一個異步協程函數,即經過async定義,而且要在main函數裏面await一個或者是多個協程,同前面同樣,我能夠經過gather或者是wait進行組合,對於有返回值的協程函數,通常就在main裏面進行結果的獲取。
第二步:啓動主函數main 這是python3.7新添加的函數,就一句話,即 asyncio.run(main())