在編寫爬蟲時,性能的消耗主要在IO請求中,當單進程單線程模式下請求URL時必然會引發等待,從而使得請求總體變慢。html
1. 單線程串行python
# -*- coding: UTF-8 -*- import requests, time def be_async(url): print(url) res = requests.get(url) print(res) url_list = ["https://www.baidu.com", "http://github.com/"] a = time.time() # 起始時間 for url in url_list: be_async(url) b = time.time() # 結束時間 print("cost time: %s s"%(b-a))
2. 多線程react
# -*- coding: UTF-8 -*- from concurrent.futures import ThreadPoolExecutor import requests import time def be_async(url): print(url) res = requests.get(url) print(res) url_list = ["https://www.baidu.com", "http://github.com/"] pool = ThreadPoolExecutor(4) a = time.time() # 起始時間 for url in url_list: pool.submit(be_async, url) pool.shutdown(wait=True) b = time.time() # 結束時間 print("cost time: %s s"%(b-a))
注:python2裏沒有線程池,只有python3裏面有 git
+回調函數github
# -*- coding: UTF-8 -*- import requests, time from concurrent.futures import ThreadPoolExecutor def be_async(url): print(url) res = requests.get(url) print(res) url_list = ["https://www.baidu.com", "http://github.com/"] def callback(future): print(future.result) pool = ThreadPoolExecutor(4) a = time.time() # 起始時間 for url in url_list: r = pool.submit(be_async, url) r.add_done_callback(callback) pool.shutdown(wait=True) b = time.time() # 結束時間 print("cost time: %s s"%(b-a))
3. 多進程web
# -*- coding: UTF-8 -*- from concurrent.futures import ProcessPoolExecutor import requests import time def be_async(url): print(url) res = requests.get(url) print(res) url_list = ["https://www.baidu.com", "http://github.com/"] if __name__ == '__main__': pool = ProcessPoolExecutor(4) a = time.time() # 起始時間 for url in url_list: pool.submit(be_async, url) pool.shutdown(wait=True) b = time.time() # 結束時間 print("cost time: %s s"%(b-a))
+回調函數json
# -*- coding: UTF-8 -*- import requests, time from concurrent.futures import ProcessPoolExecutor def be_async(url): print(url) res = requests.get(url) print(res) url_list = ["https://www.baidu.com", "http://github.com/"] def callback(future): print(future.result) if __name__ == '__main__': pool = ProcessPoolExecutor(4) a = time.time() # 起始時間 for url in url_list: r = pool.submit(be_async, url) r.add_done_callback(callback) pool.shutdown(wait=True) b = time.time() # 結束時間 print("cost time: %s s"%(b-a))
經過上述代碼都可以完成對請求性能的提升,對於多線程和多進行的缺點是在IO阻塞時會形成了線程和進程的浪費,因此異步會是首選:cookie
1. asyncio 示例,該模塊只能發送tcp協議的請求,python3.3以後纔有多線程
原理:app
代碼:
# -*- coding: UTF-8 -*- import asyncio @asyncio.coroutine def be_async(host, url = "/"): print(host, url) reader,writer = yield from asyncio.open_connection(host, 80) req_header = '''GET %s HTTP/1.0\r\nHost: %s\r\n\r\n'''%(url, host) req_header = req_header.encode(encoding="utf-8") writer.write(req_header) yield from writer.drain() text = yield from reader.read() print(host, url, text) writer.close() tasks = [ be_async("www.cnblogs.com", url="/gaosy-math"), be_async("github.com", url="/gaoshao52") ] loop = asyncio.get_event_loop() results = loop.run_until_complete(asyncio.gather(*tasks)) loop.close()
知識點:僞造http請求
GET: GET <url地址> HTTP/1.0\r\nHost: <ip或主機名>\r\n\r\n
POST: POST <url地址> HTTP/1.0\r\nHost: <ip或主機名>\r\n\r\n<請求體> 請求頭的數據是用\r\n隔開的,請求頭與請求體是用\r\n\r\n隔開的
格式: 請求方式 請求地址 協議 請求頭數據(用\r\n隔開) 請求體(k1=v1&k2=v2 格式)
例:基於socket實現http請求
# -*- coding: UTF-8 -*- import socket client = socket.socket() # 鏈接 client.connect(("www.51cto.com", 80)) # 發送請求 client.sendall(b'GET / HTTP/1.0\r\nHost: www.51cto.com\r\n\r\n') # 接受 res = client.recv(81960) print(res.decode("gbk")) client.close()
2. asyncio+aiohttp 示例
原理: 與上面的一致
代碼:
# -*- coding: UTF-8 -*- import asyncio import aiohttp @asyncio.coroutine def be_async(url): print(url) res = yield from aiohttp.request('GET', url) data = yield from res.read() print(url, data) res.close() tasks = [ be_async("https://github.com/gaoshao52"), be_async("https://www.cnblogs.com/gaosy-math") ] loop = asyncio.get_event_loop() results = loop.run_until_complete(asyncio.gather(*tasks)) loop.close()
注:python3運行有問題
3. asyncio+requests 示例
原理:一致
代碼:
# -*- coding: UTF-8 -*- import asyncio import requests @asyncio.coroutine def be_async(func, *args): loop = asyncio.get_event_loop() future = loop.run_in_executor(None, func, *args) res = yield from future print(res.url, res.text) tasks = [ be_async(requests.get, "https://github.com/gaoshao52"), be_async(requests.get, "https://www.cnblogs.com/gaosy-math") ] loop = asyncio.get_event_loop() results = loop.run_until_complete(asyncio.gather(*tasks)) loop.close()
4. gevent+requests 示例
原理:一致
代碼:
# -*- coding: UTF-8 -*- import gevent from gevent import monkey; monkey.patch_all() import requests def be_async(method, url ,req_kwargs): print(method, url, req_kwargs) res = requests.request(method=method, url=url, **req_kwargs) print(res.url, res.status_code) gevent.joinall([ gevent.spawn(be_async, method='GET', url="https://github.com/gaoshao52", req_kwargs={}), gevent.spawn(be_async, method='GET', url="https://www.cnblogs.com/gaosy-math", req_kwargs={}) ]) # ##### 發送請求(協程池控制最大協程數量) ##### # from gevent.pool import Pool # pool = Pool(None) # gevent.joinall([ # pool.spawn(be_async, method='GET', url="https://github.com/gaoshao52", req_kwargs={}), # pool.spawn(be_async, method='GET', url="https://www.cnblogs.com/gaosy-math", req_kwargs={}) # ])
monkey.patch_all() 會把原來的socket 設置成非阻塞
5. grequests 示例
原理:gevent+requests的封裝
代碼:
# -*- coding: UTF-8 -*- import grequests request_list = [ grequests.get("http://www.runoob.com/python/python-json.html"), grequests.get("https://www.git-scm.com/download/win") ] ##### 執行並獲取響應列表 ##### # response_list = grequests.map(request_list) # print(response_list) # ##### 執行並獲取響應列表(處理異常) ##### def exception_handler(request, exception): print(request,exception) print("Request failed") response_list = grequests.map(request_list, exception_handler=exception_handler) print(response_list)
6. twisted 示例
原理:
代碼:
# -*- coding: UTF-8 -*- from twisted.web.client import getPage, defer from twisted.internet import reactor def all_done(arg): reactor.stop() def callback(contents): print(contents.decode("utf-8")) deferred_list = [] url_list = ['http://www.bing.com', 'http://www.baidu.com', ] for url in url_list: deferred = getPage(url.encode(encoding="utf-8")) deferred.addCallback(callback) deferred_list.append(deferred) dlist = defer.DeferredList(deferred_list) dlist.addBoth(all_done) reactor.run()
# -*- coding: UTF-8 -*- from twisted.internet import reactor from twisted.web.client import getPage import urllib.parse def one_done(arg): # arg 是二進制 print(arg.decode("utf-8")) reactor.stop() post_data = urllib.parse.urlencode({'check_data': 'adf'}) post_data = post_data.encode(encoding="utf-8") headers = {b'Content-Type': b'application/x-www-form-urlencoded'} response = getPage('http://dig.chouti.com/login'.encode(encoding="utf-8"), method='POST'.encode(encoding="utf-8"), postdata=post_data, cookies={}, headers=headers) response.addBoth(one_done) reactor.run()
Twisted模塊安裝:
1. https://www.lfd.uci.edu/~gohlke/pythonlibs/ 下載
2. pip install Twisted-18.7.0-cp36-cp36m-win_amd64.whl
7.tornado 示例
原理:與twisted一致
代碼:
# -*- coding: UTF-8 -*- from tornado.httpclient import AsyncHTTPClient from tornado.httpclient import HTTPRequest from tornado import ioloop def handle_response(response): """ 處理返回值內容(須要維護計數器,來中止IO循環),調用 ioloop.IOLoop.current().stop() :param response: :return: """ if response.error: print("Error:", response.error) else: print(response.body) def func(): url_list = [ 'http://www.baidu.com', 'http://www.bing.com', ] for url in url_list: print(url) http_client = AsyncHTTPClient() http_client.fetch(HTTPRequest(url), handle_response) ioloop.IOLoop.current().add_callback(func) ioloop.IOLoop.current().start()
8. 自定義一個異步非阻塞模塊,與twisted tornado原理一致
# -*- coding: UTF-8 -*- import socket, select class Request(object): def __init__(self, sock, info): self.sock = sock self.info = info def fileno(self): return self.sock.fileno() class MyName(object): def __init__(self): self.sock_list = [] self.conns = [] def add_request(self, req_info): ''' 建立socket鏈接 :param req_info: :return: ''' client = socket.socket() client.setblocking(False) try: client.connect((req_info.get("host"), req_info.get("port"))) except BlockingIOError as e: pass obj = Request(client, req_info) self.sock_list.append(obj) self.conns.append(obj) def run(self): ''' 開始事件循環,檢測鏈接成功?是否有數據返回? :return: ''' while True: # select.select([sock對象]) # 能夠是任何對象,對象必定要有fileno方法 # select 內部會執行 對象.fileno() 方法 # select.select([Request對象]) r, w, e = select.select(self.sock_list, self.conns, [], 0.05) # 每隔0.05秒檢測是否有變化 # w, 是否鏈接成功 for obj in w: # 檢查obj:request對象 data = "GET {} HTTP/1.0\r\nHost: {}\r\n\r\n".format(obj.info['path'], obj.info['host']) obj.sock.send(data.encode("utf-8")) self.conns.remove(obj) # # 數據返回,接受 for obj in r: res = obj.sock.recv(8096) print(obj.info['host']) obj.info['callback'](res) self.sock_list.remove(obj) # 全部的請求已經返回 if not self.sock_list: break def done01(res): print(res) def done02(res): print(res) url_list = [ {"host": "www.baidu.com", "port": 80 , "path": "/", "callback": done01}, {"host": "www.51cto.com", "port": 80, "path": "/", "callback": done02}, {"host": "www.cnblogs.com", "port": 80, "path": "/alex3714/articles/5885096.html", "callback": done01}, ] my_name = MyName() for item in url_list: my_name.add_request(item) my_name.run()
本文參考這裏