aiohttp你不知道的異步操做網絡請求

aiohttp支持異步操做的網絡請求的模塊

1.一個簡單異步協程爬取

  • read()
  • text(encoding=編碼) 好比:await r.text(encoding="utf-8")
import asyncio
import aiohttp

async def request(url):
    print("當前url:",url)
    #使用aiohttp發起request請求。
    async with aiohttp.request("GET",url) as r:
        #r.read()不變嗎,直接讀取。返回來是二進制文件
        reponse = await r.read()
    print("返回reponse:",reponse)

urls = [
    'https://www.baidu.com',
    'https://www.sogou.com',
    'https://www.qq.com',
]

#任務列表,存放多個任務對象
stasks=[]
for url in urls:
    c = request(url)
    task = asyncio.ensure_future(c)
    stasks.append(task)

loop = asyncio.get_event_loop()
#須要將任務列表封裝到wait中
loop.run_until_complete(asyncio.wait(stasks))

2.發起session請求

#!/usr/bin/env python
# -*- coding:utf-8 -*-
"""
Xu Junkai
"""
import requests
import asyncio
import time
import aiohttp
start_time = time.time()
urls = [
    'https://blog.csdn.net/',
    'https://www.sogou.com',
    'http://www.renren.com/',
]

async def get_page(url):
    print(url)
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as res:
            print(res.status)#獲取相應狀態碼
            print(res.charset)#獲取網頁編碼
            reponse = await res.text()#獲取返回文本
            print(reponse)
            
tasks=[]
for url in urls:
    c = get_page(url)
    task = asyncio.ensure_future(c)
    tasks.append(task)

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
end_time = time.time()
print('總耗時:',end_time-start_time)
  • session.put
async with session.put(url,data=b"data")

注意:html

不要爲每次的鏈接都建立一次session,通常狀況下只須要建立一個session,而後使用這個session執行全部的請求。

每一個session對象,內部包含了一個鏈接池,而且將會保持鏈接和鏈接複用(默認開啓)能夠加快總體的性能

3.url中傳遞參數

import asyncio
import time
import aiohttp
start_time = time.time()
urls = [
    'https://blog.csdn.net/',
    'https://www.sogou.com',
    'http://www.renren.com/',
]
data = {"name":"foo"}
async def get_page(url,data):#定義函數能夠放入多個參數
    print(url)
    async with aiohttp.ClientSession() as session:
        async with session.get(url,params= data) as res:
            print(res.status)
            #獲取響應內容(因爲獲取響應內容是一個阻塞耗時過程,因此咱們使用await實現協程切換)
            reponse = await res.text()
            print(reponse)
            print(res.charset)
tasks=[]
for url in urls:
    c = get_page(url,data)#傳入參數,但不會執行
    task = asyncio.ensure_future(c)
    tasks.append(task)

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
end_time = time.time()
print('總耗時:',end_time-start_time)

注意java

當使用res.text(),res.read()獲取響應內容(因爲獲取響應內容是一個阻塞耗時過程,因此咱們使用await實現協程切換)
正確寫法
    await res.text()
    await res.read()  #獲取是字節
    await res.json()  能夠設置編碼,設置處理函數
注意:
    res.json()爲Requests中內置的JSON解碼器
    其中只有response返回爲json格式時,用res.json()打印出響應的內容.
    若是response返回不爲json格式,使用res.json()會報錯

4.StreamResponse

  • 由於text(),read()方法是把整個響應體讀入內存,若是你是獲取大量的數據,請考慮使用」字節流「(StreamResponse)
#字節流形式獲取數據
import asyncio
import aiohttp

urls ='https://blog.csdn.net/'
async def get_page(url):
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as res:
            #打印100個字節的數據
            print(await res.content.read(100))

c = get_page(urls,)#函數對象
task = asyncio.ensure_future(c)#放入ensure_future中
loop = asyncio.get_event_loop()#建立循環事件
loop.run_until_complete(task)
#獲取100個字節數據
  • 字節流形式讀取數據,保存文件
import asyncio
import aiohttp

urls ='https://blog.csdn.net/'
async def get_page(url):
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as res:
            with open("cnds.text","wb") as fp:
                #循環,100個字節100個字節讀取放入文件中
                while True:
                    chunk = await res.content.read(100)
                    if not chunk:
                        break
                    fp.write(chunk)

c = get_page(urls,)
task = asyncio.ensure_future(c)
loop = asyncio.get_event_loop()
loop.run_until_complete(task)

注意python

async with session.get(url) as res:#異步上下文管理器
with open("cnds.text","wb") as fp:#普通上下文管理器

#由於異步上下文管理器在enter和exit方法處可以暫停執行上下文管理器
#爲了實現此功能,加入了2個新方法:__aenter__ 和__aexit__這兩個方法都要返回一個 awaitable類型的值。
詳見:
https://www.jb51.net/article/163540.htm
異步迭代器

5.自定義請求頭

#與requests方法同樣,headers放User-agent比較多。
async def get_page(url):
    async with aiohttp.ClientSession() as session:
        headers = {'Content-Type':'text/html; charset=utf-8'}
        async with session.get(url,headers=headers) as res:
            with open("cnds.text","wb") as fp:
                #循環,100個字節100個字節讀取放入文件中
                while True:
                    chunk = await res.content.read(100)
                    if not chunk:
                        break
                    fp.write(chunk)

6.自定義cookie

  • 注意:對於自定義cookie,咱們須要設置在ClientSession(cookies=自定義cookie字典),而不是session.get()中

#源碼顯示
class ClientSession:
    """First-class interface for making HTTP requests."""

    ATTRS = frozenset([
        '_source_traceback', '_connector',
        'requote_redirect_url', '_loop', '_cookie_jar',
        '_connector_owner', '_default_auth',
        '_version', '_json_serialize',
        '_requote_redirect_url',
        '_timeout', '_raise_for_status', '_auto_decompress',
        '_trust_env', '_default_headers', '_skip_auto_headers',
        '_request_class', '_response_class',
        '_ws_response_class', '_trace_configs'])

    _source_traceback = None
    _connector = None

    def __init__(self, *, connector: Optional[BaseConnector]=None,
                 loop: Optional[asyncio.AbstractEventLoop]=None,
                 cookies: Optional[LooseCookies]=None,
                 headers: Optional[LooseHeaders]=None,
                 skip_auto_headers: Optional[Iterable[str]]=None,
                 auth: Optional[BasicAuth]=None,
                 json_serialize: JSONEncoder=json.dumps,
                 request_class: Type[ClientRequest]=ClientRequest,
                 response_class: Type[ClientResponse]=ClientResponse,
                 ws_response_class: Type[ClientWebSocketResponse]=ClientWebSocketResponse,  # noqa
                 version: HttpVersion=http.HttpVersion11,
                 cookie_jar: Optional[AbstractCookieJar]=None,
                 connector_owner: bool=True,
                 raise_for_status: bool=False,
                 read_timeout: Union[float, object]=sentinel,
                 conn_timeout: Optional[float]=None,
                 timeout: Union[object, ClientTimeout]=sentinel,
                 auto_decompress: bool=True,
                 trust_env: bool=False,
                 requote_redirect_url: bool=True,
                 trace_configs: Optional[List[TraceConfig]]=None) -> None:
  • 使用
cookies = {"cookies":"xxxxxxxxxx"}
async with ClientSession(cookies=cookies) as session:
    ...

7.獲取網站響應狀態碼

  • res.statusjson

    async with session.get(url) as res:
      print(res.status)

8.查看響應頭

  • res.headers 查看響應頭,獲得值類型是一個dick
  • res.raw_headers 查看原生響應頭,字節類型
import asyncio
import aiohttp
async def get_page(url):
    async with aiohttp.ClientSession() as session:
        headers = {'Content-Type':'text/html; charset=utf-8'}
        async with session.get(url,headers=headers) as res:
            for item,values in res.headers.items():
                print(item,"*******",values)
c = get_page(urls,)
task = asyncio.ensure_future(c)
loop = asyncio.get_event_loop()
loop.run_until_complete(task)

9.查看重定向的響應頭

  • res.history

10.超時處理

  • 默認IO操做都有5分鐘響應時間,可是時間太長,咱們能夠本身設置timeout安全

  • 若是timeout=None或timeout=0將不進行超時檢查。也就不限時長。cookie

    async with session.get("https://baidu.com",timeout=60) as res:
      pass

11.ClientSession用於多個鏈接之間(同一個網站)共享cookie.

import aiohttp
import asyncio


async def request():
    #設置一個cookies
    cookies = {"my_cookie":"my_set_cookies"}
    async with aiohttp.ClientSession(cookies=cookies) as session:
        async with session.get("https://www.csdn.net/") as res:
            print(session.cookie_jar.filter_cookies("https://www.csdn.net/nav/python"))
            print("*******************************************")
        async with session.get("https://www.csdn.net/") as res:
            print(session.cookie_jar.filter_cookies("https://www.csdn.net/nav/java"))


c = request()
task = asyncio.ensure_future(c)
loop = asyncio.get_event_loop()
loop.run_until_complete(task)



#Set-Cookie: dc_session_id=10_1562499942692.566280
#Set-Cookie: my_cookie=my_set_cookies
#Set-Cookie: uuid_tt_dd=10_20709428800-1562499942692-906566
#*******************************************
#Set-Cookie: dc_session_id=10_1562499942692.566280
#Set-Cookie: my_cookie=my_set_cookies
#Set-Cookie: uuid_tt_dd=10_20709428800-1562499942692-906566
  • 最好使用session.cookie_jar.filter_cookies()獲取網站cookie,不一樣於requests模塊,雖然咱們能夠使用res.cookies有可能獲取到cookie,但彷佛並未獲取到全部的cookies。網絡

  • 總結session

    1.當咱們使用res.cookie時,只會獲取到當前url下設置的cookie,不會維護整站的cookie
    2.而session.cookie_jar.filter_cookies(url)會一直保留這個網站的全部設置cookies,含有咱們在會話時設置的cookie,而且會根據響應修改更新cookie。這個纔是咱們須要的
    3.而咱們設置cookie,也是須要在aiohttp.ClientSession(cookies=cookies)中設置
    4.ClientSession 還支持 請求頭,keep-alive鏈接和鏈接池(connection pooling)

12.cookie的安全性

  • 默認ClientSession使用的是嚴格模式的 aiohttp.CookieJar. RFC 2109,明確的禁止接受url和ip地址產生的cookie,只能接受 DNS 解析IP產生的cookie。能夠經過設置aiohttp.CookieJar 的 unsafe=True 來配置app

    jar = aiohttp.CookieJar(unsafe=True)
    session = aiohttp.ClientSession(cookie_jar=jar)

13控制鏈接數量

  • TCPConnector維持連接池,限制並行鏈接的總量,當池滿了,有請求退出再加入新請求異步

    async def request():
            cookies = {"my_cookies":"my_cookies"}
            #限制並行的數量
            conn = aiohttp.TCPConnector(limit=5)
            async with aiohttp.ClientSession(cookies=cookies,connector=conn) as session:
                    pass
    
    c = request()
    
    task = asyncio.ensure_future(c)
    loop = asyncio.get_event_loop()
    loop.run_until_complete(task)
  • 限制同時打開鏈接到同一端點的數量,能夠經過設置 limit_per_host 參數:

    limit_per_host: 同一端點的最大鏈接數量。同一端點即(host, port, is_ssl)徹底相同狀況。
    
    conn = aiohttp.TCPConnector(limit_per_host=30)#默認是0

14一個小例子

import asyncio
import aiohttp


headers = {
    "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/76.0.3809.100 Safari/537.36",
}
def callback(task):
    #回調函數能夠對頁面進行解析,這裏圖省事就打印了
    print(len(task.result()))


async def res(url):
    async with aiohttp.request('GET',url,headers=headers)as fp:
        #
        response =await fp.read()
        #因訪問3個網站編碼方式不一樣,統一轉碼(ISO-8859-1比較全)
        response = response.decode('iso-8859-1')
    # 返回給回調好書
    return response



urls = [
    'https://www.baidu.com',
    'https://www.sogou.com',
    'https://www.qq.com',
]

#proxy="http://some.proxy.com"

if __name__ == '__main__':
    #建立
    stasks = []
    for url in urls:
        #建立協程對象
        c = res(url)
        #封裝任務對象
        task = asyncio.ensure_future(c)
        #給任務對象綁定回調函數
        task.add_done_callback(callback)
        #添加列表中
        stasks.append(task)
    # 建立一個事件循環對象
    loop = asyncio.get_event_loop()
    #將任務對象列表註冊到事件循環對象中而且開啓事件循環
    loop.run_until_complete(asyncio.wait(stasks))
  • 源文來自於https://www.jb51.net/article/163537.htm
相關文章
相關標籤/搜索