對於學習異步的出發點,是寫爬蟲。從簡單爬蟲到學會了使用多線程爬蟲以後,在翻看別人的博客文章時偶爾會看到異步這一說法。而對於異步的瞭解實在困擾了我很久很久,看了N遍廖雪峯python3協程和異步的文章,一直都是隻知其一;不知其二,也學不會怎麼使用異步來寫爬蟲。因而翻看了其餘關於異步的文章,才慢慢了解python的異步機制並學會使用,可是沒看到有特別全面的文章,因此在參考別人的文章基礎上,加上了本身的理解,寫了出來,也算是本身的一個小總結。html
生成器的產生其實比較容易理解,例如當咱們要建立了0到1000000這樣一個很大的列表但同時咱們只須要取出部分數據,這樣的須要並很多見,而顯然這種作法浪費了大量的內存空間。而生成器的做用就是爲了解決上述的問題,利用生成器咱們只須要可以保持一個整數的內存便可遍歷數組列表。生成器的使用是經過yield實現,看下面代碼樣例。python
def l_range(num): index = 0 while index < num: yield index # (1) index += 1 l = l_range(5) print(next(l)) #0 print(next(l)) #1 print(next(l)) #2
不少人會混淆yield和send(後面會提到)的使用,上面的代碼中 yield index,配合next(l)的使用。簡單能夠這樣理解,函數l_range的while循環中,每次程序運行到(1)處都"暫停"了,向調用函數處返回index參數,注意此時並無執行(1)這條語句!!!而每調用一次next(l)循環就會執行一次,而當index>num的時候,倘若再調用next(l),由於此時已經跳出了while循環,yield不會再執行,因此會拋出異常。
除了使用next()調用生成器,可是實際上還能夠用for循環遍歷,可知生成器也是可迭代對象。編程
for i in l_range(5): print(i)
明白了「暫停」的概念,生成器就變得很是好理解了!數組
從上面的demo中,咱們能夠得知生成器的引入使得函數的調用可以「暫停」而且向外傳遞數據,既然能夠向外傳遞數據,那麼是否可以向函數裏傳遞數據呢?生成器send的引入就是爲了實現這個需求!send可以從生成器(函數)調用處傳遞數據到yield處。
來看下面這個demo。網絡
def jumping_range(up_to): index = 0 while index < up_to: jump = yield index # (1) # print('index = %s, jump = %s' % (index, jump)) if jump is None: jump = 1 index += jump iterator = jumping_range(5) print(next(iterator)) #0 print(iterator.send(2)) #2 print(next(iterator)) #3 print(iterator.send(-1)) #2 print(next(iterator)) #3 print(next(iterator)) #4
下面解釋下每個輸出,當第一次next(iterator),程序執行到(1)處,可是未執行,只是把index傳遞出去,因此此時輸出的是0(index=0)。接着執行iterator.send(2),這裏把2從調用處傳遞給了生成器裏並賦值給jump,注意yield index是傳遞index參數出去,而jump=yield是把參數傳遞進去給jump!!!而後執行完while的第一次循環回到(1),此時index 執行了一次 index+=jump,而且jump=2。因此iterator.send(2)的輸出是2!然後面的輸出請各位獨自推算一下,若實在想不通能夠嘗試在生成器中print一下各參數出來,方便理解。
要搞明白協程,對於這句代碼的理解尤其重要。session
jump = yield index
其實意思上能夠理解爲多線程
jump = yield yield index
即 jump接受從外面傳遞進來的參數,而index則是要傳遞出去的參數。可是固然,這只是我爲了方便理解拆分出來的代碼,實際上這樣拆分會致使不一樣的結果。架構
來看看拆分出來的代碼併發
def jumping_range(up_to): index = 0 while index < up_to: jump = yield #(a) yield index #(b) # print('index = %s, jump = %s' % (index, jump)) if jump is None: jump = 1 # print('jump = %d' % jump) index += jump iterator = jumping_range(5) print(next(iterator)) #None print(iterator.send(2)) #0 print(next(iterator)) #None print(iterator.send(-1)) #2 print(next(iterator)) #None print(next(iterator)) #1
簡單講解上述的輸出,首先當程序執行到a(注意a處的代碼未執行),此時yield 右邊並無參數,因此第一個print返回的是None。而當執行iterator.send(2),程序在a處把2傳遞給參數jump,而後往下執行,當遇到第二個yield,程序又「暫停」了,即一個while循環裏暫停2次!而執行到b處(b處的代碼未執行)把index傳遞到出去,因此此時print返回的是0(index=0)。接着來的能夠如此類推了!app
只要明白了上述2個demo,相信對於協程已經有必定的理解了。最後再提一下yield from的使用。yield from的使用相似函數調用,做用是讓重構變得簡單,也讓你可以將生成器串聯起來,使返回值能夠在調用棧中上下浮動,不需對編碼進行過多改動。
def bottom(): return (yield 42) def middle(): return (yield from bottom()) def top(): return (yield from middle()) gen = top() value = next(gen) print(value) try: value = gen.send(value * 2) except StopIteration as exc: print(exc) value = exc.value print(value)
對於異步IO,就是你發起一個IO操做,卻不用等它結束,你能夠繼續作其餘事情,當它結束時,你會獲得通知。而要理解異步async/await,首先要理解什麼是事件循環。
事件循環,在維基百科的解釋是「一種等待程序分配事件或消息的編程架構」。簡單的說事件循環就是「當A發生時,執行B」。對python來講,用來提供事件循環的asyncio被加入標準庫,asyncio 重點解決網絡服務中的問題,事件循環在這裏未來自套接字(socket)的 I/O 已經準備好讀和/或寫做爲「當A發生時」(經過selectors模塊)。和多線程和多進程同樣,Asyncio是併發的一種方式。但因爲GIL(全局解釋器鎖)的存在,python的多線程以及Asyncio不能帶來真正的並行。而可交給asyncio執行的任務,就是上述的協程!一個協程能夠放棄執行,把機會給其餘協程(即yield from 或await)。
定義協程有2種經常使用的方式,
前者是python3.5的新方式,然後者是3.4的方式(3.5也可用)。
async def do_some_work(x): print("Waiting " + str(x)) await asyncio.sleep(x)
@asyncio.coroutine def do_some_work2(x): print("Waiting " + str(x)) yield from asyncio.sleep(x)
這樣一來do_some_work即是一個協程,準確來講是一個協程函數,而且能夠用asyncio.iscoroutinefunction來驗證
print(asyncio.iscoroutinefunction(do_some_work)) # True
在解釋await以前,咱們先來講明一下協程能夠作什麼事
demo中asyncio.sleep()也是一個協程,await asyncio.sleep(x),顧名思義就是等待,等待asyncio.sleep(x)執行完後返回do_some_work這個協程。
協程函數的調用與普通函數不一樣,要讓協程對象運行的話,經常使用的方式有2中
簡單來講,只有loop運行了,協程纔可能運行。因此在運行協程以前,必須先拿到當前線程缺省的loop,而後把協程對象交給loop.run_until_complete,協程對象隨後會在loop裏獲得運行。
loop = asyncio.get_event_loop() loop.run_until_complete(do_some_work(3))
run_until_complete 是一個阻塞(blocking)調用,知道調用運行結束,才返回。而它的參數是一個future,可是咱們上面傳進去的確實協程對象,之因此能夠這樣,是由於它內部作了檢查,對於協程會經過ensure_future函數把協程對象包裝(wrap)成了future。
因此咱們能夠改成:
loop.run_until_complete(asyncio.ensure_future(do_some_work(3))
上面的demo這都是用ensure_future函數計劃它的執行, 來看看使用第一種方法
tasks = [ asyncio.ensure_future(do_some_work(1)), asyncio.ensure_future(do_some_work(3)) ] loop.run_until_complete(asyncio.wait(tasks))
注意: asyncio.wait自己是一個協程
有時候當協程運行結束的時候,咱們但願獲得通知,以便判斷程序執行的狀況以及下一步數據的處理。這一需求能夠經過往future添加回調來實現。
def done_callback(cor): """ 協程的回調函數 :param cor: :return: """ print('Done') cor = asyncio.ensure_future(do_some_work(3)) cor.add_done_callback(done_callback) loop = asyncio.get_event_loop() loop.run_until_complete(cor)
在實際運行異步中,每每是有多個協程,同時在一個loop裏運行。因而須要使用asyncio.gather函數把多個協程交給loop。
loop.run_until_complete(asyncio.gather(do_some_work(1), do_some_work(3)))
固然協程一多起來,一條語句寫起來就不方便了,能夠先把協程存在列表裏。
coros = [do_some_work(1), do_some_work(3)] loop.run_until_complete(asyncio.gather(*coros))
因爲這兩個協程是併發運行的,因此等待時間並非1+3=4,而是以耗時比較長的那個。
上面也提到run_until_complete的參數是future,而gather起聚合的做用,把多個futures包裝成一個future,由於loop.run_until_complete只接受單個future。上述代碼也能夠改成:
coros = [asyncio.ensure_future(do_some_work(1)), asyncio.ensure_future(do_some_work(3))] loop.run_until_complete(asyncio.gather(*coros))
經常使用的結束協程的方法有2種:
run_until_complete看函數名就大概明白,便是直到全部協程工做(future)結束才返回
async def do_some_work(x): print('Waiting ' + str(x)) await asyncio.sleep(x) print('Done') loop = asyncio.get_event_loop() coro = do_some_work(3) loop.run_until_complete(coro)
輸出:
程序等待3秒鐘後輸出'Done'返回
試試改成run_forever:
async def do_some_work(x): print('Waiting ' + str(x)) await asyncio.sleep(x) print('Done') loop = asyncio.get_event_loop() coro = do_some_work(3) asyncio.ensure_future(coro) loop.run_forever()
輸出:
程序等待3秒鐘後輸出'Done'但並無返回。
run_forever會一直運行,直到loop.stop()被調用,可是不能在run_forever後調用stop,由於run_forever永遠都不會返回,因此stop永遠都不能被調用。
loop.run_forever() loop.stop()
正確的使用方法應該是在協程中調用stop,因此須要在協程參數中傳入loop:
async def do_some_work(loop, x): print('Waiting ' + str(x)) await asyncio.sleep(x) print('Done') loop.stop()
這樣看來彷佛沒有什麼問題,可是當有多個協程在loop裏運行呢?
asyncio.ensure_future(do_some_work(loop, 1)) asyncio.ensure_future(do_some_work(loop, 3)) loop.run_forever
運行程序時會發現,只輸出了一個‘Done’程序就返回了。這說明了第二個協程尚未結束,loop就中止了,被先結束的那個協程給停掉了。要解決這個問題,能夠用gather把多個協程合併在一塊兒,經過回調的方式調用loop.stop。
async def do_some_work(loop, x): print('Waiting ' + str(x)) await asyncio.sleep(x) print('Done') def done_callback(loop, futu): loop.stop() loop = asyncio.get_event_loop() futus = asyncio.gather(do_some_work(loop, 1), do_some_work(loop, 3)) futus.add_done_callback(functools.partial(done_callback, loop)) loop.run_forever()
對於同一個loop,只要沒有close,那麼loop還能夠繼續添加協程而且再運行。
loop.run_until_complete(do_some_work(loop, 1)) loop.run_until_complete(do_some_work(loop, 3))
可是關閉了就不能再運行了。
loop.run_until_complete(do_some_work(loop, 1)) loop.close() loop.run_until_complete(do_some_work(loop, 3)) # 拋出異常
最後提一下yield from 和 await雖然內部機制有所不一樣,可是從做用來看基本上是同樣的,這裏就不探討具體的區別了。
另外關於asyncio.gather和asyncio.wait的區別請看StackOverflow的討論Asyncio.gather vs asyncio.wait
使用asyncio異步抓取豆瓣電影top250
# -*- coding: utf-8 -*- from lxml import etree from time import time import asyncio import aiohttp __author__ = 'lateink' url = 'https://movie.douban.com/top250' async def fetch_content(url): async with aiohttp.ClientSession() as session: async with session.get(url) as response: return await response.text() async def parse(url): page = await fetch_content(url) html = etree.HTML(page) xpath_movie = '//*[@id="content"]/div/div[1]/ol/li' xpath_title = './/span[@class="title"]' xpath_pages = '//*[@id="content"]/div/div[1]/div[2]/a' pages = html.xpath(xpath_pages) fetch_list = [] result = [] for element_movie in html.xpath(xpath_movie): result.append(element_movie) for p in pages: fetch_list.append(url + p.get('href')) tasks = [fetch_content(url) for url in fetch_list] pages = await asyncio.gather(*tasks) for page in pages: html = etree.HTML(page) for element_movie in html.xpath(xpath_movie): result.append(element_movie) for i, movie in enumerate(result, 1): title = movie.find(xpath_title).text print(i, title) def main(): loop = asyncio.get_event_loop() start = time() for i in range(5): loop.run_until_complete(parse(url)) end = time() print('Cost {} seconds'.format((end - start)/5)) loop.close() if __name__ == '__main__': main()