今夕何夕故人不來遲暮連山黛
以前有研究過python協程相關的知識,但一直沒有進行深刻探究。日常工做中使用的也仍是以python2爲主,然而最近的項目須要使用python3協程相關的內容,所以湊出時間學習了一番python3的協程語法。 本篇主要以介紹python3.5的async/await協程語法爲主,由於這種語法看上去很彆扭,不容易理解。若是對python協程基礎不是很瞭解,建議能夠先看此篇:Python協程。python
咱們日常使用最多的函數都是同步函數,即不一樣函數執行是按順序執行的。那麼什麼是異步函數呢?怎麼建立異步函數?怎麼在異步函數之間來回切換執行?不急,請往下看。web
先來看下普通函數:bash
def test1():
print("1")
print("2")
def test2():
print("3")
print("4")
a = test1()
b = test2()
print(a,type(a))
print(b,type(b))
複製代碼
運行以上代碼獲得結果:微信
1
2
3
4
None <class 'NoneType'>
None <class 'NoneType'>
複製代碼
說明:程序順序執行了test一、test2函數,在調用函數的時候就自動進入了函數體,並執行了函數的內容。網絡
而後使用async關鍵詞將普通函數變成協程函數,即異步函數:session
async def test1():
print("1")
print("2")
async def test2():
print("3")
print("4")
print(test1())
print(test2())
複製代碼
運行以上代碼獲得結果:多線程
<coroutine object test1 at 0x109f4c620>
asyncio_python3_test.py:16: RuntimeWarning: coroutine 'test1' was never awaited
print(test1())
<coroutine object test2 at 0x109f4c620>
asyncio_python3_test.py:17: RuntimeWarning: coroutine 'test2' was never awaited
print(test2())
複製代碼
說明:忽略結果中的告警,能夠看到調用函數test一、test2的時候,並無進入函數體且執行函數內容,而是返回了一個coroutine(協程對象)。併發
除了函數外,類的方法也可使用async關鍵詞將其變成協程方法:異步
class test:
async def run(self):
print("1")
複製代碼
前面咱們成功建立了協程函數,而且在調用函數的時候返回了一個協程對象,那麼怎麼進入函數體並執行函數內容呢?相似於生成器,可使用send方法執行函數,修改下前面的代碼:async
async def test1():
print("1")
print("2")
async def test2():
print("3")
print("4")
a = test1()
b = test2()
a.send(None)
b.send(None)
複製代碼
運行以上代碼獲得如下結果:
1
2
Traceback (most recent call last):
File "asyncio_python3_test.py", line 19, in <module>
a.send(None)
StopIteration
sys:1: RuntimeWarning: coroutine 'test2' was never awaited
複製代碼
說明:程序先執行了test1協程函數,當test1執行完時報了StopIteration異常,這是協程函數執行完飯回的一個異常,咱們能夠用try except捕捉,來用判斷協程函數是否執行完畢。
async def test1():
print("1")
print("2")
async def test2():
print("3")
print("4")
a = test1()
b = test2()
try:
a.send(None) # 能夠經過調用 send 方法,執行協程函數
except StopIteration as e:
print(e.value)
# 協程函數執行結束時會拋出一個StopIteration 異常,標誌着協程函數執行結束,返回值在value中
pass
try:
b.send(None) # 能夠經過調用 send 方法,執行協程函數
except StopIteration:
print(e.value)
# 協程函數執行結束時會拋出一個StopIteration 異常,標誌着協程函數執行結束,返回值在value中
pass
複製代碼
運行以上代碼獲得如下結果:
1
2
3
4
複製代碼
說明:程序先執行了test1函數,等到test1函數執行完後再執行test2函數。從執行過程上來看目前協程函數與普通函數沒有區別,並無實現異步函數,那麼如何交叉運行協程函數呢?
經過以上例子,咱們發現定義協程函數可使用async關鍵詞,執行函數可使用send方法,那麼如何實如今兩個協程函數間來回切換執行呢?這裏須要使用await關鍵詞,修改一下代碼:
import asyncio
async def test1():
print("1")
await asyncio.sleep(1) # asyncio.sleep(1)返回的也是一個協程對象
print("2")
async def test2():
print("3")
print("4")
a = test1()
b = test2()
try:
a.send(None) # 能夠經過調用 send 方法,執行協程函數
except StopIteration:
# 協程函數執行結束時會拋出一個StopIteration 異常,標誌着協程函數執行結束
pass
try:
b.send(None) # 能夠經過調用 send 方法,執行協程函數
except StopIteration:
pas
複製代碼
運行以上函數獲得如下結果:
1
3
4
複製代碼
說明:程序先執行test1協程函數,在執行到await時,test1函數中止了執行(阻塞);接着開始執行test2協程函數,直到test2執行完畢。從結果中,咱們能夠看到,直到程序運行完畢,test1函數也沒有執行完(沒有執行print("2")),那麼如何使test1函數執行完畢呢?可使用asyncio自帶的方法循環執行協程函數。
使用async能夠定義協程對象,使用await能夠針對耗時的操做進行掛起,就像生成器裏的yield同樣,函數讓出控制權。協程遇到await,事件循環將會掛起該協程,執行別的協程,直到其餘的協程也掛起或者執行完畢,再進行下一個協程的執行,協程的目的也是讓一些耗時的操做異步化。
注意點:await後面跟的必須是一個Awaitable對象,或者實現了相應協議的對象,查看Awaitable抽象類的代碼,代表了只要一個類實現了__await__方法,那麼經過它構造出來的實例就是一個Awaitable,而且Coroutine類也繼承了Awaitable。
經過前面介紹咱們知道執行協程函數須要使用send方法,但一旦協程函數執行過程當中切換到其餘函數了,那麼這個函數就不在被繼續運行了,而且使用sned方法不是很高效。那麼如何在執行整個程序過程當中,自動得執行全部的協程函數呢,就如同多線程、多進程那樣,隱式得執行而不是顯示的經過send方法去執行函數。
前面提到的問題就須要用到事件循環方法去解決,即asyncio.get_event_loop方法,修改以上代碼以下:
import asyncio
async def test1():
print("1")
await test2()
print("2")
async def test2():
print("3")
print("4")
loop = asyncio.get_event_loop()
loop.run_until_complete(test1())
複製代碼
運行以上代碼獲得如下結果:
1
3
4
2
複製代碼
說明:asyncio.get_event_loop方法能夠建立一個事件循環,而後使用run_until_complete將協程註冊到事件循環,並啓動事件循環。
因爲協程對象不能直接運行,在註冊事件循環的時候,實際上是run_until_complete方法將協程包裝成爲了一個任務(task)對象。所謂task對象是Future類的子類,保存了協程運行後的狀態,用於將來獲取協程的結果。咱們也能夠手動將協程對象定義成task,修改以上代碼以下:
import asyncio
async def test1():
print("1")
await test2()
print("2")
async def test2():
print("3")
print("4")
loop = asyncio.get_event_loop()
task = loop.create_task(test1())
loop.run_until_complete(task)
複製代碼
說明:前面說到task對象保存了協程運行的狀態,而且能夠獲取協程函數運行的返回值,那麼具體該如何獲取呢?這裏能夠分兩種方式,一種須要綁定回調函數,另一種則直接在運行完task任務後輸出。值得一提的是,若是使用send方法執行函數,則返回值能夠經過捕捉StopIteration異常,利用StopIteration.value獲取。
當協程函數運行結束後,咱們須要獲得其返回值,第一種方式就是等到task狀態爲finish時,調用task的result方法獲取返回值。
import asyncio
async def test1():
print("1")
await test2()
print("2")
return "stop"
async def test2():
print("3")
print("4")
loop = asyncio.get_event_loop()
task = asyncio.ensure_future(test1())
loop.run_until_complete(task)
print(task.result())
複製代碼
運行以上代碼獲得如下結果:
1
3
4
2
stop
複製代碼
獲取返回值的第二種方法是能夠經過綁定回調函數,在task執行完畢的時候能夠獲取執行的結果,回調的最後一個參數是future對象,經過該對象能夠獲取協程返回值。
import asyncio
async def test1():
print("1")
await test2()
print("2")
return "stop"
async def test2():
print("3")
print("4")
def callback(future):
print('Callback:',future.result()) # 經過future對象的result方法能夠獲取協程函數的返回值
loop = asyncio.get_event_loop()
task = asyncio.ensure_future(test1()) # 建立task,test1()是一個協程對象
task.add_done_callback(callback) # 綁定回調函數
loop.run_until_complete(task)
複製代碼
運行以上代碼獲得如下結果:
1
3
4
2
Callback: stop
複製代碼
若是回調函數須要接受多個參數,能夠經過偏函數導入,修改代碼以下:
import asyncio
import functools
async def test1():
print("1")
await test2()
print("2")
return "stop"
async def test2():
print("3")
print("4")
def callback(param1,param2,future):
print(param1,param2)
print('Callback:',future.result())
loop = asyncio.get_event_loop()
task = asyncio.ensure_future(test1())
task.add_done_callback(functools.partial(callback,"param1","param2"))
loop.run_until_complete(task)
複製代碼
說明:回調函數中的future對象就是建立的task對象。
future對象有幾個狀態:Pending、Running、Done、Cancelled。建立future的時候,task爲pending,事件循環調用執行的時候固然就是running,調用完畢天然就是done,若是須要中止事件循環,就須要先把task取消,可使用asyncio.Task獲取事件循環的task。
前面介紹了使用事件循環執行協程函數,那麼怎麼中止執行呢?在中止執行協程前,須要先取消task,而後再中止loop事件循環。
import asyncio
async def test1():
print("1")
await asyncio.sleep(3)
print("2")
return "stop"
tasks = [
asyncio.ensure_future(test1()),
asyncio.ensure_future(test1()),
asyncio.ensure_future(test1()),
]
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(asyncio.wait(tasks))
except KeyboardInterrupt as e:
for task in asyncio.Task.all_tasks():
task.cancel()
loop.stop()
loop.run_forever()
finally:
loop.close()
複製代碼
運行以上代碼,按ctrl+c能夠結束執行。
併發一般指有多個任務須要同時進行,並行則是同一時刻有多個任務執行。用多線程、多進程、協程來講,協程實現併發,多線程與多進程實現並行。
asyncio想要實現併發,就須要多個協程來完成任務,每當有任務阻塞的時候就await,而後其餘協程繼續工做,這須要建立多個協程的列表,而後將這些協程註冊到事件循環中。這裏指的多個協程,能夠是多個協程函數,也能夠是一個協程函數的多個協程對象。
import asyncio
async def test1():
print("1")
await asyncio.sleep(1)
print("2")
return "stop"
a = test1()
b = test1()
c = test1()
tasks = [
asyncio.ensure_future(a),
asyncio.ensure_future(b),
asyncio.ensure_future(c),
]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks)) # 注意asyncio.wait方法
for task in tasks:
print("task result is ",task.result())
複製代碼
運行以上代碼獲得如下結果:
1
1
1
2
2
2
task result is stop
task result is stop
task result is stop
複製代碼
說明:代碼先是定義了三個協程對象,而後經過asyncio.ensure_future方法建立了三個task,而且將全部的task加入到了task列表,最終使用loop.run_until_complete將task列表添加到事件循環中。
前面介紹瞭如何使用async與await建立協程函數,使用asyncio.get_event_loop建立事件循環並執行協程函數。例子很好地展現了協程併發的高效,但在實際應用場景中該如何開發協程程序?好比說異步爬蟲。我嘗試用requests模塊、urllib模塊寫異步爬蟲,但實際操做發現並不支持asyncio異步,所以可使用aiohttp模塊編寫異步爬蟲。
import asyncio
import aiohttp
async def run(url):
print("start spider ",url)
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
print(resp.url)
url_list = ["https://thief.one","https://home.nmask.cn","https://movie.nmask.cn","https://tool.nmask.cn"]
tasks = [asyncio.ensure_future(run(url)) for url in url_list]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
複製代碼
運行以上代碼獲得如下結果:
start spider https://thief.one
start spider https://home.nmask.cn
start spider https://movie.nmask.cn
start spider https://tool.nmask.cn
https://movie.nmask.cn
https://home.nmask.cn
https://tool.nmask.cn
https://thief.one
複製代碼
說明:aiohttp基於asyncio實現,既能夠用來寫webserver,也能夠當爬蟲使用。
因爲requests模塊阻塞了客戶代碼與asycio事件循環的惟一線程,所以在執行調用時,整個應用程序都會凍結,但若是必定要用requests模塊,可使用事件循環對象的run_in_executor方法,經過run_in_executor方法來新建一個線程來執行耗時函數,所以能夠這樣修改代碼實現:
import asyncio
import requests
async def run(url):
print("start ",url)
loop = asyncio.get_event_loop()
response = await loop.run_in_executor(None, requests.get, url)
print(response.url)
url_list = ["https://thief.one","https://home.nmask.cn","https://movie.nmask.cn","https://tool.nmask.cn"]
tasks = [asyncio.ensure_future(run(url)) for url in url_list]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
複製代碼
若是要給requests帶上參數,可使用functools:
import asyncio
import requests
import functools
async def run(url):
print("start ",url)
loop = asyncio.get_event_loop()
try:
response = await loop.run_in_executor(None,functools.partial(requests.get,url=url,params="",timeout=1))
except Exception as e:
print(e)
else:
print(response.url)
url_list = ["https://thief.one","https://home.nmask.cn","https://movie.nmask.cn","https://tool.nmask.cn"]
tasks = [asyncio.ensure_future(run(url)) for url in url_list]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
複製代碼
如同前面介紹如何在asyncio中使用requests模塊同樣,若是想在asyncio中使用其餘阻塞函數,該怎麼實現呢?雖然目前有異步函數支持asyncio,但實際問題是大部分IO模塊還不支持asyncio。
阻塞函數(例如io讀寫,requests網絡請求)阻塞了客戶代碼與asycio事件循環的惟一線程,所以在執行調用時,整個應用程序都會凍結。
這個問題的解決方法是使用事件循環對象的run_in_executor方法。asyncio的事件循環在背後維護着一個ThreadPoolExecutor對象,咱們能夠調用run_in_executor方法,把可調用對象發給它執行,便可以經過run_in_executor方法來新建一個線程來執行耗時函數。
AbstractEventLoop.run_in_executor(executor, func, *args)
複製代碼
實際例子(使用time.sleep()):
import asyncio
import time
async def run(url):
print("start ",url)
loop = asyncio.get_event_loop()
try:
await loop.run_in_executor(None,time.sleep,1)
except Exception as e:
print(e)
print("stop ",url)
url_list = ["https://thief.one","https://home.nmask.cn","https://movie.nmask.cn","https://tool.nmask.cn"]
tasks = [asyncio.ensure_future(run(url)) for url in url_list]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
複製代碼
運行以上代碼獲得如下函數:
start https://thief.one
start https://home.nmask.cn
start https://movie.nmask.cn
start https://tool.nmask.cn
stop https://thief.one
stop https://movie.nmask.cn
stop https://home.nmask.cn
stop https://tool.nmask.cn
複製代碼
說明:有了run_in_executor方法,咱們就可使用以前熟悉的模塊建立協程併發了,而不須要使用特定的模塊進行IO異步開發。
www.oschina.net/translate/p… www.jianshu.com/p/b5e347b3a… zhuanlan.zhihu.com/p/27258289 juejin.im/entry/5aabb…
本文來自我的博客:Python3.5協程學習研究 | nMask'Blog,轉載請說明出處!
如需更多優質文章,請掃一掃關注微信公衆號
或者訪問我的博客:thief.one