若是使用非阻塞I/O,它就不會傻傻地等在那裏(好比等鏈接、等讀取),而是會返回一個錯誤信息,雖說是說錯誤信息,它其實就是叫你過一會再來的意思,編程的時候都不把它當錯誤看。html
非阻塞I/O代碼以下:python
sock = socket.socket()
sock.setblocking(False) try: sock.connect(('xkcd.com', 80)) except BlockingIOError: pass
這裏拋出的異常無視掉就能夠了。sql
有了非阻塞I/O這個特性,咱們就可以實現單線程上多個sockets
的處理了,學過C語言網絡編程的同窗應該都認識select
這個函數吧?不認識也沒關係,select
函數若是你不設置它的超時時間它就是默認一直阻塞的,只有當有I/O事件發生時它纔會被激活,而後告訴你哪一個socket
上發生了什麼事件(讀|寫|異常),在Python
中也有select
,還有跟select
功能相同可是更高效的poll
,它們都是底層C函數的Python
實現。編程
不過這裏咱們不使用select
,而是用更簡單好用的DefaultSelector
,是Python 3.4
後纔出現的一個模塊裏的類,你只須要在非阻塞socket
和事件上綁定回調函數就能夠了。ruby
代碼以下:服務器
from selectors import DefaultSelector, EVENT_WRITE selector = DefaultSelector() sock = socket.socket() sock.setblocking(False) try: sock.connect(('localhost', 3000)) except BlockingIOError: pass def connected(): selector.unregister(sock.fileno()) print('connected!') selector.register(sock.fileno(), EVENT_WRITE, connected)
這裏看一下selector.register
的原型網絡
register(fileobj, events, data=None)
其中fileobj
能夠是文件描述符也能夠是文件對象(經過fileno
獲得),events
是位掩碼,指明發生的是什麼事件,data
則是與指定文件(也就是咱們的socket)與指定事件綁定在一塊兒的數據。併發
如代碼所示,selector.register
在該socket
的寫事件上綁定了回調函數connected
(這裏做爲數據綁定)。在該socket
上第一次發生的寫事件意味着鏈接的創建,connected
函數在鏈接創建成功後再解除了該socket
上全部綁定的數據。異步
看了以上selector
的使用方式,我想你會發現它很適合寫成事件驅動的形式。socket
咱們能夠建立一個事件循環,在循環中不斷得到I/O事件:
def loop(): while True: events = selector.select() #遍歷事件並調用相應的處理 for event_key, event_mask in events: callback = event_key.data callback()
其中events_key
是一個namedtuple
,它的結構大體以下(fileobj,fd,events,data),咱們從data獲得以前綁定的回調函數並調用。 event_mask
則是事件的位掩碼。
關於selectors
的更多內容,可參考官方文檔: https://docs.python.org/3.4/library/selectors.html
如今咱們已經明白了基於回調函數實現事件驅動是怎麼一回事了,接着來完成咱們的爬蟲吧。
首先建立兩個set,一個是待處理url的集合,一個是已抓取url的集合,同時初始化爲根url '/'
urls_todo = set(['/']) seen_urls = set(['/'])
抓取一個頁面會須要許多回調函數。好比connected
,它會在鏈接創建成功後向服務器發送一個GET
請求請求頁面。固然它不會幹等着服務器響應(那就阻塞了),而是再綁定另外一個接收響應的回調函數read_response
。若是read_response
在事件觸發時沒法一次性讀取完整的響應,那麼就會等下次事件觸發時繼續讀取,直到讀取到了完整的響應才解除綁定。
咱們將這些回調函數封裝在 Fetcher
類中。它有三個成員變量:抓取的url
、socket
對象與獲得的服務器響應response
。
class Fetcher: def __init__(self, url): self.response = b'' self.url = url self.sock = None
實現fetch
函數,綁定connected
:
# 在Fetcher類中實現 def fetch(self): self.sock = socket.socket() self.sock.setblocking(False) try: self.sock.connect(('xkcd.com', 80)) except BlockingIOError: pass selector.register(self.sock.fileno(), EVENT_WRITE, self.connected)
注意到fetch
函數在內部調用connect
嘗試創建socket
鏈接並綁定回調函數,I/O的處理則都是交給事件循環控制的。Fetcher
與事件循環的關係以下:
# Begin fetching http://xkcd.com/353/ fetcher = Fetcher('/353/') fetcher.fetch() # 事件循環 while True: events = selector.select() for event_key, event_mask in events: callback = event_key.data callback(event_key, event_mask)
connected
的實現:
def connected(self, key, mask): print('connected!') #解除該socket上的全部綁定 selector.unregister(key.fd) request = 'GET {} HTTP/1.0\r\nHost: xkcd.com\r\n\r\n'.format(self.url) self.sock.send(request.encode('ascii')) # 鏈接創建後綁定讀取響應的回調函數 selector.register(key.fd, EVENT_READ, self.read_response)
read_response
的實現:
def read_response(self, key, mask): global stopped chunk = self.sock.recv(4096) # 每次接收最多4K的信息 if chunk: self.response += chunk else: selector.unregister(key.fd) # 完成接收則解除綁定 links = self.parse_links() # Python set-logic: for link in links.difference(seen_urls): urls_todo.add(link) Fetcher(link).fetch() # 抓取新的url seen_urls.update(links) urls_todo.remove(self.url) if not urls_todo: stopped = True # 當抓取隊列爲空時結束事件循環
parse_links
如上一節課,它的做用是返回抓取到的頁面中的全部發現的url的集合。 parse_links
以後,遍歷了每個沒抓取過的url併爲其建立一個新的Fetcher
對象並調用fetch
函數開始抓取。
parse_links
等其它函數的實現:
def body(self): body = self.response.split(b'\r\n\r\n', 1)[1] return body.decode('utf-8') def parse_links(self): if not self.response: print('error: {}'.format(self.url)) return set() if not self._is_html(): return set() urls = set(re.findall(r'''(?i)href=["']?([^\s"'<>]+)''', self.body())) links = set() for url in urls: normalized = urllib.parse.urljoin(self.url, url) parts = urllib.parse.urlparse(normalized) if parts.scheme not in ('', 'http', 'https'): continue host, port = urllib.parse.splitport(parts.netloc) if host and host.lower() not in ('xkcd.com', 'www.xkcd.com'): continue defragmented, frag = urllib.parse.urldefrag(parts.path) links.add(defragmented) return links def _is_html(self): head, body = self.response.split(b'\r\n\r\n', 1) headers = dict(h.split(': ') for h in head.decode().split('\r\n')[1:]) return headers.get('Content-Type', '').startswith('text/html')
將事件循環改成stopped時中止:
start = time.time() stopped = False def loop(): while not stopped: events = selector.select() for event_key, event_mask in events: callback = event_key.data callback() print('{} URLs fetched in {:.1f} seconds'.format( len(seen_urls), time.time() - start))
這裏先奉上完整代碼:
from selectors import * import socket import re import urllib.parse import time urls_todo = set(['/']) seen_urls = set(['/']) #追加了一個能夠看最高併發數的變量 concurrency_achieved = 0 selector = DefaultSelector() stopped = False class Fetcher: def __init__(self, url): self.response = b'' self.url = url self.sock = None def fetch(self): global concurrency_achieved concurrency_achieved = max(concurrency_achieved, len(urls_todo)) self.sock = socket.socket() self.sock.setblocking(False) try: self.sock.connect(('localhost', 3000)) except BlockingIOError: pass selector.register(self.sock.fileno(), EVENT_WRITE, self.connected) def connected(self, key, mask): selector.unregister(key.fd) get = 'GET {} HTTP/1.0\r\nHost: localhost\r\n\r\n'.format(self.url) self.sock.send(get.encode('ascii')) selector.register(key.fd, EVENT_READ, self.read_response) def read_response(self, key, mask): global stopped chunk = self.sock.recv(4096) # 4k chunk size. if chunk: self.response += chunk else: selector.unregister(key.fd) # Done reading. links = self.parse_links() for link in links.difference(seen_urls): urls_todo.add(link) Fetcher(link).fetch() seen_urls.update(links) urls_todo.remove(self.url) if not urls_todo: stopped = True print(self.url) def body(self): body = self.response.split(b'\r\n\r\n', 1)[1] return body.decode('utf-8') def parse_links(self): if not self.response: print('error: {}'.format(self.url)) return set() if not self._is_html(): return set() urls = set(re.findall(r'''(?i)href=["']?([^\s"'<>]+)''', self.body())) links = set() for url in urls: normalized = urllib.parse.urljoin(self.url, url) parts = urllib.parse.urlparse(normalized) if parts.scheme not in ('', 'http', 'https'): continue host, port = urllib.parse.splitport(parts.netloc) if host and host.lower() not in ('localhost'): continue defragmented, frag = urllib.parse.urldefrag(parts.path) links.add(defragmented) return links def _is_html(self): head, body = self.response.split(b'\r\n\r\n', 1) headers = dict(h.split(': ') for h in head.decode().split('\r\n')[1:]) return headers.get('Content-Type', '').startswith('text/html') start = time.time() fetcher = Fetcher('/') fetcher.fetch() while not stopped: events = selector.select() for event_key, event_mask in events: callback = event_key.data callback(event_key, event_mask) print('{} URLs fetched in {:.1f} seconds, achieved concurrency = {}'.format( len(seen_urls), time.time() - start, concurrency_achieved))
輸入python3 callback.py
命令查看效果。不要忘了先開網站的服務器哦。
想一想以前從創建鏈接到讀取響應到解析新的url到工做隊列中,這一切都可以在一個函數中完成,就像下面這樣:
def fetch(url): sock = socket.socket() sock.connect(('localhost', 3000)) request = 'GET {} HTTP/1.0\r\nHost: localhost\r\n\r\n'.format(url) sock.send(request.encode('ascii')) response = b'' chunk = sock.recv(4096) while chunk: response += chunk chunk = sock.recv(4096) links = parse_links(response) q.add(links)
而用回調函數實現,整個總體就支離破碎了,哪裏阻塞就又不得不在那裏把函數一切爲二,代碼會顯得很是亂,維護也變的很麻煩。更麻煩的是若是在回調函數中拋出了異常,你根本得不到什麼有用的信息:
Traceback (most recent call last): File "loop-with-callbacks.py", line 111, in <module> loop() File "loop-with-callbacks.py", line 106, in loop callback(event_key, event_mask) File "loop-with-callbacks.py", line 51, in read_response links = self.parse_links() File "loop-with-callbacks.py", line 67, in parse_links raise Exception('parse error') Exception: parse error
你看不到這個回調函數的上下文是什麼,你只知道它在事件循環裏。你想在這個函數外抓取它的異常都沒地方下手。可是這又是回調實現沒法避免的缺陷,那咱們想實現併發異步應該怎麼辦咧?