從零開始學asyncio(中)

  本篇文章主要是講解asyncio模塊的實現原理. 這個系列還有另外兩篇文章:html

一. asyncio模塊簡介

  asyncio是python3.4開始內置的一個標準庫, 能夠用於編寫異步的併發代碼, 所以很是適合用在IO密集型操做.python

  如今運行以下代碼:服務器

import asyncio
import time


async def task(i):
    print('task{} start at {}'.format(i, time.ctime()))
    # asyncio.sleep的效果與time.sleep相似, 讓程序睡眠n秒
    await asyncio.sleep(3)
    print('task{} end at {}'.format(i, time.ctime()))


tasks = asyncio.wait([task(i) for i in range(3)])
asyncio.run(tasks)

運行結果以下:併發

三個任務實際是處於同一線程的, 但它們的執行順序不是start->end->start->end這種串行模式, 而是幾乎同時開始, 同時結束, asyncio模塊的做用就是, 使用異步的方式實現單線程併發的效果. 最簡單的使用步驟以下:app

  • 首先, 在定義函數的時候使用關鍵字async, 這個函數就不是個普通函數了, 調用的時候不會執行內部代碼, 而是返回一個coroutine對象, 即協程, 這一點與生成器函數相似.
  • 而後, 在協程函數中的耗時操做前面加上await關鍵字, 注意await後面必須是可等待對象, 好比asyncio.sleep(n), 可等待對象在本文的第二節有詳細的講解.
  • 最後, 調用asyncio.wait將協程列表打包, 打包結果給asyncio.run運行便可.

二. asyncio實現原理

  要理解asyncio的原理, 須要理解以下幾個概念: 協程, 事件循環, future/task. 其中協程就是用戶本身定義的任務, 事件循環負責監聽事件和回調, future/task則主要負責管理回調, 以及驅動協程.框架

1. 事件循環

  事件循環負責同時對多個事件進行監聽, 當監聽到事件時, 就調用對應的回調函數, 進而驅動不一樣的任務. 上一節代碼最後的asyncio.run, 其本質就是建立一個事件循環, 而後一直運行事件循環, 直到全部任務結束爲止. 異步

  首先看看上篇文章最後的爬蟲代碼:socket

import select
import socket
import time


req = 'GET / HTTP/1.0\r\nHost:cn.bing.com\r\n\r\n'.encode('utf8')
address = ('cn.bing.com', 80)
db = []


class GenCrawler:

    '''
    這裏使用一個類將生成器封裝起來,若是要驅動生成器,就調用next_step方法
    另外,這個類還能夠獲取到使用的socket對象
    '''

    def __init__(self):
        self.sock = socket.socket()
        self.sock.setblocking(0)
        self._gen = self._crawler()

    def next_step(self):
        next(self._gen)

    def _crawler(self):
        self.sock.connect_ex(address)
        yield
        self.sock.send(req)
        response = b''
        while 1:
            yield
            chunk = self.sock.recv(1024)
            if chunk == b'':
                self.sock.close()
                break
            else:
                response += chunk
        db.append(response)


def event_loop(crawlers):
    # 首先,創建sock與crawler對象的映射關係,便於由socket對象找到對應的crawler對象
    # 創建映射的同時順便調用crawler的next_step方法,讓內部的生成器運行起來
    sock_to_crawler = {}
    for crawler in crawlers:
        sock_to_crawler[crawler.sock] = crawler
        crawler.next_step()

    # select.select須要傳入三個列表,分別對應要監聽的可讀,可寫和錯誤事件的socket對象集合
    readable = []
    writeable = [crawler.sock for crawler in crawlers]
    errors = []
    while 1:
        rs, ws, es = select.select(readable, writeable, errors)
        for sock in ws:
            # 當socket對象鏈接到服務器時,會建立可讀緩衝區和可寫緩衝區
            # 因爲可寫緩衝區建立時爲空,所以鏈接成功時,就觸發可寫事件
            # 這時再轉爲監聽可讀事件,接收到數據時,就能夠觸發可讀事件了
            writeable.remove(sock)
            readable.append(sock)
            sock_to_crawler[sock].next_step()
        for sock in rs:
            try:
                sock_to_crawler[sock].next_step()
            except StopIteration:
                # 若是生成器結束了,就說明對應的爬蟲任務已經結束,不須要監聽事件了
                readable.remove(sock)
        # 全部的事件都結束後,就退出循環
        if not readable and not writeable:
            break


if __name__ == '__main__':
    start = time.time()
    n = 10
    print('開始爬取...')
    event_loop([GenCrawler() for _ in range(n)])
    print('獲取到{}條數據,用時{:.2f}秒'.format(len(db), time.time()-start))
View Code

這段代碼使用IO多路複用對多個socket進行監聽, 監聽到事件時, 驅動對應的生成器運行, 運行到IO操做時, 再使用yield切換回事件循環, 從而實現併發的效果, 這個也就是asyncio中事件循環的工做原理.async

  因爲asyncio中的事件循環使用的是selectors模塊而非select, 如今在程序的代碼中改用selectors模塊:ide

import socket
import time
from selectors import DefaultSelector, EVENT_READ, EVENT_WRITE


req = 'GET / HTTP/1.0\r\nHost:cn.bing.com\r\n\r\n'.encode('utf8')
address = ('cn.bing.com', 80)
db = []


class EventLoop:

    def __init__(self):
        self.selector = DefaultSelector()
        self._stopped = False

    def register(self, fd, event, callback):
        self.selector.register(fd, event, callback)

    def unregister(self, fd):
        self.selector.unregister(fd)        
    
    def run_until_complete(self,gens):
        for gen in gens:
            next(gen)
        while not self._stopped:
            try:
                events = self.selector.select()
            except OSError:
                # 若是當前沒有註冊事件, 就會引起OSError異常
                continue
            for key, mask in events:
                # 這裏的callback就是註冊事件時傳入的回調函數
                callback = key.data
                callback(key=key, mask=mask)
            
            # 生成器的gi_frame屬性對應的是其框架(其實這屬性我還沒搞懂)
            # 在生成器結束(拋出stopiteration異常)後,這個屬性值就會變成None
            # 所以,每次循環時都刪減已經結束的生成器
            # 若是全部的生成器都結束了,就中止循環
            gens = [gen for gen in gens if gen.gi_frame is not None]
            if not gens:
                self.stop()

    def stop(self):
        self._stopped = True
        self.selector.close()


loop = EventLoop()


class GenCrawler:

    def __init__(self):
        self.sock = socket.socket()
        self.sock.setblocking(0)
        self._fd = self.sock.fileno()
        self.gen = self._crawler()

    def _crawler(self):
        self.sock.connect_ex(address)
        loop.register(self._fd, EVENT_WRITE, self.next_step)
        yield
        loop.unregister(self._fd)
        self.sock.send(req)
        response = b''
        while 1:
            loop.register(self._fd, EVENT_READ, self.next_step)
            yield
            loop.unregister(self._fd)
            chunk = self.sock.recv(1024)
            if chunk == b'':
                self.sock.close()
                break
            else:
                response += chunk
        db.append(response)def next_step(self,**kwargs):
        try:
            next(self.gen)
        except StopIteration:
            return


if __name__ == '__main__':
    start = time.time()
    print('開始爬取...')
    n = 10
    gens = [GenCrawler().gen for _ in range(n)]
    loop.run_until_complete(gens)
    print('獲取到{}條數據,用時{:.2f}秒'.format(len(db), time.time()-start))

這裏主要是改了EventLoop部分的代碼, 使用register和unregister方法來註冊和註銷事件, 優勢是更加靈活, 能夠指定觸發事件時調用的回調函數. 另外, DefaultSelector會自動選擇系統中效率最高的多路複用機制, 好比kqueue和epoll.

2. async與協程

  在定義函數的時候, 在def以前加上async, 這個函數就不是普通函數了, 而是一個協程函數:

async def coro():
    print('this is a coroutine')

直接調用協程函數並不能使之運行, 而是返回了一個協程對象, 若是要運行該協程, 能夠調用這個協程對象的send方法:

c=coro()
c.send(None)

運行結果以下, 首先會運行協程函數內部的代碼, 而後函數的代碼運行結束, 拋出一個StopIteration異常:

  所以, 協程函數與生成器函數是很是類似的. 可是, 協程不是可迭代對象, 所以沒法使用next函數, 只能調用其自身的send方法來驅動.

從python3.6開始, 協程函數中可使用yield語句, 此時調用這個函數, 就會返回一個async_generator對象, 即異步生成器.
不過這東西我還沒用過, 先挖個坑, 須要的能夠看PEP525.
補充說明

3. await和awaitable

  在第一節中講到, 協程中可使用await語句, 後接awaitable對象, 便可等待對象. 如下幾類都是可等待對象:

  • 一個協程, 這個上一小節剛講.
  • 一個有__await__方法, 而且該方法返回一個迭代器的對象, 常見狀況是這個對象的__await__方法就是個生成器函數.
  • 使用@types.coroutine裝飾的生成器函數, 其中types是python內置的一個庫, 這個裝飾器的實現原理是返回一個定義了__await__方法的對象.
  • Objects defined with CPython C API with a tp_as_async.am_await function, returning an iterator (similar to __await__ method). 這一條是從官網抄的, 我沒理解, 應該和定義__await__相似吧.

如今定義一個可等待對象並測試:

class AwaitableObj:

    def __await__(self):
        v = yield '來自可等待對象的yield'
        print('可等待對象得到的值:', v)
        return '來自可等待對象的return'


async def coro():
    v = await AwaitableObj()
    print('協程得到的值:', v)


if __name__ == '__main__':
    c = coro()
    v = c.send(None)
    print('外部得到的值:', v)
    try:
        c.send('來自外部')
    except StopIteration:
        pass

這段程序有三個部分: 可等待對象, 協程和外部. 協程中使用await語句來等待可等待對象, 而外部調用send方法來驅動協程.

程序的運行結果以下:

await至關於外部與可等待對象之間的橋樑, 可等待對象中__await__方法返回的生成器, 其yield返回的值會傳到外部, 而外部使用send方法傳的值也會傳給可等待對象的生成器. 最後__await__生成器迭代結束後, 協程得到其返回值.

  這裏須要說明一點: await語句自己並不能暫停和切換協程, 它只是阻塞協程直到後面接的可等待對象的__await__方法返回的可迭代對象運行完. 若是__await__裏面有yield, 返回一個生成器, 協程纔會由於這個yield語句暫停和切換.

4. future/task

  future是asyncio模塊中的一個可等待對象, 調用asyncio.get_event_loop獲取到當前線程的事件循環loop, 而後調用loop.create_future, 就能夠獲得一個future對象. future的主要代碼以下(有改動):

class Future:

    def __init__(self):
        self._callbacks = []
        self.result = None

    def add_callback(self, callback):
        self._callbacks.append(callback)

    def set_result(self, result):
        self.result = result
        for callback in self._callbacks:
            callback(self)
    
    def __await__(self):
        yield self
        return self.result

  future能夠理解爲協程的一次暫停. 首先, 若是一個協程須要在某處暫停, 就能夠實例化一個future對象而且await這個對象, 這樣就會運行future對象的__await__方法, 當運行到yield self這句話時, 協程暫停, 直到外部再使用send方法驅動協程爲止. 而後, future的另外一特性是能夠設置回調函數, 調用它的add_callback方法就行. 最後, future還有set_result這個接口, 一方面會運行future的回調函數, 另外一方面能夠設置其result屬性的值, 該值在__awaiit__方法結束以後返回給協程. 通常的用法是, 協程在事件循環中註冊事件, 而後讓事件循環來調用future對象的set_result方法.

  有了暫停, 天然也須要有驅動, task對象負責對協程進行封裝和驅動. 調用asyncio.create_task並傳入協程對象, 就能夠獲得一個task對象. task的主要代碼以下(有改動):

class Task(Future):

    def __init__(self, coro):
        super().__init__()
        self.coro = coro
        f = Future()
        f.set_result(None)
        self.step(f)

    def step(self, future):
        try:
            next_future = self.coro.send(None)
        except StopIteration:
            self.set_result(future.result)
            return
        next_future.add_callback(self.step)

  task和future應該是搭配使用的. 首先, task.step是負責對協程進行驅動的, 因爲future.__await__方法會yield self, 所以每次驅動都會得到目前暫停點對應的future對象. 這時候將本身的step方法添加到future對象的回調中, 等到future對象調用set_result方法時, 就會回調到task.step方法, 從而驅動協程繼續運行. 所以能夠認爲, future對象就是協程的一次暫停, 而調用其set_result方法就意味着此次暫停結束了, 可是這個過程須要task的協助.

  task類是繼承future類的, 這其實比較好理解, 好比一個簡單的爬蟲任務, 在鏈接服務器和接受數據等IO操做時須要使用future暫停, 並能夠設置回調, 表示暫停結束的時候應該作什麼. 而這個爬蟲任務至關於一個大的IO操做, 所以也應該有能夠設置回調以及能夠await的特性. 當一個協程驅動結束, 即拋出StopIteration異常的時候, 就意味着這個task結束了, 所以此時就調用task.set_result方法, 把最後一個future對象的結果設置爲task.result.

5. 爬蟲代碼重構

  把上面講的async/await, future/task等內容添加到以前的爬蟲實例中, 最終代碼以下:

import socket
import time
from selectors import DefaultSelector, EVENT_READ, EVENT_WRITE


req = 'GET / HTTP/1.0\r\nHost:cn.bing.com\r\n\r\n'.encode('utf8')
address = ('cn.bing.com', 80)
db = []


class EventLoop:

    def __init__(self):
        self.selector = DefaultSelector()
        self._stopped = False

    def register(self, fd, event, callback):
        self.selector.register(fd, event, callback)

    def unregister(self, fd):
        self.selector.unregister(fd)

    def run_until_complete(self, coros):
        def _done_callback(fut):
            nonlocal ncoros
            ncoros -= 1
            if ncoros == 0:
                self.stop()
        ncoros = len(coros)
        for coro in coros:
            task = Task(coro)
            task.add_callback(_done_callback)

        while not self._stopped:
            try:
                events = self.selector.select()
            except OSError:
                # 若是當前沒有註冊事件, 就會引起OSError異常
                continue
            for key, mask in events:
                # 這裏的callback就是註冊事件時傳入的回調函數
                callback = key.data
                callback(key=key, mask=mask)

    def stop(self):
        self._stopped = True
        self.selector.close()


loop = EventLoop()


class Future:

    def __init__(self):
        self._callbacks = []
        self.result = None

    def add_callback(self, callback):
        self._callbacks.append(callback)

    def set_result(self, result):
        self.result = result
        for callback in self._callbacks:
            callback(self)

    def __await__(self):
        yield self
        return self.result


class Task(Future):

    def __init__(self, coro):
        super().__init__()
        self.coro = coro
        f = Future()
        f.set_result(None)
        self.step(f)

    def step(self, future):
        try:
            next_future = self.coro.send(None)
        except StopIteration:
            self.set_result(future.result)
            return
        next_future.add_callback(self.step)


class CoroCrawler:

    def __init__(self):
        self.sock = socket.socket()
        self.sock.setblocking(0)
        self._fd = self.sock.fileno()
        self.coro = self._crawler()

    async def _crawler(self):
        await self.connect()
        self.sock.send(req)
        response = await self.read_all()
        db.append(response)

    async def connect(self):
        self.sock.connect_ex(address)

        f = Future()

        def on_connect(key, mask):
            f.set_result(None)
        loop.register(self.sock.fileno(), EVENT_WRITE, on_connect)
        await f
        loop.unregister(self.sock.fileno())

    async def read_all(self):
        response = b''
        while 1:
            chunk = await self.read()
            if chunk == b'':
                self.sock.close()
                break
            response += chunk
        return response

    async def read(self):
        f = Future()

        def on_readable(key, mask):
            chunk = self.sock.recv(1024)
            f.set_result(chunk)
        loop.register(self._fd, EVENT_READ, on_readable)
        chunk = await f
        loop.unregister(self._fd)
        return chunk


if __name__ == '__main__':
    start = time.time()
    print('開始爬取...')
    n = 10
    coros = [CoroCrawler().coro for _ in range(n)]
    loop.run_until_complete(coros)
    print('獲取到{}條數據,用時{:.2f}秒'.format(len(db), time.time()-start))

  這段代碼並不算複雜, 惟一須要留意的就是事件循環中的run_until_complete方法, 這個方法再也不是主動去檢查任務是否結束, 而是將協程包裝成task對象, 而後給task對象添加了回調函數, 來在協程所有結束時, 中止事件循環. 這也就是用task包裝協程的一個方便的地方: 能夠在協程結束的時候運行指定的回調.

  整個代碼的實現流程以下, 這也就是用asyncio運行一個協程的流程.

三. 總結

  • asyncio模塊是基於async/await實現的一個異步庫, 用法的話, 簡單來講就是首先定義好協程, 而後把協程打包扔給asyncio模塊, 最後啓動事件循環, 就實現異步了.
  • asyncio底層使用事件循環, 其本質是系統的IO多路複用機制, 經過這種機制來同時監聽多個對象, 在觸發事件時調用對應的回調函數, 從而實現異步和併發的效果.
  • Future表示協程單個斷點的運行狀態, Task繼承自Future, 表示整個協程的運行狀態, 兩者均可以設置回調函數, 在結束的時候調用回調.
  • Task是對協程對象的封裝和管理, 負責驅動協程, Future則直接對接loop循環, 接收回調函數的結果並返回.
  • 要定義協程, 首先要使用async定義函數, 而後若是有耗時操做, 在耗時操做前面加上await. 不過, 對應的耗時操做必須是awaitable的對象.
相關文章
相關標籤/搜索