python協程系列

 

聲明:本文針對的是python3.4之後的版本的,由於從3.4開始才引入asyncio,後面的3.5 3.6 3.7版本是向前兼容的,只不過語法上面有稍微的改變。好比在3.4版本中使用@asyncio.coroutine裝飾器和yield from語句,可是在3.5之後的版本中使用async、await兩個關鍵字代替,雖然語法上稍微有所差別,可是原理是同樣的。本文用最通俗的語言解釋了pythonasyncio背後的一些核心概念,簡要解析了asyncio的設計架構,並給出了使用python進行asyncio異步編程的通常模板。python

1、一些最重要的概念
一、協程(coroutine)——本質就是一個函數express

所謂的「協程」就是一個函數,這個函數須要有兩個基本的組成要素,第一,須要使用@asyncio.coroutine進行裝飾;第二,函數體內必定要有yield from 返回的的generator,或者是說使用yield from 返回另外一個協程對象。編程

固然,這兩個條件並非硬性規定的,若是沒有這兩個條件,依然是函數,只不過是普通函數而已。安全

怎麼判斷一個函數是否是協程?經過asyncio.iscoroutine(obj)和asyncio.iscoroutinefunction(func)加以判斷,返回true,則是。網絡

那麼協程函數有什麼做用呢?多線程

(1)result = yield from future架構

做用一:返回future的結果。什麼是future?後面會講到。當協程函數執行到這一句,協程會被懸掛起來,知道future的結果被返回。若是是future被中途取消,則會觸發CancelledError異常。因爲task是future的子類,後面也會介紹,關於future的全部應用,都一樣適用於task併發

(2)result = yield from coroutineapp

等候另外一個協程函數返回結果或者是觸發異常 異步

(3)result= yield from task

返回一個task的結果

(4)return expression

做爲一個函數,他自己也是能夠返回某一個結果的

(5)raise exception 

二、事件循環——event_loop

協程函數,不是像普通函數那樣直接調用運行的,必須添加到事件循環中,而後由事件循環去運行,單獨運行協程函數是不會有結果的,看一個簡單的例子:

import time
import asyncio
async def say_after_time(delay,what):
await asyncio.sleep(delay)
print(what)

async def main():
print(f"開始時間爲: {time.time()}")
await say_after_time(1,"hello")
await say_after_time(2,"world")
print(f"結束時間爲: {time.time()}")

loop=asyncio.get_event_loop() #建立事件循環對象
#loop=asyncio.new_event_loop() #與上面等價,建立新的事件循環
loop.run_until_complete(main()) #經過事件循環對象運行協程函數
loop.close()
在python3.6版本中,若是咱們單獨像執行普通函數那樣執行一個協程函數,只會返回一個coroutine對象(python3.7)以下所示:

>>> main()
<coroutine object main at 0x1053bb7c8>
(1)獲取事件循環對象的幾種方式:

下面幾種方式能夠用來獲取、設置、建立事件循環對象loop

loop=asyncio.get_running_loop() 返回(獲取)在當前線程中正在運行的事件循環,若是沒有正在運行的事件循環,則會顯示錯誤;它是python3.7中新添加的

loop=asyncio.get_event_loop() 得到一個事件循環,若是當前線程尚未事件循環,則建立一個新的事件循環loop;

loop=asyncio.set_event_loop(loop) 設置一個事件循環爲當前線程的事件循環;

loop=asyncio.new_event_loop() 建立一個新的事件循環

(2)經過事件循環運行協程函數的兩種方式:

(1)方式一:建立事件循環對象loop,即asyncio.get_event_loop(),經過事件循環運行協程函數

(2)方式二:直接經過asyncio.run(function_name)運行協程函數。可是須要注意的是,首先run函數是python3.7版本新添加的,前面的版本是沒有的;其次,這個run函數老是會建立一個新的事件循環並在run結束以後關閉事件循環,因此,若是在同一個線程中已經有了一個事件循環,則不能再使用這個函數了,由於同一個線程不能有兩個事件循環,並且這個run函數不能同時運行兩次,由於他已經建立一個了。即同一個線程中是不容許有多個事件循環loop的。

asyncio.run()是python3.7 新添加的內容,也是後面推薦的運行任務的方式,由於它是高層API,後面會講到它與asyncio.run_until_complete()的差別性,run_until_complete()是相對較低層的API。

注意:到底什麼是事件循環?如何理解?

能夠這樣理解:線程一直在各個協程方法之間永不停歇的遊走,遇到一個yield from 或者await就懸掛起來,而後又走到另一個方法,依次進行下去,知道事件循環全部的方法執行完畢。實際上loop是BaseEventLoop的一個實例,咱們能夠查看定義,它到底有哪些方法可調用。

三、什麼是awaitable對象——便可暫停等待的對象

有三類對象是可等待的,即 coroutines, Tasks, and Futures.

coroutine:本質上就是一個函數,一前面的生成器yield和yield from爲基礎,再也不贅述;

Tasks: 任務,顧名思義,就是要完成某件事情,其實就是對協程函數進一步的封裝;

Future:它是一個「更底層」的概念,他表明一個一步操做的最終結果,由於一步操做通常用於耗時操做,結果不會當即獲得,會在「未來」獲得異步運行的結果,故而命名爲Future。

三者的關係,coroutine能夠自動封裝成task,而Task是Future的子類。

四、什麼是task任務

如前所述,Task用來 併發調度的協程,即對協程函數的進一步包裝?那爲何還須要包裝呢?由於單純的協程函數僅僅是一個函數而已,將其包裝成任務,任務是能夠包含各類狀態的,異步編程最重要的就是對異步操做狀態的把控了。

(1)建立任務(兩種方法):

方法一:task = asyncio.create_task(coro())   # 這是3.7版本新添加的

方法二:task = asyncio.ensure_future(coro())

也可使用

loop.create_future()

loop.create_task(coro)

也是能夠的。

備註:關於任務的詳解,會在後面的系列文章繼續講解,本文只是歸納性的說明。

(2)獲取某一個任務的方法:

方法一:task=asyncio.current_task(loop=None)

返回在某一個指定的loop中,當前正在運行的任務,若是沒有任務正在運行,則返回None;

若是loop爲None,則默認爲在當前的事件循環中獲取,

方法二:asyncio.all_tasks(loop=None)

返回某一個loop中尚未結束的任務

五、什麼是future?

Future是一個較低層的可等待(awaitable)對象,他表示的是異步操做的最終結果,當一個Future對象被等待的時候,協程會一直等待,直到Future已經運算完畢。

Future是Task的父類,通常狀況下,已不用去管它們二者的詳細區別,也沒有必要去用Future,用Task就能夠了,

返回 future 對象的低級函數的一個很好的例子是 loop.run_in_executor().

2、asyncio的基本架構
前面介紹了asyncio裏面最爲核心的幾個概念,若是可以很好地理解這些概念,對於學習協程是很是有幫助的,可是按照我我的的風格,我會先說asyncio的架構,理解asyncio的設計架構有助於更好地應用和理解。

asyncio分爲高層API和低層API,咱們均可以使用,就像我前面在講matplotlib的架構的時候所講的同樣,咱們前面所講的Coroutine和Tasks屬於高層API,而Event Loop 和Future屬於低層API。固然asyncio所涉及到的功能遠不止於此,咱們只看這麼多。下面是是高層API和低層API的概覽:

High-level APIs

Coroutines and Tasks(本文要寫的)
Streams
Synchronization Primitives
Subprocesses
Queues
Exceptions
Low-level APIs

Event Loop(下一篇要寫的)
Futures
Transports and Protocols
Policies
Platform Support
所謂的高層API主要是指那些asyncio.xxx()的方法,

一、常見的一些高層API方法

(1)運行異步協程

asyncio.run(coro, *, debug=False)  #運行一個一步程序,參見上面

(2)建立任務

task=asyncio.create_task(coro)  #python3.7  ,參見上面

task = asyncio.ensure_future(coro()) 

(3)睡眠

await asyncio.sleep(delay, result=None, *, loop=None)

這個函數表示的是:當前的那個任務(協程函數)睡眠多長時間,而容許其餘任務執行。這是它與time.sleep()的區別,time.sleep()是當前線程休息,注意他們的區別哦。

另外若是提供了參數result,噹噹前任務(協程)結束的時候,它會返回;

loop參數將會在3.10中移除,這裏就再也不說了。

(4)併發運行多個任務
await asyncio.gather(*coros_or_futures, loop=None, return_exceptions=False)

它自己也是awaitable的。

*coros_or_futures是一個序列拆分操做,若是是以個協程函數,則會自動轉換成Task。

當全部的任務都完成以後,返回的結果是一個列表的形式,列表中值的順序和*coros_or_futures完成的順序是同樣的。

return_exceptions:False,這是他的默認值,第一個出發異常的任務會當即返回,而後其餘的任務繼續執行;

                             True,對於已經發生了異常的任務,也會像成功執行了任務那樣,等到全部的任務執行結束一塊兒將錯誤的結果返回到最終的結果列表裏面。

若是gather()自己被取消了,那麼綁定在它裏面的任務也就取消了。

(5)防止任務取消

await asyncio.shield(*arg, *, loop=None)

它自己也是awaitable的。顧名思義,shield爲屏蔽、保護的意思,即保護一個awaitable 對象防止取消,通常狀況下不推薦使用,並且在使用的過程當中,最好使用try語句塊更好。

try:
res = await shield(something())
except CancelledError:
res = None
(6)設置timeout——必定要好好理解

await asyncio.wait_for(aw, timeout, *, loop=None)

若是aw是一個協程函數,會自動包裝成一個任務task。參見下面的例子:

import asyncio

async def eternity():
print('我立刻開始執行')
await asyncio.sleep(3600) #當前任務休眠1小時,即3600秒
print('終於輪到我了')

async def main():
# Wait for at most 1 second
try:
print('等你3秒鐘哦')
await asyncio.wait_for(eternity(), timeout=3) #休息3秒鐘了執行任務
except asyncio.TimeoutError:
print('超時了!')

asyncio.run(main())

'''運行結果爲:
等你3秒鐘哦
我立刻開始執行
超時了!
'''
爲何?首先調用main()函數,做爲入口函數,當輸出‘等你3秒鐘哦’以後,main掛起,執行eternity,而後打印‘我立刻開始執行’,而後eternity掛起,並且要掛起3600秒,大於3,這時候出發TimeoutError。修改一下:‘’

import asyncio

async def eternity():
print('我立刻開始執行')
await asyncio.sleep(2) #當前任務休眠2秒鐘,2<3
print('終於輪到我了')

async def main():
# Wait for at most 1 second
try:
print('等你3秒鐘哦')
await asyncio.wait_for(eternity(), timeout=3) #給你3秒鐘執行你的任務
except asyncio.TimeoutError:
print('超時了!')

asyncio.run(main())

'''運行結果爲:
等你3秒鐘哦
我立刻開始執行
終於輪到我了
'''
總結:當異步操做須要執行的時間超過waitfor設置的timeout,就會觸發異常,因此在編寫程序的時候,若是要給異步操做設置timeout,必定要選擇合適,若是異步操做自己的耗時較長,而你設置的timeout過短,會涉及到她還沒作完,就拋出異常了。

(7)多個協程函數時候的等候

await asyncio.wait(aws, *, loop=None, timeout=None, return_when=ALL_COMPLETED)

與上面的區別是,第一個參數aws是一個集合,要寫成集合set的形式,好比:

{func(),func(),func3()}

表示的是一系列的協程函數或者是任務,其中協程會自動包裝成任務。事實上,寫成列表的形式也是能夠的。

注意:該函數的返回值是兩個Tasks/Futures的集合:

(done, pending)

其中done是一個集合,表示已經完成的任務tasks;pending也是一個集合,表示尚未完成的任務。

常見的使用方法爲:done, pending = await asyncio.wait(aws)

參數解釋:

timeout (a float or int), 同上面的含義同樣,須要注意的是,這個不會觸發asyncio.TimeoutError異常,若是到了timeout還有任務沒有執行完,那些沒有執行完的tasks和futures會被返回到第二個集合pending裏面。

return_when參數,顧名思義,他表示的是,何時wait函數該返回值。只可以去下面的幾個值。 

Constant Description
FIRST_COMPLETED first_completes.當任何一個task或者是future完成或者是取消,wait函數就返回
FIRST_EXCEPTION 當任何一個task或者是future觸發了某一個異常,就返回,.若是是全部的task和future都沒有觸發異常,則等價與下面的 ALL_COMPLETED.
ALL_COMPLETED 當全部的task或者是future都完成或者是都取消的時候,再返回。
以下面例子所示:

import asyncio
import time

a=time.time()

async def hello1(): #大約2秒
print("Hello world 01 begin")
yield from asyncio.sleep(2)
print("Hello again 01 end")

async def hello2(): #大約3秒
print("Hello world 02 begin")
yield from asyncio.sleep(3)
print("Hello again 02 end")

async def hello3(): #大約4秒
print("Hello world 03 begin")
yield from asyncio.sleep(4)
print("Hello again 03 end")

async def main(): #入口函數
done,pending=await asyncio.wait({hello1(),hello2(),hello3()},return_when=asyncio.FIRST_COMPLETED)
for i in done:
print(i)
for j in pending:
print(j)

asyncio.run(main()) #運行入口函數

b=time.time()
print('---------------------------------------')
print(b-a)

'''運行結果爲:
Hello world 02 begin
Hello world 01 begin
Hello world 03 begin
Hello again 01 end
<Task finished coro=<hello1() done, defined at e:\Python學習\基礎入門\asyncio3.4詳解\test11.py:46> result=None>
<Task pending coro=<hello3() running at e:\Python學習\基礎入門\asyncio3.4詳解\test11.py:61> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object
at 0x000001FA8D394438>()]>>
<Task pending coro=<hello2() running at e:\Python學習\基礎入門\asyncio3.4詳解\test11.py:55> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object
at 0x000001FA8D394378>()]>>
---------------------------------------
2.033155679702759
'''
從上面能夠看出,hello1()試運行結束了的,hello2()和hello3()還沒結束。
(8)asyncio.as_completed()函數

這個函數我沒有找到合適的中文名稱去描述,因此哪位大神若是知道,望告知,不勝感激!它的函數原型以下:

asyncio.as_completed(aws, *, loop=None, timeout=None)

第一個參數aws:同上面同樣,是一個集合{}集合裏面的元素是coroutine、task或者future

第三個參數timeout:意義和上面講的的同樣

那到底什麼做用呢?其實很簡單,我的感受有點雞肋,從一個例子看起:

import asyncio
import time
import threading

a=time.time()

@asyncio.coroutine
def hello1():
print("Hello world 01 begin")
yield from asyncio.sleep(5) #大約5秒
print("Hello again 01 end")
return '哈哈1'

@asyncio.coroutine
def hello2():
print("Hello world 02 begin")
yield from asyncio.sleep(3) #大約3秒
print("Hello again 02 end")
return '哈哈2'

@asyncio.coroutine
def hello3():
print("Hello world 03 begin")
yield from asyncio.sleep(4) #大約4秒
print("Hello again 03 end")
return '哈哈3'

async def main():
s=asyncio.as_completed({hello1(),hello2(),hello3()})
for f in s:
result=await f
print(result)

asyncio.run(main())

b=time.time()
print('---------------------------------------')
print(b-a)

'''運行結果爲:
Hello world 03 begin
Hello world 01 begin
Hello world 02 begin
Hello again 01 end
哈哈1
Hello again 02 end
哈哈2
Hello again 03 end
哈哈3
---------------------------------------
4.0225794315338135
'''

結論:asyncio.as_completed()函數返回的是一個可迭代(iterator)的對象,對象的每一個元素就是一個future對象,不少小夥伴說,這不是至關於沒變嗎?其實返回的future集合是對參數的future集合從新組合,組合的順序就是,最早執行完的協程函數(coroutine、task、future)最早返回,從上面的代碼可知,參數爲

aws={hello1(),hello2(),hello3()},由於hello1大約花費5秒、hello2大約花費3秒、hello3大約花費4秒。返回的結果爲

s={hello2()、hello3()、hello(1)},由於hello2時間最短,故而放在前面,hello1時間最長,故而放在最後面。而後對返回的集合s開始迭代。

二、Task 類詳解

先來看一下Task類的簡單介紹(英文原文文檔)。

class asyncio.Task(coro, *, loop=None)

A Future-like object that runs a Python coroutine. Not thread-safe.

Tasks are used to run coroutines in event loops. If a coroutine awaits on a Future, the Task suspends the execution of the coroutine and waits for the completion of the Future. When the Future is done, the execution of the wrapped coroutine resumes.

Event loops use cooperative scheduling: an event loop runs one Task at a time. While a Task awaits for the completion of a Future, the event loop runs other Tasks, callbacks, or performs IO operations.

Use the high-level asyncio.create_task() function to create Tasks, or the low-level loop.create_task() or ensure_future() functions. Manual instantiation of Tasks is discouraged.

To cancel a running Task use the cancel() method. Calling it will cause the Task to throw a CancelledError exception into the wrapped coroutine. If a coroutine is awaiting on a Future object during cancellation, the Future object will be cancelled.

cancelled() can be used to check if the Task was cancelled. The method returns True if the wrapped coroutine did not suppress the CancelledError exception and was actually cancelled.

asyncio.Task inherits from Future all of its APIs except Future.set_result() and Future.set_exception().

Tasks support the contextvars module. When a Task is created it copies the current context and later runs its coroutine in the copied context.

上面的文字描述中推出了幾個很是重要的信息,特在此總結以下:

(1)他是做爲一個python協程對象,和Future對象很像的這麼一個對象,但不是線程安全的;他繼承了Future全部的API,,除了Future.set_result()和Future.set_Exception();

(2)使用高層API  asyncio.ccreate_task()建立任務,或者是使用低層API loop.create_task()或者是loop.ensure_future()建立任務對象;

(3)相比於協程函數,任務時有狀態的,可使用Task.cancel()進行取消,這會觸發CancelledError異常,使用cancelled()檢查是否取消。

下面介紹Task類常見的一些使用函數

(1)cancel()

Request the Task to be cancelled.

其實前面已經有所介紹,最好是使用他會出發CancelledError異常,因此須要取消的協程函數裏面的代碼最好在try-except語句塊中進行,這樣方便觸發異常,打印相關信息,可是Task.cancel()沒有辦法保證任務必定會取消,而Future.cancel()是能夠保證任務必定取消的。能夠參見下面的一個例子:

import asyncio

async def cancel_me():
print('cancel_me(): before sleep')
try:
await asyncio.sleep(3600) #模擬一個耗時任務
except asyncio.CancelledError:
print('cancel_me(): cancel sleep')
raise
finally:
print('cancel_me(): after sleep')

async def main():
#經過協程建立一個任務,須要注意的是,在建立任務的時候,就會跳入到異步開始執行
#由於是3.7版本,建立一個任務就至關因而運行了異步函數cancel_me
task = asyncio.create_task(cancel_me())
#等待一秒鐘
await asyncio.sleep(1)
print('main函數休息完了')
#發出取消任務的請求
task.cancel()
try:
await task #由於任務被取消,觸發了異常
except asyncio.CancelledError:
print("main(): cancel_me is cancelled now")

asyncio.run(main())

'''運行結果爲:
cancel_me(): before sleep
main函數休息完了
cancel_me(): cancel sleep
cancel_me(): after sleep
main(): cancel_me is cancelled now
'''
運行過程分析:

首先run函數啓動主函數入口main,在main中,由於第一句話就是調用異步函數cancel_me()函數,因此先打印出了第一句話;

而後進入cancel_me中的try語句,遇到await,暫停,這時候返回main中執行,可是有在main中遇到了await,也會暫停,可是因爲main中只須要暫停1秒,而cancel_me中要暫停3600秒,因此等到main的暫停結束後,接着運行main,因此打印出第二句話;

接下來遇到取消任務的請求task.cancel(),而後繼續執行main裏面的try,又遇到了await,接着main進入暫停,接下來進入到cancel_me函數中,可是因爲main中請求了取消任務,因此那個耗時3600秒的任務就再也不執行了,直接觸發了Cancelled_Error異常,打印出第三句話,接下來又raise一個異常信息;

接下來執行cancel_me的finally,打印出第四句話,此時cancel_me執行完畢,因爲他拋出了一個異常,返回到主程序main中,觸發異常,打印出第五句話。

(2)done()

當一個被包裝得協程既沒有觸發異常、也沒有被取消的時候,意味着它是done的,返回true。

(3)result()

返回任務的執行結果,

當任務被正常執行完畢,則返回結果;

當任務被取消了,調用這個方法,會觸發CancelledError異常;

當任務返回的結果是無用的時候,則調用這個方法會觸發InvalidStateError;

當任務出發了一個異常而中斷,調用這個方法還會再次觸發這個使程序中斷的異常。

(4)exception()

返回任務的異常信息,觸發了什麼異常,就返回什麼異常,若是任務是正常執行的無異常,則返回None;

當任務被取消了,調用這個方法會觸發CancelledError異常;

當任務沒有作完,調用這個方法會觸發InvalidStateError異常。

下面還有一些不經常使用的方法,以下:

(5)add_done_callback(callback, *, context=None)

(6)remove_done_callback(callback)

(7)get_stack(*, limit=None)

(8)print_stack(*, limit=None, file=None)

(9)all_tasks(loop=None),這是一個類方法

(10)current_task(loop=None),這是一個類方法

三、異步函數的結果獲取

對於異步編程、異步函數而言,最重要的就是異步函數調用結束以後,獲取異步函數的返回值,咱們能夠有如下幾種方式來獲取函數的返回值,第一是直接經過Task.result()來獲取;第二種是綁定一個回調函數來獲取,即函數執行完畢後調用一個函數來獲取異步函數的返回值。

(1)直接經過result來獲取

import asyncio
import time


async def hello1(a,b):
print("Hello world 01 begin")
await asyncio.sleep(3) #模擬耗時任務3秒
print("Hello again 01 end")
return a+b

coroutine=hello1(10,5)
loop = asyncio.get_event_loop() #第一步:建立事件循環
task=asyncio.ensure_future(coroutine) #第二步:將多個協程函數包裝成任務列表
loop.run_until_complete(task) #第三步:經過事件循環運行
print('-------------------------------------')
print(task.result())
loop.close()

'''運行結果爲
Hello world 01 begin
Hello again 01 end
-------------------------------------
15
'''
(2)經過定義回調函數來獲取

import asyncio
import time


async def hello1(a,b):
print("Hello world 01 begin")
await asyncio.sleep(3) #模擬耗時任務3秒
print("Hello again 01 end")
return a+b

def callback(future): #定義的回調函數
print(future.result())

loop = asyncio.get_event_loop() #第一步:建立事件循環
task=asyncio.ensure_future(hello1(10,5)) #第二步:將多個協程函數包裝成任務
task.add_done_callback(callback) #並被任務綁定一個回調函數

loop.run_until_complete(task) #第三步:經過事件循環運行
loop.close() #第四步:關閉事件循環


'''運行結果爲:
Hello world 01 begin
Hello again 01 end
15
'''
注意:所謂的回調函數,就是指協程函數coroutine執行結束時候會調用回調函數。並經過參數future獲取協程執行的結果。咱們建立的task和回調裏的future對象,其實是同一個對象,由於task是future的子類。

3、asyncio異步編程的基本模板
事實上,在使用asyncio進行異步編程的時候,語法形式每每是多樣性的,雖然理解異步編程的核心思想很重要,可是實現的時候終究仍是要編寫語句的,本次給出的模板,是兩個不一樣的例子,例子一是三個異步方法,它們都沒有參數,沒有返回值,都模擬一個耗時任務;例子二是三個異步方法,都有參數,都有返回值。

一、python3.7以前的版本

(1)例子一:無參數、無返回值

import asyncio
import time

a=time.time()

async def hello1():
print("Hello world 01 begin")
await asyncio.sleep(3) #模擬耗時任務3秒
print("Hello again 01 end")

async def hello2():
print("Hello world 02 begin")
await asyncio.sleep(2) #模擬耗時任務2秒
print("Hello again 02 end")

async def hello3():
print("Hello world 03 begin")
await asyncio.sleep(4) #模擬耗時任務4秒
print("Hello again 03 end")

loop = asyncio.get_event_loop() #第一步:建立事件循環
tasks = [hello1(), hello2(),hello3()] #第二步:將多個協程函數包裝成任務列表
loop.run_until_complete(asyncio.wait(tasks)) #第三步:經過事件循環運行
loop.close() #第四步:取消事件循環

'''運行結果爲:
Hello world 02 begin
Hello world 03 begin
Hello world 01 begin
Hello again 02 end
Hello again 01 end
Hello again 03 end
'''
(2)例子二:有參數、有返回值

import asyncio
import time


async def hello1(a,b):
print("Hello world 01 begin")
await asyncio.sleep(3) #模擬耗時任務3秒
print("Hello again 01 end")
return a+b

async def hello2(a,b):
print("Hello world 02 begin")
await asyncio.sleep(2) #模擬耗時任務2秒
print("Hello again 02 end")
return a-b

async def hello3(a,b):
print("Hello world 03 begin")
await asyncio.sleep(4) #模擬耗時任務4秒
print("Hello again 03 end")
return a*b

loop = asyncio.get_event_loop() #第一步:建立事件循環
task1=asyncio.ensure_future(hello1(10,5))
task2=asyncio.ensure_future(hello2(10,5))
task3=asyncio.ensure_future(hello3(10,5))
tasks = [task1,task2,task3] #第二步:將多個協程函數包裝成任務列表
loop.run_until_complete(asyncio.wait(tasks)) #第三步:經過事件循環運行
print(task1.result()) #而且在全部的任務完成以後,獲取異步函數的返回值
print(task2.result())
print(task3.result())
loop.close() #第四步:關閉事件循環

'''運行結果爲:
Hello world 01 begin
Hello world 02 begin
Hello world 03 begin
Hello again 02 end
Hello again 01 end
Hello again 03 end
15
5
50
'''
(3)總結:四步走(針對python3.7以前的版本)

第一步·:構造事件循環

loop=asyncio.get_running_loop() #返回(獲取)在當前線程中正在運行的事件循環,若是沒有正在運行的事件循環,則會顯示錯誤;它是python3.7中新添加的

loop=asyncio.get_event_loop() #得到一個事件循環,若是當前線程尚未事件循環,則建立一個新的事件循環loop;

loop=asyncio.set_event_loop(loop) #設置一個事件循環爲當前線程的事件循環;

loop=asyncio.new_event_loop() #建立一個新的事件循環
第二步:將一個或者是多個協程函數包裝成任務Task

#高層API
task = asyncio.create_task(coro(參數列表))   # 這是3.7版本新添加的
task = asyncio.ensure_future(coro(參數列表))

#低層API
loop.create_future(coro)
loop.create_task(coro)

'''須要注意的是,在使用Task.result()獲取協程函數結果的時候,使用asyncio.create_task()卻會顯示錯
可是使用asyncio.ensure_future卻正確,本人暫時不知道緣由,哪位大神知道,望告知,不勝感激!'''
第三步:經過事件循環運行

loop.run_until_complete(asyncio.wait(tasks)) #經過asyncio.wait()整合多個task

loop.run_until_complete(asyncio.gather(tasks)) #經過asyncio.gather()整合多個task

loop.run_until_complete(task_1) #單個任務則不須要整合

loop.run_forever() #可是這個方法在新版本已經取消,再也不推薦使用,由於使用起來不簡潔

'''
使用gather或者wait能夠同時註冊多個任務,實現併發,但他們的設計是徹底不同的,在前面的2.1.(4)中已經討論過了,主要區別以下:
(1)參數形式不同
gather的參數爲 *coroutines_or_futures,即如這種形式
tasks = asyncio.gather(*[task1,task2,task3])或者
tasks = asyncio.gather(task1,task2,task3)
loop.run_until_complete(tasks)
wait的參數爲列表或者集合的形式,以下
tasks = asyncio.wait([task1,task2,task3])
loop.run_until_complete(tasks)
(2)返回的值不同
gather的定義以下,gather返回的是每個任務運行的結果,
results = await asyncio.gather(*tasks)
wait的定義以下,返回dones是已經完成的任務,pending是未完成的任務,都是集合類型
done, pending = yield from asyncio.wait(fs)
(3)後面還會講到他們的進一步使用
'''
簡單來講:async.wait會返回兩個值:done和pending,done爲已完成的協程Task,pending爲超時未完成的協程Task,需經過future.result調用Task的result。而async.gather返回的是已完成Task的result。

第四步:關閉事件循環

loop.close()

'''
以上示例都沒有調用 loop.close,好像也沒有什麼問題。因此到底要不要調 loop.close 呢?
簡單來講,loop 只要不關閉,就還能夠再運行:
loop.run_until_complete(do_some_work(loop, 1))
loop.run_until_complete(do_some_work(loop, 3))
loop.close()
可是若是關閉了,就不能再運行了:
loop.run_until_complete(do_some_work(loop, 1))
loop.close()
loop.run_until_complete(do_some_work(loop, 3)) # 此處異常
建議調用 loop.close,以完全清理 loop 對象防止誤用
'''
二、python3.7版本

在最新的python3.7版本中,asyncio又引進了一些新的特性和API,

(1)例子一:無參數、無返回值

import asyncio
import time


async def hello1():
print("Hello world 01 begin")
await asyncio.sleep(3) #模擬耗時任務3秒
print("Hello again 01 end")

async def hello2():
print("Hello world 02 begin")
await asyncio.sleep(2) #模擬耗時任務2秒
print("Hello again 02 end")

async def hello3():
print("Hello world 03 begin")
await asyncio.sleep(4) #模擬耗時任務4秒
print("Hello again 03 end")

async def main():
results=await asyncio.gather(hello1(),hello2(),hello3())
for result in results:
print(result) #由於沒返回值,故而返回None

asyncio.run(main())

'''運行結果爲:
Hello world 01 begin
Hello world 02 begin
Hello world 03 begin
Hello again 02 end
Hello again 01 end
Hello again 03 end
None
None
None
'''
(2)例子二:有參數、有返回值

import asyncio
import time


async def hello1(a,b):
print("Hello world 01 begin")
await asyncio.sleep(3) #模擬耗時任務3秒
print("Hello again 01 end")
return a+b

async def hello2(a,b):
print("Hello world 02 begin")
await asyncio.sleep(2) #模擬耗時任務2秒
print("Hello again 02 end")
return a-b

async def hello3(a,b):
print("Hello world 03 begin")
await asyncio.sleep(4) #模擬耗時任務4秒
print("Hello again 03 end")
return a*b

async def main():
results=await asyncio.gather(hello1(10,5),hello2(10,5),hello3(10,5))
for result in results:
print(result)

asyncio.run(main())

'''運行結果爲:
Hello world 01 begin
Hello world 02 begin
Hello world 03 begin
Hello again 02 end
Hello again 01 end
Hello again 03 end
15
5
50
'''
(3)總結:兩步走(針對python3.7)

第一步:構建一個入口函數main

他也是一個異步協程函數,即經過async定義,而且要在main函數裏面await一個或者是多個協程,同前面同樣,我能夠經過gather或者是wait進行組合,對於有返回值的協程函數,通常就在main裏面進行結果的獲取。

第二步:啓動主函數main

這是python3.7新添加的函數,就一句話,即

asyncio.run(main())

注意:

再也不須要顯式的建立事件循環,由於在啓動run函數的時候,就會自動建立一個新的事件循環。並且在main中也不須要經過事件循環去掉用被包裝的協程函數,只須要向普通函數那樣調用便可 ,只不過使用了await關鍵字而已。

4、協程編程的優勢:
一、無cpu分時切換線程保存上下文問題(協程上下文怎麼保存)

二、遇到io阻塞切換(怎麼實現的)

三、無需共享數據的保護鎖(爲何)

四、系列文章下篇預告——介紹低層的API,事件循環究竟是怎麼實現的以及future類的實現。
---------------------

原文:https://blog.csdn.net/qq_27825451/article/details/86218230

 

 

=============================================================================================================================

 

1、事件循環EventLoop
事件循環是asyncio的核心,異步任務的運行、任務完成以後的回調、網絡IO操做、子進程的運行,都是經過事件循環完成的。在前一篇文章中,已經提到過,在python3.7中,咱們甚至徹底不用管事件循環,只須要使用高層API,即asyncio中的方法,咱們不多直接與事件循環打交道,可是爲了更加熟悉asyncio的運行原理,最好仍是瞭解EventLoop的設計原理。

一、事件循環的建立、獲取、設置(上文已經介紹過了)

(1)asyncio.get_running_loop()。python3.7新添加的

(2)asyncio.get_event_loop()

(3)asyncio.set_event_loop(loop)

(4)asyncio.new_event_loop()

二、運行和中止事件循環

(1)loop.run_until_complete(future)。運行事件循環,直到future運行結束

(2)loop.run_forever()。在python3.7中已經取消了,表示事件循環會一直運行,直到遇到stop。

(3)loop.stop()。中止事件循環

(4)loop.is_running()。若是事件循環依然在運行,則返回True

(5)loop.is_closed()。若是事件循環已經close,則返回True

(6)loop.close()。關閉事件循環

三、建立Future和Task

(1)loop.create_future(coroutine) ,返回future對象

(2)loop.create_task(corootine) ,返回task對象

(3)loop.set_task_factory(factory)

(4)loop.get_task_factory()

四、事件循環的時鐘

loop.time()。能夠這麼理解,事件循環內部也維護着一個時鐘,能夠查看事件循環如今運行的時間點是多少,就像普通的time.time()相似,它返回的是一個浮點數值,好比下面的代碼。

import asyncio

async def hello1(a,b):
print('準備作加法運算')
await asyncio.sleep(3)
return a+b

loop=asyncio.get_event_loop()
t1=loop.time() #開始時間
print(t1)
loop.run_until_complete(hello1(3,4))
t2=loop.time() #結束時間
print(t2)
print(t2-t1) #時間間隔
'''運行結果爲:
28525.671
準備作加法運算
28528.703
3.0320000000028813
'''
五、計劃執行回調函數(CallBacks)

(1)loop.call_later(delay, callback, *args, context=None)

首先簡單的說一下它的含義,就是事件循環在delay多長時間以後才執行callback函數,它的返回值是asyncio.TimerHandle類的一個實例對象。

(2)loop.call_at(when, callback, *args, context=None)

即在某一個時刻進行調用計劃的回調函數,第一個參數再也不是delay而是when,表示一個絕對的時間點,結合前面的loop.time使用,它的使用方法和call_later()很相似。它的返回值是asyncio.TimerHandle類的一個實例對象。

(3)loop.call_soon(callback, *args, context=None)

在下一個迭代的時間循環中馬上調用回調函數,用法同上面。它的返回值是asyncio.Handle類的一個實例對象。

(4)loop.call_soon_threadsafe(callback, *args, context=None)

這是call_soon()函數的線程安全版本,計劃回調函數必須在另外一個線程中使用。

須要注意的是:上面的幾個回調函數都只使用了「位置參數」哦,asyncio中,大部分的計劃回調函數都不支持「關鍵字參數」,若是是想要使用關鍵字參數,則推薦使用functools.aprtial()對方法進一步包裝,詳細能夠參考前面的python標準庫系列文章。

如:

# will schedule "print("Hello", flush=True)"
loop.call_soon(
functools.partial(print, "Hello", flush=True))
下面來看一下具體的使用例子。

import asyncio

def callback(n):
print('我是回調函數,參數爲: {0} '.format(n))


async def main(loop):
print('在異步函數中註冊回調函數')
loop.call_later(2, callback, 1)
loop.call_later(1, callback, 2)
loop.call_soon(callback, 3)

await asyncio.sleep(4)


loop = asyncio.get_event_loop()
print('進入事件循環')
loop.run_until_complete(main(loop))
print('關閉事件循環')
loop.close()

'''運行結果爲:
進入事件循環
在異步函數中註冊回調函數
我是回調函數,參數爲: 3
我是回調函數,參數爲: 2
我是回調函數,參數爲: 1
關閉事件循環
'''
再看一個簡單的例子:

import asyncio

def callback(a, loop):
print("個人參數爲 {0},執行的時間爲{1}".format(a,loop.time()))


#call_later, call_at
if __name__ == "__main__":
try:
loop = asyncio.get_event_loop()
now = loop.time()
loop.call_later(5, callback, 5, loop) #第一個參數設置的時間5.5秒後執行,
loop.call_at(now+2, callback, 2, loop) #在指定的時間,運行,當前時間+2秒
loop.call_at(now+1, callback, 1, loop)
loop.call_at(now+3, callback, 3, loop)
loop.call_soon(callback, 4, loop)
loop.run_forever() #要用這個run_forever運行,由於沒有傳入協程,這個函數在3.7中已經被取消
except KeyboardInterrupt:
print("Goodbye!")

'''運行結果爲:
個人參數爲 4,執行的時間爲266419.843
個人參數爲 1,執行的時間爲266420.843
個人參數爲 2,執行的時間爲266421.859
個人參數爲 3,執行的時間爲266422.859
個人參數爲 5,執行的時間爲266424.843
'''
總結注意事項:

(1)CallBack函數只可以定義爲同步方法,不可以定義爲async方法,及不能使用async和@asyncio.coroutine修飾;

(2)每個CallBack方法只會調用一次,若是在同一個時刻有另個CallBack方法須要調用,則他們的執行順序是不肯定的;

(3)注意使用functools.partial()去修飾帶有關鍵字參數的CallBack方法;

(4)如何理解?對於通常的異步函數,咱們須要將它放在時間循環裏面,而後經過事件循環去循環調用它,而由於CallBack並非異步函數,它是定義爲普通的同步方法,因此不可以放在時間循環裏面,可是若是我依然想要讓事件循環去執行它怎麼辦呢?那就不放進事件循環,直接讓事件循環「當即、稍後、在何時」去執行它不就好了嘛,call的含義就是「執行」。

2、底層API之Future
一、Future的定義概覽

Future的本質是一個類。他表示的是異步操做的最終將要返回的結果,故而命名爲Future,它不是線程安全的。Future對象是awaitable的,參見系類文章的前面,

class asyncio.Future(*, loop=None)

二、asyncio中關於Future的幾個方法

(1)asyncio.isfuture(obj) 。判斷一個對象是否是Future,注意python中一切皆對象哦,包括函數,當obj是下面幾種狀況時返回true:

asyncio.Future的實例對象
asyncio.Task的實例對象
一個具備 _asyncio_future_blocking 屬性的對象
(2)asyncio.ensure_future(obj, *, loop=None)。將一個obj包裝成Future

(3)asyncio.wrap_future(future, *, loop=None)

將concurrent.futures.Future對象包裝成一個 asyncio.Future 對象。

三、Future對象的經常使用方法

(1)result()。返回Future執行的結果返回值

若是Future被執行完成,若是使用set_result()方法設置了一個結果,那個設置的value就會被返回;

若是Future被執行完成,若是使用set_exception()方法設置了一個異常,那麼使用這個方法也會觸發異常;

若是Future被取消了,那麼使用這個方法會觸發CancelledError異常;

若是Future的結果不可用或者是不可達,那麼使用這個方法也會觸發InvalidStateError異常;

(2)set_result(result)

標記Future已經執行完畢,而且設置它的返回值。

(3)set_exception(exception)

標記Future已經執行完畢,而且觸發一個異常。

(4)done()

若是Future1執行完畢,則返回 True 。

(5)cancelled()

判斷任務是否取消。

(6)add_done_callback(callback, *, context=None)

在Future完成以後,給它添加一個回調方法,這個方法就至關因而loop.call_soon()方法,參見前面,以下例子:

若是要回調帶有關鍵字參數的函數,也須要使用partial方法哦。

(7)remove_done_callback(callback)

(8)cancel()

(9)exception()

(10)get_loop()。返回Future所綁定的事件循環

3、集中回答如下幾個問題
經過前面的講解,已經講清楚了asyncio架構裏面的一些基本東西,如今能夠來集中回答如下一些常見的問題了,弄清楚這希爾問題,能夠方便咱們更加深刻的理解協程。

一、不少個協程一塊兒運行有建立新的線程嗎?

協程運行時,都是在一個線程中運行的,沒有建立新的線程。以下

import asyncio
import time
import threading

a=time.time()

async def hello1():
    print(f"Hello world 01 begin,my thread is:{threading.currentThread()}")
    await asyncio.sleep(3)
    print("Hello again 01 end")

async def hello2():
    print(f"Hello world 02 begin,my thread is:{threading.currentThread()}")
    await asyncio.sleep(2)
    print("Hello again 02 end")

async def hello3():
    print(f"Hello world 03 begin,my thread is:{threading.currentThread()}")
    await asyncio.sleep(1)
    print("Hello again 03 end")

loop = asyncio.get_event_loop()
tasks = [hello1(), hello2(),hello3()]
loop.run_until_complete(asyncio.wait(tasks))

loop.close()


b=time.time()
print('---------------------------------------')
print(b-a)
'''運行結果爲:
Hello world 03 begin,my thread is:<_MainThread(MainThread, started 4168)>
Hello world 02 begin,my thread is:<_MainThread(MainThread, started 4168)>
Hello world 01 begin,my thread is:<_MainThread(MainThread, started 4168)>
Hello again 03 end
Hello again 02 end
Hello again 01 end
---------------------------------------
2.994506597518921
'''
從上面那個能夠看出,三個不一樣的協程函數都是在一個線程完成的。可是並非意味着,多個協程函數只能在一個線程中執行,一樣能夠建立新的線程,其實咱們徹底能夠在新的線程中從新建立一個事件循環,具體的實例參見後面。

二、線程必定效率更高嗎?

也不是絕對的,固然在通常狀況下,異步方式的執行效率是更高的,就好比上面的三個函數,若是按照同步的方式執行,則一共須要6秒的時間,可是採用協程則只須要最長的那個時間3秒,這天然是提升了工做效率,那是否是必定會提升呢?也不必定,這與協程的調用方式是由密切關係的。以下所示:

import asyncio
import time
import threading

a=time.time()

async def hello1():
print(f"Hello world 01 begin,my thread is:{threading.currentThread()}")
await asyncio.sleep(3)
print("Hello again 01 end")

async def hello2():
print(f"Hello world 02 begin,my thread is:{threading.currentThread()}")
await asyncio.sleep(2)
print("Hello again 02 end")

async def hello3():
print(f"Hello world 03 begin,my thread is:{threading.currentThread()}")
await hello2()
await hello1()
print("Hello again 03 end")

loop = asyncio.get_event_loop()
tasks = [hello3()]
loop.run_until_complete(asyncio.wait(tasks))

loop.close()

b=time.time()
print('---------------------------------------')
print(b-a)

'''運行結果爲:
Hello world 03 begin,my thread is:<_MainThread(MainThread, started 13308)>
Hello world 02 begin,my thread is:<_MainThread(MainThread, started 13308)>
Hello again 02 end
Hello world 01 begin,my thread is:<_MainThread(MainThread, started 13308)>
Hello again 01 end
Hello again 03 end
---------------------------------------
5.008373498916626
'''
咱們發現一個問題,上面執行的順序徹底不是異步執行,執行的時間也沒有獲得改善,究其緣由,是由於上面是經過hello3去調用hello1和hello2的,這和同步調用的方式徹底是同樣的,即便我定義的都是異步方法,它既沒有提升執行效率,還會有阻塞。

結論:在有不少個異步方式的時候,必定要儘可能避免這種異步函數的直接調用,這和同步是沒什麼區別的,必定要經過事件循環loop,「讓事件循環在各個異步函數之間不停遊走」,這樣纔不會形成阻塞。

三、協程會不會有阻塞呢?

異步方式依然會有阻塞的,當咱們定義的不少個異步方法彼此之間有一來的時候,好比,我必需要等到函數1執行完畢,函數2須要用到函數1的返回值,如上面的例子2所示,就會形成阻塞,這也是異步編程的難點之一,如何合理配置這些資源,儘可能減小函數之間的明確依賴,這是很重要的。

四、協程的4種狀態

協程函數相比於通常的函數來講,咱們能夠將協程包裝成任務Task,任務Task就在於能夠跟蹤它的狀態,我就知道它具體執行到哪一步了,通常來講,協程函數具備4種狀態,能夠經過相關的模塊進行查看,請參見前面的文章,他的四種狀態爲:

Pending
Running
Done
Cacelled
 建立future的時候,task爲pending,事件循環調用執行的時候固然就是running,調用完畢天然就是done,若是須要中止事件循環,中途須要取消,就須要先把task取消,即爲cancelled。

4、多任務實現併發
python異步協程函數的最終目的是實現併發,這樣才能提升工做效率。

咱們常常看見下面這樣的代碼,即:

tasks = asyncio.gather(*[task1,task2,task3])
loop.run_until_complete(tasks)

#或者是
tasks = asyncio.wait([task1,task2,task3])
loop.run_until_complete(tasks)

#甚至能夠寫在一塊兒,即
loop.run_until_complete(asyncio.gather(*[task1,task2,task3])
#或者是
asyncio.gather(asyncio.wait([task1,task2,task3]))
上面這些都是一些簡單的應用,能夠同時進行多任務,進行併發,可是若是咱們每個任務都有返回值,並且須要獲取這些返回值,這樣作顯然還不夠,還須要作進一步的處理。

asyncio實現併發的思想是同樣的,只是實現的手段稍有區別,主要有如下幾種實現方式:

(1)使用gather同時註冊多個任務,實現併發

awaitable asyncio.gather(*aws, loop=None, return_exceptions=False)

注意事項:

gather的返回值是它所綁定的全部任務的執行結果,並且順序是不變的,即返回的result的順序和綁定的順序是保持一致的。

除此以外,它是awaitable的,因此,若是須要獲取多個任務的返回值,既然是awaitable的,就須要將它放在一個函數裏面,因此咱們引入一個包裝多個任務的入口main,這也是python3.7的思想。以下:

# import asyncio
# import time
# import threading

# a=time.time()

# async def hello1():
# print(f"Hello world 01 begin,my thread is:{threading.currentThread()}")
# await asyncio.sleep(3)
# print("Hello again 01 end")

# async def hello2():
# print(f"Hello world 02 begin,my thread is:{threading.currentThread()}")
# await asyncio.sleep(2)
# print("Hello again 02 end")

# async def hello3():
# print(f"Hello world 03 begin,my thread is:{threading.currentThread()}")
# await hello2()
# await hello1()
# print("Hello again 03 end")

# loop = asyncio.get_event_loop()
# tasks = [hello3()]
# loop.run_until_complete(asyncio.wait(tasks))

# loop.close()


# b=time.time()
# print('---------------------------------------')
# print(b-a)

import asyncio
import time


async def hello1(a,b):
print("Hello world 01 begin")
await asyncio.sleep(3) #模擬耗時任務3秒
print("Hello again 01 end")
return a+b

async def hello2(a,b):
print("Hello world 02 begin")
await asyncio.sleep(2) #模擬耗時任務2秒
print("Hello again 02 end")
return a-b

async def hello3(a,b):
print("Hello world 03 begin")
await asyncio.sleep(4) #模擬耗時任務4秒
print("Hello again 03 end")
return a*b

async def main(): #封裝多任務的入口函數
task1=asyncio.ensure_future(hello1(10,5))
task2=asyncio.ensure_future(hello2(10,5))
task3=asyncio.ensure_future(hello3(10,5))
results=await asyncio.gather(task1,task2,task3)
for result in results: #經過迭代獲取函數的結果,每個元素就是相對應的任務的返回值,順序都沒變
print(result)


loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()

'''運行結果爲:
Hello world 01 begin
Hello world 02 begin
Hello world 03 begin
Hello again 02 end
Hello again 01 end
Hello again 03 end
15
5
50
'''
(2)使用wait能夠同時註冊多個任務,實現併發

await asyncio.wait(aws, *, loop=None, timeout=None, return_when=ALL_COMPLETED)

它與gather不一樣的地方是他的參數是集合類型,並且他的返回類型是這樣一個形式,即

 (done, pending).   #返回dones是已經完成的任務,pending是未完成的任務,都是集合類型,不一樣的是每個元素再也不是返回值,而是某一個task哦,

相同的是它依然也是awaitable的,故而也須要定義在一個異步函數main()中,以下。

#前面的代碼和上面同樣
async def main(): #封裝多任務的入口函數
task1=asyncio.ensure_future(hello1(10,5))
task2=asyncio.ensure_future(hello2(10,5))
task3=asyncio.ensure_future(hello3(10,5))
done,pending=await asyncio.wait([task1,task2,task3])
for done_task in done:
print(done_task.result()) #這裏返回的是一個任務,不是直接的返回值,故而須要使用result函數進行獲取


loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()

#運行結果也同樣
(3)使用as_completed能夠同時註冊多個任務,實現併發

這個方法使用的比較少,與前面的兩個gather和wait不一樣的是,它不是awaitable。使用實例參見前面的一篇文章,參見以下:

(4)主調方獲取任務的運行結果

上面的運行結果,都是在main()函數裏面獲取的運行結果,那可不能夠再也不main()裏面獲取結果呢,,固然是能夠的,咱們能夠這樣作,

async def main(): #封裝多任務的入口函數
task1=asyncio.ensure_future(hello1(10,5))
task2=asyncio.ensure_future(hello2(10,5))
task3=asyncio.ensure_future(hello3(10,5))

return await asyncio.gather(task1,task2,task3) #不在這裏獲取結果,只是返回


loop = asyncio.get_event_loop()
results=loop.run_until_complete(main()) #在這裏再獲取返回函數值,而後迭代獲取
for result in results:
print(result)
loop.close()

#y運行結果同上
或者是以下:

async def main(): #封裝多任務的入口函數
task1=asyncio.ensure_future(hello1(10,5))
task2=asyncio.ensure_future(hello2(10,5))
task3=asyncio.ensure_future(hello3(10,5))

return await asyncio.wait([task1,task2,task3]) #不在這裏獲取結果,只是返回


loop = asyncio.get_event_loop()
done,pending=loop.run_until_complete(main()) #在這裏再獲取返回函數值,而後迭代獲取
for done_task in done:
print(done_task.result())
loop.close()
5、Future補充下一篇預告
一、Future補充

asyncio中的Future類是模仿concurrent.futures.Future類而設計的,關於concurrent.futures.Future,能夠查閱相關的文檔。它們之間的主要區別是:

(1)asyncio.Future對象是awaitable的,可是concurrent.futures.Future對象是不可以awaitable的;

(2)asyncio.Future.result()和asyncio.Future.exception()是不接受關鍵字參數timeout的;

(3)當Future沒有完成的時候,asyncio.Future.result()和asyncio.Future.exception()將會觸發一個InvalidStateError異常;

(4)使用asyncio.Future.add_done_callback()註冊的回調函數不會當即執行,它可使用loop.call_soon代替;

(5)asyncio裏面的Future和concurrent.futures.wait()以及concurrent.futures.as_completed()是不兼容的。

有興趣的小夥伴能夠本身學一下concurrent.futures哦!

 

=============================================================================================================================

 

本文爲系列文章的第七篇,將介紹如何使用多線程結合異步編程asyncio,開發出真正「不假死」的應用程序;以及如何模擬一個timer,實現定時操做。

1、異步方法依然會假死(freezing)
什麼是程序的假死,這裏再也不多描述,特別是在編寫桌面程序的時候,若是是使用單個線程,同步函數的方式,假死是不可避免的,可是有時候咱們即便是使用了異步函數的方式依然是不可避免的,依然會假死,這是爲何呢,下面會經過幾個例子來詳細說明。

一、通常程序的調用方「假死」


import asyncio
import time
import threading

#定義一個異步操做
async def hello1(a,b):
print(f"異步函數開始執行")
await asyncio.sleep(3)
print("異步函數執行結束")
return a+b

#在一個異步操做裏面調用另外一個異步操做
async def main():
c=await hello1(10,20)
print(c)
print("主函數執行")

loop = asyncio.get_event_loop()
tasks = [main()]
loop.run_until_complete(asyncio.wait(tasks))

loop.close()

'''運行結果爲:
異步函數開始執行(在此處要等待3秒)
異步函數執行結束
30
主函數執行
'''
注意一個問題:咱們前面所講的例子中,沒有出現等待,是由於各個異步方法之間是「徹底並列」關係,彼此之間沒有依賴,因此,我能夠將全部的異步操做「gather」起來,而後經過事件循環,讓事件循環在多個異步方法之間來回調用,永不中止,故而沒有出現等待。

可是,現實中不可能全部的異步方法都是徹底獨立的,沒有任何關係的,在上面的這個例子中,就是很好的說明,hello1是一個耗時任務,耗時大約3秒,main也是一個異步方法,可是main中須要用到hello1中的返回結果,因此他必需要等待hello1運行結束以後再才能繼續執行,這就是爲何會獲得上面結果的緣由。這也再一次說明,異步依然是會有阻塞的。

咱們也能夠這樣理解,由於我給事件循環只註冊了一個異步方法,那就是main,當在main裏面遇到了await,事件循環掛起,轉而尋找其餘的異步方法,可是因爲只註冊了一個異步方法給事件循環,他沒有其餘的方法可執行了,因此只能等待,讓hello1執行完了,再繼續執行。

二、窗體程序的假死

(1)同步假死

import tkinter as tk # 導入 Tkinter 庫
import time

class Form:
def __init__(self):
self.root=tk.Tk()
self.root.geometry('500x300')
self.root.title('窗體程序') #設置窗口標題

self.button=tk.Button(self.root,text="開始計算",command=self.calculate)
self.label=tk.Label(master=self.root,text="等待計算結果")

self.button.pack()
self.label.pack()
self.root.mainloop()

def calculate(self):
time.sleep(3) #模擬耗時計算
self.label["text"]=300

if __name__=='__main__':
form=Form()
運行的結果就是,我單機一下「開始計算」按鈕,而後窗體會假死,這時候沒法移動窗體、也沒法最大化最小化、3秒鐘以後,「等待計算結果」的label會顯示出3,而後前面移動的窗體等操做接着發生,最終效果以下:

 

上面的窗體會假死,這無可厚非,由於,全部的操做都是同步方法,只有一個線程,負責維護窗體狀態的線程和執行好使計算的線程是同一個,當遇到time.sleep()的時候天然會遇到阻塞。那若是咱們將耗時任務換成異步方法呢?代碼以下:

(2)異步假死

import tkinter as tk # 導入 Tkinter 庫
import asyncio

class Form:
def __init__(self):
self.root=tk.Tk()
self.root.geometry('500x300')
self.root.title('窗體程序') #設置窗口標題

self.button=tk.Button(self.root,text="開始計算",command=self.get_loop)
self.label=tk.Label(master=self.root,text="等待計算結果")

self.button.pack()
self.label.pack()

self.root.mainloop()

#定義一個異步方法,模擬耗時計算任務
async def calculate(self):
await asyncio.sleep(3)
self.label["text"]=300

#asyncio任務只能經過事件循環運行,不能直接運行異步函數
def get_loop(self):
self.loop=asyncio.get_event_loop()
self.loop.run_until_complete(self.calculate())
self.loop.close()


if __name__=='__main__':
form=Form()
咱們發現,窗體依然會形成阻塞,狀況和前面的同步方法是同樣的,爲何會這樣呢?由於這個地方雖然啓動了事件循環,可是擁有事件循環的那個線程同時還須要維護窗體的狀態,始終只有一個線程在運行,當單擊「開始計算」按鈕,開始執行get_loop函數,在get_loop裏面啓動異步方法calculate,而後遇到await,這個時候事件循環暫停,可是因爲事件循環只註冊了calculate一個異步方法,也沒其餘事情幹,因此只能等待,形成假死阻塞。

解決辦法就是我專門再建立一個線程去執行一些計算任務,維護窗體狀態的線程就之專門負責維護狀態,後面再詳說。

2、多線程結合asyncio解決調用時的假死
一、asyncio專門實現Concurrency and Multithreading(多線程和併發)的函數介紹

爲了讓一個協程函數在不一樣的線程中執行,咱們可使用如下兩個函數

(1)loop.call_soon_threadsafe(callback, *args),這是一個很底層的API接口,通常不多使用,本文也暫時不作討論。

(2)asyncio.run_coroutine_threadsafe(coroutine,loop)

第一個參數爲須要異步執行的協程函數,第二個loop參數爲在新線程中建立的事件循環loop,注意必定要是在新線程中建立哦,該函數的返回值是一個concurrent.futures.Future類的對象,用來獲取協程的返回結果。

future = asyncio.run_coroutine_threadsafe(coro_func(), loop)   # 在新線程中運行協程

result = future.result()   #等待獲取Future的結果

二、不阻塞的多線程併發實例

asyncio.run_coroutine_threadsafe(coroutine,loop)的意思很簡單,就是我在新線程中建立一個事件循環loop,而後在新線程的loop中不斷不停的運行一個或者是多個coroutine。參考下面代碼:

import asyncio

import asyncio,time,threading

#須要執行的耗時異步任務
async def func(num):
print(f'準備調用func,大約耗時{num}')
await asyncio.sleep(num)
print(f'耗時{num}以後,func函數運行結束')

#定義一個專門建立事件循環loop的函數,在另外一個線程中啓動它
def start_loop(loop):
asyncio.set_event_loop(loop)
loop.run_forever()

#定義一個main函數
def main():
coroutine1 = func(3)
coroutine2 = func(2)
coroutine3 = func(1)

new_loop = asyncio.new_event_loop() #在當前線程下建立時間循環,(未啓用),在start_loop裏面啓動它
t = threading.Thread(target=start_loop,args=(new_loop,)) #經過當前線程開啓新的線程去啓動事件循環
t.start()

asyncio.run_coroutine_threadsafe(coroutine1,new_loop) #這幾個是關鍵,表明在新線程中事件循環不斷「遊走」執行
asyncio.run_coroutine_threadsafe(coroutine2,new_loop)
asyncio.run_coroutine_threadsafe(coroutine3,new_loop)

for i in "iloveu":
print(str(i)+" ")

if __name__ == "__main__":
main()

'''運行結果爲:
i 準備調用func,大約耗時3
l 準備調用func,大約耗時2
o 準備調用func,大約耗時1
v
e
u
耗時1以後,func函數運行結束
耗時2以後,func函數運行結束
耗時3以後,func函數運行結束
'''
咱們發現,main是在主線程中的,而三個協程函數是在新線程中的,它們是在一塊兒執行的,沒有形成主線程main的阻塞。下面再看一下窗體函數中的實現。

三、tkinter+threading+asyncio

import tkinter as tk # 導入 Tkinter 庫
import time
import asyncio
import threading

class Form:
def __init__(self):
self.root=tk.Tk()
self.root.geometry('500x300')
self.root.title('窗體程序') #設置窗口標題

self.button=tk.Button(self.root,text="開始計算",command=self.change_form_state)
self.label=tk.Label(master=self.root,text="等待計算結果")

self.button.pack()
self.label.pack()

self.root.mainloop()

async def calculate(self):
await asyncio.sleep(3)
self.label["text"]=300

def get_loop(self,loop):
self.loop=loop
asyncio.set_event_loop(self.loop)
self.loop.run_forever()
def change_form_state(self):
coroutine1 = self.calculate()
new_loop = asyncio.new_event_loop() #在當前線程下建立時間循環,(未啓用),在start_loop裏面啓動它
t = threading.Thread(target=self.get_loop,args=(new_loop,)) #經過當前線程開啓新的線程去啓動事件循環
t.start()

asyncio.run_coroutine_threadsafe(coroutine1,new_loop) #這幾個是關鍵,表明在新線程中事件循環不斷「遊走」執行


if __name__=='__main__':
form=Form()
運行上面的代碼,咱們發現,此時點擊「開始計算」按鈕執行耗時任務,沒有形成窗體的任何阻塞,我能夠最大最小化、移動等等,而後3秒以後標籤會自動顯示運算結果。爲何會這樣?

上面的代碼中,get_loop()、change_form_state()、__init__()都是定義在主線程中的,窗體的狀態維護也是主線程,二耗時計算calculate()是一個異步協程函數。

如今單擊「開始計算按鈕」,這個事件發生以後,會觸發主線程的chang_form_state函數,而後在該函數中,會建立新的線程,經過新的線程建立一個事件循環,而後將協程函數註冊到新線程中的事件循環中去,達到的效果就是,主線程作主線程的,新線程作新線程的,不會形成任何阻塞。

四、multithreading+asyncio總結

第一步:定義須要異步執行的一系列操做,及一系列協程函數;

第二步:在主線程中定義一個新的線程,而後在新線程中產生一個新的事件循環;

第三步:在主線程中,經過asyncio.run_coroutine_threadsafe(coroutine,loop)這個方法,將一系列異步方法註冊到新線程的loop裏面去,這樣就是新線程負責事件循環的執行。

3、使用asyncio實現一個timer
所謂的timer指的是,指定一個時間間隔,讓某一個操做隔一個時間間隔執行一次,如此周而復始。不少編程語言都提供了專門的timer實現機制、包括C++、C#等。可是 Python 並無原生支持 timer,不過能夠用 asyncio.sleep 模擬。

大體的思想以下,將timer定義爲一個異步協程,而後經過事件循環去調用這個異步協程,讓事件循環不斷在這個協程中反反覆調用,只不過隔幾秒調用一次便可。

簡單的實現以下(本例基於python3.7):

async def delay(time):
await asyncio.sleep(time)

async def timer(time,function):
while True:
future=asyncio.ensure_future(delay(time))
await future
future.add_done_callback(function)

def func(future):
print('done')

if __name__=='__main__':
asyncio.run(timer(2,func))

'''運行結果爲:
done
done
done
done
done
done
done
done
done
done
done
.
.
.
.每隔2秒打印一個done
'''
幾個注意點:asyncio.sleep()自己就是一個協程函數,故而能夠將它封裝成一個Task或者是Future,等待時間結束也就是任務完成,綁定回調函數。固然,自己python語法靈活,上面只是其中一種實現而已。
---------------------
做者:LoveMIss-Y
來源:CSDN
原文:https://blog.csdn.net/qq_27825451/article/details/86483493
版權聲明:本文爲博主原創文章,轉載請附上博文連接!

 

 

 

 

 

 

 

 

 

--------------------- 原文:https://blog.csdn.net/qq_27825451/article/details/86292513

相關文章
相關標籤/搜索