Python之路(第四十七篇) 協程:greenlet模塊\gevent模塊\asyncio模塊

 

1、協程介紹

協程:是單線程下的併發,又稱微線程,纖程。英文名Coroutine。一句話說明什麼是線程:協程是一種用戶態的輕量級線程,即協程是由用戶程序本身控制調度的。python

協程相比於線程,最大的區別在於,協程不須要像線程那樣來回的中斷切換,也不須要線程的鎖機制,由於線程中斷或者鎖機制都會對性能問題形成影響,因此協程的性能相比於線程,性能有明顯的提升,尤爲在線程越多的時候,優點越明顯。編程

協程的好處:

  1. 無需線程上下文切換的開銷api

  2. 無需原子操做鎖定及同步的開銷 "原子操做(atomic operation)是不須要synchronized",所謂原子操做是指不會被線程調度機制打斷的操做;這種操做一旦開始,就一直運行到結束,中間不會有任何 context switch (切換到另外一個線程)。原子操做能夠是一個步驟,也能夠是多個操做步驟,可是其順序是不能夠被打亂,或者切割掉只執行部分。視做總體是原子性的核心。網絡

  3. 方便切換控制流,簡化編程模型併發

  4. 高併發+高擴展性+低成本:一個CPU支持上萬的協程都不是問題。因此很適合用於高併發處理。異步

 

缺點:

  1. 沒法利用多核資源:協程的本質是個單線程,它不能同時將單個 CPU 的多個核用上,協程須要和進程配合才能運行在多 CPU 上.固然咱們平常所編寫的絕大部分應用都沒有這個必要,除非是 CPU 集型應用。async

  2. 進行阻塞(Blocking)操做(如IO時)會阻塞掉整個程序異步編程

 

總結協程特色:

  1. 必須在只有一個單線程裏實現併發函數

  2. 修改共享數據不需加鎖高併發

  3. 用戶程序裏本身保存多個控制流的上下文棧

  4. 附加:一個協程遇到IO操做自動切換到其它協程(如何實現檢測IO,yield、greenlet都沒法實現,就用到了gevent模塊(select機制))

 

Python2.x協程

類庫:

  • yield

  • greenlet

  • gevent

 

Python3.x協程

  • asyncio

 

Python3.x系列的gevent用法和python2.x系列是同樣的

 

 

在學習前,咱們先來理清楚同步/異步的概念

·同步是指完成事務的邏輯,先執行第一個事務,若是阻塞了,會一直等待,直到這個事務完成,再執行第二個事務,順序執行。。。也稱做串行執行。

·異步是和同步相對的,異步是指在處理調用這個事務的以後,不會等待這個事務的處理結果,直接處理第二個事務去了,經過狀態、通知、回調來通知調用者處理結果。也稱做並行執行。

 

2、greenlet模塊

第三方模塊,能夠在pycharm中選擇虛擬環境安裝,

也能夠經過 pip install greenlet 安裝

 

greenlet 經過 greenlet(func) 啓動一個協程,經過 switch() 手動切換程序的執行

示例

from greenlet import greenlet
​
def func1(name):
    print("%s from func1"%name) #2執行這一句
    g2.switch("jack")  #3切換執行func2(),第一次執行要傳入參數保存如今執行的狀態
    print("from func1 end") #6執行這一句
    g2.switch()#7切換執行play(),保存如今執行的狀態
​
def func2(name):
    print("%s from func2"%name) #4執行這一句
    g1.switch() #5切換執行func1(),保存如今執行的狀態
    print("from func2 end") #8執行這一句
​
g1 = greenlet(func1)
g2 = greenlet(func2)
g1.switch("nick") #1執行func1(),在switch()裏傳參數 ,注意與通常的線程、進程傳參方式的不一樣
#能夠在第一次switch時傳入參數,之後都不須要

  



分析:就是經過建立greenlet(func)對象,經過對象的switch()方法轉移程序執行的不一樣步驟,可是這裏沒法自動識別IO後自動切換。

 

3、gevent模塊

gevent 是一個第三方庫,能夠輕鬆經過gevent實現併發同步或異步編程,在gevent中用到的主要模式是greenlet, 它是以C擴展模塊形式接入Python的輕量級協程。

安裝 pip3 install gevent 或者在pycharm中選擇虛擬環境安裝

 

用法

#用法
g1=gevent.spawn(func,1,,2,3,x=4,y=5)建立一個協程對象g1,spawn括號內第一個參數是函數名,如func1,後面能夠有多個參數,能夠是位置實參或關鍵字實參,都是傳給函數func1的
​
g2=gevent.spawn(func2)
​
g1.join() #等待g1結束
​
g2.join() #等待g2結束
​
#或者上述兩步合做一步:gevent.joinall([g1,g2])
​
g1.value#拿到func1的返回值

  

 

示例

import gevent
​
​
def func1():
    print('from func1: 1')
    gevent.sleep(0)
    print('from func1: 2')
    gevent.sleep(1)
​
​
def func2():
    print('from func2: 1')
    gevent.sleep(2)
    print('from func2: 2')
​
​
def func3():
    print('from func3: 1')
    gevent.sleep(1)
    print('from func3: 2')
​
​
gevent.joinall([
    gevent.spawn(func1),
    gevent.spawn(func2),
    gevent.spawn(func3),
])
​

  

輸出結果

from func1: 1
from func2: 1
from func3: 1
from func1: 2
from func3: 2
from func2: 2

  


分析:能夠從輸出結果看到程序不斷的在三個函數中跳躍執行,遇到IO了就去執行另外的函數,可是請注意一點

gevent.sleep() 是用於模仿 IO 操做的,實際使用中不須要 gevent.sleep(),這裏若是單純執行上述代碼的話,gevent模塊也是隻能識別 gevent.sleep()產生的IO,而對系統產生的IO或者網絡IO之類沒法識別,全部須要打上補丁,使得gevent模塊識別其餘IO

 

gevent是不能直接識別的須要用下面一行代碼,打補丁

要用gevent,須要將from gevent import monkey;monkey.patch_all()放到文件的開頭

 

示例

 

需求:爬取三個網站並打印網頁字符串長度

​
from gevent import monkey;monkey.patch_all()
# 把當前程序的全部 IO 操做標記起來,不然模塊沒法知道 IO 操做
import gevent
import time
import requests
​
​
def get_page(url):
    headers = {
        'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/72.0.3626.121 Safari/537.36'
    }
    page_text = requests.get(url=url, headers=headers).text
    print('網站長度', len(page_text))
​
​
def main():
    urls = [
        'https://www.sogou.com',
        'https://cn.bing.com',
        'https://cnblogs.com/Nicholas0707/',
    ]
    time_start = time.time()
    for url in urls:
        get_page(url)
​
    print('同步耗時:', time.time() - time_start)
​
    print("-"*50)
    async_time_start = time.time()
    gevent.joinall([
        gevent.spawn(get_page, 'https://www.sogou.com'),
        gevent.spawn(get_page, 'https://cn.bing.com'),
        gevent.spawn(get_page, 'https://cnblogs.com/Nicholas0707/'),
    ])
    print('異步協程耗時:', time.time() - async_time_start)
​
​
if __name__ == '__main__':
    main()

  

輸出結果

 

網站長度 23795
網站長度 130248
網站長度 13761
同步耗時: 2.5321450233459473
--------------------------------------------------
網站長度 23795
網站長度 130221
網站長度 13761
異步協程耗時: 0.36602067947387695
​

  

分析:從結果能夠看出採用協程異步明顯更快

 

 

4、asyncio模塊

asyncio是Python3.4(2014年)引進的標準庫,直接內置了對IO的支持。

python2x沒有加這個庫,python3.5又加入了async/await特性,python3.7新增了asyncio.run() api來執行異步函數.

 

 

協程示例

先簡單看一個協程示例

運行協程函數的第一種方式(loop.run_until_complete())

#python 3.7+,本次測試環境python3.8
​
import asyncio,time
​
​
async def fun():  #定義一個協程函數
    print('hello')
    await asyncio.sleep(1)  #模擬IO操做,等待調用
    print('word')
​
​
if __name__ == '__main__':
    begin = time.time()
    # 建立一個事件loop
    loop = asyncio.get_event_loop()
    # 將協程函數加入到事件循環loop,並啓動事件循環
    loop.run_until_complete(fun())
    loop.close()
    print('用時共計',time.time()-begin)
    print(fun)
    print(loop)

  


輸出結果

hello
word
用時共計 1.0010573863983154
<function fun at 0x00000000022CD0D0>
<ProactorEventLoop running=False closed=True debug=False>

  

上面代碼等同於下面(不推薦使用,python3.8已經不支持此寫法了)

##python 3.7,本次測試環境python3.7
​
import asyncio,time
​
@asyncio.coroutine #這種寫法在python3.8以後被拋棄了
def fun():  #定義一個協程函數
    print('hello')
    yield from asyncio.sleep(1)  #模擬IO操做,等待調用
    print('word')
​
​
if __name__ == '__main__':
    begin = time.time()
    # 建立一個事件loop
    loop = asyncio.get_event_loop()
    # 將協程函數加入到事件循環loop,並啓動事件循環
    loop.run_until_complete(fun())
    loop.close()
    print('用時共計',time.time()-begin)

  




分析:使用async關鍵字定義一個協程函數,用asyncio.get_event_loop()建立一個事件循環,而後使用run_until_complete將協程註冊到事件循環,並啓動事件循環。

 

運行協程函數的第二種方式( asyncio.gather()---asyncio.run())

示例

# ## python 3.7+,本次測試環境python3.8
#
import asyncio,time
​
​
async def foo():
    print('start foo')
    await asyncio.sleep(1)
    print('end foo')
    return 'foo'
​
async def bar():
    print('start bar')
    await asyncio.sleep(2)
    print('end bar')
    return ('1','2')
​
async def main():
    res = await asyncio.gather(foo(), bar())
    #同時將兩個異步函數對象加入事件循環,
    # 但並不運行,等待調用。
    print(res)
​
if __name__ == '__main__':
    begin = time.time()
    asyncio.run(main())
    print('共計用時',time.time()-begin)
    # 執行協程事件循環並返回結果。

  

 

輸出結果

start foo
start bar
end foo
end bar
['foo', ('1', '2')]
共計用時 2.003114700317383

  


分析:若是要同時異步執行兩個異步函數,須要用asyncio.gather(fun1(), fun2())將兩個異步函數對象加入事件循環,這裏不用顯示的建立異步事件循環,由於asyncio.gather()方法中若是檢測到你沒有建立異步事件循環會自動幫你建立,見源代碼

​
def gather(*coros_or_futures, loop=None, return_exceptions=False):
    """..."""
    if not coros_or_futures:
        if loop is None:
            loop = events.get_event_loop()
        else:
            warnings.warn("The loop argument is deprecated since Python 3.8, "
                          "and scheduled for removal in Python 3.10.",
                          DeprecationWarning, stacklevel=2)

  

啓動事件循環是經過 asyncio.run()方法進行啓動

 

運行協程函數的第三種方式( asyncio.create_task()---asyncio.run())

## python 3.7+,本次測試環境python3.8
import asyncio,time
​
​
async def say_after(delay, what):
    await asyncio.sleep(delay)
    print(f"{what} at {time.strftime('%X')}")
​
​
async def main():
    task1 = asyncio.create_task(
        say_after(1, 'hello'))  #建立任務事件,異步函數加入參數,
    task2 = asyncio.create_task(
        say_after(2, 'world'))
    print(f"started at {time.strftime('%X')}")
    await task1 #將任務事件加入異步事件循環,等待調用
    await task2
​
    print(f"finished at {time.strftime('%X')}")
​
​
if __name__ == '__main__':
    begin = time.time()
    asyncio.run(main()) #啓動異步事件循環
    print('共計用時',time.time()-begin)

  

輸出結果

started at 20:01:51
hello at 20:01:52
world at 20:01:53
finished at 20:01:53
共計用時 2.002114772796631

  

分析:經過asyncio.create_task()建立等待異步執行的任務事件,這裏也是自動建立了事件循環loop,

源碼

def create_task(coro, *, name=None):
    """...
    """
    loop = events.get_running_loop()

  

而後使用await將任務事件加入異步事件循環。

 

 

關於asyncio的一些關鍵字的說明:

  • event_loop 事件循環:程序開啓一個無限循環,把一些函數註冊到事件循環上,當知足事件發生的時候,調用相應的協程函數

  • coroutine 協程:協程對象,指一個使用async關鍵字定義的函數,它的調用不會當即執行函數,而是會返回一個協程對象。協程對象須要註冊到事件循環,由事件循環調用。

  • task 任務:一個協程對象就是一個原生能夠掛起的函數,任務則是對協程進一步封裝,其中包含了任務的各類狀態

  • future: 表明未來執行或沒有執行的任務的結果。它和task上沒有本質上的區別

  • async/await 關鍵字:python3.5用於定義協程的關鍵字,async定義一個協程,await用於掛起阻塞的異步調用接口,等待調用。

  • sleep:暫停執行此任務,爲事件循環分配要競爭的任務,而且它(事件循環)監視其全部任務的狀態並從一個任務切換到另外一個,這裏是模擬io任務花費的時間。

 

asyncio方法


"""
Asyncio.get_event_loop()
​
返回一個事件循環對象,是asyncio.Baseeventloop的實例
​
Abstracteventloop.stop()
​
中止運行事件循環
​
​
Abstracteventloop.run_forever()
​
一直運行直到stop()
​
Abstracteventloop.run_until_complete(future)
​
運行直至future對象運行完
​
Abstracteventloop.close()
​
關閉事件循環
​
Abstracteventloop.is_running()
​
返回事件循環的是否運行
​
asyncio.gather(*aws, loop=None, return_exceptions=False)
同時在協程事件循環中運行定義的異步函數對象
​
​
task = asyncio.create_task(func());task.cancel()
請求取消任務。調用它將致使Task將CancelledError異常拋出到協程事件循環中。
​
"""

  


 

 

爲異步函數綁定回調函數

## python 3.7+,本次測試環境python3.8
import asyncio
​
async def fun():
    print('hello word')
    return 'nick'
​
​
def callback(future):
    print('Callback: ', future.result())  # 經過result()方法得到異步函數的返回值
​
​
loop = asyncio.get_event_loop()  # 建立異步事件循環
task = loop.create_task(fun())  # 將異步函數加入loop
task.add_done_callback(callback)  # 添加回調函數
loop.run_until_complete(task)
​

  

輸出結果

hello word
Callback:  nick

  


示例二

## python 3.7+,本次測試環境python3.8
import asyncio
​
async def fun():
    print('hello')
    await asyncio.sleep(1)
    print('fun --end')
    return 'nick'
async def bar():
    print('word')
    await asyncio.sleep(2)
    print('bar --end')
    return 'jack'
​
​
def callback(future):
    print('Callback: ', future.result())  # 經過result()方法得到異步函數的返回值
​
​
async def main():
    loop = asyncio.get_event_loop()  # 建立異步事件循環
    task1 = loop.create_task(fun())  # 將異步函數加入loop
    task2 = loop.create_task(bar())  # 將異步函數加入loop
    task1.add_done_callback(callback)  # 添加回調函數
    task2.add_done_callback(callback)  # 添加回調函數
    await task1
    await task2
​
if __name__ == '__main__':
    asyncio.run(main())

  

輸出結果

hello
word
fun --end
Callback:  nick
bar --end
Callback:  jack

  

分析:經過add_done_callback方法給task任務添加回調函數,當task(也能夠說是coroutine)執行完成的時候,就會調用回調函數,經過result()方法得到異步函數的返回值。

相關文章
相關標籤/搜索