from multiprocessing.dummy import Pool
from multiprocessing.dummy import Pool pool = Pool(3) # 實例化線程池對象,3是線程池的最大線程數 # 參數1:回調函數(只是函數名,不加括號);參數2:列表 # 參數1會接收參數2列表中的某一個元素,回調函數能夠對該列表元素進行某種操做 pool.map(callback,list)
server.py
from flask import Flask, render_template import time app = Flask(__name__) @app.route('/xx') def index_1(): time.sleep(2) return render_template('test.html') @app.route('/yy') def index_2(): time.sleep(2) return render_template('test.html') @app.route('/oo') def index_3(): time.sleep(2) return render_template('test.html') if __name__ == '__main__': app.run(debug=True)
templates
文件夾,在該文件夾下建立一個HTML文件,我寫的是test.html
,隨便寫點數據<html lang="en"> <head> <meta charset="UTF-8"/> <title>測試</title> </head> <body> <div> <p>百里守約</p> </div> <div class="song"> <p>李清照</p> <p>王安石</p> <p>蘇軾</p> <p>柳宗元</p> <a href="http://www.song.com/" title="趙匡胤" target="_self"> <span>this is span</span> 宋朝是最強大的王朝,不是軍隊的強大,而是經濟很強大,國民都頗有錢</a> <a href="" class="du">總爲浮雲能蔽日,長安不見令人愁</a> <img src="http://www.baidu.com/meinv.jpg" alt=""/> </div> <div class="tang"> <ul> <li><a href="http://www.baidu.com" title="qing">清明時節雨紛紛,路上行人慾斷魂,借問酒家何處有,牧童遙指杏花村</a></li> <li><a href="http://www.163.com" title="qin">秦時明月漢時關,萬里長征人未還,但使龍城飛將在,不教胡馬度陰山</a></li> <li><a href="http://www.126.com" id="qi">岐王宅裏尋常見,崔九堂前幾度聞,正是江南好風景,落花時節又逢君</a></li> <li><a href="http://www.sina.com" class="du">杜甫</a></li> <li><a href="http://www.dudu.com" class="du">杜牧</a></li> <li><b>杜小月</b></li> <li><i>度蜜月</i></li> <li><a href="http://www.haha.com" id="feng">鳳凰臺上鳳凰遊,鳳去臺空江自流,吳宮花草埋幽徑,晉代衣冠成古丘</a></li> </ul> </div> </body> </html>
import requests from bs4 import BeautifulSoup import time # 線程池模塊 from multiprocessing.dummy import Pool urls = [ 'http://127.0.0.1:5000/xx', 'http://127.0.0.1:5000/yy', 'http://127.0.0.1:5000/oo', ] # 數據的爬取,返回爬取到的頁面源碼數據 def get_request(url): page_text = requests.get(url=url).text return page_text # 數據的解析,返回標籤的文本 def parse(page_text): soup = BeautifulSoup(page_text, 'lxml') return soup.select('#feng')[0].text # 同步代碼 if __name__ == '__main__': start = time.time() for url in urls: page_text = get_request(url) text_data = parse(page_text) print(text_data) print(time.time() - start) """ 執行結果: 鳳凰臺上鳳凰遊,鳳去臺空江自流,吳宮花草埋幽徑,晉代衣冠成古丘 鳳凰臺上鳳凰遊,鳳去臺空江自流,吳宮花草埋幽徑,晉代衣冠成古丘 鳳凰臺上鳳凰遊,鳳去臺空江自流,吳宮花草埋幽徑,晉代衣冠成古丘 6.056272029876709 """ # 異步代碼 if __name__ == '__main__': start = time.time() pool = Pool(3) # 實例化線程池對象 # 參數1:回調函數(只是函數名,不加括號);參數2:列表 # 參數1會接收參數2列表中的某一個元素,回調函數能夠對該列表元素進行某種操做 page_text_list = pool.map(get_request,urls) text_data = pool.map(parse,page_text_list) for i in text_data: print(i) print(time.time() - start) """ 執行結果: 鳳凰臺上鳳凰遊,鳳去臺空江自流,吳宮花草埋幽徑,晉代衣冠成古丘 鳳凰臺上鳳凰遊,鳳去臺空江自流,吳宮花草埋幽徑,晉代衣冠成古丘 鳳凰臺上鳳凰遊,鳳去臺空江自流,吳宮花草埋幽徑,晉代衣冠成古丘 2.0537397861480713 不用for循環速度能提高0.01秒左右 """
綜上所述:異步代碼執行效率顯著提升html
import requests from lxml import etree from multiprocessing.dummy import Pool import re import os headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.132 Safari/537.36' } # 梨視頻財富板塊的地址 main_url = 'https://www.pearvideo.com/category_3' # 解析出該板塊下視頻詳情頁的src main_page_text = requests.get(url=main_url, headers=headers).text tree = etree.HTML(main_page_text) li_list = tree.xpath('//*[@id="listvideoListUl"]/li') # 線程池 video_urls = [] for li in li_list: # 視頻詳情頁的具體地址和視頻標題 detail_url = "https://www.pearvideo.com/" + li.xpath('./div/a/@href')[0] name = li.xpath('./div/a/div[2]/text()')[0] # 對詳情頁發起請求 page_text = requests.get(url=detail_url, headers=headers).text # 視頻詳情頁的video是js代碼動態生成的,使用正則解析 ex = 'srcUrl="(.*?)",vdoUrl=' video_url = re.findall(ex, page_text, re.S)[0] # 返回的是列表類型 dic = { 'url': video_url, 'name': name, } video_urls.append(dic) # 回調函數 def get_video(url): # 對視頻地址發請求,將二進制文件持久化存儲 video_data = requests.get(url=url['url'], headers=headers).content file_name = "./video/" + url['name'] + ".mp4" with open(file_name, 'wb') as f: f.write(video_data) print(url['name'], "下載完畢!") # 建立存儲視頻的文件夾 dir_name = 'video' if not os.path.exists(dir_name): os.mkdir(dir_name) # 實例化線程池 pool = Pool(4) pool.map(get_video, video_urls)
asyncio
(重點)協程就是一個對象。當特殊函數被調用後,該函數就會返回一個協程對象。python
協程對象 == 特殊函數flask
import asyncio from time import sleep async def get_request(url): print('正在請求:', url) sleep(2) print('請求成功:', url) return '666' # 返回一個協程對象 g = get_request("https://www,qq.com")
就是對協程對象的進一步封裝(就是一個高級的協程對象)網絡
任務對象 == 協程對象 == 特殊函數(表示某個固定形式的任務)架構
asyncio.ensure_future(協程對象) task = asyncio.ensure_future(g) # g:協程對象
綁定回調:app
# 定義一個task的回調函數 def callback(task): task.result() # 表示的是當前任務對象對應的特殊函數的返回值 print("I'm callback:", task) task.add_done_callback(funcName) # task:任務對象 # funcName:回調函數的名稱
funcName
這個回調函數必需要帶一個參數,這個參數表示的就是當前的任務對象
參數.result()
:表示的就是當前任務對象對應的特殊函數的返回值建立事件循環對象異步
須要將任務對象註冊到該事件循環對象中async
# 建立事件循環對象 loop = asyncio.get_event_loop() # 將任務對象註冊/裝載到事件循環對象中,而後須要啓動循環對象 loop.run_until_complete(task) # 用於裝載且啓動事件循環 # task:任務對象
await
:當阻塞操做結束後讓loop回頭執行阻塞以後的代碼。ide
asyncio.wait()
:將當前的任務對象交出cpu的使用權。函數
loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(tasks)) asyncio.wait # 掛起操做 tasks # 任務對象列表
aiohttp
(重點)requests
:不支持異步,不能夠出如今特殊函數內部。
aiohttp
:支持異步的網絡請求模塊,和asyncio
一塊兒使用
pip install aiohttp
代碼的編寫
import asyncio import aiohttp # 基於aiohttp實現異步的網絡請求 async def get_requests(url): # 實例化了一個請求對象 with aiohttp.ClientSession() as aio: # with aio.get/post(url=url,headers=headers,data/params,proxy='http://ip:prot') as response: with aio.get(url=url) as response: # text() 獲取字符串形式的響應數據 # read() 獲取bytes類型的響應數據 page_text = await response.text() return page_text
with
前加上async
關鍵字await
關鍵字完整代碼
import asyncio import aiohttp # 基於aiohttp實現異步的網絡請求 async def get_requests(url): # 實例化了一個請求對象 async with aiohttp.ClientSession() as aio: # with aio.get/post(url=url,headers=headers,data/params,proxy='http://ip:prot') as response: async with await aio.get(url=url) as response: # text() 獲取字符串形式的響應數據 # read() 獲取bytes類型的響應數據 page_text = await response.text() return page_text
import asyncio from time import sleep async def get_request(url): print('正在請求:', url) sleep(2) print('請求成功:', url) return '666' # 定義一個task的回調函數 def callback(task): print("I'm callback:", task) # 返回一個協程對象 g = get_request("https://www,qq.com") # 建立一個任務對象 task = asyncio.ensure_future(g) """ # 給任務對象綁定回調函數 task.add_done_callback(callback) # 建立事件循環對象 loop = asyncio.get_event_loop() # 將任務對象註冊/裝載到事件循環對象中,而後須要啓動循環對象 loop.run_until_complete(task) # 用於裝載且啓動事件循環 """ 執行結果: 正在請求: www,qq.com 正在請求: www,qq.com """
import asyncio import time start = time.time() async def get_request(url): print('正在請求:', url) # await 當阻塞操做結束後讓loop回頭執行阻塞以後的代碼 await asyncio.sleep(2) print('請求成功:', url) return '666' urls = [ 'http://127.0.0.1:5000/xx', 'http://127.0.0.1:5000/yy', 'http://127.0.0.1:5000/oo', ] tasks = [] for url in urls: c = get_request(url) task = asyncio.ensure_future(c) tasks.append(task) loop = asyncio.get_event_loop() # 將任務列表註冊到事件循環的時候必定要將任務列表進行掛起操做 # asyncio.wait() 掛起操做,將當前的任務對象交出cpu的使用權 loop.run_until_complete(asyncio.wait(tasks)) print('總耗時:', time.time() - start)
測試:同步&異步效率
,按照上述步驟啓動項目;而後運行下方代碼。import asyncio import time import aiohttp from lxml import etree headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.132 Safari/537.36' } urls = [ 'http://127.0.0.1:5000/xx', 'http://127.0.0.1:5000/yy', 'http://127.0.0.1:5000/oo', ] start = time.time() """ # 發起請求,獲取響應數據(不能夠實現異步) async def get_requests(url): # requests是不支持異步的模塊 page_text = requests.get(url).text return page_text """ async def get_requests(url): """ 基於aiohttp實現異步的網絡請求 :param url: :return: """ # 實例化了一個請求對象 async with aiohttp.ClientSession() as aio: # with aio.get/post(url=url,headers=headers,data/params,proxy='http://ip:prot') as response: async with await aio.get(url=url) as response: # text() 獲取字符串形式的響應數據 # read() 獲取bytes類型的響應數據 page_text = await response.text() return page_text def parse(task): """ 定義回調函數 :param task: :return: """ page_text = task.result() # 獲取特殊函數的返回值(請求到的頁面源碼數據) tree = etree.HTML(page_text) content = tree.xpath('//*[@id="feng"]/text()')[0] print(content) tasks = [] for url in urls: c = get_requests(url) task = asyncio.ensure_future(c) task.add_done_callback(parse) tasks.append(task) loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(tasks)) print('總耗時:', time.time() - start)
案例:基於線程池爬取梨視頻
import asyncio import time import aiohttp from lxml import etree import re import os import requests # time模塊是爲了測試爬取視頻的耗時 start = time.time() headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.132 Safari/537.36' } # 梨視頻財富板塊的地址 main_url = 'https://www.pearvideo.com/category_3' main_page_text = requests.get(url=main_url, headers=headers).text tree = etree.HTML(main_page_text) li_list = tree.xpath('//*[@id="listvideoListUl"]/li') urls = [] # [{'url': video_url,'name': name},{}...] for li in li_list: detail_url = "https://www.pearvideo.com/" + li.xpath('./div/a/@href')[0] name = li.xpath('./div/a/div[2]/text()')[0] page_text = requests.get(url=detail_url, headers=headers).text # 視頻詳情頁的video是js代碼動態生成的 ex = 'srcUrl="(.*?)",vdoUrl=' video_url = re.findall(ex, page_text, re.S)[0] # 返回的是列表類型 dic = { 'url': video_url, 'name': name, } urls.append(dic) # 基於aiohttp實現異步的網絡請求 async def get_requests(url): # 實例化了一個請求對象 async with aiohttp.ClientSession() as aio: # with aio.get/post(url=url,headers=headers,data/params,proxy='http://ip:prot') as response: async with await aio.get(url=url['url'], headers=headers) as response: # text() 獲取字符串形式的響應數據 # read() 獲取bytes類型的響應數據 page_read = await response.read() dic = { "page_read": page_read, "name": url['name'] } return dic def parse(task): """ 定義回調函數 :param task: :return: """ dic_info = task.result() # 獲取特殊函數的返回值(請求到的頁面源碼數據) file_name = "./video/" + dic_info["name"] + ".mp4" with open(file_name, 'wb') as f: f.write(dic_info['page_read']) print(dic_info["name"], "下載完畢!") tasks = [] for url in urls: c = get_requests(url) task = asyncio.ensure_future(c) task.add_done_callback(parse) tasks.append(task) dir_name = 'video' if not os.path.exists(dir_name): os.mkdir(dir_name) loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(tasks)) print('總耗時:', time.time() - start)