async/await 是 python3.5中新加入的特性, 將異步從原來的yield 寫法中解放出來,變得更加直觀。
在3.5以前,若是想要使用異步,主要使用yield語法。舉例以下:html
import asyncio @asyncio.coroutine # 修飾符,等同於 asyncio.coroutine(hello()) def hello(): print('Hello world! (%s)' % threading.currentThread()) yield from asyncio.sleep(1) # 執行到這一步之後,直接切換到下一個任務,等到一秒後再切回來 print('Hello again! (%s)' % threading.currentThread()) loop = asyncio.get_event_loop() tasks = [hello(), hello()] loop.run_until_complete(asyncio.wait(tasks)) loop.close()
引入了async/await之後,hello()能夠寫成這樣:python
async def hello(): print("Hello world!") r = await asyncio.sleep(1) print("Hello again!")
注意此時已經不須要使用@asyncio.coroutin進行修飾,而是在def以前加async表示這是個異步函數,其內有異步操做。此外,使用await 替換了yield from, 表示這一步爲異步操做。git
加一項,關於時間獲取:github
import time now = lambda: time.time() # 獲取當前時間 # 待執行程序執行 start = now() # 在獲取一下時間 print('TIME: ', now() - start) # 動態獲取行進時間 不一樣於: start = time.time() # 待執行程序執行 end = time.time() print('TIME: ', end - start) # 獲取時間是固定的
通常先要設置一個loop循環 loop = asyncio.get_event_loop()web
咱們一直經過 run_until_complete 來運行 loop ,等到 future 完成,run_until_complete 也就返回了。json
輸出:windows
如今改用 run_forever:api
輸出:服務器
三秒鐘事後,future 結束,可是程序並不會退出。run_forever 會一直運行,直到 stop 被調用,可是你不能像下面這樣調 stop:markdown
run_forever 不返回,stop 永遠也不會被調用。因此,只能在協程中調 stop:
這樣並不是沒有問題,假若有多個協程在 loop 裏運行:
第二個協程沒結束,loop 就中止了——被先結束的那個協程給停掉的。
要解決這個問題,能夠用 gather 把多個協程合併成一個 future,並添加回調,而後在回調裏再去中止 loop。
看一個小測試:
# 一個關於協程的小測試:run_forever的回調關閉循環 import asyncio import functools async def compute(x, y): print("Compute %s + %s ..." % (x, y)) await asyncio.sleep(2.0) return x + y async def print_sum(x, y): result = await compute(x, y) print("%s + %s = %s" % (x, y, result)) def done_callback(loop,futu): # 這裏還能夠執行futu的相關操做 print('關閉loop循環') loop.stop() loop = asyncio.get_event_loop() # tasks = [print_sum(1, 2), print_sum(3, 4), print_sum(5, 6)] # loop.run_until_complete(asyncio.wait(tasks)) # run_until_complete 完成後釋放,結束loop,至關於loop.close() futus = asyncio.gather(print_sum(1, 2), print_sum(3, 4), print_sum(5, 6)) futus.add_done_callback(functools.partial(done_callback, loop)) loop.run_forever() # 執行多個協程以後再回調一個done_callback來中止循環,其實run_until_complete就是基於run_forever()
簡單來講,loop 只要不關閉,就還能夠再運行。:
可是若是關閉了,就不能再運行了:
回調函數, 執行且按照順序, 假如協程是一個 IO 的讀操做,等讀完數據後,咱們但願獲得通知,以便下一步數據的處理。這一需求能夠經過往 future 添加回調來實現。
import asyncio async def hello1(): print("1, Hello world!") #r = await asyncio.sleep(1) print("1, Hello again!") for i in range(5): print(i) async def hello2(): print("2, Hello world!") #r = await asyncio.sleep(1) print("2, Hello again!") for i in range(5,10): print(i) def done_callback1(futu): # futu是異步的函數名稱 print('Done1') def done_callback2(futu): print('Done2') futu = asyncio.ensure_future(hello1()) futu.add_done_callback(done_callback1) futu = asyncio.ensure_future(hello2()) futu.add_done_callback(done_callback2) loop.run_until_complete(futu) >>> 1, Hello world! >>> 1, Hello again! >>> 0 >>> 1 >>> 2 >>> 3 >>> 4 >>> 2, Hello world! >>> 2, Hello again! >>> 5 >>> 6 >>> 7 >>> 8 >>> 9 >>> Done1 >>> Done2
# 多個協程同步執行 # 第一種寫法 loop.run_until_complete(asyncio.gather(hello1(), hello2())) # 第二種寫法 coros = [hello1(), hello2()] loop.run_until_complete(asyncio.gather(*coros)) # 第三種寫法 futus = [asyncio.ensure_future(hello1()), asyncio.ensure_future(hello2())] loop.run_until_complete(asyncio.gather(*futus))
固然也能夠這麼寫:
tasks = [ asyncio.ensure_future(hello1()), asyncio.ensure_future(hello2())]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
async with aiohttp.get('https://github.com') as r: await r.text()
其中r.text(), 能夠在括號中指定解碼方式,編碼方式,例如
await resp.text(encoding='windows-1251')
或者也能夠選擇不編碼,適合讀取圖像等,是沒法編碼的
await resp.read()
例子:一個簡單的小例子:
import asyncio,aiohttp async def fetch_async(url): print(url) async with aiohttp.request("GET",url) as r: reponse = await r.text(encoding="utf-8") #或者直接await r.read()不編碼,直接讀取,適合於圖像等沒法編碼文件 print(reponse) tasks = [fetch_async('http://www.baidu.com/'), fetch_async('http://www.chouti.com/')] event_loop = asyncio.get_event_loop() results = event_loop.run_until_complete(asyncio.gather(*tasks)) # 這裏使用asyncio.gather()和asyncio.wait()不同。gather把多個函數包裝成單個tasks,由於loop.run_until_complete只接受
單個tasks,而wait()用於調用單一函數 event_loop.close()
with aiohttp.Timeout(0.001): async with aiohttp.get('https://github.com') as r: await r.text()
這裏要引入一個類,aiohttp.ClientSession. 首先要創建一個session對象,而後用該session對象去打開網頁。session能夠進行多項操做,好比post, get, put, head等等,以下面所示
import asyncio,aiohttp async def fetch_async(url): print(url) async with aiohttp.ClientSession() as session: #協程嵌套,只須要處理最外層協程便可fetch_async async with session.get(url, timeout=60) as resp: #設置超時處理60s print(resp.status) print(resp.charset) #查看默認編碼,默認使用utf-8 print(await resp.read()) #使用read()方法時,不會進行編碼,是以字節的形式讀取 print(await resp.text()) #由於這裏使用到了await關鍵字,實現異步,全部他上面的函數體須要聲明爲異步async #resp.text()會自動將服務器端返回的內容進行解碼decode,咱們也能夠自定義 resp.text(encoding='gb2312') print(await resp.content.read()) #使用字節流形式獲取數據,而不像text(),read()方法那樣一次性獲取數據,注意:session.get()是Response對象,他繼承於StreamResponse print(resp.cookies) #獲取當前cookies
print(resp.headers) #查看響應頭dict形式
print(resp.raw_headers) #查看原生headers,字節型
print(resp.history)
tasks = [fetch_async('http://www.baidu.com/'), fetch_async('http://www.cnblogs.com/ssyfj/')] event_loop = asyncio.get_event_loop() results = event_loop.run_until_complete(asyncio.gather(*tasks)) event_loop.close()
若是要使用其餘方法,則相應的語句要改爲
session.put('http://httpbin.org/put', data=b'data') session.delete('http://httpbin.org/delete') session.head('http://httpbin.org/get') session.options('http://httpbin.org/get') session.patch('http://httpbin.org/patch', data=b'data')
這個比較簡單,將headers放於session.get/post的選項中便可。注意headers數據要是dict格式
url = 'https://api.github.com/some/endpoint' headers = {'content-type': 'application/json'} await session.get(url, headers=headers)
async def func1(url,params,filename): #用自定義headers異步讀寫文件 async with aiohttp.ClientSession() as session: headers = {'Content-Type':'text/html; charset=utf-8'} async with session.get(url,params=params,headers=headers) as r: with open(filename,"wb") as fp: while True: chunk = await r.content.read(10) if not chunk: break fp.write(chunk)
conn = aiohttp.ProxyConnector(proxy="http://some.proxy.com", proxy_auth=aiohttp.BasicAuth('user', 'pass')) # 若是須要代理認證的話,就須要加這個proxy_auth選項
async with aiohttp.ClientSession(connector=conn) as session: async with session.get('http://python.org') as resp: print(resp.status)
url = 'http://httpbin.org/cookies' async with ClientSession({'cookies_test': 'Monday'}) as session: async with session.get(url) as resp: assert await resp.json() == {"cookies": {"cookies_test": "Monday"}}
咱們常常須要經過 get 在url中傳遞一些參數,參數將會做爲url問號後面的一部分發給服務器。在aiohttp的請求中,容許以dict的形式來表示問號後的參數。舉個例子,若是你想傳遞 key1=value1 key2=value2 到 httpbin.org/get 你可使用下面的代碼:
params = {'key1': 'value1', 'key2': 'value2'} async with session.get('http://httpbin.org/get', params=params) as resp: assert resp.url == 'http://httpbin.org/get?key2=value2&key1=value1'
能夠看到,代碼正確的執行了,說明參數被正確的傳遞了進去。不論是一個參數兩個參數,仍是更多的參數,均可以經過這種方式來傳遞。除了這種方式以外,還有另一個,使用一個 list 來傳遞(這種方式能夠傳遞一些特殊的參數,例以下面兩個key是相等的也能夠正確傳遞):
params = [('key', 'value1'), ('key', 'value2')] async with session.get('http://httpbin.org/get', params=params) as r: assert r.url == 'http://httpbin.org/get?key=value2&key=value1'
除了上面兩種,咱們也能夠直接經過傳遞字符串做爲參數來傳遞,可是須要注意,經過字符串傳遞的特殊字符不會被編碼:
async with session.get('http://httpbin.org/get', params='key=value+1') as r: assert r.url == 'http://httpbin.org/get?key=value+1'
import urllib.request as request from bs4 import BeautifulSoup as bs import asyncio import aiohttp @asyncio.coroutine async def getPage(url,res_list): print(url) headers = {'User-Agent':'Mozilla/4.0 (compatible; MSIE 5.5; Windows NT)'} # conn = aiohttp.ProxyConnector(proxy="http://127.0.0.1:8087") async with aiohttp.ClientSession() as session: async with session.get(url,headers=headers) as resp: assert resp.status==200 res_list.append(await resp.text()) class parseListPage(): def __init__(self,page_str): self.page_str = page_str def __enter__(self): page_str = self.page_str page = bs(page_str,'lxml') # 獲取文章連接 articles = page.find_all('div',attrs={'class':'article_title'}) art_urls = [] for a in articles: x = a.find('a')['href'] art_urls.append('http://blog.csdn.net'+x) return art_urls def __exit__(self, exc_type, exc_val, exc_tb): pass page_num = 5 page_url_base = 'http://blog.csdn.net/u014595019/article/list/' page_urls = [page_url_base + str(i+1) for i in range(page_num)] loop = asyncio.get_event_loop() ret_list = [] tasks = [getPage(host,ret_list) for host in page_urls] loop.run_until_complete(asyncio.wait(tasks)) articles_url = [] for ret in ret_list: with parseListPage(ret) as tmp: articles_url += tmp ret_list = [] tasks = [getPage(url, ret_list) for url in articles_url] loop.run_until_complete(asyncio.wait(tasks)) loop.close()