本篇文章主要是講解asyncio模塊的實現原理. 這個系列還有另外兩篇文章:html
asyncio是python3.4開始內置的一個標準庫, 能夠用於編寫異步的併發代碼, 所以很是適合用在IO密集型操做.python
如今運行以下代碼:服務器
import asyncio import time async def task(i): print('task{} start at {}'.format(i, time.ctime())) # asyncio.sleep的效果與time.sleep相似, 讓程序睡眠n秒 await asyncio.sleep(3) print('task{} end at {}'.format(i, time.ctime())) tasks = asyncio.wait([task(i) for i in range(3)]) asyncio.run(tasks)
運行結果以下:併發
三個任務實際是處於同一線程的, 但它們的執行順序不是start->end->start->end這種串行模式, 而是幾乎同時開始, 同時結束, asyncio模塊的做用就是, 使用異步的方式實現單線程併發的效果. 最簡單的使用步驟以下:app
要理解asyncio的原理, 須要理解以下幾個概念: 協程, 事件循環, future/task. 其中協程就是用戶本身定義的任務, 事件循環負責監聽事件和回調, future/task則主要負責管理回調, 以及驅動協程.框架
事件循環負責同時對多個事件進行監聽, 當監聽到事件時, 就調用對應的回調函數, 進而驅動不一樣的任務. 上一節代碼最後的asyncio.run, 其本質就是建立一個事件循環, 而後一直運行事件循環, 直到全部任務結束爲止. 異步
首先看看上篇文章最後的爬蟲代碼:socket
import select import socket import time req = 'GET / HTTP/1.0\r\nHost:cn.bing.com\r\n\r\n'.encode('utf8') address = ('cn.bing.com', 80) db = [] class GenCrawler: ''' 這裏使用一個類將生成器封裝起來,若是要驅動生成器,就調用next_step方法 另外,這個類還能夠獲取到使用的socket對象 ''' def __init__(self): self.sock = socket.socket() self.sock.setblocking(0) self._gen = self._crawler() def next_step(self): next(self._gen) def _crawler(self): self.sock.connect_ex(address) yield self.sock.send(req) response = b'' while 1: yield chunk = self.sock.recv(1024) if chunk == b'': self.sock.close() break else: response += chunk db.append(response) def event_loop(crawlers): # 首先,創建sock與crawler對象的映射關係,便於由socket對象找到對應的crawler對象 # 創建映射的同時順便調用crawler的next_step方法,讓內部的生成器運行起來 sock_to_crawler = {} for crawler in crawlers: sock_to_crawler[crawler.sock] = crawler crawler.next_step() # select.select須要傳入三個列表,分別對應要監聽的可讀,可寫和錯誤事件的socket對象集合 readable = [] writeable = [crawler.sock for crawler in crawlers] errors = [] while 1: rs, ws, es = select.select(readable, writeable, errors) for sock in ws: # 當socket對象鏈接到服務器時,會建立可讀緩衝區和可寫緩衝區 # 因爲可寫緩衝區建立時爲空,所以鏈接成功時,就觸發可寫事件 # 這時再轉爲監聽可讀事件,接收到數據時,就能夠觸發可讀事件了 writeable.remove(sock) readable.append(sock) sock_to_crawler[sock].next_step() for sock in rs: try: sock_to_crawler[sock].next_step() except StopIteration: # 若是生成器結束了,就說明對應的爬蟲任務已經結束,不須要監聽事件了 readable.remove(sock) # 全部的事件都結束後,就退出循環 if not readable and not writeable: break if __name__ == '__main__': start = time.time() n = 10 print('開始爬取...') event_loop([GenCrawler() for _ in range(n)]) print('獲取到{}條數據,用時{:.2f}秒'.format(len(db), time.time()-start))
這段代碼使用IO多路複用對多個socket進行監聽, 監聽到事件時, 驅動對應的生成器運行, 運行到IO操做時, 再使用yield切換回事件循環, 從而實現併發的效果, 這個也就是asyncio中事件循環的工做原理.async
因爲asyncio中的事件循環使用的是selectors模塊而非select, 如今在程序的代碼中改用selectors模塊:ide
import socket import time from selectors import DefaultSelector, EVENT_READ, EVENT_WRITE req = 'GET / HTTP/1.0\r\nHost:cn.bing.com\r\n\r\n'.encode('utf8') address = ('cn.bing.com', 80) db = [] class EventLoop: def __init__(self): self.selector = DefaultSelector() self._stopped = False def register(self, fd, event, callback): self.selector.register(fd, event, callback) def unregister(self, fd): self.selector.unregister(fd) def run_until_complete(self,gens): for gen in gens: next(gen) while not self._stopped: try: events = self.selector.select() except OSError: # 若是當前沒有註冊事件, 就會引起OSError異常 continue for key, mask in events: # 這裏的callback就是註冊事件時傳入的回調函數 callback = key.data callback(key=key, mask=mask) # 生成器的gi_frame屬性對應的是其框架(其實這屬性我還沒搞懂) # 在生成器結束(拋出stopiteration異常)後,這個屬性值就會變成None # 所以,每次循環時都刪減已經結束的生成器 # 若是全部的生成器都結束了,就中止循環 gens = [gen for gen in gens if gen.gi_frame is not None] if not gens: self.stop() def stop(self): self._stopped = True self.selector.close() loop = EventLoop() class GenCrawler: def __init__(self): self.sock = socket.socket() self.sock.setblocking(0) self._fd = self.sock.fileno() self.gen = self._crawler() def _crawler(self): self.sock.connect_ex(address) loop.register(self._fd, EVENT_WRITE, self.next_step) yield loop.unregister(self._fd) self.sock.send(req) response = b'' while 1: loop.register(self._fd, EVENT_READ, self.next_step) yield loop.unregister(self._fd) chunk = self.sock.recv(1024) if chunk == b'': self.sock.close() break else: response += chunk db.append(response)def next_step(self,**kwargs): try: next(self.gen) except StopIteration: return if __name__ == '__main__': start = time.time() print('開始爬取...') n = 10 gens = [GenCrawler().gen for _ in range(n)] loop.run_until_complete(gens) print('獲取到{}條數據,用時{:.2f}秒'.format(len(db), time.time()-start))
這裏主要是改了EventLoop部分的代碼, 使用register和unregister方法來註冊和註銷事件, 優勢是更加靈活, 能夠指定觸發事件時調用的回調函數. 另外, DefaultSelector會自動選擇系統中效率最高的多路複用機制, 好比kqueue和epoll.
在定義函數的時候, 在def以前加上async, 這個函數就不是普通函數了, 而是一個協程函數:
async def coro(): print('this is a coroutine')
直接調用協程函數並不能使之運行, 而是返回了一個協程對象, 若是要運行該協程, 能夠調用這個協程對象的send方法:
c=coro()
c.send(None)
運行結果以下, 首先會運行協程函數內部的代碼, 而後函數的代碼運行結束, 拋出一個StopIteration異常:
所以, 協程函數與生成器函數是很是類似的. 可是, 協程不是可迭代對象, 所以沒法使用next函數, 只能調用其自身的send方法來驅動.
從python3.6開始, 協程函數中可使用yield語句, 此時調用這個函數, 就會返回一個async_generator對象, 即異步生成器.
不過這東西我還沒用過, 先挖個坑, 須要的能夠看PEP525.
在第一節中講到, 協程中可使用await語句, 後接awaitable對象, 便可等待對象. 如下幾類都是可等待對象:
如今定義一個可等待對象並測試:
class AwaitableObj: def __await__(self): v = yield '來自可等待對象的yield' print('可等待對象得到的值:', v) return '來自可等待對象的return' async def coro(): v = await AwaitableObj() print('協程得到的值:', v) if __name__ == '__main__': c = coro() v = c.send(None) print('外部得到的值:', v) try: c.send('來自外部') except StopIteration: pass
這段程序有三個部分: 可等待對象, 協程和外部. 協程中使用await語句來等待可等待對象, 而外部調用send方法來驅動協程.
程序的運行結果以下:
await至關於外部與可等待對象之間的橋樑, 可等待對象中__await__方法返回的生成器, 其yield返回的值會傳到外部, 而外部使用send方法傳的值也會傳給可等待對象的生成器. 最後__await__生成器迭代結束後, 協程得到其返回值.
這裏須要說明一點: await語句自己並不能暫停和切換協程, 它只是阻塞協程直到後面接的可等待對象的__await__方法返回的可迭代對象運行完. 若是__await__裏面有yield, 返回一個生成器, 協程纔會由於這個yield語句暫停和切換.
future是asyncio模塊中的一個可等待對象, 調用asyncio.get_event_loop獲取到當前線程的事件循環loop, 而後調用loop.create_future, 就能夠獲得一個future對象. future的主要代碼以下(有改動):
class Future: def __init__(self): self._callbacks = [] self.result = None def add_callback(self, callback): self._callbacks.append(callback) def set_result(self, result): self.result = result for callback in self._callbacks: callback(self) def __await__(self): yield self return self.result
future能夠理解爲協程的一次暫停. 首先, 若是一個協程須要在某處暫停, 就能夠實例化一個future對象而且await這個對象, 這樣就會運行future對象的__await__方法, 當運行到yield self這句話時, 協程暫停, 直到外部再使用send方法驅動協程爲止. 而後, future的另外一特性是能夠設置回調函數, 調用它的add_callback方法就行. 最後, future還有set_result這個接口, 一方面會運行future的回調函數, 另外一方面能夠設置其result屬性的值, 該值在__awaiit__方法結束以後返回給協程. 通常的用法是, 協程在事件循環中註冊事件, 而後讓事件循環來調用future對象的set_result方法.
有了暫停, 天然也須要有驅動, task對象負責對協程進行封裝和驅動. 調用asyncio.create_task並傳入協程對象, 就能夠獲得一個task對象. task的主要代碼以下(有改動):
class Task(Future): def __init__(self, coro): super().__init__() self.coro = coro f = Future() f.set_result(None) self.step(f) def step(self, future): try: next_future = self.coro.send(None) except StopIteration: self.set_result(future.result) return next_future.add_callback(self.step)
task和future應該是搭配使用的. 首先, task.step是負責對協程進行驅動的, 因爲future.__await__方法會yield self, 所以每次驅動都會得到目前暫停點對應的future對象. 這時候將本身的step方法添加到future對象的回調中, 等到future對象調用set_result方法時, 就會回調到task.step方法, 從而驅動協程繼續運行. 所以能夠認爲, future對象就是協程的一次暫停, 而調用其set_result方法就意味着此次暫停結束了, 可是這個過程須要task的協助.
task類是繼承future類的, 這其實比較好理解, 好比一個簡單的爬蟲任務, 在鏈接服務器和接受數據等IO操做時須要使用future暫停, 並能夠設置回調, 表示暫停結束的時候應該作什麼. 而這個爬蟲任務至關於一個大的IO操做, 所以也應該有能夠設置回調以及能夠await的特性. 當一個協程驅動結束, 即拋出StopIteration異常的時候, 就意味着這個task結束了, 所以此時就調用task.set_result方法, 把最後一個future對象的結果設置爲task.result.
把上面講的async/await, future/task等內容添加到以前的爬蟲實例中, 最終代碼以下:
import socket import time from selectors import DefaultSelector, EVENT_READ, EVENT_WRITE req = 'GET / HTTP/1.0\r\nHost:cn.bing.com\r\n\r\n'.encode('utf8') address = ('cn.bing.com', 80) db = [] class EventLoop: def __init__(self): self.selector = DefaultSelector() self._stopped = False def register(self, fd, event, callback): self.selector.register(fd, event, callback) def unregister(self, fd): self.selector.unregister(fd) def run_until_complete(self, coros): def _done_callback(fut): nonlocal ncoros ncoros -= 1 if ncoros == 0: self.stop() ncoros = len(coros) for coro in coros: task = Task(coro) task.add_callback(_done_callback) while not self._stopped: try: events = self.selector.select() except OSError: # 若是當前沒有註冊事件, 就會引起OSError異常 continue for key, mask in events: # 這裏的callback就是註冊事件時傳入的回調函數 callback = key.data callback(key=key, mask=mask) def stop(self): self._stopped = True self.selector.close() loop = EventLoop() class Future: def __init__(self): self._callbacks = [] self.result = None def add_callback(self, callback): self._callbacks.append(callback) def set_result(self, result): self.result = result for callback in self._callbacks: callback(self) def __await__(self): yield self return self.result class Task(Future): def __init__(self, coro): super().__init__() self.coro = coro f = Future() f.set_result(None) self.step(f) def step(self, future): try: next_future = self.coro.send(None) except StopIteration: self.set_result(future.result) return next_future.add_callback(self.step) class CoroCrawler: def __init__(self): self.sock = socket.socket() self.sock.setblocking(0) self._fd = self.sock.fileno() self.coro = self._crawler() async def _crawler(self): await self.connect() self.sock.send(req) response = await self.read_all() db.append(response) async def connect(self): self.sock.connect_ex(address) f = Future() def on_connect(key, mask): f.set_result(None) loop.register(self.sock.fileno(), EVENT_WRITE, on_connect) await f loop.unregister(self.sock.fileno()) async def read_all(self): response = b'' while 1: chunk = await self.read() if chunk == b'': self.sock.close() break response += chunk return response async def read(self): f = Future() def on_readable(key, mask): chunk = self.sock.recv(1024) f.set_result(chunk) loop.register(self._fd, EVENT_READ, on_readable) chunk = await f loop.unregister(self._fd) return chunk if __name__ == '__main__': start = time.time() print('開始爬取...') n = 10 coros = [CoroCrawler().coro for _ in range(n)] loop.run_until_complete(coros) print('獲取到{}條數據,用時{:.2f}秒'.format(len(db), time.time()-start))
這段代碼並不算複雜, 惟一須要留意的就是事件循環中的run_until_complete方法, 這個方法再也不是主動去檢查任務是否結束, 而是將協程包裝成task對象, 而後給task對象添加了回調函數, 來在協程所有結束時, 中止事件循環. 這也就是用task包裝協程的一個方便的地方: 能夠在協程結束的時候運行指定的回調.
整個代碼的實現流程以下, 這也就是用asyncio運行一個協程的流程.