在編寫爬蟲時,性能的消耗主要在IO請求中,當單進程單線程模式下請求URL時必然會引發等待,從而使得請求總體變慢
1.利用線程實現併發(io密集型用--http請求是io密集型)
線程開多了性能下降,線程上下文切換耗時多,能夠實現併發,可是,請求發送出去後和返回以前,中間時期線程空閒
(1)編寫方式一(多線程直接返回處理)python
from concurrent.futures import ThreadPoolExecutor #線程池 import requests #線程函數,把任務url放進來執行 def task(url): response = requests.get(url) print(url,response) pool = ThreadPoolExecutor(7) #聲明一個線程池 url_list = [ 'http://www.cnblogs.com/', 'http://huaban.com/favorite/beauty/', 'http://www.bing.com', 'http://www.zhihu.com', 'http://www.sina.com', 'http://www.baidu.com', 'http://www.autohome.com.cn', ] for url in url_list: pool.submit(task,url) #把請求都放到調用線程函數裏 pool.shutdown(wait=True)
(2)編寫方式二(多線程+回調函數處理)
第一個請求來第一個url過來執行task下載任務,當下載頁面完成以後會有response,返回以後會執行add_done_callback的done方法,done方法裏的參數就有task函數執行後返回過來的值git
from concurrent.futures import ThreadPoolExecutor #線程池 import requests #線程函數,把任務url放進來下載頁面 def task(url): response = requests.get(url) return response #執行函數執行task方法返回過來的值 def done(future,*args,**kwargs): response = future.result() #result是task執行後返回過來的結果 print(response.status_code,response.content) pool = ThreadPoolExecutor(7) #聲明一個線程池 url_list = [ 'http://www.cnblogs.com/', 'http://huaban.com/favorite/beauty/', 'http://www.bing.com', 'http://www.zhihu.com', 'http://www.sina.com', 'http://www.baidu.com', 'http://www.autohome.com.cn', ] for url in url_list: v = pool.submit(task,url) #把請求url都放到調用線程函數task裏接收返回值賦值v v.add_done_callback(done) #v.add_done_callback執行done函數 pool.shutdown(wait=True)
2.利用進程實現併發(計算密集型用進程)
能夠實現併發,可是,請求發送出去後和返回以前,中間時期進程空閒
(1)編寫方式一(多進程接返回處理)github
from concurrent.futures import ProcessPoolExecutor #進程池 import requests #進程函數,把任務url放進來執行 def task(url): response = requests.get(url) print(url,response) pool = ProcessPoolExecutor(2) #聲明一個進程池 url_list = [ 'http://www.cnblogs.com/', 'http://huaban.com/favorite/beauty/', 'http://www.bing.com', 'http://www.zhihu.com', 'http://www.sina.com', 'http://www.baidu.com', 'http://www.autohome.com.cn', ] for url in url_list: pool.submit(task,url) #把請求都放到調用線程函數裏 pool.shutdown(wait=True)
(2)編寫方式二(多進程+回調函數處理)數據庫
''' 第一個請求來第一個url過來執行task下載任務,當下載頁面完成以後會有response,返回以後會執行add_done_callback的done方法, done方法裏的參數就有task函數執行後返回過來的值 ''' from concurrent.futures import ProcessPoolExecutor #線程池 import requests #線程函數,把任務url放進來下載頁面 def task(url): response = requests.get(url) return response #執行函數執行task方法返回過來的值 def done(future,*args,**kwargs): response = future.result() #result是task執行後返回過來的結果 print(response.status_code,response.content) pool = ProcessPoolExecutor(7) #聲明一個線程池 url_list = [ 'http://www.cnblogs.com/', 'http://huaban.com/favorite/beauty/', 'http://www.bing.com', 'http://www.zhihu.com', 'http://www.sina.com', 'http://www.baidu.com', 'http://www.autohome.com.cn', ] for url in url_list: v = pool.submit(task,url) #把請求url都放到調用線程函數task裏接收返回值賦值v v.add_done_callback(done) #v.add_done_callback執行done函數 pool.shutdown(wait=True)
經過上述代碼都可以完成對請求性能的提升,對於多線程和多進行的缺點是在IO阻塞時會形成了線程和進程的浪費,因此異步IO會是首選:
(1)asyncio模塊
方式一:asyncio只支持TCP請求,不支持Http請求服務器
import asyncio #單線程執行兩個任務 #task任務 @asyncio.coroutine def task(): print('before...task......') #先執行這一句 yield from asyncio.sleep(5) #等待五秒 print('end...task......') # tasks = [task(), task()] #列表定義兩個任務 #把兩個任務放到這裏 loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.gather(*tasks)) loop.close()
方式二:asyncio經過本身封裝http數據包一個線程完成異步io操做,支持Http請求多線程
import asyncio @asyncio.coroutine def task(host, url='/'): print('開始請求',host,url) reader, writer = yield from asyncio.open_connection(host, 80) #建立鏈接 request_header_content = "GET %s HTTP/1.0\r\nHost: %s\r\n\r\n" % (url, host,) #http請求格式 request_header_content = bytes(request_header_content, encoding='utf-8') writer.write(request_header_content) yield from writer.drain() text = yield from reader.read() print('獲取結果',host, url, text) writer.close() tasks = [ task('www.cnblogs.com', '/xixi/'), task('dig.chouti.com', '/pic/show?nid=4073644713430508&lid=10273091') ] loop = asyncio.get_event_loop() results = loop.run_until_complete(asyncio.gather(*tasks)) loop.close()
方式三:asyncio+aiohttp
安裝pip3 install aiohttp併發
import aiohttp #aiohttp模塊:封裝Http數據包 import asyncio #異步功能 @asyncio.coroutine def fetch_async(url): print(url) response = yield from aiohttp.request('GET', url) #用aiohttp去執行 print(url, response) response.close() tasks = [fetch_async('http://www.baidu.com/'), fetch_async('http://www.sina.com/')] event_loop = asyncio.get_event_loop() results = event_loop.run_until_complete(asyncio.gather(*tasks)) event_loop.close()
方式四:asyncio+requests
安裝pip3 install requestsapp
import asyncio import requests #requests模塊:封裝Http數據包 @asyncio.coroutine def task(func, *args): print(func,args) loop = asyncio.get_event_loop() future = loop.run_in_executor(None, func, *args) #執行requests.get('http://www.cnblogs.com/xixi/') response = yield from future print(response.url, response.content) #tasks列表裏的函數做爲參數傳到task函數裏func參數 tasks = [ task(requests.get, 'http://www.cnblogs.com/xixi/'), task(requests.get, 'http://dig.chouti.com/pic/show?nid=4073644713430508&lid=10273091') ] loop = asyncio.get_event_loop() results = loop.run_until_complete(asyncio.gather(*tasks)) loop.close()
(2)gevent模塊:
方式一:gevent依賴greenlet協程模塊+異步IO
安裝pip3 install greenlet
安裝pip3 install gevent異步
import gevent import requests from gevent import monkey monkey.patch_all() #把內部找到全部原來socket變爲異步IO的socket def task(method, url, req_kwargs): print(method, url, req_kwargs) response = requests.request(method=method, url=url, **req_kwargs) #封裝 print(response.url, response.content) #發送請求 gevent.joinall([ gevent.spawn(task, method='get', url='https://www.python.org/', req_kwargs={}), gevent.spawn(task, method='get', url='https://www.yahoo.com/', req_kwargs={}), gevent.spawn(task, method='get', url='https://github.com/', req_kwargs={}), ])
方式二:gevent(協程池,最多發多少個請求)+requests
安裝pip3 install greenlet
安裝pip3 install geventsocket
import gevent import requests from gevent import monkey monkey.patch_all() #把內部找到全部原來socket變爲異步IO的socket def task(method, url, req_kwargs): print(method, url, req_kwargs) response = requests.request(method=method, url=url, **req_kwargs) #封裝 print(response.url, response.content) #發送請求(協程池控制最大協程數量) from gevent.pool import Pool pool = Pool(5) #最多向遠程發5個 gevent.joinall([ pool.spawn(task, method='get', url='https://www.python.org/', req_kwargs={}), pool.spawn(task, method='get', url='https://www.yahoo.com/', req_kwargs={}), pool.spawn(task, method='get', url='https://www.github.com/', req_kwargs={}), ])
方式三:
安裝pip3 install grequests
import grequests #裏面幫助下載執行 request_list = [ grequests.get('https://www.python.org', timeout=0.001), grequests.get('http://www.baidu.com/'), grequests.get('http://httpbin.org/status/500') ] #執行並獲取響應列表 response_list = grequests.map(request_list,size=5) print(response_list)
(3)Twisted
(4)Tornado
from tornado.httpclient import AsyncHTTPClient from tornado.httpclient import HTTPRequest from tornado import ioloop COUNT = 0 def handle_response(response): global COUNT COUNT -= 1 if response.error: print("Error:", response.error) else: print(response.body) # 方法同twisted # ioloop.IOLoop.current().stop() if COUNT == 0: ioloop.IOLoop.current().stop() def func(): url_list = [ 'http://www.baidu.com', 'http://www.bing.com', ] global COUNT COUNT = len(url_list) for url in url_list: print(url) http_client = AsyncHTTPClient() http_client.fetch(HTTPRequest(url), handle_response) #回調函數 ioloop.IOLoop.current().add_callback(func) ioloop.IOLoop.current().start() # 死循環
以上均是Python內置以及第三方模塊提供異步IO請求模塊,使用簡便大大提升效率,而對於異步IO請求的本質則是【非阻塞Socket】+【IO多路複用】
3.自定義異步IO模塊(自定義socket客戶端)
1)標準HTTP請求本質,阻塞
import socket sk = socket.socket() #1.鏈接 sk.connect(('www.baidu.com',80,)) #IO阻塞 print('鏈接成功了...') #2.鏈接成功發送HTTP這種格式的數據(響應頭和響應體用兩個換行分割) #sk.send(b'GET / HTTP/1.0\r\nHost:www.baidu.com\r\n\r\n') #GET請求 sk.send(b'POST / HTTP/1.0\r\nHost:www.baidu.com\r\n\r\nk1=v1&k2=v2') #POST請求 #3.等待着服務端響應 data = sk.recv(8096) #IO阻塞 print(data) #4.關閉鏈接 sk.close()
2)HTTP請求本質,非阻塞
import socket sk = socket.socket() sk.setblocking(False) #設置非阻塞 #1.鏈接 try: sk.connect(('www.baidu.com',80,)) #IO阻塞 print('鏈接成功了...') except BlockingIOError as e: print(e) #2.鏈接成功發送消息 sk.send(b'GET / HTTP/1.0\r\nHost:www.baidu.com\r\n\r\n') #sk.send(b'POST / HTTP/1.0\r\nHost:www.baidu.com\r\n\r\nk1=v1&k2=v2') #3. 等待着服務端響應 data = sk.recv(8096) #IO阻塞 print(data) #關閉鏈接 sk.close()
3)自定義非阻塞select+socket完成異步IO,經過一個線程向不少地方把請求發出去
import socket import select #監聽多個socket對象 #定義HttpRequest類封裝了socket和主機名字 class HttpRequest: def __init__(self,sk,host,callback): self.socket = sk self.host = host self.callback = callback def fileno(self): return self.socket.fileno() #定義HttpResponse類分割請求頭請求體 class HttpResponse: def __init__(self,recv_data): self.recv_data = recv_data self.header_dict = {} self.body = None #執行initialize函數進行請求體請求頭分割 self.initialize() def initialize(self): headers, body = self.recv_data.split(b'\r\n\r\n', 1) self.body = body header_list = headers.split(b'\r\n') for h in header_list: h_str = str(h,encoding='utf-8') v = h_str.split(':',1) if len(v) == 2: self.header_dict[v[0]] = v[1] #定義一個類AsyncRequest class AsyncRequest: def __init__(self): self.conn = [] #conn等於空列表,有值表明接收數據,所有接收完清空跳出循環 self.connection = [] #connection等於空列表,有值表明沒鏈接成功,每鏈接成功一個刪除一個值,用於檢測是否已經鏈接成功, #發送鏈接請求 def add_request(self,host,callback): #host,callback是每一個主機名的回調函數 try: sk = socket.socket() #建立socket對象進行鏈接 sk.setblocking(0) #設置非阻塞 sk.connect((host,80,)) #ip+默認端口(向某個地址發請求) except BlockingIOError as e: pass request = HttpRequest(sk,host,callback) #建立request對象序列化HttpRequest類把socket和主機名字加進去 self.conn.append(request) #把request放到conn self.connection.append(request) #把request放到connection #鏈接成功發送消息:執行run以前conn和connection已經有HttpRequest對象,這個對象裏有socket和主機名字 def run(self): while True: rlist,wlist,elist = select.select(self.conn,self.connection,self.conn,0.05) #死循環把conn和connection放進去 #每個w是HttpRequest對象,只要能循環到,表示socket和服務器端已經鏈接成功 for w in wlist: print(w.host,'鏈接成功...') #發送數據 tpl = "GET / HTTP/1.0\r\nHost:%s\r\n\r\n" %(w.host,) w.socket.send(bytes(tpl,encoding='utf-8')) self.connection.remove(w) #從connection裏把w刪除掉 #r是HttpRequest對象,等待接收返回值,r裏有conn和主機名 for r in rlist: recv_data = bytes() #recv_data默認等於空的字節 #循環一直接收返回數據 while True: try: chunck = r.socket.recv(8096) #接收返回數據 recv_data += chunck #每接收一次讓recv_data加等於一個chunck except Exception as e: #若是出異常表明接收完了跳出循環 break #print(r.host,recv_data) #recv_data是返回的數據 response = HttpResponse(recv_data) #建立response對象實例化HttpResponse類把recv_data傳進去 r.callback(response) #接收response的返回 r.socket.close() #終止http請求 self.conn.remove(r) #select不須要監聽請求發沒發 if len(self.conn) == 0: #判斷conn是否等於0,若是等於0表明所有執行完跳出循環 break def f1(response): print('保存到文件',response.header_dict) def f2(response): print('保存到數據庫', response.header_dict) #定義列表 url_list = [ {'host':'www.baidu.com','callback': f1}, #給host用f1函數 {'host':'cn.bing.com','callback': f2}, #給host用f2函數 {'host':'www.sina.com','callback': f2}, #給host用f2函數 ] req = AsyncRequest() #建立req對象實例化AsyncRequest()把url_list全部元素加進去 for item in url_list: #循環url_list這個列表 req.add_request(item['host'],item['callback']) #add_request把列表內容傳到AsyncRequest()裏,會爲每個創造一個socket對象進行鏈接 req.run()