爬蟲提升性能:串行、線程進程、異步非阻塞

一 背景知識

    爬蟲的本質就是一個socket客戶端與服務端的通訊過程,若是咱們有多個url待爬取,只用一個線程且採用串行的方式執行,那隻能等待爬取一個結束後才能繼續下一個,效率會很是低。python

須要強調的是:對於單線程下串行N個任務,並不徹底等同於低效,若是這N個任務都是純計算的任務,那麼該線程對cpu的利用率仍然會很高,之因此單線程下串行多個爬蟲任務低效,是由於爬蟲任務是明顯的IO密集型程序。react

關於IO模型詳見連接:http://www.cnblogs.com/linhaifeng/articles/7454717.html程序員

    那麼該如何提升爬取性能呢?且看下述概念web

二 同步、異步、回調機制

一、同步調用:即提交一個任務後就在原地等待任務結束,等到拿到任務的結果後再繼續下一行代碼,效率低下數據庫

import requests

def parse_page(res):
    print('解析 %s' %(len(res)))

def get_page(url):
    print('下載 %s' %url)
    response=requests.get(url)
    if response.status_code == 200:
        return response.text

urls=['https://www.baidu.com/','http://www.sina.com.cn/','https://www.python.org']
for url in urls:
    res=get_page(url) #調用一個任務,就在原地等待任務結束拿到結果後才繼續日後執行
    parse_page(res)
同步調用

二、一個簡單的解決方案:多線程或多進程api

#在服務器端使用多線程(或多進程)。多線程(或多進程)的目的是讓每一個鏈接都擁有獨立的線程(或進程),這樣任何一個鏈接的阻塞都不會影響其餘的鏈接。
#IO密集型程序應該用多線程
import requests
from threading import Thread,current_thread

def parse_page(res):
    print('%s 解析 %s' %(current_thread().getName(),len(res)))

def get_page(url,callback=parse_page):
    print('%s 下載 %s' %(current_thread().getName(),url))
    response=requests.get(url)
    if response.status_code == 200:
        callback(response.text)

if __name__ == '__main__':
    urls=['https://www.baidu.com/','http://www.sina.com.cn/','https://www.python.org']
    for url in urls:
        t=Thread(target=get_page,args=(url,))
        t.start()
多進程或多線程  

    該方案的問題是:緩存

#開啓多進程或都線程的方式,咱們是沒法無限制地開啓多進程或多線程的:在遇到要同時響應成百上千路的鏈接請求,則不管多線程仍是多進程都會嚴重佔據系統資源,下降系統對外界響應效率,並且線程與進程自己也更容易進入假死狀態。

三、改進方案: 線程池或進程池+異步調用:提交一個任務後並不會等待任務結束,而是繼續下一行代碼tomcat

#不少程序員可能會考慮使用「線程池」或「鏈接池」。「線程池」旨在減小建立和銷燬線程的頻率,其維持必定合理數量的線程,並讓空閒的線程從新承擔新的執行任務。「鏈接池」維持鏈接的緩存池,儘可能重用已有的鏈接、減小建立和關閉鏈接的頻率。這兩種技術均可以很好的下降系統開銷,都被普遍應用不少大型系統,如websphere、tomcat和各類數據庫等。
#IO密集型程序應該用多線程,因此此時咱們使用線程池
import requests
from threading import current_thread
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor

def parse_page(res):
    res=res.result()
    print('%s 解析 %s' %(current_thread().getName(),len(res)))

def get_page(url):
    print('%s 下載 %s' %(current_thread().getName(),url))
    response=requests.get(url)
    if response.status_code == 200:
        return response.text

if __name__ == '__main__':
    urls=['https://www.baidu.com/','http://www.sina.com.cn/','https://www.python.org']

    pool=ThreadPoolExecutor(50)
    # pool=ProcessPoolExecutor(50)
    for url in urls:
        pool.submit(get_page,url).add_done_callback(parse_page)

    pool.shutdown(wait=True)
進程池或線程池:異步調用+回調機制 

    改進後方案其實也存在着問題:安全

#「線程池」和「鏈接池」技術也只是在必定程度上緩解了頻繁調用IO接口帶來的資源佔用。並且,所謂「池」始終有其上限,當請求大大超過上限時,「池」構成的系統對外界的響應並不比沒有池的時候效果好多少。因此使用「池」必須考慮其面臨的響應規模,並根據響應規模調整「池」的大小。

    對應上例中的所面臨的可能同時出現的上千甚至上萬次的客戶端請求,「線程池」或「鏈接池」或許能夠緩解部分壓力,可是不能解決全部問題。總之,多線程模型能夠方便高效的解決小規模的服務請求,但面對大規模的服務請求,多線程模型也會遇到瓶頸,能夠用非阻塞接口來嘗試解決這個問題。

三 高性能

    上述不管哪一種解決方案其實沒有解決一個性能相關的問題:IO阻塞,不管是多進程仍是多線程,在遇到IO阻塞時都會被操做系統強行剝奪走CPU的執行權限,程序的執行效率所以就下降了下來。

    解決這一問題的關鍵在於,咱們本身從應用程序級別檢測IO阻塞而後切換到咱們本身程序的其餘任務執行,這樣把咱們程序的IO降到最低,咱們的程序處於就緒態就會增多,以此來迷惑操做系統,操做系統便覺得咱們的程序是IO比較少的程序,從而會盡量多的分配CPU給咱們,這樣也就達到了提高程序執行效率的目的

    一、在python3.3以後新增了asyncio模塊,能夠幫咱們檢測IO(只能是網絡IO),實現應用程序級別的切換

import asyncio

@asyncio.coroutine
def task(task_id,senconds):
    print('%s is start' %task_id)
    yield from asyncio.sleep(senconds) #只能檢測網絡IO,檢測到IO後切換到其餘任務執行
    print('%s is end' %task_id)

tasks=[task(task_id="任務1",senconds=3),task("任務2",2),task(task_id="任務3",senconds=1)]

loop=asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
基本使用

    二、但asyncio模塊只能發tcp級別的請求,不能發http協議,所以,在咱們須要發送http請求的時候,須要咱們自定義http報頭

import asyncio
import requests
import uuid
user_agent='Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/49.0.2623.221 Safari/537.36 SE 2.X MetaSr 1.0'

def parse_page(host,res):
    print('%s 解析結果 %s' %(host,len(res)))
    with open('%s.html' %(uuid.uuid1()),'wb') as f:
        f.write(res)

@asyncio.coroutine
def get_page(host,port=80,url='/',callback=parse_page,ssl=False):
    print('下載 http://%s:%s%s' %(host,port,url))

    #步驟一(IO阻塞):發起tcp連接,是阻塞操做,所以須要yield from
    if ssl:
        port=443
    recv,send=yield from asyncio.open_connection(host=host,port=443,ssl=ssl)

    # 步驟二:封裝http協議的報頭,由於asyncio模塊只能封裝併發送tcp包,所以這一步須要咱們本身封裝http協議的包
    request_headers="""GET %s HTTP/1.0\r\nHost: %s\r\nUser-agent: %s\r\n\r\n""" %(url,host,user_agent)
    # requset_headers="""POST %s HTTP/1.0\r\nHost: %s\r\n\r\nname=egon&password=123""" % (url, host,)
    request_headers=request_headers.encode('utf-8')

    # 步驟三(IO阻塞):發送http請求包
    send.write(request_headers)
    yield from send.drain()

    # 步驟四(IO阻塞):接收響應頭
    while True:
        line=yield from recv.readline()
        if line == b'\r\n':
            break
        print('%s Response headers:%s' %(host,line))

    # 步驟五(IO阻塞):接收響應體
    text=yield from recv.read()

    # 步驟六:執行回調函數
    callback(host,text)

    # 步驟七:關閉套接字
    send.close() #沒有recv.close()方法,由於是四次揮手斷連接,雙向連接的兩端,一端發完數據後執行send.close()另一端就被動地斷開


if __name__ == '__main__':
    tasks=[
        get_page('www.baidu.com',url='/s?wd=美女',ssl=True),
        get_page('www.cnblogs.com',url='/',ssl=True),
    ]

    loop=asyncio.get_event_loop()
    loop.run_until_complete(asyncio.wait(tasks))
    loop.close()
asyncio+自定義http協議報頭

    三、自定義http報頭多少有點麻煩,因而有了aiohttp模塊,專門幫咱們封裝http報頭,而後咱們還須要用asyncio檢測IO實現切換

import aiohttp
import asyncio

@asyncio.coroutine
def get_page(url):
    print('GET:%s' %url)
    response=yield from aiohttp.request('GET',url)

    data=yield from response.read()

    print(url,data)
    response.close()
    return 1

tasks=[
    get_page('https://www.python.org/doc'),
    get_page('https://www.cnblogs.com/linhaifeng'),
    get_page('https://www.openstack.org')
]

loop=asyncio.get_event_loop()
results=loop.run_until_complete(asyncio.gather(*tasks))
loop.close()

print('=====>',results) #[1, 1, 1]
asyncio+aiohttp

    四、此外,還能夠將requests.get函數傳給asyncio,就可以被檢測了

import requests
import asyncio

@asyncio.coroutine
def get_page(func,*args):
    print('GET:%s' %args[0])
    loog=asyncio.get_event_loop()
    furture=loop.run_in_executor(None,func,*args)
    response=yield from furture

    print(response.url,len(response.text))
    return 1

tasks=[
    get_page(requests.get,'https://www.python.org/doc'),
    get_page(requests.get,'https://www.cnblogs.com/linhaifeng'),
    get_page(requests.get,'https://www.openstack.org')
]

loop=asyncio.get_event_loop()
results=loop.run_until_complete(asyncio.gather(*tasks))
loop.close()

print('=====>',results) #[1, 1, 1]
asyncio+requests模塊的方法

   

  五、還有以前在協程時介紹的gevent模塊

from gevent import monkey;monkey.patch_all()
import gevent
import requests

def get_page(url):
    print('GET:%s' %url)
    response=requests.get(url)
    print(url,len(response.text))
    return 1

# g1=gevent.spawn(get_page,'https://www.python.org/doc')
# g2=gevent.spawn(get_page,'https://www.cnblogs.com/linhaifeng')
# g3=gevent.spawn(get_page,'https://www.openstack.org')
# gevent.joinall([g1,g2,g3,])
# print(g1.value,g2.value,g3.value) #拿到返回值


#協程池
from gevent.pool import Pool
pool=Pool(2)
g1=pool.spawn(get_page,'https://www.python.org/doc')
g2=pool.spawn(get_page,'https://www.cnblogs.com/linhaifeng')
g3=pool.spawn(get_page,'https://www.openstack.org')
gevent.joinall([g1,g2,g3,])
print(g1.value,g2.value,g3.value) #拿到返回值
gevent+requests

    六、封裝了gevent+requests模塊的grequests模塊

#pip3 install grequests

import grequests

request_list=[
    grequests.get('https://wwww.xxxx.org/doc1'),
    grequests.get('https://www.cnblogs.com/linhaifeng'),
    grequests.get('https://www.openstack.org')
]


##### 執行並獲取響應列表 #####
# response_list = grequests.map(request_list)
# print(response_list)

##### 執行並獲取響應列表(處理異常) #####
def exception_handler(request, exception):
    # print(request,exception)
    print("%s Request failed" %request.url)

response_list = grequests.map(request_list, exception_handler=exception_handler)
print(response_list)
grequests

    七、twisted:是一個網絡框架,其中一個功能是發送異步請求,檢測IO並自動切換

'''
#問題一:error: Microsoft Visual C++ 14.0 is required. Get it with "Microsoft Visual C++ Build Tools": http://landinghub.visualstudio.com/visual-cpp-build-tools
https://www.lfd.uci.edu/~gohlke/pythonlibs/#twisted
pip3 install C:\Users\Administrator\Downloads\Twisted-17.9.0-cp36-cp36m-win_amd64.whl
pip3 install twisted

#問題二:ModuleNotFoundError: No module named 'win32api'
https://sourceforge.net/projects/pywin32/files/pywin32/

#問題三:openssl
pip3 install pyopenssl
'''

#twisted基本用法
from twisted.web.client import getPage,defer
from twisted.internet import reactor

def all_done(arg):
    # print(arg)
    reactor.stop()

def callback(res):
    print(res)
    return 1

defer_list=[]
urls=[
    'http://www.baidu.com',
    'http://www.bing.com',
    'https://www.python.org',
]
for url in urls:
    obj=getPage(url.encode('utf=-8'),)
    obj.addCallback(callback)
    defer_list.append(obj)

defer.DeferredList(defer_list).addBoth(all_done)

reactor.run()




#twisted的getPage的詳細用法
from twisted.internet import reactor
from twisted.web.client import getPage
import urllib.parse


def one_done(arg):
    print(arg)
    reactor.stop()

post_data = urllib.parse.urlencode({'check_data': 'adf'})
post_data = bytes(post_data, encoding='utf8')
headers = {b'Content-Type': b'application/x-www-form-urlencoded'}
response = getPage(bytes('http://dig.chouti.com/login', encoding='utf8'),
                   method=bytes('POST', encoding='utf8'),
                   postdata=post_data,
                   cookies={},
                   headers=headers)
response.addBoth(one_done)

reactor.run()
twisted的用法

    八、tornado

from tornado.httpclient import AsyncHTTPClient
from tornado.httpclient import HTTPRequest
from tornado import ioloop


def handle_response(response):
    """
    處理返回值內容(須要維護計數器,來中止IO循環),調用 ioloop.IOLoop.current().stop()
    :param response:
    :return:
    """
    if response.error:
        print("Error:", response.error)
    else:
        print(response.body)


def func():
    url_list = [
        'http://www.baidu.com',
        'http://www.bing.com',
    ]
    for url in url_list:
        print(url)
        http_client = AsyncHTTPClient()
        http_client.fetch(HTTPRequest(url), handle_response)


ioloop.IOLoop.current().add_callback(func)
ioloop.IOLoop.current().start()




#發現上例在全部任務都完畢後也不能正常結束,爲了解決該問題,讓咱們來加上計數器
from tornado.httpclient import AsyncHTTPClient
from tornado.httpclient import HTTPRequest
from tornado import ioloop

count=0

def handle_response(response):
    """
    處理返回值內容(須要維護計數器,來中止IO循環),調用 ioloop.IOLoop.current().stop()
    :param response:
    :return:
    """
    if response.error:
        print("Error:", response.error)
    else:
        print(len(response.body))

    global count
    count-=1 #完成一次回調,計數減1
    if count == 0:
        ioloop.IOLoop.current().stop() 

def func():
    url_list = [
        'http://www.baidu.com',
        'http://www.bing.com',
    ]

    global count
    for url in url_list:
        print(url)
        http_client = AsyncHTTPClient()
        http_client.fetch(HTTPRequest(url), handle_response)
        count+=1 #計數加1

ioloop.IOLoop.current().add_callback(func)
ioloop.IOLoop.current().start()
Tornado

 

總結:

- 同步、異步調用:提交任務的方式

- 同步調用:即提交一個任務後就在原地等待任務結束,等到拿到任務的結果後再繼續下一行代碼,效率低下

- 異步調用:即提交一個任務後不等待任務的執行 直接進行下一個任務的提交 (數據返回方式爲與回調函數一同使用)

- 阻塞:一個進程運行的三個狀態之一 ,遇到IO 中間什麼活都沒幹 

- 進程有兩種狀況會中止:一種是運行時間過長或遇到優先級更高的,一種是遇到IO阻塞 

- 回調機制:一個任務執行完畢後會自動觸發他身上綁定的回調函數

- cpython中有一個叫全局解釋器鎖,做用是保證python垃圾回收機制線程的安全

- 協程:單線程下實現併發(一個任務運行完就切換)

- 併發:看起來同時運行 (切換加保存狀態)

- 多進程會消耗過多的系統資源
 
- 在單線程下 能夠監測IO 遇到IO切到下一個任務 以便提升單線程的效率

函數中實現協程的方案:定義多個函數 裏邊用yield(保存狀態)能夠切換會任務
知識點整理
相關文章
相關標籤/搜索