python多線程中由於有GIL(Global Interpreter Lock 全局解釋器鎖 )的存在,因此對CPU密集型程序顯得很雞肋;但對IO密集型的程序,GIL會在調用IO操做前釋放,因此對IO密集型多線程仍是挺有做用。javascript
然而多線程是競爭型的,調度由CPU決定,有時會顯得沒那麼容易控制;因此python中也實現了一種能夠由程序本身來調度的異步方式,叫作協程。css
協程是一種用戶態的輕量級線程,又稱微線程。html
協程擁有本身的寄存器上下文和棧,調度切換時,將寄存器上下文和棧保存到其餘地方,在切回來的時候,恢復先前保存的寄存器上下文和棧。所以:協程能保留上一次調用時的狀態(即全部局部狀態的一個特定組合),每次過程重入時,就至關於進入上一次調用的狀態,換種說法:進入上一次離開時所處邏輯流的位置。java
簡單說協程在執行某個子程序(函數)時,能夠指定或者隨機地中斷,而後去執行其餘的子程序(函數),在合適的時候再返回到中斷子程序中止時的狀態繼續執行。聽起像生成器的特性,實際上協程也是基於生成器的。因此協程是經過程序自身的控制,去切換不一樣任務,實現併發的效果。也就是協程是單線程執行,沒有多線程由CPU調度時線程切換的開銷,因此效率較高。python
再多說直白一點就是:git
多線程執行多個任務時,CPU分配線程資源給每一個任務,每一個任務並行(多核才行,每一個單位時間內,一個CPU只能處理一個線程)的執行,但若是任務多了,而且線程有限,CPU會調度線程資源一會執行一個程序,在不一樣程序間切換。(而且因爲python GIL存在,同一時刻只能執行一個線程任務,並行也就成了併發,宏觀上也實際是單線程(單核)了)。總結就是多線程由CPU分配調度線程資源給子程序。github
而協程的執行不一樣,它是單一的線程(主線程),將這個線程從開始到結束的時間做爲資源分配給子程序,每一個子程序能使用這個時間資源能夠由咱們來控制。同時因爲協程具備生成器那樣保存狀態的特性,遇到阻塞時能夠去執行其餘的程序,返回來執行時又不會丟失狀態,因此能夠經過這種異步的方式實現單一線程的併發。redis
同時由於只有一個線程,也不存在同時寫變量衝突,在協程中控制共享資源不加鎖,只須要判斷狀態就行了,因此執行效率比多線程高不少。數據庫
經過liaoxuefeng.com上的一個例子來演示下協程:網絡
傳統的生產者-消費者模型是一個線程寫消息,一個線程取消息,經過鎖機制控制隊列和等待,但一不當心就可能死鎖。
若是改用協程,生產者生產消息後,直接經過yield跳轉到消費者開始執行,待消費者執行完畢後,切換回生產者繼續生產,效率極高:
def consumer(): r = '' while True: n = yield r if not n: return print('[CONSUMER] Consuming %s...' % n) r = '200 OK' def produce(c): c.send(None) n = 0 while n < 5: n = n + 1 print('[PRODUCER] Producing %s...' % n) r = c.send(n) print('[PRODUCER] Consumer return: %s' % r) c.close() c = consumer() produce(c)
輸出
[PRODUCER] Producing 1... [CONSUMER] Consuming 1... [PRODUCER] Consumer return: 200 OK [PRODUCER] Producing 2... [CONSUMER] Consuming 2... [PRODUCER] Consumer return: 200 OK [PRODUCER] Producing 3... [CONSUMER] Consuming 3... [PRODUCER] Consumer return: 200 OK [PRODUCER] Producing 4... [CONSUMER] Consuming 4... [PRODUCER] Consumer return: 200 OK [PRODUCER] Producing 5... [CONSUMER] Consuming 5... [PRODUCER] Consumer return: 200 OK
注意到consumer
函數是一個generator
,把一個consumer
傳入produce
後:
首先調用c.send(None)
啓動生成器;
而後,一旦生產了東西,經過c.send(n)
切換到consumer
執行;
consumer
經過yield
拿到消息,處理,又經過yield
把結果傳回;
produce
拿到consumer
處理的結果,繼續生產下一條消息;
produce
決定不生產了,經過c.close()
關閉consumer
,整個過程結束。
整個流程無鎖,由一個線程執行,produce
和consumer
協做完成任務,因此稱爲「協程」,而非線程的搶佔式多任務。
注意的是,生成器啓動或恢復執行一次,將會在yield處暫停。上面的第1步僅僅執行到了yield r,並無執行到賦值語句 n = yield r ,到了第2步,生成器恢復執行經過send(n)纔給consumer中n賦值。
send(value)方法:做用是發送值給yield表達式。啓動generator則是調用send(None)。具體流程,能夠經過ide調試來直觀的看懂
但上面示例並不能體現協程併發的特性,下面由asyncio這內置庫來實現
(基於3.5後版本)
asyncio 是用來編寫併發代碼的庫,使用 async/await 語法。
asyncio 被用做多個提供高性能 Python 異步框架的基礎,包括網絡和網站服務,數據庫鏈接庫,分佈式任務隊列等等。
關於asyncio的一些關鍵字的說明:
event_loop 事件循環:程序開啓一個無限循環,把一些函數註冊到事件循環上,當知足事件發生的時候,調用相應的協程函數
coroutine 協程:協程對象,指一個使用async關鍵字定義的函數,它的調用不會當即執行函數,而是會返回一個協程對象。協程對象須要註冊到事件循環,由事件循環調用。
task 任務:一個協程對象就是一個原生能夠掛起的函數,任務則是對協程進一步封裝,其中包含了任務的各類狀態
future: 表明未來執行或沒有執行的任務的結果。它和task上沒有本質上的區別
async/await 關鍵字:python3.5用於定義協程的關鍵字,async定義一個協程,await用於掛起阻塞的異步調用接口。
經過async關鍵字定義一個協程(coroutine),協程也是一種對象。下面say_after,main就是一個協程
import asyncio import time async def say_after(delay, what): await asyncio.sleep(delay) print(what) async def main(): print("started ") s_time = time.time() await say_after(1, 'hello') await say_after(2, 'world') print("runtime : ",time.time()-s_time) print("finished ") asyncio.run(main())
asyncio.run() 函數用來運行一個協程對象,這裏咱們將main()做爲入口函數。await等待一個協程。上面代碼段會在等待 1 秒後打印 "hello",而後 再次 等待 2 秒後打印 "world"。asyncio.sleep表示阻塞多少秒,運行結果以下
started hello world runtime : 3.000959634780884 finished
能夠觀察到上面的代碼,是同步運行的,兩個await say_after之間遇到了阻塞。由於asyncio.run() 只是單純的運行一個協程,並不會併發運行
運行協程對象的方法主要有:
1. 經過asyncio.run(main) 運行一個協程,同步的方式,主要用於運行入口協程
2. 在另外一個已經運行的協程中用 `await` 等待它,好比上面運行了main協程,其中等待的say_after協程也會運行
3. 將協程封裝成task或future對象,而後掛到事件循環loop上,使用loop來運行。主要方法爲loop.run_until_complete。此方法能夠異步的併發運行
實際上參考源碼asyncio.run本質也是獲取loop,運行協程,即協程依靠loop運行
asyncio.create_task() 函數用來併發運行多個協程,更改上面的例子
import asyncio import time async def say_after(delay, what): await asyncio.sleep(delay) print(what) async def main(): print("started ") task1 = asyncio.create_task(say_after(1, 'hello')) task2 = asyncio.create_task(say_after(2, 'world')) s_time = time.time() await task1 await task2 print("runtime : ",time.time()-s_time) print("finished ") asyncio.run(main())
運行輸出,比上面快一秒。這裏咱們使用create_task將協程封裝成task對象(會自動的添加到事件循環中),而後咱們在main這個入口協程中掛起task1和task2。使用run運行main入口協程,它會自動檢測循環事件,並將等待task1和task2兩個task執行完成
started hello world runtime : 2.0009524822235107 finished
asyncio.create_task方法實際是封裝了獲取事件循環asyncio.get_running_loop()與建立循環任務loop.create_task(coro)的一種高級方法,後面具體會講這些
跟在await後面的對象都是可等待對象,主要有協程, 任務 和 Future。
可等待的意思就是跳轉到等待對象,並將當前任務掛起。當等待對象的任務處理完了,纔會跳回當前任務繼續執行。實際上與yield from功能相同,不一樣的是await後面是awaitable,yield from後面是生成器對象
yield from的一個示例(來源於https://zhuanlan.zhihu.com/p/30275154這篇協程演進講的很好)
def gen_3(): yield 3 def gen_234(): yield 2 yield from gen_3() yield 4 def main(): yield 1 yield from gen_234() yield 5 for element in main(): print(element) 輸出 1 2 3 4 5
可是對於協程中進行阻塞(Blocking)操做(如IO時)會阻塞掉整個程序,以下面咱們使用time.sleep()替代asyncio.sleep(),會發如今timesleep協程時程序阻塞,最後時間爲4s
import asyncio import time async def say_after(delay, what): await timesleep(delay) return what async def timesleep(delay): time.sleep(delay) async def main(): print("started ") task1 = asyncio.create_task(say_after(2, 'hello')) task2 = asyncio.create_task(say_after(2, 'world')) s_time = time.time() await task1 await task2 print(task1.result(),task2.result()) print("runtime : ",time.time()-s_time) print("finished ") asyncio.run(main())
若是將上面的改成以下
async def timesleep(delay): # time.sleep(delay) await asyncio.sleep(delay)
則最後運行時間爲2s,這是由於asyncio.sleep()不一樣於time.sleep(),它其實在內部實現了一個future對象,事件循環會異步的等待這個對象完成
因此
在事件循環中,使用await能夠針對耗時的操做進行掛起,就像生成器裏的yield同樣,函數讓出控制權。對於task與future對象,await能夠將他們掛在事件循環上,因爲他們相比於協程對象增長了運行狀態(Pending、Running、Done、Cancelled等),事件循環則能夠讀取他們的狀態,實現異步的操做,好比上面併發的示例。同時對於阻塞的操做(沒有實現異步的操做,如request就會阻塞,aihttp則不會),因爲協程是單線程,會阻塞整個程序
事件循環是每一個 asyncio 應用的核心。 事件循環會運行異步任務和回調,執行網絡 IO 操做,以及運行子程序。
簡單說咱們將協程任務(task)註冊到事件循環(loop)上,事件循環(loop)會循環遍歷任務的狀態,當任務觸發條件發生時就會執行對應的任務。相似JavaScript事件循環,當onclick被觸發時,就會執行對應的js腳本或者回調。同時當遇到阻塞,事件循環回去查找其餘可運行的任務。因此事件循環被認爲是一個循環,由於它在不斷收集事件並遍歷它們從而找到如何處理該事件。
經過如下僞代碼理解
while (1) { events = getEvents(); for (e in events) processEvent(e); }
全部的時間都在 while 循環中捕捉,而後通過事件處理者處理。事件處理的部分是系統惟一活躍的部分,當一個事件處理完成,流程繼續處理下一個事件。若是遇到阻塞,循環會去執行其餘任務,當阻塞任務完成後再回調(具體如何實現不太清楚,應該是將阻塞任務標記狀態或者放進其它列來實現)其實能夠參考javascript的事件循環理解,都是單線程的異步操做http://www.ruanyifeng.com/blog/2013/10/event_loop.html
asyncio 中主要的事件循環方法有:
上面的併發例子就能夠改爲下面形式:
import asyncio import time async def say_after(delay, what): await asyncio.sleep(delay) print(what) def main(): print("started ") s_time = time.time() loop = asyncio.get_event_loop() #獲取一個事件循環 tasks = [ asyncio.ensure_future(say_after(1,"hello")), #asyncio.ensure_future()包裝協程或可等待對象在未來等待。若是參數是Future,則直接返回。 asyncio.ensure_future(say_after(2,"world")), loop.create_task(say_after(1,"hello")), #loop.create_task()包裝協程爲task。 loop.create_task(say_after(2,"world")) ] loop.run_until_complete(asyncio.wait(tasks)) print("runtime : ",time.time()-s_time) print("finished ") main()
asyncio.get_event_loop方法能夠建立一個事件循環,而後使用 run_until_complete 將協程註冊到事件循環,並啓動事件循環。asyncio.ensure_future(coroutine) 和loop.create_task(coroutine)均可以建立一個task,run_until_complete的參數是一個futrue對象。當傳入一個協程,其內部會自動封裝成task,task是Future的子類。asyncio.wait相似與await 不過它能夠接受一個list,asyncio.wait()返回的是一個協程。
總結:使用async能夠定義協程對象,使用await能夠針對耗時的操做進行掛起,就像生成器裏的yield同樣,函數讓出控制權。協程遇到await,事件循環將會掛起該協程,執行別的協程,直到其餘的協程也掛起或者執行完畢,再進行下一個協程的執行,協程的目的也是讓一些耗時的操做異步化。
Asyncio是用來處理事件循環中的異步進程和併發任務執行的。它還提供了 asyncio.Task() 類,能夠在任務中使用協程。它的做用是,在同一事件循環中,運行某一個任務的同時能夠併發地運行多個任務。當協程被包在任務中,它會自動將任務和事件循環鏈接起來,當事件循環啓動的時候,任務自動運行。這樣就提供了一個能夠自動驅動協程的機制。
若是被包裹的協程要等待一個 future 對象,那麼任務會被掛起,等待future的計算結果。當future計算完成,被包裹的協程將會拿到future返回的結果或異常(exception)繼續執行。另外,須要注意的是,事件循環一次只能運行一個任務,除非還有其它事件循環在不一樣的線程並行運行,此任務纔有可能和其餘任務並行。當一個任務在等待future執行的期間,事件循環會運行一個新的任務。
即Task對象封裝協程(async標記的函數),將其掛到事件循環上運行,若是遇到等待 future 對象(await 後面等待的),那麼該事件循環會運行其餘 Task、回調或執行 IO 操做
相關的主要方法有:
經過網上的一個示例來理解一下,Task與loop之間的工做流程
import asyncio async def compute(x, y): print("Compute %s + %s ..." % (x, y)) await asyncio.sleep(1.0) return x + y async def print_sum(x, y): result = await compute(x, y) print("%s + %s = %s" % (x, y, result)) loop = asyncio.get_event_loop() loop.run_until_complete(print_sum(1, 2)) loop.close()
流程圖以下
期間loop兩次訪問compute(),第一次是遇到阻塞await(yield from)掛起,第二次是掛起的事件有結果了去取結果,生成器return時會raise StopIteration()異常
經過task.result()獲取返回的結果
import asyncio import time async def say_after(delay, what): await asyncio.sleep(delay) return what async def main(): print("started ") task1 = asyncio.create_task(say_after(1, 'hello')) task2 = asyncio.create_task(say_after(2, 'world')) s_time = time.time() await task1 await task2 print(task1.result(),task2.result()) print("runtime : ",time.time()-s_time) print("finished ") asyncio.run(main())
建議使用高層級的 asyncio.create_task() 函數來建立 Task 對象,也可用低層級的 loop.create_task() 或 ensure_future() 函數。不建議手動實例化 asyncio.Task() 對象。
Future如它的名字同樣,是一種對將來的一種抽象,表明未來執行或沒有執行的任務的結果。它和task上沒有本質上的區別,task是Future的子類。實際上Future包裹協程,添加上各類狀態,而task則是在Future上添加一些特性便於掛在事件循環上執行,因此Future就是一個內部底層的對象,平時咱們只要關注task就能夠了。Future能夠經過回調函數處理結果
相關的主要方法有:
官網的一個例子,體現的是Future的四個狀態:Pending、Running、Done、Cancelled。建立future的時候,task爲pending,事件循環調用執行的時候固然就是running,調用完畢天然就是done
import asyncio async def set_after(fut, delay, value): # Sleep for *delay* seconds. await asyncio.sleep(delay) # Set *value* as a result of *fut* Future. fut.set_result(value) async def main(): # Get the current event loop. loop = asyncio.get_running_loop() # Create a new Future object. fut = loop.create_future() # Run "set_after()" coroutine in a parallel Task. # We are using the low-level "loop.create_task()" API here because # we already have a reference to the event loop at hand. # Otherwise we could have just used "asyncio.create_task()". loop.create_task( set_after(fut, 1, '... world')) print('hello ...') # Wait until *fut* has a result (1 second) and print it. print(await fut) asyncio.run(main())
若是註釋掉fut.set_result(value),那麼future永遠不會done
綁定回調,future與task均可以使用add_done_callback方法,由於task是future子類
import time import asyncio async def say_after(delay, what): await asyncio.sleep(delay) return what def callback(future): print('Callback: ', future.result()) coroutine = say_after(2,"hello") loop = asyncio.get_event_loop() task = asyncio.ensure_future(coroutine) task.add_done_callback(callback) loop.run_until_complete(task)
實際上官網在3.5後建議使用高層的封裝如:asyncio.run(),asyncio.create_task()等,忽略底層的一些實現,雖然方便使用,可是對asyncio的流程理解幫助不大,仍是要看底層的一些實現。
總的來講主要重點以下: