實驗二:事件驅動-回調函數實現爬蟲

承接上一節課,咱們在上節課中提到,socket I/O 阻塞時的那段時間徹底被浪費了,那麼要如何節省下那一段時間呢?

非阻塞I/O

若是使用非阻塞I/O,它就不會傻傻地等在那裏(好比等鏈接、等讀取),而是會返回一個錯誤信息,雖說是說錯誤信息,它其實就是叫你過一會再來的意思,編程的時候都不把它當錯誤看。html

非阻塞I/O代碼以下:python

sock = socket.socket()
sock.setblocking(False) try: sock.connect(('xkcd.com', 80)) except BlockingIOError: pass 

這裏拋出的異常無視掉就能夠了。sql

單線程上的多I/O

有了非阻塞I/O這個特性,咱們就可以實現單線程上多個sockets的處理了,學過C語言網絡編程的同窗應該都認識select這個函數吧?不認識也沒關係,select函數若是你不設置它的超時時間它就是默認一直阻塞的,只有當有I/O事件發生時它纔會被激活,而後告訴你哪一個socket上發生了什麼事件(讀|寫|異常),在Python中也有select,還有跟select功能相同可是更高效的poll,它們都是底層C函數的Python實現。編程

不過這裏咱們不使用select,而是用更簡單好用的DefaultSelector,是Python 3.4後纔出現的一個模塊裏的類,你只須要在非阻塞socket和事件上綁定回調函數就能夠了。ruby

代碼以下:服務器

from selectors import DefaultSelector, EVENT_WRITE selector = DefaultSelector() sock = socket.socket() sock.setblocking(False) try: sock.connect(('localhost', 3000)) except BlockingIOError: pass def connected(): selector.unregister(sock.fileno()) print('connected!') selector.register(sock.fileno(), EVENT_WRITE, connected) 

這裏看一下selector.register的原型網絡

register(fileobj, events, data=None) 

其中fileobj能夠是文件描述符也能夠是文件對象(經過fileno獲得),events是位掩碼,指明發生的是什麼事件,data 則是與指定文件(也就是咱們的socket)與指定事件綁定在一塊兒的數據。併發

如代碼所示,selector.register 在該socket的寫事件上綁定了回調函數connected(這裏做爲數據綁定)。在該socket上第一次發生的寫事件意味着鏈接的創建,connected函數在鏈接創建成功後再解除了該socket上全部綁定的數據。異步

事件驅動

看了以上selector的使用方式,我想你會發現它很適合寫成事件驅動的形式。socket

咱們能夠建立一個事件循環,在循環中不斷得到I/O事件:

def loop(): while True: events = selector.select() #遍歷事件並調用相應的處理 for event_key, event_mask in events: callback = event_key.data callback() 

其中events_key是一個namedtuple,它的結構大體以下(fileobj,fd,events,data),咱們從data獲得以前綁定的回調函數並調用。 event_mask則是事件的位掩碼。

關於selectors的更多內容,可參考官方文檔: https://docs.python.org/3.4/library/selectors.html

完成後續工做

如今咱們已經明白了基於回調函數實現事件驅動是怎麼一回事了,接着來完成咱們的爬蟲吧。

首先建立兩個set,一個是待處理url的集合,一個是已抓取url的集合,同時初始化爲根url '/'

urls_todo = set(['/']) seen_urls = set(['/']) 

抓取一個頁面會須要許多回調函數。好比connected,它會在鏈接創建成功後向服務器發送一個GET請求請求頁面。固然它不會幹等着服務器響應(那就阻塞了),而是再綁定另外一個接收響應的回調函數read_response。若是read_response在事件觸發時沒法一次性讀取完整的響應,那麼就會等下次事件觸發時繼續讀取,直到讀取到了完整的響應才解除綁定。

咱們將這些回調函數封裝在 Fetcher 類中。它有三個成員變量:抓取的urlsocket對象與獲得的服務器響應response

class Fetcher: def __init__(self, url): self.response = b'' self.url = url self.sock = None 

實現fetch函數,綁定connected

# 在Fetcher類中實現 def fetch(self): self.sock = socket.socket() self.sock.setblocking(False) try: self.sock.connect(('xkcd.com', 80)) except BlockingIOError: pass selector.register(self.sock.fileno(), EVENT_WRITE, self.connected) 

注意到fetch函數在內部調用connect嘗試創建socket鏈接並綁定回調函數,I/O的處理則都是交給事件循環控制的。Fetcher與事件循環的關係以下:

# Begin fetching http://xkcd.com/353/ fetcher = Fetcher('/353/') fetcher.fetch() # 事件循環 while True: events = selector.select() for event_key, event_mask in events: callback = event_key.data callback(event_key, event_mask) 

connected的實現:

def connected(self, key, mask): print('connected!') #解除該socket上的全部綁定 selector.unregister(key.fd) request = 'GET {} HTTP/1.0\r\nHost: xkcd.com\r\n\r\n'.format(self.url) self.sock.send(request.encode('ascii')) # 鏈接創建後綁定讀取響應的回調函數 selector.register(key.fd, EVENT_READ, self.read_response) 

read_response的實現:

def read_response(self, key, mask): global stopped chunk = self.sock.recv(4096) # 每次接收最多4K的信息 if chunk: self.response += chunk else: selector.unregister(key.fd) # 完成接收則解除綁定 links = self.parse_links() # Python set-logic: for link in links.difference(seen_urls): urls_todo.add(link) Fetcher(link).fetch() # 抓取新的url seen_urls.update(links) urls_todo.remove(self.url) if not urls_todo: stopped = True # 當抓取隊列爲空時結束事件循環 

parse_links 如上一節課,它的做用是返回抓取到的頁面中的全部發現的url的集合。 parse_links 以後,遍歷了每個沒抓取過的url併爲其建立一個新的Fetcher 對象並調用fetch 函數開始抓取。

parse_links等其它函數的實現:

def body(self): body = self.response.split(b'\r\n\r\n', 1)[1] return body.decode('utf-8') def parse_links(self): if not self.response: print('error: {}'.format(self.url)) return set() if not self._is_html(): return set() urls = set(re.findall(r'''(?i)href=["']?([^\s"'<>]+)''', self.body())) links = set() for url in urls: normalized = urllib.parse.urljoin(self.url, url) parts = urllib.parse.urlparse(normalized) if parts.scheme not in ('', 'http', 'https'): continue host, port = urllib.parse.splitport(parts.netloc) if host and host.lower() not in ('xkcd.com', 'www.xkcd.com'): continue defragmented, frag = urllib.parse.urldefrag(parts.path) links.add(defragmented) return links def _is_html(self): head, body = self.response.split(b'\r\n\r\n', 1) headers = dict(h.split(': ') for h in head.decode().split('\r\n')[1:]) return headers.get('Content-Type', '').startswith('text/html') 

將事件循環改成stopped時中止:

start = time.time() stopped = False def loop(): while not stopped: events = selector.select() for event_key, event_mask in events: callback = event_key.data callback() print('{} URLs fetched in {:.1f} seconds'.format( len(seen_urls), time.time() - start)) 

運行效果

這裏先奉上完整代碼:

from selectors import * import socket import re import urllib.parse import time urls_todo = set(['/']) seen_urls = set(['/']) #追加了一個能夠看最高併發數的變量 concurrency_achieved = 0 selector = DefaultSelector() stopped = False class Fetcher: def __init__(self, url): self.response = b'' self.url = url self.sock = None def fetch(self): global concurrency_achieved concurrency_achieved = max(concurrency_achieved, len(urls_todo)) self.sock = socket.socket() self.sock.setblocking(False) try: self.sock.connect(('localhost', 3000)) except BlockingIOError: pass selector.register(self.sock.fileno(), EVENT_WRITE, self.connected) def connected(self, key, mask): selector.unregister(key.fd) get = 'GET {} HTTP/1.0\r\nHost: localhost\r\n\r\n'.format(self.url) self.sock.send(get.encode('ascii')) selector.register(key.fd, EVENT_READ, self.read_response) def read_response(self, key, mask): global stopped chunk = self.sock.recv(4096) # 4k chunk size. if chunk: self.response += chunk else: selector.unregister(key.fd) # Done reading. links = self.parse_links() for link in links.difference(seen_urls): urls_todo.add(link) Fetcher(link).fetch() seen_urls.update(links) urls_todo.remove(self.url) if not urls_todo: stopped = True print(self.url) def body(self): body = self.response.split(b'\r\n\r\n', 1)[1] return body.decode('utf-8') def parse_links(self): if not self.response: print('error: {}'.format(self.url)) return set() if not self._is_html(): return set() urls = set(re.findall(r'''(?i)href=["']?([^\s"'<>]+)''', self.body())) links = set() for url in urls: normalized = urllib.parse.urljoin(self.url, url) parts = urllib.parse.urlparse(normalized) if parts.scheme not in ('', 'http', 'https'): continue host, port = urllib.parse.splitport(parts.netloc) if host and host.lower() not in ('localhost'): continue defragmented, frag = urllib.parse.urldefrag(parts.path) links.add(defragmented) return links def _is_html(self): head, body = self.response.split(b'\r\n\r\n', 1) headers = dict(h.split(': ') for h in head.decode().split('\r\n')[1:]) return headers.get('Content-Type', '').startswith('text/html') start = time.time() fetcher = Fetcher('/') fetcher.fetch() while not stopped: events = selector.select() for event_key, event_mask in events: callback = event_key.data callback(event_key, event_mask) print('{} URLs fetched in {:.1f} seconds, achieved concurrency = {}'.format( len(seen_urls), time.time() - start, concurrency_achieved)) 

輸入python3 callback.py命令查看效果。不要忘了先開網站的服務器哦。

此處輸入圖片的描述

基於回調函數實現的缺陷

想一想以前從創建鏈接到讀取響應到解析新的url到工做隊列中,這一切都可以在一個函數中完成,就像下面這樣:

def fetch(url): sock = socket.socket() sock.connect(('localhost', 3000)) request = 'GET {} HTTP/1.0\r\nHost: localhost\r\n\r\n'.format(url) sock.send(request.encode('ascii')) response = b'' chunk = sock.recv(4096) while chunk: response += chunk chunk = sock.recv(4096) links = parse_links(response) q.add(links) 

而用回調函數實現,整個總體就支離破碎了,哪裏阻塞就又不得不在那裏把函數一切爲二,代碼會顯得很是亂,維護也變的很麻煩。更麻煩的是若是在回調函數中拋出了異常,你根本得不到什麼有用的信息:

Traceback (most recent call last): File "loop-with-callbacks.py", line 111, in <module> loop() File "loop-with-callbacks.py", line 106, in loop callback(event_key, event_mask) File "loop-with-callbacks.py", line 51, in read_response links = self.parse_links() File "loop-with-callbacks.py", line 67, in parse_links raise Exception('parse error') Exception: parse error 

你看不到這個回調函數的上下文是什麼,你只知道它在事件循環裏。你想在這個函數外抓取它的異常都沒地方下手。可是這又是回調實現沒法避免的缺陷,那咱們想實現併發異步應該怎麼辦咧?

相關文章
相關標籤/搜索