Python實現基於協程的異步爬蟲

Python實現基於協程的異步爬蟲

1、課程介紹

1. 課程來源

本課程核心部分來自《500 lines or less》項目,做者是來自 MongoDB 的工程師 A. Jesse Jiryu Davis 與 Python 之父 Guido van Rossum。項目代碼使用 MIT 協議,項目文檔使用 http://creativecommons.org/licenses/by/3.0/legalcode 協議。html

課程內容在原文檔基礎上作了稍許修改,增長了部分原理介紹,步驟的拆解分析及源代碼註釋。python

2. 內容簡介

傳統計算機科學每每將大量精力放在如何追求更有效率的算法上。但現在大部分涉及網絡的程序,它們的時間開銷主要並非在計算上,而是在維持多個Socket鏈接上。亦或是它們的事件循環處理的不夠高效致使了更多的時間開銷。對於這些程序來講,它們面臨的挑戰是如何更高效地等待大量的網絡事件並進行調度。目前流行的解決方式就是使用異步I/O。nginx

本課程將探討幾種實現爬蟲的方法,從傳統的線程池到使用協程,每節課實現一個小爬蟲。另外學習協程的時候,咱們會從原理入手,以ayncio協程庫爲原型,實現一個簡單的異步編程模型。git

本課程實現的爬蟲爲爬一個整站的爬蟲,不會爬到站點外面去,且功能較簡單,主要目的在於學習原理,提供實現併發與異步的思路,並不適合直接改寫做爲平常工具使用。程序員

3. 課程知識點

本課程項目完成過程當中,咱們將學習:github

  1. 線程池實現併發爬蟲
  2. 回調方法實現異步爬蟲
  3. 協程技術的介紹
  4. 一個基於協程的異步編程模型
  5. 協程實現異步爬蟲

2、實驗環境

本課程使用Python 3.4,因此本課程內運行py腳本都是使用python3命令。算法

打開終端,進入 Code 目錄,建立 crawler 文件夾, 並將其做爲咱們的工做目錄。npm

$ cd Code $ mkdir crawler && cd crawler 

環保起見,測試爬蟲的網站在本地搭建。編程

咱們使用 Python 2.7 版本官方文檔做爲測試爬蟲用的網站緩存

wget http://labfile.oss.aliyuncs.com/courses/574/python-doc.zip unzip python-doc.zip 

安裝serve,一個用起來很方便的靜態文件服務器:

sudo npm install -g serve 

啓動服務器:

serve python-doc 

若是訪問不了npm的資源,也能夠用如下方式開啓服務器:

ruby -run -ehttpd python-doc -p 3000 

訪問localhost:3000查看網站:

此處輸入圖片的描述

3、實驗原理

什麼是爬蟲?

網絡爬蟲(又被稱爲網頁蜘蛛,網絡機器人,在FOAF社區中間,更常常的稱爲網頁追逐者),是一種按照必定的規則,自動地抓取萬維網信息的程序或者腳本。

爬蟲的工做流程

網絡爬蟲基本的工做流程是從一個根URL開始,抓取頁面,解析頁面中全部的URL,將尚未抓取過的URL放入工做隊列中,以後繼續抓取工做隊列中的URL,重複抓取、解析,將解析到的url放入工做隊列的步驟,直到工做隊列爲空爲止。

線程池、回調、協程

咱們但願經過併發執行來加快爬蟲抓取頁面的速度。通常的實現方式有三種:

  1. 線程池方式:開一個線程池,每當爬蟲發現一個新連接,就將連接放入任務隊列中,線程池中的線程從任務隊列獲取一個連接,以後創建socket,完成抓取頁面、解析、將新鏈接放入工做隊列的步驟。
  2. 回調方式:程序會有一個主循環叫作事件循環,在事件循環中會不斷得到事件,經過在事件上註冊解除回調函數來達到多任務併發執行的效果。缺點是一旦須要的回調操做變多,代碼就會很是散,變得難以維護。
  3. 協程方式:一樣經過事件循環執行程序,利用了Python 的生成器特性,生成器函數可以中途中止並在以後恢復,那麼本來不得不分開寫的回調函數就可以寫在一個生成器函數中了,這也就實現了協程。

4、實驗一:線程池實現爬蟲

使用socket抓取頁面須要先創建鏈接,以後發送GET類型的HTTP報文,等待讀入,將讀到的全部內容存入響應緩存。

def fetch(url): sock = socket.socket() sock.connect(('localhost.com', 3000)) request = 'GET {} HTTP/1.0\r\nHost: localhost\r\n\r\n'.format(url) sock.send(request.encode('ascii')) response = b'' chunk = sock.recv(4096) while chunk: response += chunk chunk = sock.recv(4096) links = parse_links(response) q.add(links) 

默認的socket鏈接與讀寫是阻塞式的,在等待讀入的這段時間的CPU佔用是被徹底浪費的。

多線程

默認這部分同窗們都是學過的,因此就粗略記幾個重點,沒學過的同窗能夠直接參考廖雪峯的教程:廖雪峯的官方網站-Python多線程

導入線程庫:

import threading 

開啓一個線程的方法:

t = 你新建的線程
t.start()   #開始運行線程 t.join() #你的當前函數就阻塞在這一步直到線程運行完 

創建線程的兩種方式:

#第一種:經過函數建立線程 def 函數a(): pass t = threading.Thread(target=函數a,name=本身隨便取的線程名字) #第二種:繼承線程類 class Fetcher(threading.Thread): def __init__(self): Thread.__init__(self): #加這一步後主程序中斷退出後子線程也會跟着中斷退出 self.daemon = True def run(self): #線程運行的函數 pass t = Fetcher() 

線程同時操做一個全局變量時會產生線程競爭因此須要鎖:

lock = threading.Lock() lock.acquire() #得到鎖 #..操做全局變量.. lock.release() #釋放鎖 

多線程同步-隊列

默認這部分同窗們都是學過的,因此就粗略記幾個重點,沒學過的同窗能夠直接參考PyMOTW3-queue — Thread-safe FIFO Implementation中文翻譯版

多線程同步就是多個線程競爭一個全局變量時按順序讀寫,通常狀況下要用鎖,可是使用標準庫裏的Queue的時候它內部已經實現了鎖,不用程序員本身寫了。

導入隊列類:

from queue import Queue 

建立一個隊列:

q = Queue(maxsize=0) 

maxsize爲隊列大小,爲0默認隊列大小可無窮大。

隊列是先進先出的數據結構:

q.put(item) #往隊列添加一個item,隊列滿了則阻塞 q.get(item) #從隊列獲得一個item,隊列爲空則阻塞 

還有相應的不等待的版本,這裏略過。

隊列不爲空,或者爲空可是取得item的線程沒有告知任務完成時都是處於阻塞狀態

q.join() #阻塞直到全部任務完成 

線程告知任務完成使用task_done

q.task_done() #在線程內調用 

實現線程池

建立thread.py文件做爲爬蟲程序的文件。

咱們使用seen_urls來記錄已經解析到的url地址:

seen_urls = set(['/']) 

建立Fetcher類:

class Fetcher(Thread): def __init__(self, tasks): Thread.__init__(self) #tasks爲任務隊列 self.tasks = tasks self.daemon = True self.start() def run(self): while True: url = self.tasks.get() print(url) sock = socket.socket() sock.connect(('localhost', 3000)) get = 'GET {} HTTP/1.0\r\nHost: localhost\r\n\r\n'.format(url) sock.send(get.encode('ascii')) response = b'' chunk = sock.recv(4096) while chunk: response += chunk chunk = sock.recv(4096) #解析頁面上的全部連接 links = self.parse_links(url, response) lock.acquire() #獲得新連接加入任務隊列與seen_urls中 for link in links.difference(seen_urls): self.tasks.put(link) seen_urls.update(links) lock.release() #通知任務隊列這個線程的任務完成了 self.tasks.task_done() 

使用正則庫與url解析庫來解析抓取的頁面,這裏圖方便用了正則,同窗也能夠用Beautifulsoup等專門用來解析頁面的Python庫:

import urllib.parse import re 

Fetcher中實現parse_links解析頁面:

def parse_links(self, fetched_url, response): if not response: print('error: {}'.format(fetched_url)) return set() if not self._is_html(response): return set() #經過href屬性找到全部連接 urls = set(re.findall(r'''(?i)href=["']?([^\s"'<>]+)''', self.body(response))) links = set() for url in urls: #可能找到的url是相對路徑,這時候就須要join一下,絕對路徑的話就仍是會返回url normalized = urllib.parse.urljoin(fetched_url, url) #url的信息會被分段存在parts裏 parts = urllib.parse.urlparse(normalized) if parts.scheme not in ('', 'http', 'https'): continue host, port = urllib.parse.splitport(parts.netloc) if host and host.lower() not in ('localhost'): continue #有的頁面會經過地址裏的#frag後綴在頁面內跳轉,這裏去掉frag的部分 defragmented, frag = urllib.parse.urldefrag(parts.path) links.add(defragmented) return links #獲得報文的html正文 def body(self, response): body = response.split(b'\r\n\r\n', 1)[1] return body.decode('utf-8') def _is_html(self, response): head, body = response.split(b'\r\n\r\n', 1) headers = dict(h.split(': ') for h in head.decode().split('\r\n')[1:]) return headers.get('Content-Type', '').startswith('text/html') 

實現線程池類與main的部分:

class ThreadPool: def __init__(self, num_threads): self.tasks = Queue() for _ in range(num_threads): Fetcher(self.tasks) def add_task(self, url): self.tasks.put(url) def wait_completion(self): self.tasks.join() if __name__ == '__main__': start = time.time() #開4個線程 pool = ThreadPool(4) #從根地址開始抓取頁面 pool.add_task("/") pool.wait_completion() print('{} URLs fetched in {:.1f} seconds'.format(len(seen_urls),time.time() - start)) 

運行效果

這裏先貼出完整代碼:

from queue import Queue from threading import Thread, Lock import urllib.parse import socket import re import time seen_urls = set(['/']) lock = Lock() class Fetcher(Thread): def __init__(self, tasks): Thread.__init__(self) self.tasks = tasks self.daemon = True self.start() def run(self): while True: url = self.tasks.get() print(url) sock = socket.socket() sock.connect(('localhost', 3000)) get = 'GET {} HTTP/1.0\r\nHost: localhost\r\n\r\n'.format(url) sock.send(get.encode('ascii')) response = b'' chunk = sock.recv(4096) while chunk: response += chunk chunk = sock.recv(4096) links = self.parse_links(url, response) lock.acquire() for link in links.difference(seen_urls): self.tasks.put(link) seen_urls.update(links) lock.release() self.tasks.task_done() def parse_links(self, fetched_url, response): if not response: print('error: {}'.format(fetched_url)) return set() if not self._is_html(response): return set() urls = set(re.findall(r'''(?i)href=["']?([^\s"'<>]+)''', self.body(response))) links = set() for url in urls: normalized = urllib.parse.urljoin(fetched_url, url) parts = urllib.parse.urlparse(normalized) if parts.scheme not in ('', 'http', 'https'): continue host, port = urllib.parse.splitport(parts.netloc) if host and host.lower() not in ('localhost'): continue defragmented, frag = urllib.parse.urldefrag(parts.path) links.add(defragmented) return links def body(self, response): body = response.split(b'\r\n\r\n', 1)[1] return body.decode('utf-8') def _is_html(self, response): head, body = response.split(b'\r\n\r\n', 1) headers = dict(h.split(': ') for h in head.decode().split('\r\n')[1:]) return headers.get('Content-Type', '').startswith('text/html') class ThreadPool: def __init__(self, num_threads): self.tasks = Queue() for _ in range(num_threads): Fetcher(self.tasks) def add_task(self, url): self.tasks.put(url) def wait_completion(self): self.tasks.join() if __name__ == '__main__': start = time.time() pool = ThreadPool(4) pool.add_task("/") pool.wait_completion() print('{} URLs fetched in {:.1f} seconds'.format(len(seen_urls),time.time() - start)) 

運行python3 thread.py命令查看效果(記得先開網站服務器):

此處輸入圖片的描述

使用標準庫中的線程池

線程池直接使用multiprocessing.pool中的ThreadPool

代碼更改以下:

from multiprocessing.pool import ThreadPool #...省略中間部分... #...去掉Fetcher初始化中的self.start() #...刪除本身實現的ThreadPool... if __name__ == '__main__': start = time.time() pool = ThreadPool() tasks = Queue() tasks.put("/") Workers = [Fetcher(tasks) for i in range(4)] pool.map_async(lambda w:w.run(), Workers) tasks.join() pool.close() print('{} URLs fetched in {:.1f} seconds'.format(len(seen_urls),time.time() - start)) 

使用ThreadPool時,它處理的對象能夠不是線程對象,實際上Fetcher的線程部分ThreadPool根本用不到。由於它本身內部已開了幾個線程在等待任務輸入。這裏偷個懶就只把self.start()去掉了。能夠把Fetcher的線程部分全去掉,效果是同樣的。

ThreadPool活用了map函數,這裏它將每個Fetcher對象分配給線程池中的一個線程,線程調用了Fetcherrun函數。這裏使用map_async是由於不但願它在那一步阻塞,咱們但願在任務隊列join的地方阻塞,那麼到隊列爲空且任務所有處理完時程序就會繼續執行了。

運行python3 thread.py命令查看效果:

此處輸入圖片的描述

線程池實現的缺陷

咱們但願爬蟲的性能可以進一步提高,可是咱們沒辦法開太多的線程,由於線程的內存開銷很大,每建立一個線程可能須要佔用50k的內存。以及還有一點,網絡程序的時間開銷每每花在I/O上,socket I/O 阻塞時的那段時間是徹底被浪費了的。那麼要如何解決這個問題呢?

下節課你就知道啦,下節課見~

相關文章
相關標籤/搜索