Python3.5協程學習研究

今夕何夕故人不來遲暮連山黛

   以前有研究過python協程相關的知識,但一直沒有進行深刻探究。日常工做中使用的也仍是以python2爲主,然而最近的項目須要使用python3協程相關的內容,所以湊出時間學習了一番python3的協程語法。    本篇主要以介紹python3.5的async/await協程語法爲主,由於這種語法看上去很彆扭,不容易理解。若是對python協程基礎不是很瞭解,建議能夠先看此篇:Python協程python

協程函數(異步函數)

   咱們日常使用最多的函數都是同步函數,即不一樣函數執行是按順序執行的。那麼什麼是異步函數呢?怎麼建立異步函數?怎麼在異步函數之間來回切換執行?不急,請往下看。web

建立協程函數

先來看下普通函數:bash

def test1():
    print("1")
    print("2")

def test2():
    print("3")
    print("4")

a = test1()
b = test2()
print(a,type(a))
print(b,type(b))
複製代碼

運行以上代碼獲得結果:微信

1
2
3
4
None <class 'NoneType'>
None <class 'NoneType'>
複製代碼

說明:程序順序執行了test一、test2函數,在調用函數的時候就自動進入了函數體,並執行了函數的內容。網絡

而後使用async關鍵詞將普通函數變成協程函數,即異步函數:session

async def test1():
    print("1")
    print("2")

async def test2():
    print("3")
    print("4")

print(test1())
print(test2())

複製代碼

運行以上代碼獲得結果:多線程

<coroutine object test1 at 0x109f4c620>
asyncio_python3_test.py:16: RuntimeWarning: coroutine 'test1' was never awaited
  print(test1())
<coroutine object test2 at 0x109f4c620>
asyncio_python3_test.py:17: RuntimeWarning: coroutine 'test2' was never awaited
  print(test2())
複製代碼

說明:忽略結果中的告警,能夠看到調用函數test一、test2的時候,並無進入函數體且執行函數內容,而是返回了一個coroutine(協程對象)。併發

除了函數外,類的方法也可使用async關鍵詞將其變成協程方法:異步

class test:
    async def run(self):
        print("1")
複製代碼

執行協程函數

   前面咱們成功建立了協程函數,而且在調用函數的時候返回了一個協程對象,那麼怎麼進入函數體並執行函數內容呢?相似於生成器,可使用send方法執行函數,修改下前面的代碼:async

async def test1():
    print("1")
    print("2")

async def test2():
    print("3")
    print("4")

a = test1()
b = test2()

a.send(None)
b.send(None)
複製代碼

運行以上代碼獲得如下結果:

1
2
Traceback (most recent call last):
  File "asyncio_python3_test.py", line 19, in <module>
    a.send(None)
StopIteration
sys:1: RuntimeWarning: coroutine 'test2' was never awaited
複製代碼

   說明:程序先執行了test1協程函數,當test1執行完時報了StopIteration異常,這是協程函數執行完飯回的一個異常,咱們能夠用try except捕捉,來用判斷協程函數是否執行完畢。

async def test1():
    print("1")
    print("2")

async def test2():
    print("3")
    print("4")

a = test1()
b = test2()

try:
    a.send(None) # 能夠經過調用 send 方法,執行協程函數
except StopIteration as e:
    print(e.value)
    # 協程函數執行結束時會拋出一個StopIteration 異常,標誌着協程函數執行結束,返回值在value中
    pass
try:
    b.send(None) # 能夠經過調用 send 方法,執行協程函數
except StopIteration:
    print(e.value)
    # 協程函數執行結束時會拋出一個StopIteration 異常,標誌着協程函數執行結束,返回值在value中
    pass
複製代碼

運行以上代碼獲得如下結果:

1
2
3
4
複製代碼

   說明:程序先執行了test1函數,等到test1函數執行完後再執行test2函數。從執行過程上來看目前協程函數與普通函數沒有區別,並無實現異步函數,那麼如何交叉運行協程函數呢?

交叉執行協程函數(await)

   經過以上例子,咱們發現定義協程函數可使用async關鍵詞,執行函數可使用send方法,那麼如何實如今兩個協程函數間來回切換執行呢?這裏須要使用await關鍵詞,修改一下代碼:

import asyncio

async def test1():
    print("1")
    await asyncio.sleep(1) # asyncio.sleep(1)返回的也是一個協程對象
    print("2")

async def test2():
    print("3")
    print("4")

a = test1()
b = test2()

try:
    a.send(None) # 能夠經過調用 send 方法,執行協程函數
except StopIteration:
    # 協程函數執行結束時會拋出一個StopIteration 異常,標誌着協程函數執行結束
    pass

try:
    b.send(None) # 能夠經過調用 send 方法,執行協程函數
except StopIteration:
    pas
複製代碼

運行以上函數獲得如下結果:

1
3
4
複製代碼

   說明:程序先執行test1協程函數,在執行到await時,test1函數中止了執行(阻塞);接着開始執行test2協程函數,直到test2執行完畢。從結果中,咱們能夠看到,直到程序運行完畢,test1函數也沒有執行完(沒有執行print("2")),那麼如何使test1函數執行完畢呢?可使用asyncio自帶的方法循環執行協程函數。

await與阻塞

   使用async能夠定義協程對象,使用await能夠針對耗時的操做進行掛起,就像生成器裏的yield同樣,函數讓出控制權。協程遇到await,事件循環將會掛起該協程,執行別的協程,直到其餘的協程也掛起或者執行完畢,再進行下一個協程的執行,協程的目的也是讓一些耗時的操做異步化。

注意點:await後面跟的必須是一個Awaitable對象,或者實現了相應協議的對象,查看Awaitable抽象類的代碼,代表了只要一個類實現了__await__方法,那麼經過它構造出來的實例就是一個Awaitable,而且Coroutine類也繼承了Awaitable。

自動循環執行協程函數

   經過前面介紹咱們知道執行協程函數須要使用send方法,但一旦協程函數執行過程當中切換到其餘函數了,那麼這個函數就不在被繼續運行了,而且使用sned方法不是很高效。那麼如何在執行整個程序過程當中,自動得執行全部的協程函數呢,就如同多線程、多進程那樣,隱式得執行而不是顯示的經過send方法去執行函數。

事件循環方法

前面提到的問題就須要用到事件循環方法去解決,即asyncio.get_event_loop方法,修改以上代碼以下:

import asyncio

async def test1():
    print("1")
    await test2()
    print("2")

async def test2():
    print("3")
    print("4")

loop = asyncio.get_event_loop()
loop.run_until_complete(test1())
複製代碼

運行以上代碼獲得如下結果:

1
3
4
2
複製代碼

說明:asyncio.get_event_loop方法能夠建立一個事件循環,而後使用run_until_complete將協程註冊到事件循環,並啓動事件循環。

task任務

   因爲協程對象不能直接運行,在註冊事件循環的時候,實際上是run_until_complete方法將協程包裝成爲了一個任務(task)對象。所謂task對象是Future類的子類,保存了協程運行後的狀態,用於將來獲取協程的結果。咱們也能夠手動將協程對象定義成task,修改以上代碼以下:

import asyncio

async def test1():
    print("1")
    await test2()
    print("2")

async def test2():
    print("3")
    print("4")

loop = asyncio.get_event_loop()
task = loop.create_task(test1())
loop.run_until_complete(task)
複製代碼

   說明:前面說到task對象保存了協程運行的狀態,而且能夠獲取協程函數運行的返回值,那麼具體該如何獲取呢?這裏能夠分兩種方式,一種須要綁定回調函數,另一種則直接在運行完task任務後輸出。值得一提的是,若是使用send方法執行函數,則返回值能夠經過捕捉StopIteration異常,利用StopIteration.value獲取。

直接輸出task結果

當協程函數運行結束後,咱們須要獲得其返回值,第一種方式就是等到task狀態爲finish時,調用task的result方法獲取返回值。

import asyncio

async def test1():
    print("1")
    await test2()
    print("2")
    return "stop"

async def test2():
    print("3")
    print("4")

loop = asyncio.get_event_loop()
task = asyncio.ensure_future(test1())
loop.run_until_complete(task)
print(task.result())
複製代碼

運行以上代碼獲得如下結果:

1
3
4
2
stop
複製代碼
回調函數

   獲取返回值的第二種方法是能夠經過綁定回調函數,在task執行完畢的時候能夠獲取執行的結果,回調的最後一個參數是future對象,經過該對象能夠獲取協程返回值。

import asyncio

async def test1():
    print("1")
    await test2()
    print("2")
    return "stop"

async def test2():
    print("3")
    print("4")

def callback(future):
    print('Callback:',future.result()) # 經過future對象的result方法能夠獲取協程函數的返回值

loop = asyncio.get_event_loop()
task = asyncio.ensure_future(test1()) # 建立task,test1()是一個協程對象
task.add_done_callback(callback) # 綁定回調函數
loop.run_until_complete(task)
複製代碼

運行以上代碼獲得如下結果:

1
3
4
2
Callback: stop
複製代碼

若是回調函數須要接受多個參數,能夠經過偏函數導入,修改代碼以下:

import asyncio
import functools

async def test1():
    print("1")
    await test2()
    print("2")
    return "stop"

async def test2():
    print("3")
    print("4")

def callback(param1,param2,future):
    print(param1,param2)
    print('Callback:',future.result())

loop = asyncio.get_event_loop()
task = asyncio.ensure_future(test1())
task.add_done_callback(functools.partial(callback,"param1","param2"))
loop.run_until_complete(task)
複製代碼

說明:回調函數中的future對象就是建立的task對象。

future對象

   future對象有幾個狀態:Pending、Running、Done、Cancelled。建立future的時候,task爲pending,事件循環調用執行的時候固然就是running,調用完畢天然就是done,若是須要中止事件循環,就須要先把task取消,可使用asyncio.Task獲取事件循環的task。

協程中止

   前面介紹了使用事件循環執行協程函數,那麼怎麼中止執行呢?在中止執行協程前,須要先取消task,而後再中止loop事件循環。

import asyncio

async def test1():
    print("1")
    await asyncio.sleep(3)
    print("2")
    return "stop"

tasks = [
    asyncio.ensure_future(test1()),
    asyncio.ensure_future(test1()),
    asyncio.ensure_future(test1()),
]

loop = asyncio.get_event_loop()
try:
    loop.run_until_complete(asyncio.wait(tasks))
except KeyboardInterrupt as e:
    for task in asyncio.Task.all_tasks():
        task.cancel()
    loop.stop()
    loop.run_forever()
finally:
    loop.close()
複製代碼

運行以上代碼,按ctrl+c能夠結束執行。

本文中用到的一些概念及方法

  • event_loop事件循環:程序開啓一個無限的循環,當把一些函數註冊到事件循環上時,知足事件發生條件即調用相應的函數。
  • coroutine協程對象:指一個使用async關鍵字定義的函數,它的調用不會當即執行函數,而是會返回一個協程對象,協程對象須要註冊到事件循環,由事件循環調用。
  • task任務:一個協程對象就是一個原生能夠掛起的函數,任務則是對協程進一步封裝,其中包含任務的各類狀態。
  • future:表明未來執行或沒有執行的任務的結果,它和task上沒有本質的區別
  • async/await關鍵字:python3.5用於定義協程的關鍵字,async定義一個協程,await用於掛起阻塞的異步調用接口。

併發與並行

   併發一般指有多個任務須要同時進行,並行則是同一時刻有多個任務執行。用多線程、多進程、協程來講,協程實現併發,多線程與多進程實現並行。

asyncio協程如何實現併發

   asyncio想要實現併發,就須要多個協程來完成任務,每當有任務阻塞的時候就await,而後其餘協程繼續工做,這須要建立多個協程的列表,而後將這些協程註冊到事件循環中。這裏指的多個協程,能夠是多個協程函數,也能夠是一個協程函數的多個協程對象。

import asyncio

async def test1():

    print("1")
    await asyncio.sleep(1)
    print("2")
    return "stop"

a = test1()
b = test1()
c = test1()

tasks = [
    asyncio.ensure_future(a),
    asyncio.ensure_future(b),
    asyncio.ensure_future(c),
]

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks)) # 注意asyncio.wait方法
for task in tasks:
    print("task result is ",task.result())
複製代碼

運行以上代碼獲得如下結果:

1
1
1
2
2
2
task result is  stop
task result is  stop
task result is  stop
複製代碼

說明:代碼先是定義了三個協程對象,而後經過asyncio.ensure_future方法建立了三個task,而且將全部的task加入到了task列表,最終使用loop.run_until_complete將task列表添加到事件循環中。

協程爬蟲

   前面介紹瞭如何使用async與await建立協程函數,使用asyncio.get_event_loop建立事件循環並執行協程函數。例子很好地展現了協程併發的高效,但在實際應用場景中該如何開發協程程序?好比說異步爬蟲。我嘗試用requests模塊、urllib模塊寫異步爬蟲,但實際操做發現並不支持asyncio異步,所以可使用aiohttp模塊編寫異步爬蟲。

aiohttp實現

import asyncio
import aiohttp

async def run(url):
    print("start spider ",url)
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as resp:
            print(resp.url)

url_list = ["https://thief.one","https://home.nmask.cn","https://movie.nmask.cn","https://tool.nmask.cn"]

tasks = [asyncio.ensure_future(run(url)) for url in url_list]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
複製代碼

運行以上代碼獲得如下結果:

start spider  https://thief.one
start spider  https://home.nmask.cn
start spider  https://movie.nmask.cn
start spider  https://tool.nmask.cn
https://movie.nmask.cn
https://home.nmask.cn
https://tool.nmask.cn
https://thief.one
複製代碼

說明:aiohttp基於asyncio實現,既能夠用來寫webserver,也能夠當爬蟲使用。

requests實現

   因爲requests模塊阻塞了客戶代碼與asycio事件循環的惟一線程,所以在執行調用時,整個應用程序都會凍結,但若是必定要用requests模塊,可使用事件循環對象的run_in_executor方法,經過run_in_executor方法來新建一個線程來執行耗時函數,所以能夠這樣修改代碼實現:

import asyncio
import requests

async def run(url):
    print("start ",url)
    loop = asyncio.get_event_loop()
    response = await loop.run_in_executor(None, requests.get, url)
    print(response.url)
    
url_list = ["https://thief.one","https://home.nmask.cn","https://movie.nmask.cn","https://tool.nmask.cn"]

tasks = [asyncio.ensure_future(run(url)) for url in url_list]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
複製代碼

若是要給requests帶上參數,可使用functools:

import asyncio
import requests
import functools

async def run(url):
    print("start ",url)
    loop = asyncio.get_event_loop()
    try:
        response = await loop.run_in_executor(None,functools.partial(requests.get,url=url,params="",timeout=1))
    except Exception as e:
        print(e)
    else:
        print(response.url)

url_list = ["https://thief.one","https://home.nmask.cn","https://movie.nmask.cn","https://tool.nmask.cn"]

tasks = [asyncio.ensure_future(run(url)) for url in url_list]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
複製代碼

asyncio中使用阻塞函數

   如同前面介紹如何在asyncio中使用requests模塊同樣,若是想在asyncio中使用其餘阻塞函數,該怎麼實現呢?雖然目前有異步函數支持asyncio,但實際問題是大部分IO模塊還不支持asyncio。

阻塞函數在asyncio中使用的問題

   阻塞函數(例如io讀寫,requests網絡請求)阻塞了客戶代碼與asycio事件循環的惟一線程,所以在執行調用時,整個應用程序都會凍結。

解決方案

   這個問題的解決方法是使用事件循環對象的run_in_executor方法。asyncio的事件循環在背後維護着一個ThreadPoolExecutor對象,咱們能夠調用run_in_executor方法,把可調用對象發給它執行,便可以經過run_in_executor方法來新建一個線程來執行耗時函數。

run_in_executor方法

AbstractEventLoop.run_in_executor(executor, func, *args)
複製代碼
  • executor 參數應該是一個 Executor 實例。若是爲 None,則使用默認 executor。
  • func 就是要執行的函數
  • args 就是傳遞給 func 的參數

實際例子(使用time.sleep()):

import asyncio
import time

async def run(url):
    print("start ",url)
    loop = asyncio.get_event_loop()
    try:
        await loop.run_in_executor(None,time.sleep,1)
    except Exception as e:
        print(e)
    print("stop ",url)

url_list = ["https://thief.one","https://home.nmask.cn","https://movie.nmask.cn","https://tool.nmask.cn"]

tasks = [asyncio.ensure_future(run(url)) for url in url_list]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
複製代碼

運行以上代碼獲得如下函數:

start  https://thief.one
start  https://home.nmask.cn
start  https://movie.nmask.cn
start  https://tool.nmask.cn
stop  https://thief.one
stop  https://movie.nmask.cn
stop  https://home.nmask.cn
stop  https://tool.nmask.cn
複製代碼

說明:有了run_in_executor方法,咱們就可使用以前熟悉的模塊建立協程併發了,而不須要使用特定的模塊進行IO異步開發。

參考

www.oschina.net/translate/p… www.jianshu.com/p/b5e347b3a… zhuanlan.zhihu.com/p/27258289 juejin.im/entry/5aabb…

本文來自我的博客:Python3.5協程學習研究 | nMask'Blog,轉載請說明出處!

如需更多優質文章,請掃一掃關注微信公衆號

或者訪問我的博客:thief.one

相關文章
相關標籤/搜索