關於asyncio知識(一)

1、介紹

asyncio 是python3.4 引入的一個新的併發模塊,主要經過使用coroutines 和 futures 來讓咱們更容易的去實現異步的功能,而且幾乎和寫同步代碼同樣的寫代碼,尚未煩人的回調。python

在2018年6月 3.7的更新中針對asyncio的api進行了一些升級,主要是關於task的管理以及 event loops 方面。後面會把3.7的增長的新特性專門整理一篇文章。mysql

現狀:
其實目前來講asyncio相關的異步庫並不完善,官網也並無專門維護,在github上有一個俄羅斯的小組在開發維護一些經常使用的庫如:aiomysql, aiopika, aioredis等。 這裏有一點須要在這裏提早說明:若是目前想要用asyncio異步的功能,那麼你整個代碼中其餘的庫也要是異步的而不能是阻塞的,若是咱們須要用aiomysql 而不能用pymysql, 咱們須要用aiohttp 而不能使用requests等等等。若是恰巧你用的一些庫,如今並無相對應的異步庫,那麼可能就比較麻煩了。git

2、Threads, loops, coroutines and futures

1. event loop:主要負責管理和分發不一樣task的執行,咱們能夠將不一樣的任務註冊在event loop上。
2. coroutines: 咱們一般也稱之爲協程,是與python生成器相似的特殊的函數,在這個函數中一般會有一個關鍵字await ,當coroutine執行到await 的時候,就會將控制權釋放給event loop. 若是一個coroutine被包裝成一個Future類型的Task中,那麼這個coroutine就須要被event loop 去調度執行
3. futures:表明未來執行或沒有執行的任務的結果,固然這個結果多是一個異常github

3、同步VS異步

asyncio 容許咱們將子任務定義爲coroutine,並容許你來調度它們,而在多線程中,這個調度一般是交給操做系統控制咱們並不能控制。咱們先經過下面的一個例子理解:redis

import asyncio
async def foo():
    print("running in foo")
    await asyncio.sleep(0)
    print("back foo")
async def bar():
    print("running in bar")
    await asyncio.sleep(0)
    print("back bar")
async def main():
    tasks = [foo(), bar()]
    await asyncio.gather(*tasks)
asyncio.run(main())

 

上述代碼的運行結果以下:sql

running in foo
running in bar
back foo
back bar

 

針對上述代碼的一個說明:api

  1. 切記這裏的sleep只能用asyncio裏面的,不能直接用sleep。這裏咱們看到coroutine經過await的方式將控制權交還給了event loop,並切換到計劃執行的下一個任務
  2. 關於gather的使用這裏能夠暫時忽略,後面文章會詳細說明
  3. 最後使用的asyncio.run是3.7更新的新方法,負責建立一個事件循環並調度coroutine,在3.7以前是須要咱們手動建立loop:asyncio.new_event_loop()

當咱們的代碼是同步執行的時候,執行的順序是線性的,若是咱們是異步的,順序就變得不肯定了,咱們經過一個簡單的爬蟲的例子來理解:網絡

import time
import random
import asyncio
import aiohttp
URL = 'https://baidu.com'
MAX_CLIENTS = 3
async def aiohttp_get(url):
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            return response
async def fetch_async(pid):
    start = time.time()
    sleepy_time = random.randint(2, 5)
    print('fetch coroutine {} started, sleeping for {} seconds'.format(
        pid, sleepy_time))
    response = await aiohttp_get(URL)
    datetime = response.headers.get('Date')
    #  這裏增長的asyncio.sleep是爲了模擬每一個請求有必定延遲返回
    await asyncio.sleep(sleepy_time)
    response.close()
    return 'coroutine {}: {}, took: {:.2f} seconds'.format(
        pid, datetime, time.time() - start)
async def main():
    start = time.time()
    futures = [fetch_async(i) for i in range(1, MAX_CLIENTS + 1)]
    for i, future in enumerate(asyncio.as_completed(futures)):
        result = await future
        print('{} {}'.format(">>" * (i + 1), result))
    print("all took: {:.2f} seconds".format(time.time() - start))
asyncio.run(main())

上述代碼中,咱們在每一個請求裏都添加了asyncio.sleep的操做,這裏實際上是爲了模擬實際狀況中當咱們請求多個網站的時候,由於網絡和目標網站的不一樣,請求返回的時間通常不一樣。
運行結果以下:session

fetch coroutine 2 started, sleeping for 5 seconds
fetch coroutine 1 started, sleeping for 3 seconds
fetch coroutine 3 started, sleeping for 4 seconds
>> coroutine 1: Wed, 27 Feb 2019 11:27:58 GMT, took: 3.09 seconds
>>>> coroutine 3: Wed, 27 Feb 2019 11:27:58 GMT, took: 4.08 seconds
>>>>>> coroutine 2: Wed, 27 Feb 2019 11:27:58 GMT, took: 5.12 seconds
all took: 5.12 seconds

關於return_when參數

這個參數是當咱們執行多個任務的時候,我只關注最快返回結果的那個任務,用法例子以下(注意我這裏爲了讓復現一個錯誤,先用了python3.7以前建立loop的方法):多線程

import time
import random
import asyncio
import aiohttp
from concurrent.futures import FIRST_COMPLETED
URL = 'https://baidu.com'
MAX_CLIENTS = 3
async def aiohttp_get(url):
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            return response
async def fetch_async(pid):
    start = time.time()
    sleepy_time = random.randint(2, 5)
    print('fetch coroutine {} started, sleeping for {} seconds'.format(
        pid, sleepy_time))
    response = await aiohttp_get(URL)
    datetime = response.headers.get('Date')
    #  這裏增長的asyncio.sleep是爲了模擬每一個請求有必定延遲返回
    await asyncio.sleep(sleepy_time)
    response.close()
    return 'coroutine {}: {}, took: {:.2f} seconds'.format(
        pid, datetime, time.time() - start)
async def main():
    start = time.time()
    futures = [fetch_async(i) for i in range(1, MAX_CLIENTS + 1)]
    done, pending = await asyncio.wait(
        futures, return_when=FIRST_COMPLETED
    )
    print(done.pop().result())
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()

運行結果會出現以下狀況:

fetch coroutine 2 started, sleeping for 2 seconds
fetch coroutine 1 started, sleeping for 5 seconds
fetch coroutine 3 started, sleeping for 2 seconds
coroutine 2: Wed, 27 Feb 2019 11:41:19 GMT, took: 2.11 seconds
Task was destroyed but it is pending!
task: <Task pending coro=<fetch_async() done, defined at e:/vs_python/lean_asyncio/ex2.py:17> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x00000000038E5798>()]>>

其實這裏出現這種問題的緣由,咱們很容易理解,咱們開啓了三個任務,當咱們收到最快完成的那個以後就關閉了循環,後面的兩個任務還處於pending狀態,asyncio 認爲這是一個錯誤,因此打印出了咱們看到的那個警告:Task was destroyed but it is pending!
咱們如何解決這個問題呢?

4、關於future

future有四種狀態:

  1. Pending
  2. Running
  3. Done
  4. Cancelled

咱們能夠經過調用done, cancelled 或者 running 來看當前future是否處於該狀態,這裏再次提醒,done 狀態能夠表示返回結果,也能夠表示跑出了異常。咱們也能夠經過調用cancel來專門取消future,不過在python3.7以後,asyncio.run替咱們作了這些事情,咱們把上面的那個出現Task was destroyed but it is pending!的代碼進行更改:

import time
import random
import asyncio
import aiohttp
from concurrent.futures import FIRST_COMPLETED
URL = 'https://baidu.com'
MAX_CLIENTS = 3
async def aiohttp_get(url):
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            return response
async def fetch_async(pid):
    start = time.time()
    sleepy_time = random.randint(2, 5)
    print('fetch coroutine {} started, sleeping for {} seconds'.format(
        pid, sleepy_time))
    response = await aiohttp_get(URL)
    datetime = response.headers.get('Date')
    #  這裏增長的asyncio.sleep是爲了模擬每一個請求有必定延遲返回
    await asyncio.sleep(sleepy_time)
    response.close()
    return 'coroutine {}: {}, took: {:.2f} seconds'.format(
        pid, datetime, time.time() - start)
async def main():
    start = time.time()
    futures = [fetch_async(i) for i in range(1, MAX_CLIENTS + 1)]
    done, pending = await asyncio.wait(
        futures, return_when=FIRST_COMPLETED
    )
    print(done.pop().result())
asyncio.run(main())

運行結果以下,徹底正常了:

fetch coroutine 2 started, sleeping for 5 seconds
fetch coroutine 3 started, sleeping for 2 seconds
fetch coroutine 1 started, sleeping for 2 seconds
coroutine 3: Wed, 27 Feb 2019 11:54:13 GMT, took: 2.07 seconds

future還有一個實用的功能:容許咱們在future變成完成狀態時添加callback回調.

關於future的完成時結果的獲取,經過下面代碼來演示:

import time
import random
import asyncio
import aiohttp
from concurrent.futures import FIRST_COMPLETED
URL = 'https://httpbin.org/get'
MAX_CLIENTS = 3
async def aiohttp_get(url):
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            return response
async def fetch_async(pid):
    start = time.time()
    sleepy_time = random.randint(2, 5)
    print('fetch coroutine {} started, sleeping for {} seconds'.format(
        pid, sleepy_time))
    response = await aiohttp_get(URL)
    datetime = response.headers.get('Date')
    #  這裏增長的asyncio.sleep是爲了模擬每一個請求有必定延遲返回
    await asyncio.sleep(sleepy_time)
    response.close()
    return 'coroutine {}: {}, took: {:.2f} seconds'.format(
        pid, datetime, time.time() - start)
async def main():
    start = time.time()
    futures = [fetch_async(i) for i in range(1, MAX_CLIENTS + 1)]
    done, pending = await asyncio.wait(
        futures
    )
    print(done)
    for future in done:
        print(future.result())
asyncio.run(main())

運行結果以下:

fetch coroutine 2 started, sleeping for 5 seconds
fetch coroutine 1 started, sleeping for 2 seconds
fetch coroutine 3 started, sleeping for 4 seconds
{<Task finished coro=<fetch_async() done, defined at e:/vs_python/lean_asyncio/ex2.py:17> result='coroutine 3:... 5.31 seconds'>, <Task finished coro=<fetch_async() done, defined at e:/vs_python/lean_asyncio/ex2.py:17> result='coroutine 1:... 3.34 seconds'>, <Task finished coro=<fetch_async() done, defined at e:/vs_python/lean_asyncio/ex2.py:17> result='coroutine 2:... 6.38 seconds'>}
coroutine 3: Wed, 27 Feb 2019 12:10:15 GMT, took: 5.31 seconds
coroutine 1: Wed, 27 Feb 2019 12:10:15 GMT, took: 3.34 seconds
coroutine 2: Wed, 27 Feb 2019 12:10:15 GMT, took: 6.38 seconds

咱們能夠看到,當全部任務完成後,咱們能夠經過done獲取每一個人的結果信息。

咱們也能夠給咱們的任務添加超時時間

import time
import random
import asyncio
import aiohttp
from concurrent.futures import FIRST_COMPLETED
URL = 'https://httpbin.org/get'
MAX_CLIENTS = 3
async def aiohttp_get(url):
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            return response
async def fetch_async(pid):
    start = time.time()
    sleepy_time = random.randint(2, 5)
    print('fetch coroutine {} started, sleeping for {} seconds'.format(
        pid, sleepy_time))
    response = await aiohttp_get(URL)
    datetime = response.headers.get('Date')
    #  這裏增長的asyncio.sleep是爲了模擬每一個請求有必定延遲返回
    await asyncio.sleep(sleepy_time)
    response.close()
    return 'coroutine {}: {}, took: {:.2f} seconds'.format(
        pid, datetime, time.time() - start)
async def main():
    start = time.time()
    futures = [fetch_async(i) for i in range(1, MAX_CLIENTS + 1)]
    done, pending = await asyncio.wait(
        futures, return_when=FIRST_COMPLETED,timeout=0.01
    )
    print(done)
    for future in done:
        print(future.result())
asyncio.run(main())

我這裏把超時時間設置的很是小了是0.01,致使最後我打印done結果的時候其實三個任務沒有一任務是被完成的:

fetch coroutine 2 started, sleeping for 4 seconds
fetch coroutine 3 started, sleeping for 3 seconds
fetch coroutine 1 started, sleeping for 4 seconds
set()

5、總結

這裏對python asyncio先進行總體功能的整理,會面會針對細節作詳細整理。相對來講如今各個公司實際線上用asyncio的應該很少,也但願更多的小夥伴來相互交流,分享這個python以及python異步相關心得。歡迎加入交流羣:948510543

相關文章
相關標籤/搜索