爬蟲必備—性能相關(異步非阻塞)

在編寫爬蟲時,性能的消耗主要在IO請求中,當單進程單線程模式下請求URL時必然會引發等待,從而使得請求總體變慢。html

1. 同步執行python

 1 import requests
 2 
 3 def fetch_async(url):
 4     response = requests.get(url)
 5     return response
 6 
 7 
 8 url_list = ['http://www.github.com', 'http://www.bing.com']
 9 
10 for url in url_list:
11     fetch_async(url)

2. 多線程執行(多個線程併發執行,時間長短取決於最長的URL請求)react

 1 from concurrent.futures import ThreadPoolExecutor
 2 import requests
 3 
 4 
 5 def fetch_async(url):
 6     response = requests.get(url)
 7     return response
 8 
 9 
10 url_list = ['http://www.github.com', 'http://www.bing.com']
11 pool = ThreadPoolExecutor(5)
12 for url in url_list:
13     pool.submit(fetch_async, url)
14 pool.shutdown(wait=True)

3.  多進程執行(在CPU核心數足夠的狀況下,多個進程並行執行,時間長短取決於最長的URL請求,理論上會快於多線程)git

 1 from concurrent.futures import ProcessPoolExecutor
 2 import requests
 3 
 4 def fetch_async(url):
 5     response = requests.get(url)
 6     return response
 7 
 8 
 9 url_list = ['http://www.github.com', 'http://www.bing.com']
10 pool = ProcessPoolExecutor(5)
11 for url in url_list:
12     pool.submit(fetch_async, url)
13 pool.shutdown(wait=True)

4. 多線程+回調函數(實現了異步非阻塞,在IO等待的狀況下能夠作其它事情)github

 1 from concurrent.futures import ThreadPoolExecutor
 2 import requests
 3 
 4 def fetch_async(url):
 5     response = requests.get(url)
 6     return response
 7 
 8 
 9 def callback(future):
10     print(future.result())
11 
12 
13 url_list = ['http://www.github.com', 'http://www.bing.com']
14 pool = ThreadPoolExecutor(5)
15 for url in url_list:
16     v = pool.submit(fetch_async, url)
17     v.add_done_callback(callback)
18 pool.shutdown(wait=True)

5. 多進程+回調函數(實現了異步非阻塞,在IO等待的狀況下能夠作其它事情)web

 1 from concurrent.futures import ProcessPoolExecutor
 2 import requests
 3 
 4 
 5 def fetch_async(url):
 6     response = requests.get(url)
 7     return response
 8 
 9 
10 def callback(future):
11     print(future.result())
12 
13 
14 url_list = ['http://www.github.com', 'http://www.bing.com']
15 pool = ProcessPoolExecutor(5)
16 for url in url_list:
17     v = pool.submit(fetch_async, url)
18     v.add_done_callback(callback)
19 pool.shutdown(wait=True)

經過上述代碼都可以完成對請求性能的提升,對於多線程和多進程的缺點是在IO阻塞時會形成了線程和進程的浪費,因此異步IO會是首選:服務器

1. asyncio示例一cookie

import asyncio


@asyncio.coroutine
def func1():
    print('before...func1......')
    yield from asyncio.sleep(5)
    print('end...func1......')


tasks = [func1(), func1()]

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.gather(*tasks))
loop.close()

2. asyncio示例二多線程

 1 import asyncio
 2 
 3 
 4 @asyncio.coroutine
 5 def fetch_async(host, url='/'):
 6     print(host, url)
 7     reader, writer = yield from asyncio.open_connection(host, 80)
 8 
 9     request_header_content = """GET %s HTTP/1.0\r\nHost: %s\r\n\r\n""" % (url, host,)
10     request_header_content = bytes(request_header_content, encoding='utf-8')
11 
12     writer.write(request_header_content)
13     yield from writer.drain()
14     text = yield from reader.read()
15     print(host, url, text)
16     writer.close()
17 
18 tasks = [
19     fetch_async('www.cnblogs.com', '/wupeiqi/'),
20     fetch_async('dig.chouti.com', '/pic/show?nid=4073644713430508&lid=10273091')
21 ]
22 
23 loop = asyncio.get_event_loop()
24 results = loop.run_until_complete(asyncio.gather(*tasks))
25 loop.close()

3. asyncio + aiohttp併發

 1 import aiohttp
 2 import asyncio
 3 
 4 
 5 @asyncio.coroutine
 6 def fetch_async(url):
 7     print(url)
 8     response = yield from aiohttp.request('GET', url)
 9     # data = yield from response.read()
10     # print(url, data)
11     print(url, response)
12     response.close()
13 
14 
15 tasks = [fetch_async('http://www.google.com/'), fetch_async('http://www.chouti.com/')]
16 
17 event_loop = asyncio.get_event_loop()
18 results = event_loop.run_until_complete(asyncio.gather(*tasks))
19 event_loop.close()

4. asyncio + requests

 1 import asyncio
 2 import requests
 3 
 4 
 5 @asyncio.coroutine
 6 def fetch_async(func, *args):
 7     loop = asyncio.get_event_loop()
 8     future = loop.run_in_executor(None, func, *args)
 9     response = yield from future
10     print(response.url, response.content)
11 
12 
13 tasks = [
14     fetch_async(requests.get, 'http://www.cnblogs.com/wupeiqi/'),
15     fetch_async(requests.get, 'http://dig.chouti.com/pic/show?nid=4073644713430508&lid=10273091')
16 ]
17 
18 loop = asyncio.get_event_loop()
19 results = loop.run_until_complete(asyncio.gather(*tasks))
20 loop.close()

5. gevent + requests

 1 import gevent
 2 
 3 import requests
 4 from gevent import monkey
 5 
 6 monkey.patch_all()
 7 
 8 
 9 def fetch_async(method, url, req_kwargs):
10     print(method, url, req_kwargs)
11     response = requests.request(method=method, url=url, **req_kwargs)
12     print(response.url, response.content)
13 
14 # ##### 發送請求 #####
15 gevent.joinall([
16     gevent.spawn(fetch_async, method='get', url='https://www.python.org/', req_kwargs={}),
17     gevent.spawn(fetch_async, method='get', url='https://www.yahoo.com/', req_kwargs={}),
18     gevent.spawn(fetch_async, method='get', url='https://github.com/', req_kwargs={}),
19 ])
20 
21 # ##### 發送請求(協程池控制最大協程數量) #####
22 # from gevent.pool import Pool
23 # pool = Pool(None)
24 # gevent.joinall([
25 #     pool.spawn(fetch_async, method='get', url='https://www.python.org/', req_kwargs={}),
26 #     pool.spawn(fetch_async, method='get', url='https://www.yahoo.com/', req_kwargs={}),
27 #     pool.spawn(fetch_async, method='get', url='https://www.github.com/', req_kwargs={}),
28 # ])

6. grequests(封裝的 gevent + requests)

 1 import grequests
 2 
 3 
 4 request_list = [
 5     grequests.get('http://httpbin.org/delay/1', timeout=0.001),
 6     grequests.get('http://fakedomain/'),
 7     grequests.get('http://httpbin.org/status/500')
 8 ]
 9 
10 
11 # ##### 執行並獲取響應列表 #####
12 # response_list = grequests.map(request_list)
13 # print(response_list)
14 
15 
16 # ##### 執行並獲取響應列表(處理異常) #####
17 # def exception_handler(request, exception):
18 # print(request,exception)
19 #     print("Request failed")
20 
21 # response_list = grequests.map(request_list, exception_handler=exception_handler)
22 # print(response_list)

7. Twisted示例

 1 from twisted.web.client import getPage, defer
 2 from twisted.internet import reactor
 3 
 4 
 5 def all_done(arg):
 6     reactor.stop()
 7 
 8 
 9 def callback(contents):
10     print(contents)
11 
12 
13 deferred_list = []
14 
15 url_list = ['http://www.bing.com', 'http://www.baidu.com', ]
16 for url in url_list:
17     deferred = getPage(bytes(url, encoding='utf8'))
18     deferred.addCallback(callback)
19     deferred_list.append(deferred)
20 
21 dlist = defer.DeferredList(deferred_list)
22 dlist.addBoth(all_done)
23 
24 reactor.run()

8. Tornado

 1 from tornado.httpclient import AsyncHTTPClient
 2 from tornado.httpclient import HTTPRequest
 3 from tornado import ioloop
 4 
 5 
 6 def handle_response(response):
 7     """
 8     處理返回值內容(須要維護計數器,來中止IO循環),調用 ioloop.IOLoop.current().stop()
 9     :param response: 
10     :return: 
11     """
12     if response.error:
13         print("Error:", response.error)
14     else:
15         print(response.body)
16 
17 
18 def func():
19     url_list = [
20         'http://www.baidu.com',
21         'http://www.bing.com',
22     ]
23     for url in url_list:
24         print(url)
25         http_client = AsyncHTTPClient()
26         http_client.fetch(HTTPRequest(url), handle_response)
27 
28 
29 ioloop.IOLoop.current().add_callback(func)
30 ioloop.IOLoop.current().start()

9. Twisted更多

 1 from twisted.internet import reactor
 2 from twisted.web.client import getPage
 3 import urllib.parse
 4 
 5 
 6 def one_done(arg):
 7     print(arg)
 8     reactor.stop()
 9 
10 post_data = urllib.parse.urlencode({'check_data': 'adf'})
11 post_data = bytes(post_data, encoding='utf8')
12 headers = {b'Content-Type': b'application/x-www-form-urlencoded'}
13 response = getPage(bytes('http://dig.chouti.com/login', encoding='utf8'),
14                    method=bytes('POST', encoding='utf8'),
15                    postdata=post_data,
16                    cookies={},
17                    headers=headers)
18 response.addBoth(one_done)
19 
20 reactor.run()

以上均是Python內置以及第三方模塊提供異步IO請求模塊,使用簡便大大提升效率,而對於異步IO請求的本質則是【非阻塞Socket】+【IO多路複用】:

  1 import select
  2 import socket
  3 import time
  4 
  5 
  6 class AsyncTimeoutException(TimeoutError):
  7     """
  8     請求超時異常類
  9     """
 10 
 11     def __init__(self, msg):
 12         self.msg = msg
 13         super(AsyncTimeoutException, self).__init__(msg)
 14 
 15 
 16 class HttpContext(object):
 17     """封裝請求和相應的基本數據"""
 18 
 19     def __init__(self, sock, host, port, method, url, data, callback, timeout=5):
 20         """
 21         sock: 請求的客戶端socket對象
 22         host: 請求的主機名
 23         port: 請求的端口
 24         port: 請求的端口
 25         method: 請求方式
 26         url: 請求的URL
 27         data: 請求時請求體中的數據
 28         callback: 請求完成後的回調函數
 29         timeout: 請求的超時時間
 30         """
 31         self.sock = sock
 32         self.callback = callback
 33         self.host = host
 34         self.port = port
 35         self.method = method
 36         self.url = url
 37         self.data = data
 38 
 39         self.timeout = timeout
 40 
 41         self.__start_time = time.time()
 42         self.__buffer = []
 43 
 44     def is_timeout(self):
 45         """當前請求是否已經超時"""
 46         current_time = time.time()
 47         if (self.__start_time + self.timeout) < current_time:
 48             return True
 49 
 50     def fileno(self):
 51         """請求sockect對象的文件描述符,用於select監聽"""
 52         return self.sock.fileno()
 53 
 54     def write(self, data):
 55         """在buffer中寫入響應內容"""
 56         self.__buffer.append(data)
 57 
 58     def finish(self, exc=None):
 59         """在buffer中寫入響應內容完成,執行請求的回調函數"""
 60         if not exc:
 61             response = b''.join(self.__buffer)
 62             self.callback(self, response, exc)
 63         else:
 64             self.callback(self, None, exc)
 65 
 66     def send_request_data(self):
 67         content = """%s %s HTTP/1.0\r\nHost: %s\r\n\r\n%s""" % (
 68             self.method.upper(), self.url, self.host, self.data,)
 69 
 70         return content.encode(encoding='utf8')
 71 
 72 
 73 class AsyncRequest(object):
 74     def __init__(self):
 75         self.fds = []
 76         self.connections = []
 77 
 78     def add_request(self, host, port, method, url, data, callback, timeout):
 79         """建立一個要請求"""
 80         client = socket.socket()
 81         client.setblocking(False)
 82         try:
 83             client.connect((host, port))
 84         except BlockingIOError as e:
 85             pass
 86             # print('已經向遠程發送鏈接的請求')
 87         req = HttpContext(client, host, port, method, url, data, callback, timeout)
 88         self.connections.append(req)
 89         self.fds.append(req)
 90 
 91     def check_conn_timeout(self):
 92         """檢查全部的請求,是否有已經鏈接超時,若是有則終止"""
 93         timeout_list = []
 94         for context in self.connections:
 95             if context.is_timeout():
 96                 timeout_list.append(context)
 97         for context in timeout_list:
 98             context.finish(AsyncTimeoutException('請求超時'))
 99             self.fds.remove(context)
100             self.connections.remove(context)
101 
102     def running(self):
103         """事件循環,用於檢測請求的socket是否已經就緒,從而執行相關操做"""
104         while True:
105             r, w, e = select.select(self.fds, self.connections, self.fds, 0.05)
106 
107             if not self.fds:
108                 return
109 
110             for context in r:
111                 sock = context.sock
112                 while True:
113                     try:
114                         data = sock.recv(8096)
115                         if not data:
116                             self.fds.remove(context)
117                             context.finish()
118                             break
119                         else:
120                             context.write(data)
121                     except BlockingIOError as e:
122                         break
123                     except TimeoutError as e:
124                         self.fds.remove(context)
125                         self.connections.remove(context)
126                         context.finish(e)
127                         break
128 
129             for context in w:
130                 # 已經鏈接成功遠程服務器,開始向遠程發送請求數據
131                 if context in self.fds:
132                     data = context.send_request_data()
133                     context.sock.sendall(data)
134                     self.connections.remove(context)
135 
136             self.check_conn_timeout()
137 
138 
139 if __name__ == '__main__':
140     def callback_func(context, response, ex):
141         """
142         :param context: HttpContext對象,內部封裝了請求相關信息
143         :param response: 請求響應內容
144         :param ex: 是否出現異常(若是有異常則值爲異常對象;不然值爲None)
145         :return:
146         """
147         print(context, response, ex)
148 
149     obj = AsyncRequest()
150     url_list = [
151         {'host': 'www.google.com', 'port': 80, 'method': 'GET', 'url': '/', 'data': '', 'timeout': 5,
152          'callback': callback_func},
153         {'host': 'www.baidu.com', 'port': 80, 'method': 'GET', 'url': '/', 'data': '', 'timeout': 5,
154          'callback': callback_func},
155         {'host': 'www.bing.com', 'port': 80, 'method': 'GET', 'url': '/', 'data': '', 'timeout': 5,
156          'callback': callback_func},
157     ]
158     for item in url_list:
159         print(item)
160         obj.add_request(**item)
161 
162     obj.running()
史上最牛逼的異步IO模塊

 

 

本文轉載自: 銀角大王

http://www.cnblogs.com/wupeiqi/articles/6229292.html

相關文章
相關標籤/搜索