本課程核心部分來自《500 lines or less》項目,做者是來自 MongoDB 的工程師 A. Jesse Jiryu Davis 與 Python 之父 Guido van Rossum。項目代碼使用 MIT 協議,項目文檔使用 http://creativecommons.org/licenses/by/3.0/legalcode 協議。html
課程內容在原文檔基礎上作了稍許修改,增長了部分原理介紹,步驟的拆解分析及源代碼註釋。python
傳統計算機科學每每將大量精力放在如何追求更有效率的算法上。但現在大部分涉及網絡的程序,它們的時間開銷主要並非在計算上,而是在維持多個Socket鏈接上。亦或是它們的事件循環處理的不夠高效致使了更多的時間開銷。對於這些程序來講,它們面臨的挑戰是如何更高效地等待大量的網絡事件並進行調度。目前流行的解決方式就是使用異步I/O。nginx
本課程將探討幾種實現爬蟲的方法,從傳統的線程池到使用協程,每節課實現一個小爬蟲。另外學習協程的時候,咱們會從原理入手,以ayncio協程庫爲原型,實現一個簡單的異步編程模型。git
本課程實現的爬蟲爲爬一個整站的爬蟲,不會爬到站點外面去,且功能較簡單,主要目的在於學習原理,提供實現併發與異步的思路,並不適合直接改寫做爲平常工具使用。程序員
本課程項目完成過程當中,咱們將學習:github
本課程使用Python 3.4
,因此本課程內運行py
腳本都是使用python3
命令。算法
打開終端,進入 Code
目錄,建立 crawler
文件夾, 並將其做爲咱們的工做目錄。npm
$ cd Code $ mkdir crawler && cd crawler
環保起見,測試爬蟲的網站在本地搭建。編程
咱們使用 Python 2.7 版本官方文檔做爲測試爬蟲用的網站緩存
wget http://labfile.oss.aliyuncs.com/courses/574/python-doc.zip unzip python-doc.zip
安裝serve
,一個用起來很方便的靜態文件服務器:
sudo npm install -g serve
啓動服務器:
serve python-doc
若是訪問不了npm
的資源,也能夠用如下方式開啓服務器:
ruby -run -ehttpd python-doc -p 3000
訪問localhost:3000
查看網站:
網絡爬蟲(又被稱爲網頁蜘蛛,網絡機器人,在FOAF社區中間,更常常的稱爲網頁追逐者),是一種按照必定的規則,自動地抓取萬維網信息的程序或者腳本。
網絡爬蟲基本的工做流程是從一個根URL開始,抓取頁面,解析頁面中全部的URL,將尚未抓取過的URL放入工做隊列中,以後繼續抓取工做隊列中的URL,重複抓取、解析,將解析到的url放入工做隊列的步驟,直到工做隊列爲空爲止。
咱們但願經過併發執行來加快爬蟲抓取頁面的速度。通常的實現方式有三種:
Python
的生成器特性,生成器函數可以中途中止並在以後恢復,那麼本來不得不分開寫的回調函數就可以寫在一個生成器函數中了,這也就實現了協程。使用socket
抓取頁面須要先創建鏈接,以後發送GET
類型的HTTP
報文,等待讀入,將讀到的全部內容存入響應緩存。
def fetch(url): sock = socket.socket() sock.connect(('localhost.com', 3000)) request = 'GET {} HTTP/1.0\r\nHost: localhost\r\n\r\n'.format(url) sock.send(request.encode('ascii')) response = b'' chunk = sock.recv(4096) while chunk: response += chunk chunk = sock.recv(4096) links = parse_links(response) q.add(links)
默認的socket
鏈接與讀寫是阻塞式的,在等待讀入的這段時間的CPU佔用是被徹底浪費的。
默認這部分同窗們都是學過的,因此就粗略記幾個重點,沒學過的同窗能夠直接參考廖雪峯的教程:廖雪峯的官方網站-Python多線程
導入線程庫:
import threading
開啓一個線程的方法:
t = 你新建的線程
t.start() #開始運行線程 t.join() #你的當前函數就阻塞在這一步直到線程運行完
創建線程的兩種方式:
#第一種:經過函數建立線程 def 函數a(): pass t = threading.Thread(target=函數a,name=本身隨便取的線程名字) #第二種:繼承線程類 class Fetcher(threading.Thread): def __init__(self): Thread.__init__(self): #加這一步後主程序中斷退出後子線程也會跟着中斷退出 self.daemon = True def run(self): #線程運行的函數 pass t = Fetcher()
線程同時操做一個全局變量時會產生線程競爭因此須要鎖:
lock = threading.Lock() lock.acquire() #得到鎖 #..操做全局變量.. lock.release() #釋放鎖
默認這部分同窗們都是學過的,因此就粗略記幾個重點,沒學過的同窗能夠直接參考PyMOTW3-queue — Thread-safe FIFO Implementation:中文翻譯版
多線程同步就是多個線程競爭一個全局變量時按順序讀寫,通常狀況下要用鎖,可是使用標準庫裏的Queue
的時候它內部已經實現了鎖,不用程序員本身寫了。
導入隊列類:
from queue import Queue
建立一個隊列:
q = Queue(maxsize=0)
maxsize爲隊列大小,爲0默認隊列大小可無窮大。
隊列是先進先出的數據結構:
q.put(item) #往隊列添加一個item,隊列滿了則阻塞 q.get(item) #從隊列獲得一個item,隊列爲空則阻塞
還有相應的不等待的版本,這裏略過。
隊列不爲空,或者爲空可是取得item的線程沒有告知任務完成時都是處於阻塞狀態
q.join() #阻塞直到全部任務完成
線程告知任務完成使用task_done
q.task_done() #在線程內調用
建立thread.py
文件做爲爬蟲程序的文件。
咱們使用seen_urls
來記錄已經解析到的url
地址:
seen_urls = set(['/'])
建立Fetcher
類:
class Fetcher(Thread): def __init__(self, tasks): Thread.__init__(self) #tasks爲任務隊列 self.tasks = tasks self.daemon = True self.start() def run(self): while True: url = self.tasks.get() print(url) sock = socket.socket() sock.connect(('localhost', 3000)) get = 'GET {} HTTP/1.0\r\nHost: localhost\r\n\r\n'.format(url) sock.send(get.encode('ascii')) response = b'' chunk = sock.recv(4096) while chunk: response += chunk chunk = sock.recv(4096) #解析頁面上的全部連接 links = self.parse_links(url, response) lock.acquire() #獲得新連接加入任務隊列與seen_urls中 for link in links.difference(seen_urls): self.tasks.put(link) seen_urls.update(links) lock.release() #通知任務隊列這個線程的任務完成了 self.tasks.task_done()
使用正則庫與url解析庫來解析抓取的頁面,這裏圖方便用了正則,同窗也能夠用Beautifulsoup
等專門用來解析頁面的Python庫:
import urllib.parse import re
在Fetcher
中實現parse_links
解析頁面:
def parse_links(self, fetched_url, response): if not response: print('error: {}'.format(fetched_url)) return set() if not self._is_html(response): return set() #經過href屬性找到全部連接 urls = set(re.findall(r'''(?i)href=["']?([^\s"'<>]+)''', self.body(response))) links = set() for url in urls: #可能找到的url是相對路徑,這時候就須要join一下,絕對路徑的話就仍是會返回url normalized = urllib.parse.urljoin(fetched_url, url) #url的信息會被分段存在parts裏 parts = urllib.parse.urlparse(normalized) if parts.scheme not in ('', 'http', 'https'): continue host, port = urllib.parse.splitport(parts.netloc) if host and host.lower() not in ('localhost'): continue #有的頁面會經過地址裏的#frag後綴在頁面內跳轉,這裏去掉frag的部分 defragmented, frag = urllib.parse.urldefrag(parts.path) links.add(defragmented) return links #獲得報文的html正文 def body(self, response): body = response.split(b'\r\n\r\n', 1)[1] return body.decode('utf-8') def _is_html(self, response): head, body = response.split(b'\r\n\r\n', 1) headers = dict(h.split(': ') for h in head.decode().split('\r\n')[1:]) return headers.get('Content-Type', '').startswith('text/html')
實現線程池類與main
的部分:
class ThreadPool: def __init__(self, num_threads): self.tasks = Queue() for _ in range(num_threads): Fetcher(self.tasks) def add_task(self, url): self.tasks.put(url) def wait_completion(self): self.tasks.join() if __name__ == '__main__': start = time.time() #開4個線程 pool = ThreadPool(4) #從根地址開始抓取頁面 pool.add_task("/") pool.wait_completion() print('{} URLs fetched in {:.1f} seconds'.format(len(seen_urls),time.time() - start))
這裏先貼出完整代碼:
from queue import Queue from threading import Thread, Lock import urllib.parse import socket import re import time seen_urls = set(['/']) lock = Lock() class Fetcher(Thread): def __init__(self, tasks): Thread.__init__(self) self.tasks = tasks self.daemon = True self.start() def run(self): while True: url = self.tasks.get() print(url) sock = socket.socket() sock.connect(('localhost', 3000)) get = 'GET {} HTTP/1.0\r\nHost: localhost\r\n\r\n'.format(url) sock.send(get.encode('ascii')) response = b'' chunk = sock.recv(4096) while chunk: response += chunk chunk = sock.recv(4096) links = self.parse_links(url, response) lock.acquire() for link in links.difference(seen_urls): self.tasks.put(link) seen_urls.update(links) lock.release() self.tasks.task_done() def parse_links(self, fetched_url, response): if not response: print('error: {}'.format(fetched_url)) return set() if not self._is_html(response): return set() urls = set(re.findall(r'''(?i)href=["']?([^\s"'<>]+)''', self.body(response))) links = set() for url in urls: normalized = urllib.parse.urljoin(fetched_url, url) parts = urllib.parse.urlparse(normalized) if parts.scheme not in ('', 'http', 'https'): continue host, port = urllib.parse.splitport(parts.netloc) if host and host.lower() not in ('localhost'): continue defragmented, frag = urllib.parse.urldefrag(parts.path) links.add(defragmented) return links def body(self, response): body = response.split(b'\r\n\r\n', 1)[1] return body.decode('utf-8') def _is_html(self, response): head, body = response.split(b'\r\n\r\n', 1) headers = dict(h.split(': ') for h in head.decode().split('\r\n')[1:]) return headers.get('Content-Type', '').startswith('text/html') class ThreadPool: def __init__(self, num_threads): self.tasks = Queue() for _ in range(num_threads): Fetcher(self.tasks) def add_task(self, url): self.tasks.put(url) def wait_completion(self): self.tasks.join() if __name__ == '__main__': start = time.time() pool = ThreadPool(4) pool.add_task("/") pool.wait_completion() print('{} URLs fetched in {:.1f} seconds'.format(len(seen_urls),time.time() - start))
運行python3 thread.py
命令查看效果(記得先開網站服務器):
線程池直接使用multiprocessing.pool
中的ThreadPool
:
代碼更改以下:
from multiprocessing.pool import ThreadPool #...省略中間部分... #...去掉Fetcher初始化中的self.start() #...刪除本身實現的ThreadPool... if __name__ == '__main__': start = time.time() pool = ThreadPool() tasks = Queue() tasks.put("/") Workers = [Fetcher(tasks) for i in range(4)] pool.map_async(lambda w:w.run(), Workers) tasks.join() pool.close() print('{} URLs fetched in {:.1f} seconds'.format(len(seen_urls),time.time() - start))
使用ThreadPool
時,它處理的對象能夠不是線程對象,實際上Fetcher
的線程部分ThreadPool
根本用不到。由於它本身內部已開了幾個線程在等待任務輸入。這裏偷個懶就只把self.start()
去掉了。能夠把Fetcher
的線程部分全去掉,效果是同樣的。
ThreadPool
活用了map
函數,這裏它將每個Fetcher
對象分配給線程池中的一個線程,線程調用了Fetcher
的run
函數。這裏使用map_async
是由於不但願它在那一步阻塞,咱們但願在任務隊列join
的地方阻塞,那麼到隊列爲空且任務所有處理完時程序就會繼續執行了。
運行python3 thread.py
命令查看效果:
咱們但願爬蟲的性能可以進一步提高,可是咱們沒辦法開太多的線程,由於線程的內存開銷很大,每建立一個線程可能須要佔用50k的內存。以及還有一點,網絡程序的時間開銷每每花在I/O上,socket I/O 阻塞時的那段時間是徹底被浪費了的。那麼要如何解決這個問題呢?
下節課你就知道啦,下節課見~