本篇文章主要是對asyncio和相關內容的補充, 以及一個異步爬蟲實例. 這個系列還有另外兩篇文章:html
上一篇文章已經講到, 使用asyncio模塊的基本套路是, 把要執行的代碼寫成協程函數的形式, 在函數內部IO操做的部分使用await掛起任務. 最後將協程給asyncio運行便可. python
假設如今須要使用requests庫請求數據:git
import requests def fetch(url): response = requests.get(url) # 如下省略一萬字
顯然, 對一個url的get請求屬於耗時的IO操做, 可是requests是個同步庫, 無法await, 若是直接以同步的方式運行, 那麼又不能發揮出異步的效率優點.github
這種必須使用同步代碼的狀況下, 能夠這樣作:web
import asyncio import requests async def fetch(url): loop = asyncio.get_running_loop() asyncIO = loop.run_in_executor(None, requests.get, url) response = await asyncIO # 繼續省略一萬字
loop就是當前線程運行的事件循環, 調用它的run_in_execcutor方法, 能夠將一個同步代碼的函數封裝爲future對象, 而後就能夠對其await了.api
這個方法接受三個參數: executor, func和*args, 其中executor爲線程池或者進程池(concurrent.futures.ThreadPoolExecutor&ProcessPoolExecutor), 若是傳入None則使用默認的線程池. func和*args就是同步函數以及對應的參數. 這個方式的本質是將同步代碼放在其它的線程或者進程運行, 來避免其阻塞主線程, 故只是權宜之計.安全
目前已經有愈來愈多支持asyncio的異步庫, 好比上面說的requests庫, 就能夠用aiohttp來替代. 在https://github.com/timofurrer/awesome-asyncio能夠找到支持用asyncio異步的庫.網絡
async除了用於定義協程函數外, 另兩種用法是async with和async for, 這兩種語句和await同樣有異步屬性, 必須在協程函數內使用.session
async with和with相似, async with的對象應該實現__aenter__和__aexit__方法, 而且這兩個方法是異步的.併發
一個簡單的async with實例以下:
import asyncio class Lock: def __init__(self): self._locked = False async def __aenter__(self): while self._locked: await asyncio.sleep(0) self._locked = True return self async def __aexit__(self, exc_type, exc, tb): self._locked = False lock = Lock() async def coro(): async with lock: pass asyncio.run(coro())
這裏實現了一個簡單的協程鎖, 其中__aenter__和__aexit__方法分別用於請求和釋放鎖.
async for也和for相似, 後面接的迭代對象須要實現__aiter__和__anext__兩個方法, 其中__anext__方法是異步的.
一個簡單的async for實例以下:
import asyncio class Xrange: def __init__(self, lower, upper): self._num = lower-1 self.upper = upper def __aiter__(self): return self async def __anext__(self): self._num += 1 if self._num < self.upper: return self._num raise StopAsyncIteration async def coro(): async for i in Xrange(1, 10): print(i) asyncio.run(coro())
須要注意的是, 要在__anext__方法中迭代結束的位置引起StopAsyncIteration, 不然這個對象就會永遠迭代下去.
coro.throw(type, value=None, traceback=None)
class Awaitable: def __await__(self): yield async def coro(): while 1: aw = Awaitable() await aw c = coro() # 若是不預先激活協程就直接調用throw,那麼throw方法就會在async def coro這一行引起異常 c.send(None) c.throw(Exception, 'haha')
運行結果以下:
若是一個協程沒有被激活, 那麼調用throw, 就會直接在定義協程的那個位置拋出異常, 不然, 協程卡在哪一個yield, 就在那個位置拋出異常. 固然, 協程也能夠在內部捕獲這個異常.
除此以外, 其實throw和send是類似的, 兩者都會得到yield返回的值. 而且, throw和send同樣會驅動協程, 前提是協程能捕獲到這個異常而不退出.
close函數用於關閉協程, 使之徹底中止. 這個函數實際上是使用了throw函數, 僞碼以下:
def close(self): try: self.throw(GeneratorExit) except (GeneratorExit, StopIteration): pass else: raise RuntimeError("generator ignored GeneratorExit")
其中GeneratorExit繼承自BaseException, 也是異常的一種. 所以, close方法其實就是經過往協程內部扔異常的方式讓協程中止, 若是這個協程執意要捕獲異常而不中止的話, close方法就會拋出RuntimeError.
上一篇文章講過, future用在協程內部, 主要特性就是暫停和回調:
task繼承自future, 性質和future差很少, 另外還負責封裝和驅動協程.
鑑別一個對象是否是task&future, 能夠調用asyncio.isfuture, 這個函數會返回一個布爾值. 相應的, 調用asyncio.iscoroutine和asyncio.iscoroutinefunction就能夠判斷協程對象和協程函數.
若是要建立task或者future對象, 能夠調用如下函數;
# 要把協程封裝爲task對象,可使用如下兩個函數: # 這個函數必須在有事件循環運行時調用,否則報錯 # 通常來講,在協程中調用就行 asyncio.create_task(coro) # 這個函數不光能夠接收協程對象,只要是可等待對象均可以 # 若是沒有事件循環, 它就會調用get_event_loop獲取 asyncio.ensure_future(aw) # 或者在獲取到當前事件循環的前提下 loop.create_task() # 若是要建立future對象,可使用如下方法: asyncio.Future() # 或者在獲取到當前事件循環的前提下 loop.create_future()
task&future有pending, 和done兩種狀態, 其中done又能夠再分爲finished和cancelled. 簡單來講, 還能運行就是pending, 正常結束就是finished. 被取消了就是cancelled, 調用cancel方法就能夠取消一個task&future對象.
如今使用以下代碼取消一個future對象:
import asyncio loop = asyncio.get_event_loop() async def coro(): fut = loop.create_future() def cb(fut): print('這是一個回調函數') fut.add_done_callback(cb) # 在一秒以後取消fut loop.call_later(1, fut.cancel) try: await fut except Exception as e: print('異常:', type(e)) print('future對象:', fut) loop.run_until_complete(coro())
運行結果以下:
首先, cancel方法會處理全部的回調函數, 這一點和set_result方法是同樣的. 在這以後, future.__await__方法內部會引起CancelledError, 若是協程不捕獲的話, 這整個協程就中止了. 最後能夠看到, fut對象的狀態變爲cancelled. 簡單點說, 就是首先處理回調函數, 而後往協程裏面throw一個CancelledError使之結束.
task的cancel方法與future相似, 首先處理回調函數, 而後調用future.cancel來取消協程.
上一篇中講過了, 事件循環至關於一個調度者, 對多個task進行管理, 當觸發到事件時, 就驅動對應的task運行.
使用如下函數獲取或者設置事件循環:
# 若是沒有正在運行的事件循環, 下面這個函數會報錯 # 所以, 只能在協程或者相關的回調中調用這個函數 asyncio.get_running_loop() # 獲取事件循環, 沒有就會新建一個 asyncio.get_event_loop() # 新建一個事件循環 asyncio.new_event_loop() # 將loop綁定到當前的線程中 asyncio.set_event_loop(loop)
事件循環對象綁定了不少方法, 主要是運行回調, 網絡通訊等方面的, 能夠看https://docs.python.org/zh-cn/3/library/asyncio-eventloop.html.
不管是future仍是task, 都須要依賴事件循環來調度. 顯然, 在一個線程只須要一個事件循環, 不然會形成程序混亂.
如今運行下面這段程序:
import asyncio loop = asyncio.get_event_loop() async def coro(): fut = loop.create_future() loop.call_later(1, fut.set_result, None) await fut print('the end') asyncio.run(coro())
運行結果以下:
這是由於, asyncio.run會建立一個事件循環A來運行傳入的協程對象, 可是, 這個協程自身又經過事件循環B來建立了一個future對象, 那麼這個future對象就綁定到事件循環B上去了, 這就致使程序的混亂和錯誤.
若是要避免這種狀況, 能夠採起如下兩種措施;
以前講過, 若是要同時運行多個協程, 可使用asyncio.wait將其打包, 其實asyncio中還有一個效果相似的函數, 即asyncio.gather.
import asynciodef gather(*coros): def _done_callback(fut): nonlocal nfinished nfinished += 1 if nfinished == nfuts: results = [] for fut in children: res = fut.result() results.append(res) outer.set_result(results) nfuts = 0 nfinished = 0 children = [] for c in coros: fut = asyncio.create_task(c) fut.add_done_callback(_done_callback) children.append(fut) nfuts = len(children) outer = _GatherFuture(children) return outer class _GatherFuture(asyncio.Future): ''' 這個future類是gather函數的輔助 若是調用它的cancel方法, 就會把全部的子協程都取消 ''' def __init__(self, children): super().__init__() self._children = children def cancel(self): if self.done(): return False ret = False for child in self._children: if child.cancel(): ret = True return ret async def wait(fs): fs = {asyncio.create_task(f) for f in set(fs)} loop = asyncio.get_running_loop() waiter = loop.create_future() counter = len(fs) def _on_completion(f): nonlocal counter counter -= 1 if counter <= 0: waiter.set_result(None) for f in fs: f.add_done_callback(_on_completion) await waiter done, pending = set(), set() for f in fs: f.remove_done_callback(_on_completion) if f.done(): done.add(f) else: pending.add(f) return done, pending
# coros是一個包含了多個協程的可迭代對象, 好比一個協程列表 asyncio.wait(coros) # 若是使用gather,應該用如下兩種方式傳參 asyncio.gather(*coros) asyncio.gather(coro1,coro2,coro3)
而後, wait是一個協程函數, 所以直接調用會返回一個協程對象, 可使用asyncio.run運行. 可是gather則是返回一個future對象, 所以與asyncio.run不兼容:
# run收到的參數是一個協程,沒毛病 asyncio.run(asyncio.wait(coros)) # run收到一個future對象,所以下面這句話會報錯 asyncio.run(asyncio.gather(*coros)) # 若是要運行gather打包的協程, 能夠用如下兩種方式: loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.gather(*coros)) # 或者 async def main(): res = await asyncio.gather(*coros) asyncio.run(main())
另外一方面, 因爲gather函數返回的是一個future對象, 所以能夠對其調用cancel方法來提早結束, 此時gather內全部的協程都會被取消.
最後, 由上面的代碼可知, 兩者的完成時的返回值有差別, wait會返回done和pending兩個集合, 即已完成和未完成的協程集合, 而gather則是直接把協程的結果放在列表中返回.
async def func(coros): # wait會返回兩個集合, 分別存放coros中已完成和未完成的協程 done_coros, pending_coros = await asyncio.wait(coros) # gather則是直接把協程的結果放在一個列表中返回 results = await asyncio.gather(*coros)
wait和gather函數的附加功能主要體如今其附加的參數上.
wait能夠設置timeout和return_when兩個參數, 從而讓wait函數提早結束, 這時未完成的協程就會被放入pending這個集合中返回. 不過, wait並不會將這些未完成的協程取消掉.
import asyncio import concurrent.futures # return_when只能設置如下三個值, 否則會報錯 concurrent.futures.FIRST_COMPLETED # 在有一個協程結束時返回 concurrent.futures.FIRST_EXCEPTION # 在有一個協程拋出異常時返回 concurrent.futures.ALL_COMPLETE # 在全部協程都結束時返回, 這是默認值 async def main(coros): # timeout表示幾秒後返回, 不設置則不起做用 done, pending = await asyncio.wait(coros, timeout=3, return_when=concurrent.futures.FIRST_COMPLETED)
須要注意的是, 就算將return_when設置爲FIRST_COMPLETED, 若是多個協程的完成時間相近, 那麼這幾個協程可能都會被放到done集合中. FIRST_EXCEPTION同理.
gather能夠設置return_exceptions這個參數, 其默認值爲False, 若是設置爲True, 那麼協程在運行中的錯誤將不會直接拋出, 而是將錯誤信息做爲返回結果, 存入最後返回的結果列表中.
按照python官方文檔的說法, 從python3.8開始, wait函數不會將傳入的協程封裝爲task對象, 也就是說, 在將協程傳入wait函數以前, 應該先調用asyncio.create_task方法將協程轉化爲task對象. 詳細說明能夠看https://docs.python.org/zh-cn/3/library/asyncio-task.html?highlight=asyncio%20wait#asyncio.wait. 可是, 我今天特意安裝了一個python3.8進行測試, 發現並無這回事, 傳一個協程列表給wait, 程序仍是能夠正常運行. 並且wait內部的代碼也沒改, 仍是會調用ensure_future將協程封裝成task對象. 總之, 這裏仍是注意一下吧, 指不定哪天就變了. 今天是2020-1-7.
以https://xkcd.com/這個網站爲例, 這是一個漫畫網站, 每張漫畫對應的頁面是https://xkcd.com/int/, 其中int爲1-2244之間的整數. 如今從網站上下載一百張漫畫, 代碼以下:
import asyncio import os import random import re import time import aiofiles import aiohttp class Producer: def __init__(self, session, queue): self.session = session self.regex = re.compile( '''<div id="comic">\n<img src="(//imgs.xkcd.com/comics/[a-z0-9_]+\.(jpg|png))"''') self.q = queue self.urls = self.url_factory() async def start(self): workers = [self.worker() for i in range(10)] await asyncio.wait(workers) self.q.put_nowait('all tasks are done') async def worker(self): while self.urls: url = self.urls.pop() html = await self.fetch(url) await self.pares_html(html) def url_factory(self, n=100): # 返回一個含有n個url的集合 urls = set() while len(urls) < n: urls.add('https://xkcd.com/{}/'.format(random.randrange(1, 2245))) return urls async def fetch(self, url): async with self.session.get(url) as response: assert response.status == 200 return await response.text() async def pares_html(self, html): res = re.search(self.regex, html) if res is None: return img_url = 'https:'+res.group(1) await self.q.put(img_url) class Consumer: def __init__(self, session, queue): self.session = session self.q = queue self.save_folder = os.path.join(os.path.dirname(__file__), 'webcomics') try: os.mkdir(self.save_folder) except FileExistsError: pass async def start(self): workers = [self.worker() for i in range(10)] await asyncio.wait(workers) async def worker(self): while 1: url = await self.q.get() if url == 'all tasks are done': break await self.download(url) self._on_worker_done() def _on_worker_done(self): self.q.put_nowait('all tasks are done') async def download(self, url): filename = os.path.split(url)[-1] save_path = os.path.join(self.save_folder, filename) if os.path.exists(save_path): return async with self.session.get(url) as response: assert response.status == 200 content = await response.read() async with aiofiles.open(save_path, 'wb') as f: await f.write(content) async def main(): queue = asyncio.Queue(maxsize=100) async with aiohttp.ClientSession() as session: producer = Producer(session, queue) consumer = Consumer(session, queue) tasks = asyncio.gather(producer.start(), consumer.start()) await tasks if __name__ == '__main__': asyncio.run(main())
這段代碼主要分爲兩部分: producer和consumer, 即生產者和消費者. 生產者負責生成圖片下載連接, 消費者則處理這些連接, 將對應的圖片下載到本地.
首先, producer須要兩個參數, session和queue. session是鏈接池, 其特色是結束以後, 對應的鏈接不會斷開, 下次須要同一個目標的鏈接時, 直接使用上次未斷開的鏈接, 也就是實現了tcp鏈接的重複使用. 因爲本次鏈接的目標都是同一個網站, 所以使用鏈接池能夠減小建立和斷開tcp鏈接的開銷(三次握手, 四次揮手). queue則是隊列, 生產者產生圖片下載連接以後, 不直接交給消費者, 而是放在隊列中讓消費者自取. 這樣兩者不是同步的, 效率低的一方不會阻塞到效率高的另外一方.
producer的工做策略是, 首先產生好100個網頁連接存在self.urls這個列表中, 而後讓worker不斷從集合中取出url. worker方法首先調用fetch方法獲取到對應的html, 而後調用parse_html提取html中的圖片連接, 將圖片連接儲存到隊列中.
這段代碼並不算複雜, 須要注意的有三點.
consumer類的代碼與producer類似, 自己也不算複雜, 主要變化就是把fetch換成download, 將請求的數據直接寫入本地的圖片文件. 值得講講的就是這裏中止消費者的機制:
若是要中止消費者, 另一個方式是使用asyncio.Queue自帶的join方法. 首先, 將 "all tasks are done" 這一機制刪除, 而後, 將start和worker方法的代碼修改以下:
async def start(self): workers = [asyncio.Task(self.worker()) for i in range(10)] await self.q.join() for worker in workers: worker.cancel() async def worker(self): while 1: url = await self.q.get() await self.download(url) self.q.task_done()
asyncio.Queue內部帶有一個future對象和一個_unfinished_tasks計數, 當往隊列添加數據時, _unfinished_tasks+1, 調用task_done方法時, _unfinished_tasks-1, 而且若是此時_unfinished_task爲0, 就調用future對象的set_result方法.
這種中止機制的關鍵就在於隊列: 調用隊列的join方法會返回隊列的future對象, 所以, await self.q.join()這句話實際是在等待future對象結束. start方法首先用asyncio.Task對worker進行封裝, 使其開始運行. worker每從隊列中取出一個url並下載圖片, 就調用一次task_done. 這樣隊列爲空時, 隊列的future對象就結束了, 因而start協程再反過來將worker協程取消.
這種方法的問題是, 首先, 這裏的生產者和消費者是同時運行的, 當生產者沒有產生數據以前, 隊列爲空, 此時消費者就會直接結束. 而後, 若是生產者效率比消費者低不少, 在生產中途出現了隊列爲空的狀況, 此時消費者也可能提早結束, 所以這種方法更適用於消費者消費隊列中已有數據的場景, 在這裏不適用.
在使用爬蟲獲取網站內容以前, 應該先查看這個網站的robots協議, 該協議放在root_url/robots.txt, 好比https://xkcd.com/的robots協議能夠在https://xkcd.com/robots.txt看到. 這個協議規定了容許的爬蟲和能夠爬取的目錄. 好比: User-agent: * Disallow: /personal/ 表示任何爬蟲均可以爬, 可是不能爬取/personal/目錄下的內容.