asyncio 是python3.4 引入的一個新的併發模塊,主要經過使用coroutines 和 futures 來讓咱們更容易的去實現異步的功能,而且幾乎和寫同步代碼同樣的寫代碼,尚未煩人的回調。python
在2018年6月 3.7的更新中針對asyncio的api進行了一些升級,主要是關於task的管理以及 event loops 方面。後面會把3.7的增長的新特性專門整理一篇文章。mysql
現狀:
其實目前來講asyncio相關的異步庫並不完善,官網也並無專門維護,在github上有一個俄羅斯的小組在開發維護一些經常使用的庫如:aiomysql, aiopika, aioredis等。 這裏有一點須要在這裏提早說明:若是目前想要用asyncio異步的功能,那麼你整個代碼中其餘的庫也要是異步的而不能是阻塞的,若是咱們須要用aiomysql 而不能用pymysql, 咱們須要用aiohttp 而不能使用requests等等等。若是恰巧你用的一些庫,如今並無相對應的異步庫,那麼可能就比較麻煩了。git
1. event loop:主要負責管理和分發不一樣task的執行,咱們能夠將不一樣的任務註冊在event loop上。
2. coroutines: 咱們一般也稱之爲協程,是與python生成器相似的特殊的函數,在這個函數中一般會有一個關鍵字await ,當coroutine執行到await 的時候,就會將控制權釋放給event loop. 若是一個coroutine被包裝成一個Future類型的Task中,那麼這個coroutine就須要被event loop 去調度執行
3. futures:表明未來執行或沒有執行的任務的結果,固然這個結果多是一個異常github
asyncio 容許咱們將子任務定義爲coroutine,並容許你來調度它們,而在多線程中,這個調度一般是交給操做系統控制咱們並不能控制。咱們先經過下面的一個例子理解:redis
import asyncio async def foo(): print("running in foo") await asyncio.sleep(0) print("back foo") async def bar(): print("running in bar") await asyncio.sleep(0) print("back bar") async def main(): tasks = [foo(), bar()] await asyncio.gather(*tasks) asyncio.run(main())
上述代碼的運行結果以下:sql
running in foo running in bar back foo back bar
針對上述代碼的一個說明:api
當咱們的代碼是同步執行的時候,執行的順序是線性的,若是咱們是異步的,順序就變得不肯定了,咱們經過一個簡單的爬蟲的例子來理解:網絡
import time import random import asyncio import aiohttp URL = 'https://baidu.com' MAX_CLIENTS = 3 async def aiohttp_get(url): async with aiohttp.ClientSession() as session: async with session.get(url) as response: return response async def fetch_async(pid): start = time.time() sleepy_time = random.randint(2, 5) print('fetch coroutine {} started, sleeping for {} seconds'.format( pid, sleepy_time)) response = await aiohttp_get(URL) datetime = response.headers.get('Date') # 這裏增長的asyncio.sleep是爲了模擬每一個請求有必定延遲返回 await asyncio.sleep(sleepy_time) response.close() return 'coroutine {}: {}, took: {:.2f} seconds'.format( pid, datetime, time.time() - start) async def main(): start = time.time() futures = [fetch_async(i) for i in range(1, MAX_CLIENTS + 1)] for i, future in enumerate(asyncio.as_completed(futures)): result = await future print('{} {}'.format(">>" * (i + 1), result)) print("all took: {:.2f} seconds".format(time.time() - start)) asyncio.run(main())
上述代碼中,咱們在每一個請求裏都添加了asyncio.sleep的操做,這裏實際上是爲了模擬實際狀況中當咱們請求多個網站的時候,由於網絡和目標網站的不一樣,請求返回的時間通常不一樣。
運行結果以下:session
fetch coroutine 2 started, sleeping for 5 seconds fetch coroutine 1 started, sleeping for 3 seconds fetch coroutine 3 started, sleeping for 4 seconds >> coroutine 1: Wed, 27 Feb 2019 11:27:58 GMT, took: 3.09 seconds >>>> coroutine 3: Wed, 27 Feb 2019 11:27:58 GMT, took: 4.08 seconds >>>>>> coroutine 2: Wed, 27 Feb 2019 11:27:58 GMT, took: 5.12 seconds all took: 5.12 seconds
這個參數是當咱們執行多個任務的時候,我只關注最快返回結果的那個任務,用法例子以下(注意我這裏爲了讓復現一個錯誤,先用了python3.7以前建立loop的方法):多線程
import time import random import asyncio import aiohttp from concurrent.futures import FIRST_COMPLETED URL = 'https://baidu.com' MAX_CLIENTS = 3 async def aiohttp_get(url): async with aiohttp.ClientSession() as session: async with session.get(url) as response: return response async def fetch_async(pid): start = time.time() sleepy_time = random.randint(2, 5) print('fetch coroutine {} started, sleeping for {} seconds'.format( pid, sleepy_time)) response = await aiohttp_get(URL) datetime = response.headers.get('Date') # 這裏增長的asyncio.sleep是爲了模擬每一個請求有必定延遲返回 await asyncio.sleep(sleepy_time) response.close() return 'coroutine {}: {}, took: {:.2f} seconds'.format( pid, datetime, time.time() - start) async def main(): start = time.time() futures = [fetch_async(i) for i in range(1, MAX_CLIENTS + 1)] done, pending = await asyncio.wait( futures, return_when=FIRST_COMPLETED ) print(done.pop().result()) loop = asyncio.get_event_loop() loop.run_until_complete(main()) loop.close()
運行結果會出現以下狀況:
fetch coroutine 2 started, sleeping for 2 seconds fetch coroutine 1 started, sleeping for 5 seconds fetch coroutine 3 started, sleeping for 2 seconds coroutine 2: Wed, 27 Feb 2019 11:41:19 GMT, took: 2.11 seconds Task was destroyed but it is pending! task: <Task pending coro=<fetch_async() done, defined at e:/vs_python/lean_asyncio/ex2.py:17> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x00000000038E5798>()]>>
其實這裏出現這種問題的緣由,咱們很容易理解,咱們開啓了三個任務,當咱們收到最快完成的那個以後就關閉了循環,後面的兩個任務還處於pending狀態,asyncio 認爲這是一個錯誤,因此打印出了咱們看到的那個警告:Task was destroyed but it is pending!
咱們如何解決這個問題呢?
future有四種狀態:
咱們能夠經過調用done, cancelled 或者 running 來看當前future是否處於該狀態,這裏再次提醒,done 狀態能夠表示返回結果,也能夠表示跑出了異常。咱們也能夠經過調用cancel來專門取消future,不過在python3.7以後,asyncio.run替咱們作了這些事情,咱們把上面的那個出現Task was destroyed but it is pending!的代碼進行更改:
import time import random import asyncio import aiohttp from concurrent.futures import FIRST_COMPLETED URL = 'https://baidu.com' MAX_CLIENTS = 3 async def aiohttp_get(url): async with aiohttp.ClientSession() as session: async with session.get(url) as response: return response async def fetch_async(pid): start = time.time() sleepy_time = random.randint(2, 5) print('fetch coroutine {} started, sleeping for {} seconds'.format( pid, sleepy_time)) response = await aiohttp_get(URL) datetime = response.headers.get('Date') # 這裏增長的asyncio.sleep是爲了模擬每一個請求有必定延遲返回 await asyncio.sleep(sleepy_time) response.close() return 'coroutine {}: {}, took: {:.2f} seconds'.format( pid, datetime, time.time() - start) async def main(): start = time.time() futures = [fetch_async(i) for i in range(1, MAX_CLIENTS + 1)] done, pending = await asyncio.wait( futures, return_when=FIRST_COMPLETED ) print(done.pop().result()) asyncio.run(main())
運行結果以下,徹底正常了:
fetch coroutine 2 started, sleeping for 5 seconds fetch coroutine 3 started, sleeping for 2 seconds fetch coroutine 1 started, sleeping for 2 seconds coroutine 3: Wed, 27 Feb 2019 11:54:13 GMT, took: 2.07 seconds
future還有一個實用的功能:容許咱們在future變成完成狀態時添加callback回調.
關於future的完成時結果的獲取,經過下面代碼來演示:
import time import random import asyncio import aiohttp from concurrent.futures import FIRST_COMPLETED URL = 'https://httpbin.org/get' MAX_CLIENTS = 3 async def aiohttp_get(url): async with aiohttp.ClientSession() as session: async with session.get(url) as response: return response async def fetch_async(pid): start = time.time() sleepy_time = random.randint(2, 5) print('fetch coroutine {} started, sleeping for {} seconds'.format( pid, sleepy_time)) response = await aiohttp_get(URL) datetime = response.headers.get('Date') # 這裏增長的asyncio.sleep是爲了模擬每一個請求有必定延遲返回 await asyncio.sleep(sleepy_time) response.close() return 'coroutine {}: {}, took: {:.2f} seconds'.format( pid, datetime, time.time() - start) async def main(): start = time.time() futures = [fetch_async(i) for i in range(1, MAX_CLIENTS + 1)] done, pending = await asyncio.wait( futures ) print(done) for future in done: print(future.result()) asyncio.run(main())
運行結果以下:
fetch coroutine 2 started, sleeping for 5 seconds fetch coroutine 1 started, sleeping for 2 seconds fetch coroutine 3 started, sleeping for 4 seconds {<Task finished coro=<fetch_async() done, defined at e:/vs_python/lean_asyncio/ex2.py:17> result='coroutine 3:... 5.31 seconds'>, <Task finished coro=<fetch_async() done, defined at e:/vs_python/lean_asyncio/ex2.py:17> result='coroutine 1:... 3.34 seconds'>, <Task finished coro=<fetch_async() done, defined at e:/vs_python/lean_asyncio/ex2.py:17> result='coroutine 2:... 6.38 seconds'>} coroutine 3: Wed, 27 Feb 2019 12:10:15 GMT, took: 5.31 seconds coroutine 1: Wed, 27 Feb 2019 12:10:15 GMT, took: 3.34 seconds coroutine 2: Wed, 27 Feb 2019 12:10:15 GMT, took: 6.38 seconds
咱們能夠看到,當全部任務完成後,咱們能夠經過done獲取每一個人的結果信息。
咱們也能夠給咱們的任務添加超時時間
import time import random import asyncio import aiohttp from concurrent.futures import FIRST_COMPLETED URL = 'https://httpbin.org/get' MAX_CLIENTS = 3 async def aiohttp_get(url): async with aiohttp.ClientSession() as session: async with session.get(url) as response: return response async def fetch_async(pid): start = time.time() sleepy_time = random.randint(2, 5) print('fetch coroutine {} started, sleeping for {} seconds'.format( pid, sleepy_time)) response = await aiohttp_get(URL) datetime = response.headers.get('Date') # 這裏增長的asyncio.sleep是爲了模擬每一個請求有必定延遲返回 await asyncio.sleep(sleepy_time) response.close() return 'coroutine {}: {}, took: {:.2f} seconds'.format( pid, datetime, time.time() - start) async def main(): start = time.time() futures = [fetch_async(i) for i in range(1, MAX_CLIENTS + 1)] done, pending = await asyncio.wait( futures, return_when=FIRST_COMPLETED,timeout=0.01 ) print(done) for future in done: print(future.result()) asyncio.run(main())
我這裏把超時時間設置的很是小了是0.01,致使最後我打印done結果的時候其實三個任務沒有一任務是被完成的:
fetch coroutine 2 started, sleeping for 4 seconds fetch coroutine 3 started, sleeping for 3 seconds fetch coroutine 1 started, sleeping for 4 seconds set()
這裏對python asyncio先進行總體功能的整理,會面會針對細節作詳細整理。相對來講如今各個公司實際線上用asyncio的應該很少,也但願更多的小夥伴來相互交流,分享這個python以及python異步相關心得。歡迎加入交流羣:948510543