爬蟲性能相關併發方案

在編寫爬蟲時,性能的消耗主要在IO請求中,當單進程單線程模式下請求URL時必然會引發等待,從而使得請求總體變慢
1.利用線程實現併發(io密集型用--http請求是io密集型)
線程開多了性能下降,線程上下文切換耗時多,能夠實現併發,可是,請求發送出去後和返回以前,中間時期線程空閒
(1)編寫方式一(多線程直接返回處理)python

from concurrent.futures import ThreadPoolExecutor   #線程池
import requests #線程函數,把任務url放進來執行
def task(url): response = requests.get(url) print(url,response) pool = ThreadPoolExecutor(7)                          #聲明一個線程池
url_list = [ 'http://www.cnblogs.com/', 'http://huaban.com/favorite/beauty/', 'http://www.bing.com', 'http://www.zhihu.com', 'http://www.sina.com', 'http://www.baidu.com', 'http://www.autohome.com.cn', ] for url in url_list: pool.submit(task,url) #把請求都放到調用線程函數裏
 pool.shutdown(wait=True)

(2)編寫方式二(多線程+回調函數處理)
第一個請求來第一個url過來執行task下載任務,當下載頁面完成以後會有response,返回以後會執行add_done_callback的done方法,done方法裏的參數就有task函數執行後返回過來的值git

from concurrent.futures import ThreadPoolExecutor    #線程池
import requests #線程函數,把任務url放進來下載頁面
def task(url): response = requests.get(url) return response #執行函數執行task方法返回過來的值
def done(future,*args,**kwargs): response = future.result()                      #result是task執行後返回過來的結果
    print(response.status_code,response.content) pool = ThreadPoolExecutor(7)                         #聲明一個線程池
url_list = [ 'http://www.cnblogs.com/', 'http://huaban.com/favorite/beauty/', 'http://www.bing.com', 'http://www.zhihu.com', 'http://www.sina.com', 'http://www.baidu.com', 'http://www.autohome.com.cn', ] for url in url_list: v = pool.submit(task,url)                        #把請求url都放到調用線程函數task裏接收返回值賦值v
    v.add_done_callback(done)                        #v.add_done_callback執行done函數
 pool.shutdown(wait=True)

2.利用進程實現併發(計算密集型用進程)
能夠實現併發,可是,請求發送出去後和返回以前,中間時期進程空閒
(1)編寫方式一(多進程接返回處理)github

from concurrent.futures import ProcessPoolExecutor   #進程池
import requests #進程函數,把任務url放進來執行
def task(url): response = requests.get(url) print(url,response) pool = ProcessPoolExecutor(2)                          #聲明一個進程池
url_list = [ 'http://www.cnblogs.com/', 'http://huaban.com/favorite/beauty/', 'http://www.bing.com', 'http://www.zhihu.com', 'http://www.sina.com', 'http://www.baidu.com', 'http://www.autohome.com.cn', ] for url in url_list: pool.submit(task,url) #把請求都放到調用線程函數裏
 pool.shutdown(wait=True)

(2)編寫方式二(多進程+回調函數處理)數據庫

''' 第一個請求來第一個url過來執行task下載任務,當下載頁面完成以後會有response,返回以後會執行add_done_callback的done方法, done方法裏的參數就有task函數執行後返回過來的值 '''

from concurrent.futures import ProcessPoolExecutor    #線程池
import requests #線程函數,把任務url放進來下載頁面
def task(url): response = requests.get(url) return response #執行函數執行task方法返回過來的值
def done(future,*args,**kwargs): response = future.result()                      #result是task執行後返回過來的結果
    print(response.status_code,response.content) pool = ProcessPoolExecutor(7)                         #聲明一個線程池
url_list = [ 'http://www.cnblogs.com/', 'http://huaban.com/favorite/beauty/', 'http://www.bing.com', 'http://www.zhihu.com', 'http://www.sina.com', 'http://www.baidu.com', 'http://www.autohome.com.cn', ] for url in url_list: v = pool.submit(task,url)                        #把請求url都放到調用線程函數task裏接收返回值賦值v
    v.add_done_callback(done)                        #v.add_done_callback執行done函數
 pool.shutdown(wait=True)

經過上述代碼都可以完成對請求性能的提升,對於多線程和多進行的缺點是在IO阻塞時會形成了線程和進程的浪費,因此異步IO會是首選:
(1)asyncio模塊
方式一:asyncio只支持TCP請求,不支持Http請求服務器

import asyncio #單線程執行兩個任務

#task任務
@asyncio.coroutine def task(): print('before...task......')             #先執行這一句
    yield from asyncio.sleep(5)                #等待五秒
    print('end...task......')                #  tasks = [task(), task()]                        #列表定義兩個任務

#把兩個任務放到這裏
loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.gather(*tasks)) loop.close()

方式二:asyncio經過本身封裝http數據包一個線程完成異步io操做,支持Http請求多線程

import asyncio @asyncio.coroutine def task(host, url='/'): print('開始請求',host,url) reader, writer = yield from asyncio.open_connection(host, 80)                        #建立鏈接
 request_header_content = "GET %s HTTP/1.0\r\nHost: %s\r\n\r\n" % (url, host,)   #http請求格式
    request_header_content = bytes(request_header_content, encoding='utf-8') writer.write(request_header_content) yield from writer.drain() text = yield from reader.read() print('獲取結果',host, url, text) writer.close() tasks = [ task('www.cnblogs.com', '/xixi/'), task('dig.chouti.com', '/pic/show?nid=4073644713430508&lid=10273091') ] loop = asyncio.get_event_loop() results = loop.run_until_complete(asyncio.gather(*tasks)) loop.close()

方式三:asyncio+aiohttp
安裝pip3 install aiohttp併發

import aiohttp                      #aiohttp模塊:封裝Http數據包
import asyncio                      #異步功能
 @asyncio.coroutine def fetch_async(url): print(url) response = yield from aiohttp.request('GET', url)                  #用aiohttp去執行
    print(url, response) response.close() tasks = [fetch_async('http://www.baidu.com/'), fetch_async('http://www.sina.com/')] event_loop = asyncio.get_event_loop() results = event_loop.run_until_complete(asyncio.gather(*tasks)) event_loop.close()

方式四:asyncio+requests
安裝pip3 install requestsapp

import asyncio import requests                            #requests模塊:封裝Http數據包
 @asyncio.coroutine def task(func, *args): print(func,args) loop = asyncio.get_event_loop() future = loop.run_in_executor(None, func, *args)    #執行requests.get('http://www.cnblogs.com/xixi/')
    response = yield from future print(response.url, response.content) #tasks列表裏的函數做爲參數傳到task函數裏func參數
tasks = [ task(requests.get, 'http://www.cnblogs.com/xixi/'), task(requests.get, 'http://dig.chouti.com/pic/show?nid=4073644713430508&lid=10273091') ] loop = asyncio.get_event_loop() results = loop.run_until_complete(asyncio.gather(*tasks)) loop.close()

(2)gevent模塊:
方式一:gevent依賴greenlet協程模塊+異步IO
安裝pip3 install greenlet
安裝pip3 install gevent異步

import gevent import requests from gevent import monkey monkey.patch_all() #把內部找到全部原來socket變爲異步IO的socket

def task(method, url, req_kwargs): print(method, url, req_kwargs) response = requests.request(method=method, url=url, **req_kwargs)    #封裝
    print(response.url, response.content) #發送請求
gevent.joinall([ gevent.spawn(task, method='get', url='https://www.python.org/', req_kwargs={}), gevent.spawn(task, method='get', url='https://www.yahoo.com/', req_kwargs={}), gevent.spawn(task, method='get', url='https://github.com/', req_kwargs={}), ])

方式二:gevent(協程池,最多發多少個請求)+requests
安裝pip3 install greenlet
安裝pip3 install geventsocket

import gevent import requests from gevent import monkey monkey.patch_all() #把內部找到全部原來socket變爲異步IO的socket

def task(method, url, req_kwargs): print(method, url, req_kwargs) response = requests.request(method=method, url=url, **req_kwargs)    #封裝
    print(response.url, response.content) #發送請求(協程池控制最大協程數量)
from gevent.pool import Pool pool = Pool(5)                                                           #最多向遠程發5個
gevent.joinall([ pool.spawn(task, method='get', url='https://www.python.org/', req_kwargs={}), pool.spawn(task, method='get', url='https://www.yahoo.com/', req_kwargs={}), pool.spawn(task, method='get', url='https://www.github.com/', req_kwargs={}), ])

方式三:
安裝pip3 install grequests

import grequests #裏面幫助下載執行
request_list = [ grequests.get('https://www.python.org', timeout=0.001), grequests.get('http://www.baidu.com/'), grequests.get('http://httpbin.org/status/500') ] #執行並獲取響應列表
response_list = grequests.map(request_list,size=5) print(response_list)

(3)Twisted
(4)Tornado

from tornado.httpclient import AsyncHTTPClient from tornado.httpclient import HTTPRequest from tornado import ioloop COUNT = 0 def handle_response(response): global COUNT COUNT -= 1
    if response.error: print("Error:", response.error) else: print(response.body) # 方法同twisted
        # ioloop.IOLoop.current().stop()
    if COUNT == 0: ioloop.IOLoop.current().stop() def func(): url_list = [ 'http://www.baidu.com', 'http://www.bing.com', ] global COUNT COUNT = len(url_list) for url in url_list: print(url) http_client = AsyncHTTPClient() http_client.fetch(HTTPRequest(url), handle_response) #回調函數
 ioloop.IOLoop.current().add_callback(func) ioloop.IOLoop.current().start() # 死循環

以上均是Python內置以及第三方模塊提供異步IO請求模塊,使用簡便大大提升效率,而對於異步IO請求的本質則是【非阻塞Socket】+【IO多路複用】
3.自定義異步IO模塊(自定義socket客戶端)
1)標準HTTP請求本質,阻塞

import socket sk = socket.socket() #1.鏈接
sk.connect(('www.baidu.com',80,))                                   #IO阻塞
print('鏈接成功了...') #2.鏈接成功發送HTTP這種格式的數據(響應頭和響應體用兩個換行分割) #sk.send(b'GET / HTTP/1.0\r\nHost:www.baidu.com\r\n\r\n') #GET請求
sk.send(b'POST / HTTP/1.0\r\nHost:www.baidu.com\r\n\r\nk1=v1&k2=v2')      #POST請求

#3.等待着服務端響應
data = sk.recv(8096)                                                  #IO阻塞
print(data) #4.關閉鏈接
sk.close()

2)HTTP請求本質,非阻塞

import socket sk = socket.socket() sk.setblocking(False) #設置非阻塞 #1.鏈接
try: sk.connect(('www.baidu.com',80,)) #IO阻塞
    print('鏈接成功了...') except BlockingIOError as e: print(e) #2.鏈接成功發送消息
sk.send(b'GET / HTTP/1.0\r\nHost:www.baidu.com\r\n\r\n') #sk.send(b'POST / HTTP/1.0\r\nHost:www.baidu.com\r\n\r\nk1=v1&k2=v2')

#3. 等待着服務端響應
data = sk.recv(8096)                    #IO阻塞
print(data) #關閉鏈接
sk.close()

3)自定義非阻塞select+socket完成異步IO,經過一個線程向不少地方把請求發出去

import socket import select                                 #監聽多個socket對象

#定義HttpRequest類封裝了socket和主機名字
class HttpRequest: def __init__(self,sk,host,callback): self.socket = sk self.host = host self.callback = callback def fileno(self): return self.socket.fileno() #定義HttpResponse類分割請求頭請求體
class HttpResponse: def __init__(self,recv_data): self.recv_data = recv_data self.header_dict = {} self.body = None #執行initialize函數進行請求體請求頭分割
 self.initialize() def initialize(self): headers, body = self.recv_data.split(b'\r\n\r\n', 1) self.body = body header_list = headers.split(b'\r\n') for h in header_list: h_str = str(h,encoding='utf-8') v = h_str.split(':',1) if len(v) == 2: self.header_dict[v[0]] = v[1] #定義一個類AsyncRequest
class AsyncRequest: def __init__(self): self.conn = []                      #conn等於空列表,有值表明接收數據,所有接收完清空跳出循環
        self.connection = []                #connection等於空列表,有值表明沒鏈接成功,每鏈接成功一個刪除一個值,用於檢測是否已經鏈接成功,

    #發送鏈接請求
    def add_request(self,host,callback):        #host,callback是每一個主機名的回調函數
        try: sk = socket.socket()                 #建立socket對象進行鏈接
            sk.setblocking(0)                    #設置非阻塞
            sk.connect((host,80,))               #ip+默認端口(向某個地址發請求)
        except BlockingIOError as e: pass request = HttpRequest(sk,host,callback)  #建立request對象序列化HttpRequest類把socket和主機名字加進去
        self.conn.append(request)                #把request放到conn
        self.connection.append(request)          #把request放到connection

    #鏈接成功發送消息:執行run以前conn和connection已經有HttpRequest對象,這個對象裏有socket和主機名字
    def run(self): while True: rlist,wlist,elist = select.select(self.conn,self.connection,self.conn,0.05)   #死循環把conn和connection放進去
            #每個w是HttpRequest對象,只要能循環到,表示socket和服務器端已經鏈接成功
            for w in wlist: print(w.host,'鏈接成功...') #發送數據
                tpl = "GET / HTTP/1.0\r\nHost:%s\r\n\r\n" %(w.host,) w.socket.send(bytes(tpl,encoding='utf-8')) self.connection.remove(w) #從connection裏把w刪除掉
            #r是HttpRequest對象,等待接收返回值,r裏有conn和主機名
            for r in rlist: recv_data = bytes()                                         #recv_data默認等於空的字節
                #循環一直接收返回數據
                while True: try: chunck = r.socket.recv(8096)                        #接收返回數據
                        recv_data += chunck                                 #每接收一次讓recv_data加等於一個chunck
                    except Exception as e:                                  #若是出異常表明接收完了跳出循環
                        break
                #print(r.host,recv_data) #recv_data是返回的數據
                response = HttpResponse(recv_data)                          #建立response對象實例化HttpResponse類把recv_data傳進去
                r.callback(response)                                        #接收response的返回
                r.socket.close()                                            #終止http請求
                self.conn.remove(r)                                         #select不須要監聽請求發沒發
            if len(self.conn) == 0:                                         #判斷conn是否等於0,若是等於0表明所有執行完跳出循環
                break

def f1(response): print('保存到文件',response.header_dict) def f2(response): print('保存到數據庫', response.header_dict) #定義列表
url_list = [ {'host':'www.baidu.com','callback': f1},        #給host用f1函數
    {'host':'cn.bing.com','callback': f2},          #給host用f2函數
    {'host':'www.sina.com','callback': f2},         #給host用f2函數
] req = AsyncRequest()                                  #建立req對象實例化AsyncRequest()把url_list全部元素加進去
for item in url_list:                                 #循環url_list這個列表
    req.add_request(item['host'],item['callback'])    #add_request把列表內容傳到AsyncRequest()裏,會爲每個創造一個socket對象進行鏈接
 req.run()
相關文章
相關標籤/搜索