在執行 IO 密集型任務的時候,程序會由於等待 IO 而阻塞。好比咱們使用 requests 庫來進行網絡爬蟲請求的話,若是網站響應速度過慢,程序會一直等待網站響應,最終致使其爬取效率十分低下。本文以爬取 IP 代理池爲例,演示 Python 中如何利用異步協程來加速網絡爬蟲。html
注:本文示例代碼,須要 Python 3.7 及以上版本。python
協程(Coroutine),又稱微線程,纖程,協程是一種用戶態的輕量級線程。編程
協程擁有本身的寄存器上下文和棧。協程調度切換時,將寄存器上下文和棧保存,在調度回來的時候,恢復先前保存的寄存器上下文和棧。所以協程能保留上一次調用時的狀態,即全部局部狀態的一個特定組合。bash
協程本質上是個單進程,協程相對於多進程來講,無需進程間上下文切換的開銷,無需原子操做鎖定及同步的開銷,編程模型也很是簡單。網絡
咱們可使用協程來實現異步操做,好比在網絡爬蟲場景下,在發出一個請求以後,須要等待必定的時間才能獲得響應。其實在這個等待過程當中,程序能夠幹許多其餘的事情,等到響應返回以後再切換回來繼續處理,這樣能夠充分利用 CPU 和其餘資源,這就是異步協程的優點。session
從 Python 3.4 開始,Python 中加入了協程的概念,這個版本的協程是經過生成器對象來實現的,在 Python 3.5 中增長了 asyncio 庫和 async、await 關鍵字,使得協程的實現更加方便。多線程
首先咱們先來看一個不使用協程的程序,代碼以下:app
import time
def job(t):
print('Start job {}'.format(t))
time.sleep(t) # 等待 t 秒
print('Job {0} takes {0}s'.format(t))
def main():
[job(t) for t in range(1, 3)]
start = time.time()
main()
print("total time: {}".format(time.time() - start))複製代碼
運行結果:框架
Start job 1
Job 1 takes 1s
Start job 2
Job 2 takes 2s
total time: 3.001577138900757複製代碼
從運行結果能夠看出,咱們的 job 是按順序執行的。必須執行完 job 1 才能開始執行 job 2, job 1 須要 1 秒的執行時間,job 2 須要 2 秒的執行時間,因此總時間是 3 秒多。異步
若是咱們使用協程的方式,job 1 在等待 time.sleep(t)
執行結束的時候(能夠看作是等待一個網頁的下載成功),是能夠切換到 job 2 執行的。
咱們再來看一下使用協程改造後的代碼:
import time
import asyncio
async def job(t): # 使用 async 關鍵字將一個函數定義爲協程
print('Start job {}'.format(t))
await asyncio.sleep(t) # 等待 t 秒, 期間切換執行其餘任務
print('Job {0} takes {0}s'.format(t))
async def main(loop): # 使用 async 關鍵字將一個函數定義爲協程
tasks = [loop.create_task(job(t)) for t in range(1, 3)] # 建立任務, 不當即執行
await asyncio.wait(tasks) # 執行並等待全部任務完成
start = time.time()
loop = asyncio.get_event_loop() # 創建 loop
loop.run_until_complete(main(loop)) # 執行 loop
loop.close() # 關閉 loop
print("total time: {}".format(time.time() - start))複製代碼
運行結果:
Start job 1
Start job 2
Job 1 takes 1s
Job 2 takes 2s
total time: 2.0033459663391113複製代碼
從運行結果能夠看出,咱們沒有等待 job 1 執行結束再開始執行 job 2,而是 job 1 觸發 await 的時候切換到了 job 2 。 這時 job 1 和 job 2 同時在執行 await asyncio.sleep(t)
,因此最終程序的執行時間取決於執行時間最長的那個 job,也就是 job 2 的執行時間:2 秒。
在對 asyncio 庫作了簡單瞭解以後,咱們來看一下如何經過協程來改造咱們的爬蟲程序。
安裝 aiohttp 庫:
pip install aiohttp複製代碼
咱們先來看一下使用 reqeusts 庫實現一個網頁的爬取:
import time
import requests
def fetch(url):
r = requests.get(url)
return r.url
def main():
results = [fetch('http://www.baidu.com') for _ in range(2)]
print(results)
start = time.time()
main()
print("total time: {}".format(time.time() - start))複製代碼
運行結果:
['http://www.baidu.com/', 'http://www.baidu.com/']
total time: 1.5445010662078857複製代碼
使用 requests 庫,訪問兩次 www.baidu.com,共耗時 1.5 秒
咱們用 aiohttp 庫來改造上面的代碼:
import time
import asyncio
import aiohttp
async def fetch(session, url):
response = await session.get(url) # await 等待網絡 IO 並切換協程
return str(response.url)
async def main(loop):
async with aiohttp.ClientSession() as session:
tasks = [
loop.create_task(fetch(session, 'http://www.baidu.com'))
for _ in range(2)
]
done, pending = await asyncio.wait(tasks) # 執行並等待全部任務完成
results = [r.result() for r in done] # 獲取全部返回結果
print(results)
start = time.time()
loop = asyncio.get_event_loop() # 創建 事件循環
loop.run_until_complete(main(loop)) # 在 事件循環 中執行協程
loop.close() # 關閉 事件循環
print("total time: {}".format(time.time() - start))複製代碼
運行結果:
['http://www.baidu.com', 'http://www.baidu.com']
total time: 0.10848307609558105複製代碼
使用 aiohttp 的代碼執行時間較使用 reqeusts 的代碼有大幅的提高。
上例中,咱們使用官方推薦的方式建立 session,並經過 session 執行 get 操做。aiohttp 官方建議一個 application 中共享使用一個 session,不要爲每一個請求都建立 session。
經過爬蟲解析免費的代理髮佈網站頁面,來生成代理池。
#!/usr/bin/env python
# -*- coding:utf-8 -*-
""" @author: qfedu.com """
import os
import re
import time
import asyncio
import aiohttp
HEADERS = {
'User-Agent':
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_6) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/12.0.3 Safari/605.1.15'
}
OUTPUT_FILE = 'proxies.txt' # 代理池輸出文件
SITES = ['http://www.live-socks.net', 'http://www.proxyserverlist24.top'] # 代理髮佈網站
CHECK_URL = 'http://www.baidu.com'
LOCAL_PROXY = None # 在本地發起請求時的代理
# http get 協程
async def fetch(session, url, proxy=None):
proxy_headers = HEADERS if proxy else None
try:
async with session.get(
url, headers=HEADERS, proxy=proxy,
proxy_headers=proxy_headers,
timeout=aiohttp.ClientTimeout(total=5)) as response:
if response.status == 200:
return await response.text()
else:
return ''
except:
return ''
# 從代理髮佈網站獲取代理髮布頁面連接
async def get_page_links(loop, session):
tasks = [loop.create_task(fetch(session, url, proxy=LOCAL_PROXY))
for url in SITES] # 建立協程任務
done, _ = await asyncio.wait(tasks) # 執行並等待全部任務完成
htmls = [f.result() for f in done] # 獲取全部返回結果
# 解析出 html 頁面中的代理髮布連接
def parse(html):
return re.findall(r'<h3[\s\S]*?<a.*?(http.*?\.html).*?</a>', html)
results = map(parse, htmls) # 逐個解析 html 頁面
return [y for x in results for y in x]
# 從代理髮布頁面獲取代理 IP
async def get_proxies(loop, session, page_links):
tasks = [loop.create_task(fetch(session, url, proxy=LOCAL_PROXY))
for url in page_links] # 建立協程任務
done, _ = await asyncio.wait(tasks) # 執行並等待全部任務完成
htmls = [f.result() for f in done] # 獲取全部返回結果
# 解析出 html 頁面中的代理 IP
def parse(html):
return re.findall(r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}:\d{1,5}', html)
results = map(parse, htmls) # 逐個解析 html 頁面
return list(set([y for x in results for y in x]))
# 驗證代理 IP
async def check_proxy(session, proxy):
html = await fetch(session, CHECK_URL, proxy=proxy)
# 若是返回經過代理 IP 訪問的頁面,則說明代理 IP 有效
return proxy if html else ''
# 經過協程批量驗證代理 IP,每次同時發起 200 個驗證請求
async def check_proxies(loop, session, proxies):
checked_proxies = []
for i in range(0, len(proxies), 200):
_proxies = [proxy.strip() if proxy.strip().startswith('http://')
else 'http://' + proxy.strip() for proxy in proxies[i:i + 200]]
tasks = [loop.create_task(check_proxy(session, proxy))
for proxy in _proxies]
done, _ = await asyncio.wait(tasks) # 執行並等待全部任務完成
checked = [f.result() for f in done] # 獲取全部返回結果
checked_proxies += [p for p in checked if p] # 獲取不爲空的返回值,即驗證成功的代理 IP
return checked_proxies
# 將代理 IP 逐行保存到文件
def save_proxies(proxies):
# 建立新文件,若是文件已存在,則清空文件內容
with open(OUTPUT_FILE, 'w') as f:
f.write('')
# 經過追加寫模式,逐行寫入文件
with open(OUTPUT_FILE, 'a') as f:
for proxy in proxies:
f.write(proxy + '\n')
async def main(loop):
async with aiohttp.ClientSession() as session:
page_links = await get_page_links(loop, session) # 得到代理髮布頁面連接
# 從代理髮布頁面得到代理 IP
proxies = await get_proxies(loop, session, page_links)
print('total proxy: {}'.format(len(proxies))) # 解析出的代理 IP 總量
proxies = await check_proxies(loop, session, proxies) # 驗證代理 IP
print('total checked proxy: {}'.format(len(proxies))) # 驗證後的代理 IP 總量
save_proxies(proxies) # 保存代理 IP 到文件
start = time.time()
loop = asyncio.get_event_loop() # 創建 事件循環
loop.run_until_complete(main(loop)) # 在 事件循環 中執行協程
loop.close() # 關閉 事件循環
total_time = time.time() - start
print(f'total time: {total_time}')複製代碼
運行結果:
total proxy: 15675
total checked proxy: 4503
total time: 487.2807550430298複製代碼
在爬蟲程序中,一般有網絡請求任務、頁面解析任務、數據清洗任務和數據入庫任務。
網絡請求任務、數據入庫任務屬於 IO 密集型任務,在 Python 中一般使用多線程模型來提升這類任務的性能,如今還能夠經過 aiohttp,Motor(MongoDB 的異步 Python 驅動)等異步框架將性能進一步提高。
頁面解析任務、數據清洗任務這類 CPU 密集型的任務咱們該如何來提升性能?在 Python 中針對 CPU 密集型任務能夠經過 multiprocessing 模塊來提高性能,經過 multiprocessing 模塊可使程序運行在多核 CPU 中,增長 CPU 的利用率以提高計算性能。
給代理池爬蟲示例增長多核計算支持:
#!/usr/bin/env python
# -*- coding:utf-8 -*-
""" @author: qfedu.com """
import os
import re
import time
import asyncio
from multiprocessing import Pool
import aiohttp
HEADERS = {
'User-Agent':
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_6) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/12.0.3 Safari/605.1.15'
}
OUTPUT_FILE = 'proxies.txt' # 代理池輸出文件
SITES = ['http://www.live-socks.net', 'http://www.proxyserverlist24.top'] # 代理髮佈網站
CHECK_URL = 'http://www.baidu.com'
LOCAL_PROXY = 'http://127.0.0.1:1087' # •在本地發起請求時的代理
# http get 協程
async def fetch(session, url, proxy=None):
proxy_headers = HEADERS if proxy else None
try:
async with session.get(
url, headers=HEADERS, proxy=proxy,
proxy_headers=proxy_headers,
timeout=aiohttp.ClientTimeout(total=5)) as response:
if response.status == 200:
return await response.text()
else:
return ''
except:
return ''
# 解析出 html 頁面中的代理髮布連接
def parse_page_link(html):
return re.findall(r'<h3[\s\S]*?<a.*?(http.*?\.html).*?</a>', html)
# 從代理髮佈網站獲取代理髮布頁面連接
async def get_page_links(loop, session):
tasks = [loop.create_task(fetch(session, url, proxy=LOCAL_PROXY))
for url in SITES] # 建立協程任務
done, _ = await asyncio.wait(tasks) # 執行並等待全部任務完成
htmls = [f.result() for f in done] # 獲取全部返回結果
# 利用多核 CPU 的計算能力提高頁面解析性能
with Pool(processes=os.cpu_count() * 2) as pool:
results = pool.map(parse_page_link, htmls)
return [y for x in results for y in x]
# 解析出 html 頁面中的代理 IP
def parse_proxy(html):
return re.findall(r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}:\d{1,5}', html)
# 從代理髮布頁面獲取代理 IP
async def get_proxies(loop, session, page_links):
tasks = [loop.create_task(fetch(session, url, proxy=LOCAL_PROXY))
for url in page_links] # 建立協程任務
done, _ = await asyncio.wait(tasks) # 執行並等待全部任務完成
htmls = [f.result() for f in done] # 獲取全部返回結果
# 利用多核 CPU 的計算能力提高頁面解析性能
with Pool(processes=os.cpu_count() * 2) as pool:
results = pool.map(parse_proxy, htmls)
return list(set([y for x in results for y in x]))
# 驗證代理 IP
async def check_proxy(session, proxy):
html = await fetch(session, CHECK_URL, proxy=proxy)
# 若是返回經過代理 IP 訪問的頁面,則說明代理 IP 有效
return proxy if html else ''
# 經過協程批量驗證代理 IP,每次同時發起 200 個驗證請求
async def check_proxies(loop, session, proxies):
checked_proxies = []
for i in range(0, len(proxies), 200):
_proxies = [proxy.strip() if proxy.strip().startswith('http://')
else 'http://' + proxy.strip() for proxy in proxies[i:i + 200]]
tasks = [loop.create_task(check_proxy(session, proxy))
for proxy in _proxies]
done, _ = await asyncio.wait(tasks) # 執行並等待全部任務完成
checked = [f.result() for f in done] # 獲取全部返回結果
checked_proxies += [p for p in checked if p] # 獲取不爲空的返回值,即驗證成功的代理 IP
return checked_proxies
# 將代理 IP 逐行保存到文件
def save_proxies(proxies):
# 建立新文件,若是文件已存在,則清空文件內容
with open(OUTPUT_FILE, 'w') as f:
f.write('')
# 經過追加寫模式,逐行寫入文件
with open(OUTPUT_FILE, 'a') as f:
for proxy in proxies:
f.write(proxy + '\n')
async def main(loop):
async with aiohttp.ClientSession() as session:
page_links = await get_page_links(loop, session) # 得到代理髮布頁面連接
# 從代理髮布頁面得到代理 IP
proxies = await get_proxies(loop, session, page_links)
print('total proxy: {}'.format(len(proxies))) # 解析出的代理 IP 總量
proxies = await check_proxies(loop, session, proxies) # 驗證代理 IP
print('total checked proxy: {}'.format(len(proxies))) # 驗證後的代理 IP 總量
save_proxies(proxies) # 保存代理 IP 到文件
start = time.time()
loop = asyncio.get_event_loop() # 創建 事件循環
loop.run_until_complete(main(loop)) # 在 事件循環 中執行協程
loop.close() # 關閉 事件循環
total_time = time.time() - start
print(f'total time: {total_time}')複製代碼
進程間的調度及上下文切換是很是消耗資源的。上面例子中解析任務比較簡單,解析量也很是少,增長多核計算支持後,性能幾乎沒有提高還有可能下降。在實際爬蟲項目中須要根據實際狀況來衡量和選擇。