python異步爬蟲

本文主要包括如下內容           html

  • 線程池實現併發爬蟲
  • 回調方法實現異步爬蟲
  • 協程技術的介紹
  • 一個基於協程的異步編程模型
  • 協程實現異步爬蟲

線程池、回調、協程python

咱們但願經過併發執行來加快爬蟲抓取頁面的速度。通常的實現方式有三種:程序員

  • 線程池方式:開一個線程池,每當爬蟲發現一個新連接,就將連接放入任務隊列中,線程池中的線程從任務隊列獲取一個連接,以後創建socket,完成抓取頁面、解析、將新鏈接放入工做隊列的步驟。
  • 回調方式:程序會有一個主循環叫作事件循環,在事件循環中會不斷得到事件,經過在事件上註冊解除回調函數來達到多任務併發執行的效果。缺點是一旦須要的回調操做變多,代碼就會很是散,變得難以維護。
  • 協程方式:一樣經過事件循環執行程序,利用了Python 的生成器特性,生成器函數可以中途中止並在以後恢復,那麼本來不得不分開寫的回調函數就可以寫在一個生成器函數中了,這也就實現了協程。

線程池實現爬蟲

Python多線程創建線程的兩種方式編程

#第一種:經過函數建立線程 def 函數a(): pass t = threading.Thread(target=函數a,name=本身隨便取的線程名字) #第二種:繼承線程類 class Fetcher(threading.Thread): def __init__(self): Thread.__init__(self): #加這一步後主程序中斷退出後子線程也會跟着中斷退出 self.daemon = True def run(self): #線程運行的函數 pass t = Fetcher()

 

參見:python 多線程就這麼簡單 - 蟲師 - 博客園服務器

多線程同步-隊列網絡

多線程同步就是多個線程競爭一個全局變量時按順序讀寫,通常狀況下要用鎖,可是使用標準庫裏的Queue的時候它內部已經實現了鎖,不用程序員本身寫了。數據結構

導入隊列類:多線程

from queue import Queue 建立一個隊列: q = Queue(maxsize=0) maxsize爲隊列大小,爲0默認隊列大小可無窮大。 隊列是先進先出的數據結構: q.put(item) #往隊列添加一個item,隊列滿了則阻塞 q.get(item) #從隊列獲得一個item,隊列爲空則阻塞 還有相應的不等待的版本,這裏略過。 隊列不爲空,或者爲空可是取得item的線程沒有告知任務完成時都是處於阻塞狀態 q.join() #阻塞直到全部任務完成 線程告知任務完成使用task_done q.task_done() #在線程內調用

 

完整代碼併發

from queue import Queue from threading import Thread, Lock import urllib.parse import socket import re import time seen_urls = set(['/']) lock = Lock() class Fetcher(Thread): def __init__(self, tasks): Thread.__init__(self) self.tasks = tasks self.daemon = True self.start() def run(self): while True: url = self.tasks.get() print(url) sock = socket.socket() sock.connect(('localhost', 3000)) get = 'GET {} HTTP/1.0\r\nHost: localhost\r\n\r\n'.format(url) sock.send(get.encode('ascii')) response = b'' chunk = sock.recv(4096) while chunk: response += chunk chunk = sock.recv(4096) links = self.parse_links(url, response) lock.acquire() for link in links.difference(seen_urls): self.tasks.put(link) seen_urls.update(links) lock.release() self.tasks.task_done() def parse_links(self, fetched_url, response): if not response: print('error: {}'.format(fetched_url)) return set() if not self._is_html(response): return set() urls = set(re.findall(r'''(?i)href=["']?([^\s"'<>]+)''', self.body(response))) links = set() for url in urls: normalized = urllib.parse.urljoin(fetched_url, url) parts = urllib.parse.urlparse(normalized) if parts.scheme not in ('', 'http', 'https'): continue host, port = urllib.parse.splitport(parts.netloc) if host and host.lower() not in ('localhost'): continue defragmented, frag = urllib.parse.urldefrag(parts.path) links.add(defragmented) return links def body(self, response): body = response.split(b'\r\n\r\n', 1)[1] return body.decode('utf-8') def _is_html(self, response): head, body = response.split(b'\r\n\r\n', 1) headers = dict(h.split(': ') for h in head.decode().split('\r\n')[1:]) return headers.get('Content-Type', '').startswith('text/html') class ThreadPool: def __init__(self, num_threads): self.tasks = Queue() for _ in range(num_threads): Fetcher(self.tasks) def add_task(self, url): self.tasks.put(url) def wait_completion(self): self.tasks.join() if __name__ == '__main__': start = time.time() pool = ThreadPool(4) pool.add_task("/") pool.wait_completion() print('{} URLs fetched in {:.1f} seconds'.format(len(seen_urls),time.time() - start))

 

事件驅動-回調函數實現爬蟲

非阻塞I/Oapp

若是使用非阻塞I/O,程序就不會傻傻地等在那裏(好比等鏈接、等讀取),而是會返回一個錯誤信息,雖說是說錯誤信息,它其實就是叫你過一會再來的意思,編程的時候都不把它當錯誤看。

非阻塞I/O代碼以下:

sock = socket.socket()
sock.setblocking(False) try: sock.connect(('xkcd.com', 80)) except BlockingIOError: pass

 

單線程上的多I/O

有了非阻塞I/O這個特性,咱們就可以實現單線程上多個sockets的處理了,學過C語言網絡編程的同窗應該都認識select這個函數吧?不認識也沒關係,select函數若是你不設置它的超時時間它就是默認一直阻塞的,只有當有I/O事件發生時它纔會被激活,而後告訴你哪一個socket上發生了什麼事件(讀|寫|異常),在python中也有select,還有跟select功能相同可是更高效的poll,它們都是底層C函數的Python實現。

不過這裏咱們不使用select,而是用更簡單好用的DefaultSelector,是Python 3.4後纔出現的一個模塊裏的類,你只須要在非阻塞socket和事件上綁定回調函數就能夠了。

代碼以下:

from selectors import DefaultSelector, EVENT_WRITE selector = DefaultSelector() sock = socket.socket() sock.setblocking(False) try: sock.connect(('localhost', 3000)) except BlockingIOError: pass def connected(): selector.unregister(sock.fileno()) print('connected!') selector.register(sock.fileno(), EVENT_WRITE, connected)

 

這裏看一下selector.register的原型

register(fileobj, events, data=None) 
其中fileobj能夠是文件描述符也能夠是文件對象(經過fileno獲得),events是位掩碼,指明發生的是什麼事件,data 則是與指定文件(也就是咱們的socket)與指定事件綁定在一塊兒的數據。

如代碼所示,selector.register 在該socket的寫事件上綁定了回調函數connected(這裏做爲數據綁定)。在該socket上第一次發生的寫事件意味着鏈接的創建,connected函數在鏈接創建成功後再解除了該socket上全部綁定的數據。

事件驅動

看了以上selector的使用方式,我想你會發現它很適合寫成事件驅動的形式。

咱們能夠建立一個事件循環,在循環中不斷得到I/O事件:

def loop(): while True: events = selector.select() #遍歷事件並調用相應的處理 for event_key, event_mask in events: callback = event_key.data callback()

 

完整代碼

from selectors import * import socket import re import urllib.parse import time urls_todo = set(['/']) seen_urls = set(['/']) #追加了一個能夠看最高併發數的變量 concurrency_achieved = 0 selector = DefaultSelector() stopped = False class Fetcher: def __init__(self, url): self.response = b'' self.url = url self.sock = None def fetch(self): global concurrency_achieved concurrency_achieved = max(concurrency_achieved, len(urls_todo)) self.sock = socket.socket() self.sock.setblocking(False) try: self.sock.connect(('localhost', 3000)) except BlockingIOError: pass selector.register(self.sock.fileno(), EVENT_WRITE, self.connected) def connected(self, key, mask): selector.unregister(key.fd) get = 'GET {} HTTP/1.0\r\nHost: localhost\r\n\r\n'.format(self.url) self.sock.send(get.encode('ascii')) selector.register(key.fd, EVENT_READ, self.read_response) def read_response(self, key, mask): global stopped chunk = self.sock.recv(4096) # 4k chunk size. if chunk: self.response += chunk else: selector.unregister(key.fd) # Done reading. links = self.parse_links() for link in links.difference(seen_urls): urls_todo.add(link) Fetcher(link).fetch() seen_urls.update(links) urls_todo.remove(self.url) if not urls_todo: stopped = True print(self.url) def body(self): body = self.response.split(b'\r\n\r\n', 1)[1] return body.decode('utf-8') def parse_links(self): if not self.response: print('error: {}'.format(self.url)) return set() if not self._is_html(): return set() urls = set(re.findall(r'''(?i)href=["']?([^\s"'<>]+)''', self.body())) links = set() for url in urls: normalized = urllib.parse.urljoin(self.url, url) parts = urllib.parse.urlparse(normalized) if parts.scheme not in ('', 'http', 'https'): continue host, port = urllib.parse.splitport(parts.netloc) if host and host.lower() not in ('localhost'): continue defragmented, frag = urllib.parse.urldefrag(parts.path) links.add(defragmented) return links def _is_html(self): head, body = self.response.split(b'\r\n\r\n', 1) headers = dict(h.split(': ') for h in head.decode().split('\r\n')[1:]) return headers.get('Content-Type', '').startswith('text/html') start = time.time() fetcher = Fetcher('/') fetcher.fetch() while not stopped: events = selector.select() for event_key, event_mask in events: callback = event_key.data callback(event_key, event_mask) print('{} URLs fetched in {:.1f} seconds, achieved concurrency = {}'.format( len(seen_urls), time.time() - start, concurrency_achieved))

 

事件驅動-協程實現爬蟲

什麼是協程?

協程實際上是比起通常的子例程而言更寬泛的存在,子例程是協程的一種特例。

子例程的起始處是唯一的入口點,一旦退出即完成了子例程的執行,子例程的一個實例只會返回一次。

協程能夠經過yield來調用其它協程。經過yield方式轉移執行權的協程之間不是調用者與被調用者的關係,而是彼此對稱、平等的。

協程的起始處是第一個入口點,在協程裏,返回點以後是接下來的入口點。子例程的生命期遵循後進先出(最後一個被調用的子例程最早返回);相反,協程的生命期徹底由他們的使用的須要決定。

還記得咱們何時會用到yield嗎,就是在生成器(generator)裏,在迭代的時候每次執行next(generator)生成器都會執行到下一次yield的位置並返回,能夠說生成器就是例程。

生成器實現協程模型

雖然生成器擁有一個協程該有的特性,但光這樣是不夠的,作異步編程還是困難的,咱們須要先用生成器實現一個協程異步編程的簡單模型,它同時也是Python標準庫asyncio的簡化版,正如asyncio的實現,咱們會用到生成器,Future類,以及yield from語句。

首先實現Future類, Future類能夠認爲是專門用來存儲將要發送給協程的信息的類。

class Future: def __init__(self): self.result = None self._callbacks = [] def add_done_callback(self, fn): self._callbacks.append(fn) def set_result(self, result): self.result = result for fn in self._callbacks: fn(self)

Future對象最開始處在掛起狀態,當調用set_result時被激活,並運行註冊的回調函數,該回調函數多半是對協程發送信息讓協程繼續運行下去的函數。

咱們改造一下以前從fetch到connected的函數,加入Future與yield。

這是以前回調實現的fetch:

class Fetcher: def fetch(self): self.sock = socket.socket() self.sock.setblocking(False) try: self.sock.connect(('localhost', 3000)) except BlockingIOError: pass selector.register(self.sock.fileno(), EVENT_WRITE, self.connected) def connected(self, key, mask): print('connected!') # ...後面省略...

 

改造後,咱們將鏈接創建後的部分也放到了fetch中。

class Fetcher: def fetch(self): sock = socket.socket() sock.setblocking(False) try: sock.connect(('localhost', 3000)) except BlockingIOError: pass f = Future() def on_connected(): #鏈接創建後經過set_result協程繼續從yield的地方往下運行 f.set_result(None) selector.register(sock.fileno(), EVENT_WRITE, on_connected) yield f selector.unregister(sock.fileno()) print('connected!')

fetcher是一個生成器函數,咱們建立一個Future實例,yield它來暫停fetch的運行直到鏈接創建f.set_result(None)的時候,生成器才繼續運行。那set_result時運行的回調函數是哪來的呢?這裏引入Task類:

class Task: def __init__(self, coro): #協程 self.coro = coro #建立並初始化一個爲None的Future對象 f = Future() f.set_result(None) #步進一次(發送一次信息) #在初始化的時候發送是爲了協程到達第一個yield的位置,也是爲了註冊下一次的步進 self.step(f) def step(self, future): try: #向協程發送消息並獲得下一個從協程那yield到的Future對象 next_future = self.coro.send(future.result) except StopIteration: return next_future.add_done_callback(self.step) fetcher = Fetcher('/') Task(fetcher.fetch()) loop()

流程大體是這樣的,首先Task初始化,向fetch生成器發送None信息(也能夠想象成step調用了fetch,參數是None),fetch得以從開頭運行到第一個yield的地方並返回了一個Future對象給step的next_future,而後step就在這個獲得的Future對象註冊了step。當鏈接創建時on_connected就會被調用,再一次向協程發送信息,協程就會繼續往下執行了。

使用yield from分解協程

一旦socket鏈接創建成功,咱們發送HTTP GET請求到服務器並在以後讀取服務器響應。如今這些步驟不用再分散在不一樣的回調函數裏了,咱們能夠將其放在同一個生成器函數中:

def fetch(self): # ... 省略鏈接的代碼 sock.send(request.encode('ascii')) while True: f = Future() def on_readable(): f.set_result(sock.recv(4096)) selector.register(sock.fileno(), EVENT_READ, on_readable) chunk = yield f selector.unregister(sock.fileno()) if chunk: self.response += chunk else: # 完成讀取 break

可是這樣代碼也會越積越多,可不能夠分解生成器函數的代碼呢,從協程中提取出子協程?Python 3 的yield from能幫助咱們完成這部分工做。:

>>> def gen_fn():
... result = yield 1 ... print('result of yield: {}'.format(result)) ... result2 = yield 2 ... print('result of 2nd yield: {}'.format(result2)) ... return 'done' ...

yield from獲得的子協程最後return的返回值

完整代碼

from selectors import * import socket import re import urllib.parse import time class Future: def __init__(self): self.result = None self._callbacks = [] def result(self): return self.result def add_done_callback(self, fn): self._callbacks.append(fn) def set_result(self, result): self.result = result for fn in self._callbacks: fn(self) def __iter__(self): yield self return self.result class Task: def __init__(self, coro): self.coro = coro f = Future() f.set_result(None) self.step(f) def step(self, future): try: next_future = self.coro.send(future.result) except StopIteration: return next_future.add_done_callback(self.step) urls_seen = set(['/']) urls_todo = set(['/']) #追加了一個能夠看最高併發數的變量 concurrency_achieved = 0 selector = DefaultSelector() stopped = False def connect(sock, address): f = Future() sock.setblocking(False) try: sock.connect(address) except BlockingIOError: pass def on_connected(): f.set_result(None) selector.register(sock.fileno(), EVENT_WRITE, on_connected) yield from f selector.unregister(sock.fileno()) def read(sock): f = Future() def on_readable(): f.set_result(sock.recv(4096)) # Read 4k at a time. selector.register(sock.fileno(), EVENT_READ, on_readable) chunk = yield from f selector.unregister(sock.fileno()) return chunk def read_all(sock): response = [] chunk = yield from read(sock) while chunk: response.append(chunk) chunk = yield from read(sock) return b''.join(response) class Fetcher: def __init__(self, url): self.response = b'' self.url = url def fetch(self): global concurrency_achieved, stopped concurrency_achieved = max(concurrency_achieved, len(urls_todo)) sock = socket.socket() yield from connect(sock, ('localhost', 3000)) get = 'GET {} HTTP/1.0\r\nHost: localhost\r\n\r\n'.format(self.url) sock.send(get.encode('ascii')) self.response = yield from read_all(sock) self._process_response() urls_todo.remove(self.url) if not urls_todo: stopped = True print(self.url) def body(self): body = self.response.split(b'\r\n\r\n', 1)[1] return body.decode('utf-8') def _process_response(self): if not self.response: print('error: {}'.format(self.url)) return if not self._is_html(): return urls = set(re.findall(r'''(?i)href=["']?([^\s"'<>]+)''', self.body())) for url in urls: normalized = urllib.parse.urljoin(self.url, url) parts = urllib.parse.urlparse(normalized) if parts.scheme not in ('', 'http', 'https'): continue host, port = urllib.parse.splitport(parts.netloc) if host and host.lower() not in ('localhost'): continue defragmented, frag = urllib.parse.urldefrag(parts.path) if defragmented not in urls_seen: urls_todo.add(defragmented) urls_seen.add(defragmented) Task(Fetcher(defragmented).fetch()) def _is_html(self): head, body = self.response.split(b'\r\n\r\n', 1) headers = dict(h.split(': ') for h in head.decode().split('\r\n')[1:]) return headers.get('Content-Type', '').startswith('text/html') start = time.time() fetcher = Fetcher('/') Task(fetcher.fetch()) while not stopped: events = selector.select() for event_key, event_mask in events: callback = event_key.data callback() print('{} URLs fetched in {:.1f} seconds, achieved concurrency = {}'.format( len(urls_seen), time.time() - start, concurrency_achieved))

 

 

總結

至此,咱們在學習的過程當中掌握了:

  • 線程池實現併發爬蟲
  • 回調方法實現異步爬蟲
  • 協程技術的介紹
  • 一個基於協程的異步編程模型
  • 協程實現異步爬蟲

三種爬蟲的實現方式中線程池是最壞的選擇,由於它既佔用內存,又有線程競爭的危險須要程序員本身編程解決,並且產生的I/O阻塞也浪費了CPU佔用時間。再來看看回調方式,它是一種異步方法,因此I/O阻塞的問題解決了,並且它是單線程的不會產生競爭,問題好像都解決了。然而它引入了新的問題,它的問題在於以這種方式編寫的代碼很差維護,也不容易debug。看來協程纔是最好的選擇,咱們實現的協程異步編程模型使得一個單線程可以很容易地改寫爲協程。那是否是每一次作異步編程都要實現Task、Future呢?不是的,你能夠直接使用asyncio官方標準協程庫,它已經幫你把Task、Future封裝好了,你根本不會感覺到它們的存在,是否是很棒呢?若是你使用Python 3.5那更好,已經能夠用原生的協程了,Python 3.5追加了async def,await等協程相關的關鍵詞。

參考連接:Python - Python實現基於協程的異步爬蟲 - 實驗樓

原文地址:http://blog.csdn.net/whuhan2013/article/details/52529477

相關文章
相關標籤/搜索