分佈式進程是指將Process進程分佈到多臺機器上,充分利用多臺機器的性能完成複雜的任務。在Python的經過multiprocessing庫來完成,該模塊不只支持多進程且還支持將多進程分佈到多臺機器上。分佈式進程就是將把Queue暴露到網絡中讓其餘機器進程能夠訪問的過程進行了封裝,這個過程也稱爲本地隊列的網絡化。
html
分佈式爬蟲服務主要爲 6個步驟:node
1. 創建隊列Queue, 用來進行進程間的通訊;
python
2. 把第一步中創建的隊列在網絡上註冊, 暴露給其餘主機的進程, 註冊後得到網絡隊列, 至關於本地隊列的映像;linux
3. 創建一個對象實例manager, 綁定端口和驗證口令;緩存
4. 啓動第三步中創建的實例, 就是管理manager, 監管信息通道;服務器
5. 經過管理實例的方法得到經過網絡訪問Queue對象, 把網絡隊列實體化成可使用的隊列;網絡
6. 建立任務到"本地" 隊列中, 自動上傳任務到網絡隊列中, 分配給任務進程進行處理.app
#coding=utf-8 import Queue from multiprocessing.managers import BaseManager # 第一步,創建task_queue和result_queue來存聽任務和結果 task_queue = Queue.Queue() result_queue = Queue.Queue() # 第二步,把建立的兩個隊列註冊在網絡上,利用register方法,callable參數關聯了Queue對象,將Queue對象在網絡中暴露 BaseManager.register('get_task_queue', callable=lambda:task_queue) BaseManager.register('get_result_queue', callable=lambda:result_queue) # 第三步,綁定端口8001,設置驗證口令,至關於對象初始化 manager = BaseManager(address=('', 8001), authkey='ski12') # 第四步,啓動管理,監聽信息通道 manager.start() # 第五步,經過管理實例的方法得到經過網絡訪問的Queue對象 task = manager.get_task_queue() result = manager.get_result_queue() # 第六步,添加任務 for url in ["ImageUrl_" + str(i) for i in range(10)]: print "put task %s ..." % url task.put(url) # 獲取返回結果 print "try get result..." for i in range(10): print "result is %s" % result.get(timeout=10) # 關閉管理 manager.shutdown()
#coding=utf-8 import Queue from multiprocessing.managers import BaseManager from multiprocessing import freeze_support # 任務個數 task_number = 10 # 第一步,創建task_queue和result_queue來存聽任務和結果 task_queue = Queue.Queue(task_number) result_queue = Queue.Queue(task_number) def get_task(): return task_queue def get_result(): return result_queue def win_run(): # 第二步,把建立的兩個隊列註冊在網絡上,利用register方法,callable參數關聯了Queue對象,將Queue對象在網絡中暴露 # Windows下綁定調用接口不能使用lambda,因此只能先定義函數再綁定 BaseManager.register('get_task_queue', callable=get_task) BaseManager.register('get_result_queue', callable=get_result) # 第三步,綁定端口8001,設置驗證口令,至關於對象初始化,Windows下須要填寫IP地址 manager = BaseManager(address=('127.0.0.1', 8001), authkey='ski12') # 第四步,啓動管理,監聽信息通道 manager.start() try: # 第五步,經過管理實例的方法得到經過網絡訪問的Queue對象 task = manager.get_task_queue() result = manager.get_result_queue() # 第六步,添加任務 for url in ["ImageUrl_" + str(i) for i in range(10)]: print "put task %s ..." % url task.put(url) # 獲取返回結果 print "try get result..." for i in range(10): print "result is %s" % result.get(timeout=10) except: print "Manager error" finally: # 必定要關閉管理,不然會報管道未關閉錯誤 manager.shutdown() if __name__ == '__main__': # Windows下多進程可能會有問題,添加這句能夠緩解 freeze_support() win_run()
一、使用QueueManager註冊用於獲取Queue的方法名稱,任務進程只能經過名稱來在網絡上獲取Queue;框架
二、鏈接服務器,端口和驗證口令注意保持與服務進程中徹底一致;分佈式
三、從網絡上獲取Queue,進行本地化;
四、從task隊列獲取任務,並把結果寫入result隊列。
#coding=utf-8 import time from multiprocessing.managers import BaseManager # 第一步,使用QueueManager註冊用於獲取Queue的方法名稱 BaseManager.register('get_task_queue') BaseManager.register('get_result_queue') # 第二步,鏈接到服務器 server_addr = '127.0.0.1' print "Connect to server %s..." % server_addr # 端口和驗證口令注意保持與服務進程徹底一致 m = BaseManager(address=(server_addr, 8001), authkey='ski12') # 從網絡鏈接 m.connect() # 第三步,獲取Queue對象 task = m.get_task_queue() result = m.get_result_queue() # 第四步,從task隊列獲取任務,並把結果寫入result隊列 while(not task.empty()): image_url = task.get(True, timeout=5) print "run task download %s..." % image_url time.sleep(1) result.put("%s--->success" % image_url) # 處理結束 print "worker exit."
採用主從模式,由一臺主機做爲控制節點來負責管理全部運行爬蟲的主機,爬蟲只需從控制節點中接收任務並把新生成的任務提交給控制節點便可。缺點是容易致使整個分佈式爬蟲系統的性能降低。
控制節點分爲URL管理器、數據存儲器和控制調度器。
爬蟲節點分爲HTML下載器、HTML解析器和爬蟲調度器。
其實和以前的基礎爬蟲框架相似,只不過添加了控制調度器來實現分佈式的控制管理而已。
NodeManager.py:
#coding=utf-8 import time import sys from multiprocessing import Queue, Process from multiprocessing.managers import BaseManager from DataOutput import DataOutput from URLManager import UrlManager class NodeManager(object): # 建立一個分佈式管理器 def start_Manager(self, url_q, result_q): # 把建立的兩個隊列註冊在網絡上,利用register方法,callable參數關聯了Queue對象,將Queue對象在網絡中暴露 BaseManager.register('get_task_queue', callable=lambda:url_q) BaseManager.register('get_result_queue', callable=lambda:result_q) # 綁定端口8001,設置驗證口令,至關於對象初始化 manager = BaseManager(address=('', 8001), authkey='ski12') # 返回manager對象 return manager def url_manager_proc(self, url_q, conn_q, root_url): url_manager = UrlManager() url_manager.add_new_url(root_url) while True: while(url_manager.has_new_url()): # 從URL管理器獲取新的url new_url = url_manager.get_new_url() # 將新的URL發給工做節點 url_q.put(new_url) # print "[*]The number of crawled url is: ", url_manager.old_url_size() # 顯示進度條 percentage = u"[*]已爬取的URL數量爲:%s" % url_manager.old_url_size() sys.stdout.write('\r' + percentage) # 加一個判斷條件,當爬去2000個連接後就關閉,並保存進度 if(url_manager.old_url_size()>20): # 通知爬行節點工做結束,添加標識符end url_q.put('end') print u"\n[*]控制節點通知爬行結點結束工做..." # 關閉管理節點,同時存儲set狀態 url_manager.save_progress('new_urls.txt', url_manager.new_urls) url_manager.save_progress('old_urls.txt', url_manager.old_urls) return # 將從result_solve_proc獲取到的urls添加到URL管理器 try: if not conn_q.empty(): urls = conn_q.get() url_manager.add_new_urls(urls) except BaseException, e: # 延時休息 time.sleep(0.1) def result_solve_proc(self, result_q, conn_q, store_q): while True: try: if not result_q.empty(): content = result_q.get(True) if content['new_urls'] == 'end': # 結果分析進程接受通知而後結束 print u"[*]關閉數據提取進程" store_q.put('end') return # url爲set類型 conn_q.put(content['new_urls']) # 解析出來的數據爲dict類型 store_q.put(content['data']) else: # 延時休息 time.sleep(0.1) except BaseException, e: # 延時休息 time.sleep(0.1) def store_proc(self, store_q): output = DataOutput() while True: if not store_q.empty(): data = store_q.get() if data == 'end': print u"[*]關閉數據存儲進程" output.output_end(output.filepath) return output.store_data(data) else: time.sleep(0.1) if __name__ == '__main__': if len(sys.argv) == 2: url = 'https://baike.baidu.com/item/' + sys.argv[1] # 初始化4個隊列 url_q = Queue() result_q = Queue() conn_q = Queue() store_q = Queue() # 建立分佈式管理器 node = NodeManager() manager = node.start_Manager(url_q, result_q) # 建立URL管理進程、 數據提取進程和數據存儲進程 url_manager_proc = Process(target=node.url_manager_proc, args=(url_q, conn_q, url)) result_solve_proc = Process(target=node.result_solve_proc, args=(result_q, conn_q, store_q)) store_proc = Process(target=node.store_proc, args=(store_q, )) # 啓動3個進程和分佈式管理器 url_manager_proc.start() result_solve_proc.start() store_proc.start() manager.get_server().serve_forever() else: print "[*]Usage: python NodeManager.py [Crawl Keyword]"
UrlManager.py:
#coding=utf-8 import cPickle import hashlib class UrlManager(object): def __init__(self): # 未爬取的URL集合 self.new_urls = self.load_progress('new_urls.txt') # 已爬取的URL集合 self.old_urls = self.load_progress('old_urls.txt') def has_new_url(self): return self.new_url_size() != 0 def get_new_url(self): new_url = self.new_urls.pop() # 對爬取過的URL進行MD5處理,對獲取的信息摘要取中間的128位保存到set(),以減小內存消耗 m = hashlib.md5() m.update(new_url) self.old_urls.add(m.hexdigest()[8:-8]) return new_url def add_new_url(self, url): if url is None: return m = hashlib.md5() m.update(url) url_md5 = m.hexdigest()[8:-8] if url not in self.new_urls and url_md5 not in self.old_urls: self.new_urls.add(url) def add_new_urls(self, urls): if urls is None or len(urls) == 0: return for url in urls: self.add_new_url(url) def new_url_size(self): return len(self.new_urls) def old_url_size(self): return len(self.old_urls) # 將未爬取的URL集合和已爬取的URL集合序列化到本地,保存當前進度,以便下次恢復狀態 def save_progress(self, path, data): with open(path, 'wb') as f: cPickle.dump(data, f) # 加載進度 def load_progress(self, path): print "[+]從文件加載進度:%s" % path try: with open(path, 'rb') as f: tmp = cPickle.load(f) return tmp except: print "[!]無進度文件,建立:%s" % path return set()
DataOutput.py:
#coding=utf-8 import codecs import time class DataOutput(object): def __init__(self): # 生成的文件按照當前時間來命名以區分文件 self.filepath = 'baike_%s.html' % (time.strftime("%Y_%m_%d_%H_%M_%S", time.localtime())) self.output_head(self.filepath) self.datas = [] def store_data(self, data): if data is None: return self.datas.append(data) # 對文件進行緩存寫入,當大於10條數據時就保存 if len(self.datas) > 10: self.output_html(self.filepath) def output_head(self, path): fout = codecs.open(path, 'w', encoding='utf-8') fout.write("<html>") fout.write("<body>") fout.write("<table>") fout.close() def output_html(self, path): fout = codecs.open(path, 'a', encoding='utf-8') for data in self.datas: fout.write("<tr>") fout.write("<td>%s</td>" % data['url']) fout.write("<td>%s</td>" % data['title']) fout.write("<td>%s</td>" % data['summary']) fout.write("</tr>") self.datas.remove(data) fout.close() def output_end(self, path): fout = codecs.open(path, 'a', encoding='utf-8') fout.write("</table>") fout.write("</body>") fout.write("</html>") fout.close()
SpiderWorker.py:
#coding=utf-8 from multiprocessing.managers import BaseManager from HtmlDownloader import HtmlDownloader from HtmlParser import HtmlParser class SpiderWorker(object): def __init__(self): # 初始化分佈式進程中的工做節點的鏈接工做 # 實現第一步:使用BaseManager註冊獲取Queue的方法名稱 BaseManager.register('get_task_queue') BaseManager.register('get_result_queue') # 實現第二步:鏈接到服務器: server_addr = '127.0.0.1' print "[*]Connect to server %s..." % server_addr # 端口和驗證口令注意保持與服務進程設置的徹底一致: self.m = BaseManager(address=(server_addr, 8001), authkey='ski12') # 從網絡鏈接: self.m.connect() # 實現第三步:獲取Queue的對象 self.task = self.m.get_task_queue() self.result = self.m.get_result_queue() # 初始化網頁下載器和解析器 self.downloader = HtmlDownloader() self.parser = HtmlParser() print "[*]Init finished." def crawl(self): while True: try: if not self.task.empty(): url = self.task.get() if url == 'end': print "[*]Control Node informs all the Spider Nodes stop working." # 接着通知其它節點中止工做 self.result.put({'new_urls':'end', 'data':'end'}) return print "[*]The Spider Node is parsing: %s" % url.encode('utf-8') content = self.downloader.download(url) new_urls, data = self.parser.parser(url, content) self.result.put({'new_urls':new_urls, 'data':data}) except EOFError, e: print "[-]Fail to connect to the Worker Node." return except Exception, e: print e print "[-]Crawl failed." if __name__ == '__main__': spider = SpiderWorker() spider.crawl()
HtmlDownloader.py:
#coding=utf-8 import requests class HtmlDownloader(object): def download(self, url): if url is None: return None user_agent = "Mozilla/4.0 (compatible; MSIE 5.5; Windows NT)" headers = {'User-Agent':user_agent} r = requests.get(url, headers=headers) if r.status_code == 200: r.encoding = 'utf-8' return r.text return None
HtmlParser.py:
#coding=utf-8 import re import urlparse from bs4 import BeautifulSoup as BS class HtmlParser(object): def parser(self, page_url, html_cont): if page_url is None or html_cont is None: return soup = BS(html_cont, 'html.parser', from_encoding='utf-8') new_urls = self._get_new_urls(page_url, soup) new_data = self._get_new_data(page_url, soup) return new_urls, new_data def _get_new_urls(self, page_url, soup): new_urls = set() # 抽取符合要求的a標籤 links = soup.find_all('a', href=re.compile(r'/item/.*')) for link in links: # 提取href屬性 new_url = link['href'] # 拼接成完整網址 new_full_url = urlparse.urljoin(page_url, new_url) new_urls.add(new_full_url) return new_urls def _get_new_data(self, page_url, soup): data = {} data['url'] = page_url title = soup.find('dd', class_='lemmaWgt-lemmaTitle-title').find('h1') data['title'] = title.get_text() summary = soup.find('div', class_='lemma-summary') # 獲取到tag中包含的全部文版內容包括子孫tag中的內容,並將結果做爲Unicode字符串返回 data['summary'] = summary.get_text() return data