import re import requests from lxml import etree from multiprocessing.dummy import Pool import random headers = { 'User-Agent':'Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/73.0.3683.86 Safari/537.36' } def request_video(url): return requests.get(url=url,headers=headers).content def saveVideo(data): name = str(random.randint(0,9999))+'.mp4' with open(name,'wb') as fp: fp.write(data) print(name,'下載存儲成功!!!') url = 'https://www.pearvideo.com/category_1' page_text = requests.get(url=url,headers=headers).text tree = etree.HTML(page_text) li_list = tree.xpath('//ul[@id="listvideoListUl"]/li') #實例化一個線程池對象 pool = Pool(4) video_url_list = [] #全部的視頻鏈接 for li in li_list: detail_url = 'https://www.pearvideo.com/'+li.xpath('./div/a/@href')[0] detail_page_text = requests.get(url=detail_url,headers=headers).text ex = 'srcUrl="(.*?)",vdoUrl=' video_url = re.findall(ex,detail_page_text,re.S)[0] video_url_list.append(video_url) #異步的獲取4個視頻的二進制數據 video_data_list = pool.map(request_video,video_url_list) #進行視頻的持久化存儲 pool.map(saveVideo,video_data_list)
event_loop:事件循環,至關於一個無限循環,咱們能夠把一些函數註冊到這個事件循環上,當知足某些條件的時候,函數就會被循環執行。程序是按照設定的順序從頭執行到尾,運行的次數也是徹底按照設定。當在編寫異步程序時,必然其中有部分程序的運行耗時是比較久的,須要先讓出當前程序的控制權,讓其在背後運行,讓另外一部分的程序先運行起來。當背後運行的程序完成後,也須要及時通知主程序已經完成任務能夠進行下一步操做,但這個過程所需的時間是不肯定的,須要主程序不斷的監聽狀態,一旦收到了任務完成的消息,就開始進行下一步。loop就是這個持續不斷的監視器。json
coroutine:中文翻譯叫協程,在 Python 中常指代爲協程對象類型,咱們能夠將協程對象註冊到事件循環中,它會被事件循環調用。咱們可使用 async 關鍵字來定義一個方法,這個方法在調用時不會當即被執行,而是返回一個協程對象。網絡
task:任務,它是對協程對象的進一步封裝,包含了任務的各個狀態。session
future:表明未來執行或尚未執行的任務,實際上和 task 沒有本質區別。app
另外咱們還須要瞭解 async/await 關鍵字,它是從 Python 3.5 纔出現的,專門用於定義協程。其中,async 定義一個協程,await 用來掛起阻塞方法的執行。dom
#基本使用 import asyncio async def hello(name): print('hello to :',name) #獲取了一個協程對象 c = hello('bobo') #建立一個事件循環對象 loop = asyncio.get_event_loop() #將協程對象註冊到事件循環中,而後啓動事件循環對象 loop.run_until_complete(c)
#task的使用 import asyncio async def hello(name): print('hello to :',name) #獲取了一個協程對象 c = hello('bobo') #建立一個事件循環對象 loop = asyncio.get_event_loop() #就協程進行進一步的封裝,封裝到了task對象中 task = loop.create_task(c) print(task) loop.run_until_complete(task) print(task)
#future import asyncio async def hello(name): print('hello to :',name)
c = hello('bobo') task = asyncio.ensure_future(c) loop.run_until_complete(task)
import asyncio def callback(task): print('i am callback:',task.result()) async def hello(name): print('hello to :',name) return name c = hello('bobo') task = asyncio.ensure_future(c) #給任務對象綁定一個回調函數 task.add_done_callback(callback) loop.run_until_complete(task)
import requests async def get_page(url): print('正在下載:',url) #之因此沒有實現異步操做,緣由是由於requests模塊是一個非異步的模塊 response = requests.get(url=url) print('響應數據:',response.text) print('下載成功:',url) start = time.time() urls = [ 'http://127.0.0.1:5000/bobo', 'http://127.0.0.1:5000/jay', 'http://127.0.0.1:5000/tom' ] tasks = [] loop = asyncio.get_event_loop() for url in urls: c = get_page(url) task = asyncio.ensure_future(c) tasks.append(task) loop.run_until_complete(asyncio.wait(tasks)) print('總耗時:',time.time()-start)
asyncio.sleep()異步
import asyncio async def request(url): print('正在下載:',url) # sleep(2) #非異步模塊的代碼:在此處若是存在非異步操做代碼,則會完全讓asyncio失去異步的效果 await asyncio.sleep(2) print('下載成功:',url) urls = [ 'www.baidu.com', 'www.taobao.com', 'www.sogou.com' ] start = time.time() loop = asyncio.get_event_loop() tasks = [] #任務列表,放置多個任務對象 for url in urls: c = request(url) task = asyncio.ensure_future(c) tasks.append(task) #將多個任務對象對應的列表註冊到事件循環中 loop.run_until_complete(asyncio.wait(tasks)) print('總耗時:',time.time()-start)
import aiohttp import asyncio async def get_page(url): async with aiohttp.ClientSession() as session: async with await session.get(url=url) as response: page_text = await response.text() #read() json() print(page_text) start = time.time() urls = [ 'http://127.0.0.1:5000/bobo', 'http://127.0.0.1:5000/jay', 'http://127.0.0.1:5000/tom', 'http://127.0.0.1:5000/bobo', 'http://127.0.0.1:5000/jay', 'http://127.0.0.1:5000/tom', 'http://127.0.0.1:5000/bobo', 'http://127.0.0.1:5000/jay', 'http://127.0.0.1:5000/tom' ] tasks = [] loop = asyncio.get_event_loop() for url in urls: c = get_page(url) task = asyncio.ensure_future(c) tasks.append(task) loop.run_until_complete(asyncio.wait(tasks)) print('總耗時:',time.time()-start)
import aiohttp import asyncio #回調函數:解析響應數據 def callback(task): print('this is callback()') #獲取響應數據 page_text = task.result() print('在回調函數中,實現數據解析') async def get_page(url): async with aiohttp.ClientSession() as session: async with await session.get(url=url) as response: page_text = await response.text() #read() json() # print(page_text) return page_text start = time.time() urls = [ 'http://127.0.0.1:5000/bobo', 'http://127.0.0.1:5000/jay', 'http://127.0.0.1:5000/tom', 'http://127.0.0.1:5000/bobo', 'http://127.0.0.1:5000/jay', 'http://127.0.0.1:5000/tom', 'http://127.0.0.1:5000/bobo', 'http://127.0.0.1:5000/jay', 'http://127.0.0.1:5000/tom' ] tasks = [] loop = asyncio.get_event_loop() for url in urls: c = get_page(url) task = asyncio.ensure_future(c) #給任務對象綁定回調函數用於解析響應數據 task.add_done_callback(callback) tasks.append(task) loop.run_until_complete(asyncio.wait(tasks)) print('總耗時:',time.time()-start)