高性能的異步爬蟲

線程池(適當使用)

import re
import requests
from lxml import etree
from multiprocessing.dummy import Pool
import random
headers = {
    'User-Agent':'Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/73.0.3683.86 Safari/537.36'
}

def request_video(url):
    return requests.get(url=url,headers=headers).content


def saveVideo(data):
    name = str(random.randint(0,9999))+'.mp4'
    with open(name,'wb') as fp:
        fp.write(data)
        print(name,'下載存儲成功!!!')

url = 'https://www.pearvideo.com/category_1'
page_text = requests.get(url=url,headers=headers).text

tree = etree.HTML(page_text)
li_list = tree.xpath('//ul[@id="listvideoListUl"]/li')
#實例化一個線程池對象
pool = Pool(4)
video_url_list = [] #全部的視頻鏈接
for li in li_list:
    detail_url = 'https://www.pearvideo.com/'+li.xpath('./div/a/@href')[0]
    detail_page_text = requests.get(url=detail_url,headers=headers).text
    ex = 'srcUrl="(.*?)",vdoUrl='
    video_url = re.findall(ex,detail_page_text,re.S)[0]
    video_url_list.append(video_url)
#異步的獲取4個視頻的二進制數據
video_data_list = pool.map(request_video,video_url_list)

#進行視頻的持久化存儲
pool.map(saveVideo,video_data_list)

單線程+異步協程(推薦)

  • event_loop:事件循環,至關於一個無限循環,咱們能夠把一些函數註冊到這個事件循環上,當知足某些條件的時候,函數就會被循環執行。程序是按照設定的順序從頭執行到尾,運行的次數也是徹底按照設定。當在編寫異步程序時,必然其中有部分程序的運行耗時是比較久的,須要先讓出當前程序的控制權,讓其在背後運行,讓另外一部分的程序先運行起來。當背後運行的程序完成後,也須要及時通知主程序已經完成任務能夠進行下一步操做,但這個過程所需的時間是不肯定的,須要主程序不斷的監聽狀態,一旦收到了任務完成的消息,就開始進行下一步。loop就是這個持續不斷的監視器。json

  • coroutine:中文翻譯叫協程,在 Python 中常指代爲協程對象類型,咱們能夠將協程對象註冊到事件循環中,它會被事件循環調用。咱們可使用 async 關鍵字來定義一個方法,這個方法在調用時不會當即被執行,而是返回一個協程對象。網絡

  • task:任務,它是對協程對象的進一步封裝,包含了任務的各個狀態。session

  • future:表明未來執行或尚未執行的任務,實際上和 task 沒有本質區別。app

  • 另外咱們還須要瞭解 async/await 關鍵字,它是從 Python 3.5 纔出現的,專門用於定義協程。其中,async 定義一個協程,await 用來掛起阻塞方法的執行。dom

基本使用

#基本使用
import asyncio
async def hello(name):
    print('hello to :',name)
#獲取了一個協程對象
c = hello('bobo')

#建立一個事件循環對象
loop = asyncio.get_event_loop()

#將協程對象註冊到事件循環中,而後啓動事件循環對象
loop.run_until_complete(c)

task的使用

#task的使用
import asyncio
async def hello(name):
    print('hello to :',name)

#獲取了一個協程對象
c = hello('bobo')
#建立一個事件循環對象
loop = asyncio.get_event_loop()
#就協程進行進一步的封裝,封裝到了task對象中
task = loop.create_task(c)
print(task)
loop.run_until_complete(task)
print(task)

future

#future
import asyncio
async def hello(name):
    print('hello to :',name)
c
= hello('bobo') task = asyncio.ensure_future(c) loop.run_until_complete(task)

綁定回調(task)

import asyncio
def callback(task):
    print('i am callback:',task.result())

async def hello(name):
    print('hello to :',name)
    return name

c = hello('bobo')

task = asyncio.ensure_future(c)
#給任務對象綁定一個回調函數
task.add_done_callback(callback)
loop.run_until_complete(task)

多任務異步協程

import requests
async def get_page(url):
    print('正在下載:',url)
    #之因此沒有實現異步操做,緣由是由於requests模塊是一個非異步的模塊
    response = requests.get(url=url)
    print('響應數據:',response.text)
    print('下載成功:',url)
start = time.time()
urls = [
    'http://127.0.0.1:5000/bobo',
    'http://127.0.0.1:5000/jay',
    'http://127.0.0.1:5000/tom'
]
tasks = []
loop = asyncio.get_event_loop()
for url in urls:
    c = get_page(url)
    task = asyncio.ensure_future(c)
    tasks.append(task)
loop.run_until_complete(asyncio.wait(tasks))
print('總耗時:',time.time()-start)

asyncio.sleep()異步

import asyncio
async def request(url):
    print('正在下載:',url)
#     sleep(2) #非異步模塊的代碼:在此處若是存在非異步操做代碼,則會完全讓asyncio失去異步的效果
    await asyncio.sleep(2)
    print('下載成功:',url)
urls = [
    'www.baidu.com',
    'www.taobao.com',
    'www.sogou.com'
]
start = time.time()
loop = asyncio.get_event_loop()
tasks = [] #任務列表,放置多個任務對象
for url in urls:
    c = request(url)
    task = asyncio.ensure_future(c)
    tasks.append(task)
    
#將多個任務對象對應的列表註冊到事件循環中
loop.run_until_complete(asyncio.wait(tasks))
print('總耗時:',time.time()-start)

多任務異步操做應用到爬蟲中

  • 環境安裝:pip install aiohttp  支持異步的網絡請求的模塊
import aiohttp
import asyncio

async def get_page(url):
    async with aiohttp.ClientSession() as session:
        async with await session.get(url=url) as response:
            page_text = await response.text() #read()  json()
            print(page_text)
start = time.time()
urls = [
    'http://127.0.0.1:5000/bobo',
    'http://127.0.0.1:5000/jay',
    'http://127.0.0.1:5000/tom',
    'http://127.0.0.1:5000/bobo',
    'http://127.0.0.1:5000/jay',
    'http://127.0.0.1:5000/tom',
    'http://127.0.0.1:5000/bobo',
    'http://127.0.0.1:5000/jay',
    'http://127.0.0.1:5000/tom'
]
tasks = []
loop = asyncio.get_event_loop()
for url in urls:
    c = get_page(url)
    task = asyncio.ensure_future(c)
    tasks.append(task)
loop.run_until_complete(asyncio.wait(tasks))
print('總耗時:',time.time()-start)

實現數據解析---任務的綁定回調機制

import aiohttp
import asyncio
#回調函數:解析響應數據
def callback(task):
    print('this is callback()')
    #獲取響應數據
    page_text = task.result()
    print('在回調函數中,實現數據解析')
    
async def get_page(url):
    async with aiohttp.ClientSession() as session:
        async with await session.get(url=url) as response:
            page_text = await response.text() #read()  json()
#             print(page_text)
            return page_text
start = time.time()
urls = [
    'http://127.0.0.1:5000/bobo',
    'http://127.0.0.1:5000/jay',
    'http://127.0.0.1:5000/tom',
    'http://127.0.0.1:5000/bobo',
    'http://127.0.0.1:5000/jay',
    'http://127.0.0.1:5000/tom',
    'http://127.0.0.1:5000/bobo',
    'http://127.0.0.1:5000/jay',
    'http://127.0.0.1:5000/tom'
]
tasks = []
loop = asyncio.get_event_loop()
for url in urls:
    c = get_page(url)
    task = asyncio.ensure_future(c)
    #給任務對象綁定回調函數用於解析響應數據
    task.add_done_callback(callback)
    tasks.append(task)
loop.run_until_complete(asyncio.wait(tasks))
print('總耗時:',time.time()-start)
相關文章
相關標籤/搜索