python協程總結

 

概述

python多線程中由於有GIL(Global Interpreter Lock 全局解釋器鎖 )的存在,因此對CPU密集型程序顯得很雞肋;但對IO密集型的程序,GIL會在調用IO操做前釋放,因此對IO密集型多線程仍是挺有做用。javascript

然而多線程是競爭型的,調度由CPU決定,有時會顯得沒那麼容易控制;因此python中也實現了一種能夠由程序本身來調度的異步方式,叫作協程。css

 

協程是一種用戶態的輕量級線程,又稱微線程。html

協程擁有本身的寄存器上下文和棧,調度切換時,將寄存器上下文和棧保存到其餘地方,在切回來的時候,恢復先前保存的寄存器上下文和棧。所以:協程能保留上一次調用時的狀態(即全部局部狀態的一個特定組合),每次過程重入時,就至關於進入上一次調用的狀態,換種說法:進入上一次離開時所處邏輯流的位置。java

 

簡單說協程在執行某個子程序(函數)時,能夠指定或者隨機地中斷,而後去執行其餘的子程序(函數),在合適的時候再返回到中斷子程序中止時的狀態繼續執行。聽起像生成器的特性,實際上協程也是基於生成器的。因此協程是經過程序自身的控制,去切換不一樣任務,實現併發的效果。也就是協程是單線程執行,沒有多線程由CPU調度時線程切換的開銷,因此效率較高。python

 

再多說直白一點就是:git

多線程執行多個任務時,CPU分配線程資源給每一個任務,每一個任務並行(多核才行,每一個單位時間內,一個CPU只能處理一個線程)的執行,但若是任務多了,而且線程有限,CPU會調度線程資源一會執行一個程序,在不一樣程序間切換。(而且因爲python GIL存在,同一時刻只能執行一個線程任務,並行也就成了併發,宏觀上也實際是單線程(單核)了)。總結就是多線程由CPU分配調度線程資源給子程序。github

而協程的執行不一樣,它是單一的線程(主線程),將這個線程從開始到結束的時間做爲資源分配給子程序,每一個子程序能使用這個時間資源能夠由咱們來控制。同時因爲協程具備生成器那樣保存狀態的特性,遇到阻塞時能夠去執行其餘的程序,返回來執行時又不會丟失狀態,因此能夠經過這種異步的方式實現單一線程的併發。redis

同時由於只有一個線程,也不存在同時寫變量衝突,在協程中控制共享資源不加鎖,只須要判斷狀態就行了,因此執行效率比多線程高不少。數據庫

 

經過liaoxuefeng.com上的一個例子來演示下協程:網絡

傳統的生產者-消費者模型是一個線程寫消息,一個線程取消息,經過鎖機制控制隊列和等待,但一不當心就可能死鎖。

若是改用協程,生產者生產消息後,直接經過yield跳轉到消費者開始執行,待消費者執行完畢後,切換回生產者繼續生產,效率極高:

def consumer():
    r = ''
    while True:
        n = yield r
        if not n:
            return
        print('[CONSUMER] Consuming %s...' % n)
        r = '200 OK'

def produce(c):
    c.send(None)
    n = 0
    while n < 5:
        n = n + 1
        print('[PRODUCER] Producing %s...' % n)
        r = c.send(n)
        print('[PRODUCER] Consumer return: %s' % r)
    c.close()

c = consumer()
produce(c)

輸出

[PRODUCER] Producing 1... [CONSUMER] Consuming 1... [PRODUCER] Consumer return: 200 OK [PRODUCER] Producing 2... [CONSUMER] Consuming 2... [PRODUCER] Consumer return: 200 OK [PRODUCER] Producing 3... [CONSUMER] Consuming 3... [PRODUCER] Consumer return: 200 OK [PRODUCER] Producing 4... [CONSUMER] Consuming 4... [PRODUCER] Consumer return: 200 OK [PRODUCER] Producing 5... [CONSUMER] Consuming 5... [PRODUCER] Consumer return: 200 OK

注意到consumer函數是一個generator,把一個consumer傳入produce後:

  1. 首先調用c.send(None)啓動生成器;

  2. 而後,一旦生產了東西,經過c.send(n)切換到consumer執行;

  3. consumer經過yield拿到消息,處理,又經過yield把結果傳回;

  4. produce拿到consumer處理的結果,繼續生產下一條消息;

  5. produce決定不生產了,經過c.close()關閉consumer,整個過程結束。

整個流程無鎖,由一個線程執行,produceconsumer協做完成任務,因此稱爲「協程」,而非線程的搶佔式多任務。

注意的是,生成器啓動或恢復執行一次,將會在yield處暫停。上面的第1步僅僅執行到了yield r,並無執行到賦值語句 n = yield r ,到了第2步,生成器恢復執行經過send(n)纔給consumer中n賦值。

send(value)方法:做用是發送值給yield表達式。啓動generator則是調用send(None)。具體流程,能夠經過ide調試來直觀的看懂

但上面示例並不能體現協程併發的特性,下面由asyncio這內置庫來實現

 

asyncio (一)

(基於3.5後版本)

asyncio 是用來編寫併發代碼的庫,使用 async/await 語法。

asyncio 被用做多個提供高性能 Python 異步框架的基礎,包括網絡和網站服務,數據庫鏈接庫,分佈式任務隊列等等。

關於asyncio的一些關鍵字的說明:

  • event_loop 事件循環:程序開啓一個無限循環,把一些函數註冊到事件循環上,當知足事件發生的時候,調用相應的協程函數

  • coroutine 協程:協程對象,指一個使用async關鍵字定義的函數,它的調用不會當即執行函數,而是會返回一個協程對象。協程對象須要註冊到事件循環,由事件循環調用。

  • task 任務:一個協程對象就是一個原生能夠掛起的函數,任務則是對協程進一步封裝,其中包含了任務的各類狀態

  • future: 表明未來執行或沒有執行的任務的結果。它和task上沒有本質上的區別

  • async/await 關鍵字:python3.5用於定義協程的關鍵字,async定義一個協程,await用於掛起阻塞的異步調用接口。

 

建立協程

經過async關鍵字定義一個協程(coroutine),協程也是一種對象。下面say_after,main就是一個協程

import asyncio
import time

async def say_after(delay, what):
    await asyncio.sleep(delay)
    print(what)

async def main():
    print("started ")
    s_time = time.time()
    await say_after(1, 'hello')
    await say_after(2, 'world')
    print("runtime : ",time.time()-s_time)
    print("finished ")

asyncio.run(main())

asyncio.run() 函數用來運行一個協程對象,這裏咱們將main()做爲入口函數。await等待一個協程。上面代碼段會在等待 1 秒後打印 "hello",而後 再次 等待 2 秒後打印 "world"。asyncio.sleep表示阻塞多少秒,運行結果以下

started
hello
world
runtime :  3.000959634780884
finished

能夠觀察到上面的代碼,是同步運行的,兩個await say_after之間遇到了阻塞。由於asyncio.run() 只是單純的運行一個協程,並不會併發運行

 

運行協程

運行協程對象的方法主要有:

1. 經過asyncio.run(main) 運行一個協程,同步的方式,主要用於運行入口協程

2. 在另外一個已經運行的協程中用 `await` 等待它,好比上面運行了main協程,其中等待的say_after協程也會運行

3. 將協程封裝成task或future對象,而後掛到事件循環loop上,使用loop來運行。主要方法爲loop.run_until_complete。此方法能夠異步的併發運行

實際上參考源碼asyncio.run本質也是獲取loop,運行協程,即協程依靠loop運行

 

併發協程

asyncio.create_task() 函數用來併發運行多個協程,更改上面的例子

import asyncio
import time

async def say_after(delay, what):
    await asyncio.sleep(delay)
    print(what)

async def main():
    print("started ")
    task1 = asyncio.create_task(say_after(1, 'hello'))
    task2 = asyncio.create_task(say_after(2, 'world'))
    s_time = time.time()
    await task1
    await task2
    print("runtime : ",time.time()-s_time)
    print("finished ")

asyncio.run(main())

運行輸出,比上面快一秒。這裏咱們使用create_task將協程封裝成task對象(會自動的添加到事件循環中),而後咱們在main這個入口協程中掛起task1和task2。使用run運行main入口協程,它會自動檢測循環事件,並將等待task1和task2兩個task執行完成

started
hello
world
runtime :  2.0009524822235107
finished

asyncio.create_task方法實際是封裝了獲取事件循環asyncio.get_running_loop()與建立循環任務loop.create_task(coro)的一種高級方法,後面具體會講這些

 

可等待對象

跟在await後面的對象都是可等待對象,主要有協程, 任務 和 Future。

  • 協程對象:async def 的函數對象
  • task任務:將協程包裝成的一個任務(task)對象,用於註冊到事件循環上
  • Future:是一種特殊的低層級可等待對象,表示一個異步操做的最終結果

可等待的意思就是跳轉到等待對象,並將當前任務掛起。當等待對象的任務處理完了,纔會跳回當前任務繼續執行。實際上與yield from功能相同,不一樣的是await後面是awaitable,yield from後面是生成器對象

yield from的一個示例(來源於https://zhuanlan.zhihu.com/p/30275154這篇協程演進講的很好)

def gen_3():
   yield 3

def gen_234():
   yield 2
   yield from gen_3()
   yield 4

def main():
   yield 1
   yield from gen_234()
   yield 5

for element in main():
   print(element)  

輸出
1
2
3
4
5

可是對於協程中進行阻塞(Blocking)操做(如IO時)會阻塞掉整個程序,以下面咱們使用time.sleep()替代asyncio.sleep(),會發如今timesleep協程時程序阻塞,最後時間爲4s

import asyncio
import time

async def say_after(delay, what):
    await timesleep(delay)
    return what

async def timesleep(delay):
    time.sleep(delay)

async def main():
    print("started ")
    task1 = asyncio.create_task(say_after(2, 'hello'))
    task2 = asyncio.create_task(say_after(2, 'world'))
    s_time = time.time()
    await task1
    await task2
    print(task1.result(),task2.result())
    print("runtime : ",time.time()-s_time)
    print("finished ")

asyncio.run(main())

若是將上面的改成以下

async def timesleep(delay):
    # time.sleep(delay)
    await asyncio.sleep(delay)

則最後運行時間爲2s,這是由於asyncio.sleep()不一樣於time.sleep(),它其實在內部實現了一個future對象,事件循環會異步的等待這個對象完成

因此

在事件循環中,使用await能夠針對耗時的操做進行掛起,就像生成器裏的yield同樣,函數讓出控制權。對於task與future對象,await能夠將他們掛在事件循環上,因爲他們相比於協程對象增長了運行狀態(Pending、Running、Done、Cancelled等),事件循環則能夠讀取他們的狀態,實現異步的操做,好比上面併發的示例。同時對於阻塞的操做(沒有實現異步的操做,如request就會阻塞,aihttp則不會),因爲協程是單線程,會阻塞整個程序

 

asyncio (二)

事件循環

事件循環是每一個 asyncio 應用的核心。 事件循環會運行異步任務和回調,執行網絡 IO 操做,以及運行子程序。

簡單說咱們將協程任務(task)註冊到事件循環(loop)上,事件循環(loop)會循環遍歷任務的狀態,當任務觸發條件發生時就會執行對應的任務。相似JavaScript事件循環,當onclick被觸發時,就會執行對應的js腳本或者回調。同時當遇到阻塞,事件循環回去查找其餘可運行的任務。因此事件循環被認爲是一個循環,由於它在不斷收集事件並遍歷它們從而找到如何處理該事件。

經過如下僞代碼理解

while (1) {
    events = getEvents();
    for (e in events)
        processEvent(e);
}

全部的時間都在 while 循環中捕捉,而後通過事件處理者處理。事件處理的部分是系統惟一活躍的部分,當一個事件處理完成,流程繼續處理下一個事件。若是遇到阻塞,循環會去執行其餘任務,當阻塞任務完成後再回調(具體如何實現不太清楚,應該是將阻塞任務標記狀態或者放進其它列來實現)其實能夠參考javascript的事件循環理解,都是單線程的異步操做http://www.ruanyifeng.com/blog/2013/10/event_loop.html

asyncio 中主要的事件循環方法有:

  • asyncio.get_running_loop() :返回當前 OS 線程中正在運行的事件循環對象。
  • asyncio.get_event_loop() :獲取當前事件循環。 若是當前 OS 線程沒有設置當前事件循環而且 set_event_loop() 尚未被調用,asyncio 將建立一個新的事件循環並將其設置爲當前循環。
  • asyncio.new_event_loop() :建立一個新的事件循環。
  • loop.run_until_complete() :運行直到 future ( Future 的實例 ) 被完成。若是參數是 coroutine object ,將被隱式調度爲 asyncio.Task 來運行。返回 Future 的結果 或者引起相關異常。
  • loop.create_future() :建立一個附加到事件循環中的 asyncio.Future 對象。
  • loop.create_task(coro) :安排一個 協程 的執行。返回一個 Task 對象。
  • loop.run_forever() :運行事件循環直到 stop() 被調用。
  • loop.stop() :中止事件循環
  • loop.close() :關閉事件循環。

上面的併發例子就能夠改爲下面形式:

import asyncio
import time

async def say_after(delay, what):
    await asyncio.sleep(delay)
    print(what)

def main():
    print("started ")
    s_time = time.time()
    loop = asyncio.get_event_loop() #獲取一個事件循環
    tasks = [
        asyncio.ensure_future(say_after(1,"hello")), #asyncio.ensure_future()包裝協程或可等待對象在未來等待。若是參數是Future,則直接返回。
        asyncio.ensure_future(say_after(2,"world")),
        loop.create_task(say_after(1,"hello")), #loop.create_task()包裝協程爲task。
        loop.create_task(say_after(2,"world"))
    ]
    loop.run_until_complete(asyncio.wait(tasks))
    print("runtime : ",time.time()-s_time)
    print("finished ")

main()

asyncio.get_event_loop方法能夠建立一個事件循環,而後使用 run_until_complete 將協程註冊到事件循環,並啓動事件循環。asyncio.ensure_future(coroutine) 和loop.create_task(coroutine)均可以建立一個task,run_until_complete的參數是一個futrue對象。當傳入一個協程,其內部會自動封裝成task,task是Future的子類。asyncio.wait相似與await 不過它能夠接受一個list,asyncio.wait()返回的是一個協程。

總結:使用async能夠定義協程對象,使用await能夠針對耗時的操做進行掛起,就像生成器裏的yield同樣,函數讓出控制權。協程遇到await,事件循環將會掛起該協程,執行別的協程,直到其餘的協程也掛起或者執行完畢,再進行下一個協程的執行,協程的目的也是讓一些耗時的操做異步化。

 

Task對象

Asyncio是用來處理事件循環中的異步進程和併發任務執行的。它還提供了 asyncio.Task() 類,能夠在任務中使用協程。它的做用是,在同一事件循環中,運行某一個任務的同時能夠併發地運行多個任務。當協程被包在任務中,它會自動將任務和事件循環鏈接起來,當事件循環啓動的時候,任務自動運行。這樣就提供了一個能夠自動驅動協程的機制。

若是被包裹的協程要等待一個 future 對象,那麼任務會被掛起,等待future的計算結果。當future計算完成,被包裹的協程將會拿到future返回的結果或異常(exception)繼續執行。另外,須要注意的是,事件循環一次只能運行一個任務,除非還有其它事件循環在不一樣的線程並行運行,此任務纔有可能和其餘任務並行。當一個任務在等待future執行的期間,事件循環會運行一個新的任務。

即Task對象封裝協程(async標記的函數),將其掛到事件循環上運行,若是遇到等待 future 對象(await 後面等待的),那麼該事件循環會運行其餘 Task、回調或執行 IO 操做

相關的主要方法有:

  • asyncio.create_task() :高層級的方法,建立Task對象,並自動添加進loop,即get_running_loop()和loop.create_task(coro)的封裝
  • asyncio.Task() :打包一個協程爲Task對象
  • asyncio.current_task(loop=None) :返回當前運行的 Task 實例,若是沒有正在運行的任務則返回 None。若是 loop 爲 None 則會使用 get_running_loop() 獲取當前事件循環
  • asyncio.all_tasks(loop=None) :返回事件循環所運行的未完成的 Task 對象的集合。
  • Task.cancel() :請求取消 Task 對象。這將安排在下一輪事件循環中拋出一個 CancelledError 異常給被封包的協程。
  • Task.result() :返回 Task 的結果。若是 Task 對象 已完成,其封包的協程的結果會被返回 (或者當協程引起異常時,該異常會被從新引起。)。若是 Task 對象 被取消,此方法會引起一個 CancelledError 異常。若是 Task 對象的結果還不可用,此方法會引起一個 InvalidStateError 異常。

經過網上的一個示例來理解一下,Task與loop之間的工做流程

import asyncio

async def compute(x, y):
    print("Compute %s + %s ..." % (x, y))
    await asyncio.sleep(1.0)
    return x + y

async def print_sum(x, y):
    result = await compute(x, y)
    print("%s + %s = %s" % (x, y, result))

loop = asyncio.get_event_loop()
loop.run_until_complete(print_sum(1, 2))
loop.close()

流程圖以下

期間loop兩次訪問compute(),第一次是遇到阻塞await(yield from)掛起,第二次是掛起的事件有結果了去取結果,生成器return時會raise StopIteration()異常

 

經過task.result()獲取返回的結果

import asyncio
import time

async def say_after(delay, what):
    await asyncio.sleep(delay)
    return what

async def main():
    print("started ")
    task1 = asyncio.create_task(say_after(1, 'hello'))
    task2 = asyncio.create_task(say_after(2, 'world'))
    s_time = time.time()
    await task1
    await task2
    print(task1.result(),task2.result())
    print("runtime : ",time.time()-s_time)
    print("finished ")

asyncio.run(main())

建議使用高層級的 asyncio.create_task() 函數來建立 Task 對象,也可用低層級的 loop.create_task() 或 ensure_future() 函數。不建議手動實例化 asyncio.Task() 對象。

 

Future對象 

Future如它的名字同樣,是一種對將來的一種抽象,表明未來執行或沒有執行的任務的結果。它和task上沒有本質上的區別,task是Future的子類。實際上Future包裹協程,添加上各類狀態,而task則是在Future上添加一些特性便於掛在事件循環上執行,因此Future就是一個內部底層的對象,平時咱們只要關注task就能夠了。Future能夠經過回調函數處理結果

相關的主要方法有:

  • asyncio.isfuture(obj) :判斷對象是否是future對象
  • asyncio.ensure_future(obj,loop=None) :接收一個協程或者future或者task對象,若是是future則直接返回future,其它則返回task
  • Future.result() :返回future結果
  • Future.set_result(result) :將 Future 標記爲完成並設置結果
  • Future.add_done_callback(callback, *, context=None) :添加一個在 Future 完成 時運行的回調函數。調用 callback 時,Future 對象是它的惟一參數。

官網的一個例子,體現的是Future的四個狀態:Pending、Running、Done、Cancelled。建立future的時候,task爲pending,事件循環調用執行的時候固然就是running,調用完畢天然就是done

import asyncio

async def set_after(fut, delay, value):
    # Sleep for *delay* seconds.
    await asyncio.sleep(delay)

    # Set *value* as a result of *fut* Future.
    fut.set_result(value)

async def main():
    # Get the current event loop.
    loop = asyncio.get_running_loop()

    # Create a new Future object.
    fut = loop.create_future()

    # Run "set_after()" coroutine in a parallel Task.
    # We are using the low-level "loop.create_task()" API here because
    # we already have a reference to the event loop at hand.
    # Otherwise we could have just used "asyncio.create_task()".
    loop.create_task(
        set_after(fut, 1, '... world'))

    print('hello ...')

    # Wait until *fut* has a result (1 second) and print it.
    print(await fut)

asyncio.run(main())

若是註釋掉fut.set_result(value),那麼future永遠不會done

 

綁定回調,future與task均可以使用add_done_callback方法,由於task是future子類

import time
import asyncio

async def say_after(delay, what):
    await asyncio.sleep(delay)
    return what

def callback(future):
    print('Callback: ', future.result())


coroutine = say_after(2,"hello")
loop = asyncio.get_event_loop()
task = asyncio.ensure_future(coroutine)
task.add_done_callback(callback)
loop.run_until_complete(task)

 

總結

實際上官網在3.5後建議使用高層的封裝如:asyncio.run(),asyncio.create_task()等,忽略底層的一些實現,雖然方便使用,可是對asyncio的流程理解幫助不大,仍是要看底層的一些實現。

總的來講主要重點以下:

  1. 協程在asyncio裏就是 async定義的函數 
  2. await將可等待對象(協程,future,task)掛起,異步或者同步地等待它們完成
  3. task對象與future對象沒有多大的區別,它們都有四個狀態,用於異步的實現
  4. 對於沒有異步實現的阻塞操做,程序會被阻塞,使用實現異步的庫(aiohttp,aiodns,aioredis等等 https://github.com/aio-libs 這裏列出了已經支持的內容,並在持續更新)
相關文章
相關標籤/搜索