從零開始學asyncio(下)

  本篇文章主要是對asyncio和相關內容的補充, 以及一個異步爬蟲實例. 這個系列還有另外兩篇文章:html

一. 使用同步代碼

  上一篇文章已經講到, 使用asyncio模塊的基本套路是, 把要執行的代碼寫成協程函數的形式, 在函數內部IO操做的部分使用await掛起任務. 最後將協程給asyncio運行便可. python

  假設如今須要使用requests庫請求數據:git

import requests


def fetch(url):
    response = requests.get(url)
    # 如下省略一萬字

顯然, 對一個url的get請求屬於耗時的IO操做, 可是requests是個同步庫, 無法await, 若是直接以同步的方式運行, 那麼又不能發揮出異步的效率優點.github

  這種必須使用同步代碼的狀況下,  能夠這樣作:web

import asyncio
import requests


async def fetch(url):
    loop = asyncio.get_running_loop()
    asyncIO = loop.run_in_executor(None, requests.get, url)
    response = await asyncIO
    # 繼續省略一萬字

   loop就是當前線程運行的事件循環, 調用它的run_in_execcutor方法, 能夠將一個同步代碼的函數封裝爲future對象, 而後就能夠對其await了.api

  這個方法接受三個參數: executor, func和*args, 其中executor爲線程池或者進程池(concurrent.futures.ThreadPoolExecutor&ProcessPoolExecutor), 若是傳入None則使用默認的線程池.  func和*args就是同步函數以及對應的參數. 這個方式的本質是將同步代碼放在其它的線程或者進程運行, 來避免其阻塞主線程, 故只是權宜之計.安全

  目前已經有愈來愈多支持asyncio的異步庫, 好比上面說的requests庫, 就能夠用aiohttp來替代. 在https://github.com/timofurrer/awesome-asyncio能夠找到支持用asyncio異步的庫.網絡

二. async語句的其它用法

  async除了用於定義協程函數外, 另兩種用法是async with和async for, 這兩種語句和await同樣有異步屬性, 必須在協程函數內使用.session

1. async with

  async with和with相似, async with的對象應該實現__aenter__和__aexit__方法, 而且這兩個方法是異步的.併發

  一個簡單的async with實例以下:

import asyncio


class Lock:

    def __init__(self):
        self._locked = False

    async def __aenter__(self):
        while self._locked:
            await asyncio.sleep(0)
        self._locked = True
        return self

    async def __aexit__(self, exc_type, exc, tb):
        self._locked = False


lock = Lock()


async def coro():
    async with lock:
        pass


asyncio.run(coro())

這裏實現了一個簡單的協程鎖, 其中__aenter__和__aexit__方法分別用於請求和釋放鎖.

2. async for

  async for也和for相似, 後面接的迭代對象須要實現__aiter__和__anext__兩個方法, 其中__anext__方法是異步的.

  一個簡單的async for實例以下:

import asyncio


class Xrange:

    def __init__(self, lower, upper):
        self._num = lower-1
        self.upper = upper

    def __aiter__(self):
        return self

    async def __anext__(self):
        self._num += 1
        if self._num < self.upper:
            return self._num
        raise StopAsyncIteration


async def coro():
    async for i in Xrange(1, 10):
        print(i)


asyncio.run(coro())

 須要注意的是, 要在__anext__方法中迭代結束的位置引起StopAsyncIteration, 不然這個對象就會永遠迭代下去.

三. 協程補充

  除了send方法以外, 協程對象常見的方法還有throw和close, 前者在協程內部引起異常, 後者將協程中止.

1. coro.throw()

  顧名思義, throw方法就是把一個異常扔進協程內部, 它須要三個參數: 
coro.throw(type, value=None, traceback=None)
其中type爲錯誤類型, value爲錯誤值, traceback的話, 官網沒找到說明, 我也不知道是什麼.
  如今測試一下throw方法:
class Awaitable:

    def __await__(self):
        yield


async def coro():
    while 1:
        aw = Awaitable()
        await aw


c = coro()
# 若是不預先激活協程就直接調用throw,那麼throw方法就會在async def coro這一行引起異常
c.send(None)
c.throw(Exception, 'haha')

運行結果以下:

  若是一個協程沒有被激活, 那麼調用throw, 就會直接在定義協程的那個位置拋出異常, 不然, 協程卡在哪一個yield, 就在那個位置拋出異常. 固然, 協程也能夠在內部捕獲這個異常.

  除此以外, 其實throw和send是類似的, 兩者都會得到yield返回的值. 而且, throw和send同樣會驅動協程, 前提是協程能捕獲到這個異常而不退出.

2. coro.close()

  close函數用於關閉協程, 使之徹底中止. 這個函數實際上是使用了throw函數, 僞碼以下:

def close(self):
    try:
        self.throw(GeneratorExit)
    except (GeneratorExit, StopIteration):
        pass
    else:
        raise RuntimeError("generator ignored GeneratorExit")

  其中GeneratorExit繼承自BaseException, 也是異常的一種. 所以, close方法其實就是經過往協程內部扔異常的方式讓協程中止, 若是這個協程執意要捕獲異常而不中止的話, close方法就會拋出RuntimeError.

四. task/future補充

   上一篇文章講過, future用在協程內部, 主要特性就是暫停和回調:

  • 協程在這個位置暫停和切換, 所以可用於IO操做
  • 在協程被task封裝的前提下, 調用set_result就等於結束本次暫停, 並將set_result的值返回給協程
  • 能夠設置回調函數, 當暫停結束的時候調用回調

task繼承自future, 性質和future差很少, 另外還負責封裝和驅動協程.

1.task&future的建立與鑑別

  鑑別一個對象是否是task&future, 能夠調用asyncio.isfuture, 這個函數會返回一個布爾值. 相應的, 調用asyncio.iscoroutine和asyncio.iscoroutinefunction就能夠判斷協程對象和協程函數.

  若是要建立task或者future對象, 能夠調用如下函數;

# 要把協程封裝爲task對象,可使用如下兩個函數:

# 這個函數必須在有事件循環運行時調用,否則報錯
# 通常來講,在協程中調用就行
asyncio.create_task(coro)

# 這個函數不光能夠接收協程對象,只要是可等待對象均可以
# 若是沒有事件循環, 它就會調用get_event_loop獲取
asyncio.ensure_future(aw)


# 或者在獲取到當前事件循環的前提下
loop.create_task()


# 若是要建立future對象,可使用如下方法:

asyncio.Future()

# 或者在獲取到當前事件循環的前提下
loop.create_future()

2.取消task&future

  task&future有pending, 和done兩種狀態, 其中done又能夠再分爲finished和cancelled. 簡單來講, 還能運行就是pending, 正常結束就是finished. 被取消了就是cancelled, 調用cancel方法就能夠取消一個task&future對象.

  如今使用以下代碼取消一個future對象:

import asyncio


loop = asyncio.get_event_loop()


async def coro():
    fut = loop.create_future()

    def cb(fut):
        print('這是一個回調函數')
    fut.add_done_callback(cb)
    # 在一秒以後取消fut
    loop.call_later(1, fut.cancel)
    try:
        await fut
    except Exception as e:
        print('異常:', type(e))
    print('future對象:', fut)


loop.run_until_complete(coro())

運行結果以下:

  首先, cancel方法會處理全部的回調函數, 這一點和set_result方法是同樣的. 在這以後, future.__await__方法內部會引起CancelledError, 若是協程不捕獲的話, 這整個協程就中止了. 最後能夠看到, fut對象的狀態變爲cancelled. 簡單點說, 就是首先處理回調函數, 而後往協程裏面throw一個CancelledError使之結束.

  task的cancel方法與future相似, 首先處理回調函數, 而後調用future.cancel來取消協程.

五. 事件循環

  上一篇中講過了, 事件循環至關於一個調度者, 對多個task進行管理, 當觸發到事件時, 就驅動對應的task運行.

1. 獲取和設置事件循環

  使用如下函數獲取或者設置事件循環:

# 若是沒有正在運行的事件循環, 下面這個函數會報錯
# 所以, 只能在協程或者相關的回調中調用這個函數
asyncio.get_running_loop()
# 獲取事件循環, 沒有就會新建一個
asyncio.get_event_loop()
# 新建一個事件循環
asyncio.new_event_loop()
# 將loop綁定到當前的線程中
asyncio.set_event_loop(loop)

事件循環對象綁定了不少方法, 主要是運行回調, 網絡通訊等方面的, 能夠看https://docs.python.org/zh-cn/3/library/asyncio-eventloop.html.

2. 使人迷惑的點

  不管是future仍是task, 都須要依賴事件循環來調度. 顯然, 在一個線程只須要一個事件循環, 不然會形成程序混亂.

  如今運行下面這段程序:

import asyncio


loop = asyncio.get_event_loop()


async def coro():
    fut = loop.create_future()
    loop.call_later(1, fut.set_result, None)
    await fut
    print('the end')


asyncio.run(coro())

運行結果以下:

 

  這是由於, asyncio.run會建立一個事件循環A來運行傳入的協程對象, 可是, 這個協程自身又經過事件循環B來建立了一個future對象, 那麼這個future對象就綁定到事件循環B上去了, 這就致使程序的混亂和錯誤.

  若是要避免這種狀況, 能夠採起如下兩種措施;

  • 首先, 儘可能使用asyncio.get_running_loop而非asyncio.get_event_loop, 這樣能確保獲取的事件循環就是當前運行的那個
  • 而後, 若是調用的API能夠接收loop參數, 就傳入loop參數以保證事件循環的一致性. 我統計了一下, asyncio模塊的task.py共定義了42個函數, 其中有14個函數能接收loop參數.

六. asyncio.wait&asyncio.gather

  以前講過, 若是要同時運行多個協程, 可使用asyncio.wait將其打包, 其實asyncio中還有一個效果相似的函數, 即asyncio.gather.

1. wait&gather的實現原理

  gather和wait函數的主要代碼以下(有刪改):
import asynciodef gather(*coros):
    def _done_callback(fut):
        nonlocal nfinished
        nfinished += 1
        if nfinished == nfuts:
            results = []

            for fut in children:
                res = fut.result()
                results.append(res)

            outer.set_result(results)

    nfuts = 0
    nfinished = 0
    children = []

    for c in coros:
        fut = asyncio.create_task(c)
        fut.add_done_callback(_done_callback)
        children.append(fut)
    nfuts = len(children)
    outer = _GatherFuture(children)
    return outer


class _GatherFuture(asyncio.Future):

    '''
    這個future類是gather函數的輔助
    若是調用它的cancel方法, 就會把全部的子協程都取消
    '''

    def __init__(self, children):
        super().__init__()
        self._children = children

    def cancel(self):
        if self.done():
            return False
        ret = False
        for child in self._children:
            if child.cancel():
                ret = True
        return ret


async def wait(fs):

    fs = {asyncio.create_task(f) for f in set(fs)}

    loop = asyncio.get_running_loop()
    waiter = loop.create_future()
    counter = len(fs)

    def _on_completion(f):
        nonlocal counter
        counter -= 1
        if counter <= 0:
            waiter.set_result(None)

    for f in fs:
        f.add_done_callback(_on_completion)

    await waiter

    done, pending = set(), set()
    for f in fs:
        f.remove_done_callback(_on_completion)
        if f.done():
            done.add(f)
        else:
            pending.add(f)
    return done, pending
  gather和wait的實現原理是大同小異的,都利用了future和task的特性. future能夠視爲一個暫停點, 調用future.set_result, 就等於此次暫停結束了, task則等於對協程的封裝, 不但能夠驅動協程, 並且在協程結束的時候會調用設定的回調函數. 
  基於future和task的特性, 首先將協程都封裝成task, 同時用一個變量統計協程數,而且實例化一個future對象, 讓程序停在future這裏. 每一個協程一結束,就調用回調更改計數器的值,根據計數器的值肯定是否是全部協程都結束了, 是的話就調用future對象的set_result結束本次暫停. 這樣, 等全部協程都結束以後, wait&gather函數也就結束了.

2. wait&gather的使用差別

  首先, 兩者都是接收協程, 實現這些協程併發的效果, 其中wait方法是將協程放在一個可迭代對象中,而後做爲一個參數傳入, 而gather則是使用任意參數列表, 便可以傳入多個協程:
# coros是一個包含了多個協程的可迭代對象, 好比一個協程列表
asyncio.wait(coros)
# 若是使用gather,應該用如下兩種方式傳參
asyncio.gather(*coros)
asyncio.gather(coro1,coro2,coro3)

  而後, wait是一個協程函數, 所以直接調用會返回一個協程對象, 可使用asyncio.run運行. 可是gather則是返回一個future對象, 所以與asyncio.run不兼容:

# run收到的參數是一個協程,沒毛病
asyncio.run(asyncio.wait(coros))
# run收到一個future對象,所以下面這句話會報錯
asyncio.run(asyncio.gather(*coros))
# 若是要運行gather打包的協程, 能夠用如下兩種方式:
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.gather(*coros))
# 或者
async def main():
    res = await asyncio.gather(*coros)

asyncio.run(main())

  另外一方面, 因爲gather函數返回的是一個future對象, 所以能夠對其調用cancel方法來提早結束, 此時gather內全部的協程都會被取消.

  最後, 由上面的代碼可知, 兩者的完成時的返回值有差別, wait會返回done和pending兩個集合, 即已完成和未完成的協程集合, 而gather則是直接把協程的結果放在列表中返回.

async def func(coros):
    # wait會返回兩個集合, 分別存放coros中已完成和未完成的協程
    done_coros, pending_coros = await asyncio.wait(coros)
    # gather則是直接把協程的結果放在一個列表中返回
    results = await asyncio.gather(*coros)

3. wait&gather函數的附加功能

  wait和gather函數的附加功能主要體如今其附加的參數上.

  wait能夠設置timeout和return_when兩個參數, 從而讓wait函數提早結束, 這時未完成的協程就會被放入pending這個集合中返回. 不過, wait並不會將這些未完成的協程取消掉.

import asyncio
import concurrent.futures


# return_when只能設置如下三個值, 否則會報錯
concurrent.futures.FIRST_COMPLETED # 在有一個協程結束時返回
concurrent.futures.FIRST_EXCEPTION # 在有一個協程拋出異常時返回
concurrent.futures.ALL_COMPLETE # 在全部協程都結束時返回, 這是默認值


async def main(coros):
    # timeout表示幾秒後返回, 不設置則不起做用
    done, pending = await asyncio.wait(coros, timeout=3, return_when=concurrent.futures.FIRST_COMPLETED)

  須要注意的是, 就算將return_when設置爲FIRST_COMPLETED,  若是多個協程的完成時間相近, 那麼這幾個協程可能都會被放到done集合中. FIRST_EXCEPTION同理.

  gather能夠設置return_exceptions這個參數, 其默認值爲False, 若是設置爲True, 那麼協程在運行中的錯誤將不會直接拋出, 而是將錯誤信息做爲返回結果, 存入最後返回的結果列表中.

按照python官方文檔的說法, 從python3.8開始, wait函數不會將傳入的協程封裝爲task對象, 也就是說, 在將協程傳入wait函數以前, 應該先調用asyncio.create_task方法將協程轉化爲task對象. 詳細說明能夠看https://docs.python.org/zh-cn/3/library/asyncio-task.html?highlight=asyncio%20wait#asyncio.wait.
可是, 我今天特意安裝了一個python3.8進行測試, 發現並無這回事, 傳一個協程列表給wait, 程序仍是能夠正常運行. 並且wait內部的代碼也沒改, 仍是會調用ensure_future將協程封裝成task對象.
總之, 這裏仍是注意一下吧, 指不定哪天就變了.
今天是2020-1-7.
補充說明

七. 一個簡單的爬蟲實例

  以https://xkcd.com/這個網站爲例, 這是一個漫畫網站, 每張漫畫對應的頁面是https://xkcd.com/int/, 其中int爲1-2244之間的整數. 如今從網站上下載一百張漫畫, 代碼以下:

import asyncio
import os
import random
import re
import time

import aiofiles
import aiohttp


class Producer:

    def __init__(self, session, queue):
        self.session = session
        self.regex = re.compile(
            '''<div id="comic">\n<img src="(//imgs.xkcd.com/comics/[a-z0-9_]+\.(jpg|png))"''')
        self.q = queue
        self.urls = self.url_factory()

    async def start(self):
        workers = [self.worker() for i in range(10)]
        await asyncio.wait(workers)
        self.q.put_nowait('all tasks are done')

    async def worker(self):
        while self.urls:
            url = self.urls.pop()
            html = await self.fetch(url)
            await self.pares_html(html)

    def url_factory(self, n=100):
        # 返回一個含有n個url的集合
        urls = set()
        while len(urls) < n:
            urls.add('https://xkcd.com/{}/'.format(random.randrange(1, 2245)))
        return urls

    async def fetch(self, url):
        async with self.session.get(url) as response:
            assert response.status == 200
            return await response.text()

    async def pares_html(self, html):
        res = re.search(self.regex, html)
        if res is None:
            return
        img_url = 'https:'+res.group(1)
        await self.q.put(img_url)


class Consumer:

    def __init__(self, session, queue):
        self.session = session
        self.q = queue
        self.save_folder = os.path.join(os.path.dirname(__file__), 'webcomics')
        try:
            os.mkdir(self.save_folder)
        except FileExistsError:
            pass

    async def start(self):
        workers = [self.worker() for i in range(10)]
        await asyncio.wait(workers)

    async def worker(self):
        while 1:
            url = await self.q.get()
            if url == 'all tasks are done':
                break
            await self.download(url)
        self._on_worker_done()

    def _on_worker_done(self):
        self.q.put_nowait('all tasks are done')

    async def download(self, url):
        filename = os.path.split(url)[-1]
        save_path = os.path.join(self.save_folder, filename)
        if os.path.exists(save_path):
            return
        async with self.session.get(url) as response:
            assert response.status == 200
            content = await response.read()
            async with aiofiles.open(save_path, 'wb') as f:
                await f.write(content)


async def main():
    queue = asyncio.Queue(maxsize=100)
    async with aiohttp.ClientSession() as session:
        producer = Producer(session, queue)
        consumer = Consumer(session, queue)
        tasks = asyncio.gather(producer.start(), consumer.start())
        await tasks


if __name__ == '__main__':
    asyncio.run(main())

  這段代碼主要分爲兩部分: producer和consumer, 即生產者和消費者. 生產者負責生成圖片下載連接, 消費者則處理這些連接, 將對應的圖片下載到本地.

1. producer代碼解析

  首先, producer須要兩個參數, session和queue. session是鏈接池, 其特色是結束以後, 對應的鏈接不會斷開, 下次須要同一個目標的鏈接時, 直接使用上次未斷開的鏈接, 也就是實現了tcp鏈接的重複使用. 因爲本次鏈接的目標都是同一個網站, 所以使用鏈接池能夠減小建立和斷開tcp鏈接的開銷(三次握手, 四次揮手). queue則是隊列, 生產者產生圖片下載連接以後, 不直接交給消費者, 而是放在隊列中讓消費者自取. 這樣兩者不是同步的, 效率低的一方不會阻塞到效率高的另外一方. 

  producer的工做策略是, 首先產生好100個網頁連接存在self.urls這個列表中, 而後讓worker不斷從集合中取出url. worker方法首先調用fetch方法獲取到對應的html, 而後調用parse_html提取html中的圖片連接, 將圖片連接儲存到隊列中. 

  這段代碼並不算複雜, 須要注意的有三點.

  • 首先, producer調用start方法開始工做, 這個方法建立了10個worker協程, 至關於最大併發數爲10. 之因此只建立10個協程, 一方面是由於爬取的數量很少, 不必爬太狠, 一方面是建立過多的tcp鏈接, 也會形成較大的開銷(雖然鏈接池有tcp複用機制, 可是多個協程同時在爬, 每一個協程都須要一個tcp鏈接). 若是要限制協程的併發量, 除了限制建立協程的數量以外, 還可使用信號量的方式, 這部分能夠在asyncio的高級api中找到, 用法比較簡單. 
  • 而後, 這裏worker從self.urls中取值的方式是直接取, 這裏不須要擔憂線程安全問題, 由於協程只會在指定的位置, 好比await時切換到其它協程, 也就是說, 從self.urls中取值這一操做是原子性的.

2. consumer代碼解析

  consumer類的代碼與producer類似, 自己也不算複雜, 主要變化就是把fetch換成download, 將請求的數據直接寫入本地的圖片文件. 值得講講的就是這裏中止消費者的機制:

  • 因爲本次只爬取100張圖片, 因此爬取結束以後, 消費者應該中止. 這裏中止消費者的機制是, 生產者結束後, 往隊列中放 "all tasks are done" 這一語句來通知消費者, 消費者的某個worker收到這一消息後, 就中止, 並將這一消息再放入隊列來通知其它worker, 從而讓全部worker都中止. 

  若是要中止消費者, 另一個方式是使用asyncio.Queue自帶的join方法. 首先, 將 "all tasks are done" 這一機制刪除, 而後, 將start和worker方法的代碼修改以下:

async def start(self):
    workers = [asyncio.Task(self.worker()) for i in range(10)]
    await self.q.join()
    for worker in workers:
        worker.cancel()

async def worker(self):
    while 1:
        url = await self.q.get()
        await self.download(url)
        self.q.task_done()

  asyncio.Queue內部帶有一個future對象和一個_unfinished_tasks計數, 當往隊列添加數據時, _unfinished_tasks+1, 調用task_done方法時, _unfinished_tasks-1, 而且若是此時_unfinished_task爲0, 就調用future對象的set_result方法.

  這種中止機制的關鍵就在於隊列: 調用隊列的join方法會返回隊列的future對象, 所以, await self.q.join()這句話實際是在等待future對象結束. start方法首先用asyncio.Task對worker進行封裝, 使其開始運行. worker每從隊列中取出一個url並下載圖片, 就調用一次task_done. 這樣隊列爲空時, 隊列的future對象就結束了, 因而start協程再反過來將worker協程取消.

  這種方法的問題是, 首先, 這裏的生產者和消費者是同時運行的, 當生產者沒有產生數據以前, 隊列爲空, 此時消費者就會直接結束. 而後, 若是生產者效率比消費者低不少, 在生產中途出現了隊列爲空的狀況, 此時消費者也可能提早結束, 所以這種方法更適用於消費者消費隊列中已有數據的場景, 在這裏不適用.

在使用爬蟲獲取網站內容以前, 應該先查看這個網站的robots協議, 該協議放在root_url/robots.txt, 好比https://xkcd.com/的robots協議能夠在https://xkcd.com/robots.txt看到. 這個協議規定了容許的爬蟲和能夠爬取的目錄.
好比:
User-agent: *
Disallow: /personal/
表示任何爬蟲均可以爬, 可是不能爬取/personal/目錄下的內容.
補充說明
相關文章
相關標籤/搜索