異步:asyncio和aiohttp的一些應用(1)

1. asyncio

1.1asyncio/await 用法

async/await 是 python3.5中新加入的特性, 將異步從原來的yield 寫法中解放出來,變得更加直觀。 
在3.5以前,若是想要使用異步,主要使用yield語法。舉例以下:html

import asyncio

@asyncio.coroutine  # 修飾符,等同於 asyncio.coroutine(hello())
def hello():
    print('Hello world! (%s)' % threading.currentThread())
    yield from asyncio.sleep(1)  # 執行到這一步之後,直接切換到下一個任務,等到一秒後再切回來
    print('Hello again! (%s)' % threading.currentThread())

loop = asyncio.get_event_loop()  
tasks = [hello(), hello()]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()

引入了async/await之後,hello()能夠寫成這樣:python

async def hello():
    print("Hello world!")
    r = await asyncio.sleep(1)
    print("Hello again!")

注意此時已經不須要使用@asyncio.coroutin進行修飾,而是在def以前加async表示這是個異步函數,其內有異步操做。此外,使用await 替換了yield from, 表示這一步爲異步操做。git

加一項,關於時間獲取:github

import time
now = lambda: time.time()    # 獲取當前時間
# 待執行程序執行
start = now()    # 在獲取一下時間
print('TIME: ', now() - start)     # 動態獲取行進時間

不一樣於:
start = time.time()
# 待執行程序執行
end = time.time()
print('TIME: ', end - start)    # 獲取時間是固定的

 

 1.2關於run_until_complete和run_server的區別

通常先要設置一個loop循環 loop = asyncio.get_event_loop()web

咱們一直經過 run_until_complete 來運行 loop ,等到 future 完成,run_until_complete 也就返回了。json

輸出:windows

如今改用 run_forever:api

輸出:服務器

三秒鐘事後,future 結束,可是程序並不會退出。run_forever 會一直運行,直到 stop 被調用,可是你不能像下面這樣調 stop:markdown

run_forever 不返回,stop 永遠也不會被調用。因此,只能在協程中調 stop:

這樣並不是沒有問題,假若有多個協程在 loop 裏運行:

第二個協程沒結束,loop 就中止了——被先結束的那個協程給停掉的。

要解決這個問題,能夠用 gather 把多個協程合併成一個 future,並添加回調,而後在回調裏再去中止 loop。

 

看一個小測試:

# 一個關於協程的小測試:run_forever的回調關閉循環
import asyncio
import functools

async def compute(x, y):
    print("Compute %s + %s ..." % (x, y))
    await asyncio.sleep(2.0)
    return x + y

async def print_sum(x, y):
    result = await compute(x, y)
    print("%s + %s = %s" % (x, y, result))

def done_callback(loop,futu):
    # 這裏還能夠執行futu的相關操做
    print('關閉loop循環')
    loop.stop()

loop = asyncio.get_event_loop()
# tasks = [print_sum(1, 2), print_sum(3, 4), print_sum(5, 6)]
# loop.run_until_complete(asyncio.wait(tasks))  # run_until_complete 完成後釋放,結束loop,至關於loop.close()

futus = asyncio.gather(print_sum(1, 2), print_sum(3, 4), print_sum(5, 6))
futus.add_done_callback(functools.partial(done_callback, loop))
loop.run_forever()  # 執行多個協程以後再回調一個done_callback來中止循環,其實run_until_complete就是基於run_forever()

 

 1.1.2 loop.close()?

簡單來講,loop 只要不關閉,就還能夠再運行。:

可是若是關閉了,就不能再運行了:

1.1.3 回調函數:add_done_callback

回調函數, 執行且按照順序, 假如協程是一個 IO 的讀操做,等讀完數據後,咱們但願獲得通知,以便下一步數據的處理。這一需求能夠經過往 future 添加回調來實現。

import asyncio
async def hello1():
    print("1, Hello world!")
    #r = await asyncio.sleep(1)
    print("1, Hello again!")
    for i in range(5):
        print(i)

async def hello2():
    print("2, Hello world!")
    #r = await asyncio.sleep(1)
    print("2, Hello again!")
    for i in range(5,10):
        print(i)

def done_callback1(futu):  # futu是異步的函數名稱
    print('Done1')

def done_callback2(futu):
    print('Done2')

futu = asyncio.ensure_future(hello1())
futu.add_done_callback(done_callback1) 
futu = asyncio.ensure_future(hello2())
futu.add_done_callback(done_callback2)

loop.run_until_complete(futu)
>>> 1, Hello world!
>>> 1, Hello again!
>>> 0
>>> 1
>>> 2
>>> 3
>>> 4
>>> 2, Hello world!
>>> 2, Hello again!
>>> 5
>>> 6
>>> 7
>>> 8
>>> 9
>>> Done1
>>> Done2

1.1.4 多個協程運行的三種寫法

# 多個協程同步執行
# 第一種寫法
loop.run_until_complete(asyncio.gather(hello1(), hello2()))

# 第二種寫法
coros = [hello1(), hello2()]
loop.run_until_complete(asyncio.gather(*coros))

# 第三種寫法
futus = [asyncio.ensure_future(hello1()),
             asyncio.ensure_future(hello2())]

loop.run_until_complete(asyncio.gather(*futus))

固然也能夠這麼寫:
tasks = [ asyncio.ensure_future(hello1()), asyncio.ensure_future(hello2())]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))

 

 

2. aiohttp

aiohttp是一個用於web服務的庫,網上的資料,包括廖雪峯的網站中的資料大部分是關於服務器端(server)的,關於客戶端(client)的資料很少。

2.1 基本用法

async with aiohttp.get('https://github.com') as r:
        await r.text()

其中r.text(), 能夠在括號中指定解碼方式,編碼方式,例如

await resp.text(encoding='windows-1251')

或者也能夠選擇不編碼,適合讀取圖像等,是沒法編碼的

await resp.read()

例子:一個簡單的小例子:

import asyncio,aiohttp

async def fetch_async(url):
print(url)
async with aiohttp.request("GET",url) as r:
reponse = await r.text(encoding="utf-8")  #或者直接await r.read()不編碼,直接讀取,適合於圖像等沒法編碼文件
print(reponse)

tasks = [fetch_async('http://www.baidu.com/'), fetch_async('http://www.chouti.com/')]

event_loop = asyncio.get_event_loop()
results = event_loop.run_until_complete(asyncio.gather(*tasks))     # 這裏使用asyncio.gather()和asyncio.wait()不同。gather把多個函數包裝成單個tasks,由於loop.run_until_complete只接受
                                         單個tasks,而wait()用於調用單一函數 event_loop.close()

2.2 設置timeout

須要加一個with aiohttp.Timeout(x)
with aiohttp.Timeout(0.001):
    async with aiohttp.get('https://github.com') as r:
        await r.text()

2.3 使用session獲取數據

這裏要引入一個類,aiohttp.ClientSession. 首先要創建一個session對象,而後用該session對象去打開網頁。session能夠進行多項操做,好比post, get, put, head等等,以下面所示

import asyncio,aiohttp

async def fetch_async(url):
    print(url)
    async with aiohttp.ClientSession() as session:  #協程嵌套,只須要處理最外層協程便可fetch_async
        async with session.get(url, timeout=60) as resp:  #設置超時處理60s
            print(resp.status)
       print(resp.charset)            #查看默認編碼,默認使用utf-8
       print(await resp.read())        #使用read()方法時,不會進行編碼,是以字節的形式讀取
            print(await resp.text())        #由於這裏使用到了await關鍵字,實現異步,全部他上面的函數體須要聲明爲異步async
                             #resp.text()會自動將服務器端返回的內容進行解碼decode,咱們也能夠自定義 resp.text(encoding='gb2312')
       print(await resp.content.read())    #使用字節流形式獲取數據,而不像text(),read()方法那樣一次性獲取數據,注意:session.get()是Response對象,他繼承於StreamResponse

       print(resp.cookies)           #獲取當前cookies
       print(resp.headers)           #查看響應頭dict形式
       print(resp.raw_headers)         #查看原生headers,字節型
       print(resp.history)            
tasks
= [fetch_async('http://www.baidu.com/'), fetch_async('http://www.cnblogs.com/ssyfj/')] event_loop = asyncio.get_event_loop() results = event_loop.run_until_complete(asyncio.gather(*tasks)) event_loop.close()

 

若是要使用其餘方法,則相應的語句要改爲

session.put('http://httpbin.org/put', data=b'data')
session.delete('http://httpbin.org/delete')
session.head('http://httpbin.org/get')
session.options('http://httpbin.org/get')
session.patch('http://httpbin.org/patch', data=b'data')

2.4 自定義headers

這個比較簡單,將headers放於session.get/post的選項中便可。注意headers數據要是dict格式

url = 'https://api.github.com/some/endpoint'
headers = {'content-type': 'application/json'}
await session.get(url, headers=headers)
async def func1(url,params,filename):  #用自定義headers異步讀寫文件
    async with aiohttp.ClientSession() as session:
        headers = {'Content-Type':'text/html; charset=utf-8'}
        async with session.get(url,params=params,headers=headers) as r:
            with open(filename,"wb") as fp:
                while True:
                    chunk = await r.content.read(10)
                    if not chunk:
                        break
                    fp.write(chunk)

 

2.5 使用代理

conn = aiohttp.ProxyConnector(proxy="http://some.proxy.com", proxy_auth=aiohttp.BasicAuth('user', 'pass'))  # 若是須要代理認證的話,就須要加這個proxy_auth選項
async with aiohttp.ClientSession(connector=conn) as session:
  async with session.get('http://python.org') as resp:   print(resp.status) 

2.6 自定義cookie

url = 'http://httpbin.org/cookies'
async with ClientSession({'cookies_test': 'Monday'}) as session:
    async with session.get(url) as resp:
        assert await resp.json() == {"cookies":
                                         {"cookies_test": "Monday"}}

2.7 在URL中傳遞參數

咱們常常須要經過 get 在url中傳遞一些參數,參數將會做爲url問號後面的一部分發給服務器。在aiohttp的請求中,容許以dict的形式來表示問號後的參數。舉個例子,若是你想傳遞 key1=value1   key2=value2 到 httpbin.org/get 你可使用下面的代碼:

params = {'key1': 'value1', 'key2': 'value2'}
async with session.get('http://httpbin.org/get',
                       params=params) as resp:
                       assert resp.url == 'http://httpbin.org/get?key2=value2&key1=value1'

能夠看到,代碼正確的執行了,說明參數被正確的傳遞了進去。不論是一個參數兩個參數,仍是更多的參數,均可以經過這種方式來傳遞。除了這種方式以外,還有另一個,使用一個 list 來傳遞(這種方式能夠傳遞一些特殊的參數,例以下面兩個key是相等的也能夠正確傳遞):

params = [('key', 'value1'), ('key', 'value2')]
async with session.get('http://httpbin.org/get',
                       params=params) as r:
    assert r.url == 'http://httpbin.org/get?key=value2&key=value1'

除了上面兩種,咱們也能夠直接經過傳遞字符串做爲參數來傳遞,可是須要注意,經過字符串傳遞的特殊字符不會被編碼:

async with session.get('http://httpbin.org/get',
                       params='key=value+1') as r:
        assert r.url == 'http://httpbin.org/get?key=value+1'

 

 

 

3. 樣例

3.1下面這個簡單的爬蟲,是用來爬取我博客下全部的文章的

import urllib.request as request
from bs4 import BeautifulSoup as bs
import asyncio
import aiohttp

@asyncio.coroutine
async def getPage(url,res_list):
    print(url)
    headers = {'User-Agent':'Mozilla/4.0 (compatible; MSIE 5.5; Windows NT)'}
    # conn = aiohttp.ProxyConnector(proxy="http://127.0.0.1:8087")
    async with aiohttp.ClientSession() as session:
        async with session.get(url,headers=headers) as resp:
            assert resp.status==200
            res_list.append(await resp.text())


class parseListPage():
    def __init__(self,page_str):
        self.page_str = page_str
    def __enter__(self):
        page_str = self.page_str
        page = bs(page_str,'lxml')
        # 獲取文章連接
        articles = page.find_all('div',attrs={'class':'article_title'})
        art_urls = []
        for a in articles:
            x = a.find('a')['href']
            art_urls.append('http://blog.csdn.net'+x)
        return art_urls
    def __exit__(self, exc_type, exc_val, exc_tb):
        pass


page_num = 5
page_url_base = 'http://blog.csdn.net/u014595019/article/list/'
page_urls = [page_url_base + str(i+1) for i in range(page_num)]
loop = asyncio.get_event_loop()
ret_list = []
tasks = [getPage(host,ret_list) for host in page_urls]
loop.run_until_complete(asyncio.wait(tasks))

articles_url = []
for ret in ret_list:
    with parseListPage(ret) as tmp:
        articles_url += tmp
ret_list = []

tasks = [getPage(url, ret_list) for url in articles_url]
loop.run_until_complete(asyncio.wait(tasks))
loop.close() 
3.2
相關文章
相關標籤/搜索