此次分享的文章是我《Python爬蟲開發與項目實戰》基礎篇 第七章的內容,關於如何手工打造簡單分佈式爬蟲 (若是你們對這本書感興趣的話,能夠看一下 試讀樣章),下面是文章的具體內容。html
本章講的依舊是實戰項目,實戰內容是打造分佈式爬蟲,這對初學者來講,是一個不小的挑戰,也是一次有意義的嘗試。此次打造的分佈式爬蟲採用比較簡單的主從模式,徹底手工打造,不使用成熟框架,基本上涵蓋了前六章的主要知識點,其中涉及分佈式的知識點是分佈式進程和進程間通訊的內容,算是對Python爬蟲基礎篇的總結。node
如今大型的爬蟲系統都是採起分佈式爬取結構,經過這次實戰項目,讓你們對分佈式爬蟲有一個比較清晰地瞭解,爲以後系統的講解分佈式爬蟲打下基礎,其實它並無多麼困難。實戰目標:爬取2000個百度百科網絡爬蟲詞條以及相關詞條的標題、摘要和連接等信息,採用分佈式結構改寫第六章的基礎爬蟲,使功能更增強大。爬取頁面請看圖6.1。python
本次分佈式爬蟲採用主從模式。主從模式是指由一臺主機做爲控制節點負責全部運行網絡爬蟲的主機進行管理,爬蟲只須要從控制節點那裏接收任務,並把新生成任務提交給控制節點就能夠了,在這個過程當中沒必要與其餘爬蟲通訊,這種方式實現簡單利於管理。而控制節點則須要與全部爬蟲進行通訊,所以能夠看到主從模式是有缺陷的,控制節點會成爲整個系統的瓶頸,容易致使整個分佈式網絡爬蟲系統性能降低。linux
這次使用三臺主機進行分佈式爬取,一臺主機做爲控制節點,另外兩臺主機做爲爬蟲節點。爬蟲結構如圖7.1所示:算法
圖7.1 主從爬蟲結構緩存
控制節點主要分爲URL管理器、數據存儲器和控制調度器。控制調度器經過三個進程來協調URL管理器和數據存儲器的工做,一個是URL管理進程,負責URL的管理和將URL傳遞給爬蟲節點,一個是數據提取進程,負責讀取爬蟲節點返回的數據,將返回數據中的URL交給URL管理進程,將標題和摘要等數據交給數據存儲進程,最後一個是數據存儲進程,負責將數據提取進程中提交的數據進行本地存儲。執行流程如圖7.2所示:服務器
圖7.2 控制節點執行流程網絡
URL管理器查考第六章的代碼,作了一些優化修改。因爲咱們採用set內存去重的方式,若是直接存儲大量的URL連接,尤爲是URL連接很長的時候,很容易形成內存溢出,因此咱們採用將爬取過的URL進行MD5處理,因爲字符串通過MD5處理後的信息摘要長度能夠128bit,將生成的MD5摘要存儲到set後,能夠減小好幾倍的內存消耗,Python中的MD5算法生成的是32位的字符串,因爲咱們爬取的url較少,md5衝突不大,徹底能夠取中間的16位字符串,即16位MD5加密。同時添加了save_progress和load_progress方法進行序列化的操做,將未爬取URL集合和已爬取的URL集合序列化到本地,保存當前的進度,以便下次恢復狀態。URL管理器URLManager.py代碼以下:app
#coding:utf-8 import cPickle import hashlib class UrlManager(object): def __init__(self): self.new_urls = self.load_progress('new_urls.txt')#未爬取URL集合 self.old_urls = self.load_progress('old_urls.txt')#已爬取URL集合 def has_new_url(self): ''' 判斷是否有未爬取的URL :return: ''' return self.new_url_size()!=0 def get_new_url(self): ''' 獲取一個未爬取的URL :return: ''' new_url = self.new_urls.pop() m = hashlib.md5() m.update(new_url) self.old_urls.add(m.hexdigest()[8:-8]) return new_url def add_new_url(self,url): ''' 將新的URL添加到未爬取的URL集合中 :param url:單個URL :return: ''' if url is None: return m = hashlib.md5() m.update(url) url_md5 = m.hexdigest()[8:-8] if url not in self.new_urls and url_md5 not in self.old_urls: self.new_urls.add(url) def add_new_urls(self,urls): ''' 將新的URLS添加到未爬取的URL集合中 :param urls:url集合 :return: ''' if urls is None or len(urls)==0: return for url in urls: self.add_new_url(url) def new_url_size(self): ''' 獲取未爬取URL集合的s大小 :return: ''' return len(self.new_urls) def old_url_size(self): ''' 獲取已經爬取URL集合的大小 :return: ''' return len(self.old_urls) def save_progress(self,path,data): ''' 保存進度 :param path:文件路徑 :param data:數據 :return: ''' with open(path, 'wb') as f: cPickle.dump(data, f) def load_progress(self,path): ''' 從本地文件加載進度 :param path:文件路徑 :return:返回set集合 ''' print '[+] 從文件加載進度: %s' % path try: with open(path, 'rb') as f: tmp = cPickle.load(f) return tmp except: print '[!] 無進度文件, 建立: %s' % path return set()
數據存儲器的內容基本上和第六章的同樣,不過生成文件按照當前時間進行命名避免重複,同時對文件進行緩存寫入。代碼以下:框架
#coding:utf-8 import codecs import time class DataOutput(object): def __init__(self): self.filepath='baike_%s.html'%(time.strftime("%Y_%m_%d_%H_%M_%S", time.localtime()) ) self.output_head(self.filepath) self.datas=[] def store_data(self,data): if data is None: return self.datas.append(data) if len(self.datas)>10: self.output_html(self.filepath) def output_head(self,path): ''' 將HTML頭寫進去 :return: ''' fout=codecs.open(path,'w',encoding='utf-8') fout.write("<html>") fout.write("<body>") fout.write("<table>") fout.close() def output_html(self,path): ''' 將數據寫入HTML文件中 :param path: 文件路徑 :return: ''' fout=codecs.open(path,'a',encoding='utf-8') for data in self.datas: fout.write("<tr>") fout.write("<td>%s</td>"%data['url']) fout.write("<td>%s</td>"%data['title']) fout.write("<td>%s</td>"%data['summary']) fout.write("</tr>") self.datas.remove(data) fout.close() def ouput_end(self,path): ''' 輸出HTML結束 :param path: 文件存儲路徑 :return: ''' fout=codecs.open(path,'a',encoding='utf-8') fout.write("</table>") fout.write("</body>") fout.write("</html>") fout.close()
控制調度器主要是產生並啓動URL管理進程、數據提取進程和數據存儲進程,同時維護4個隊列保持進程間的通訊,分別爲url_queue,result_queue, conn_q,store_q。4個隊列說明以下:
由於要和工做節點進行通訊,因此分佈式進程必不可少。參考1.4.4小節分佈式進程中服務進程中的代碼(linux版),建立一個分佈式管理器,定義爲start_manager方法。方法代碼以下:
def start_Manager(self,url_q,result_q): ''' 建立一個分佈式管理器 :param url_q: url隊列 :param result_q: 結果隊列 :return: ''' #把建立的兩個隊列註冊在網絡上,利用register方法,callable參數關聯了Queue對象, # 將Queue對象在網絡中暴露 BaseManager.register('get_task_queue',callable=lambda:url_q) BaseManager.register('get_result_queue',callable=lambda:result_q) #綁定端口8001,設置驗證口令‘baike’。這個至關於對象的初始化 manager=BaseManager(address=('',8001),authkey='baike') #返回manager對象 return manager
URL管理進程將從conn_q隊列獲取到的新URL提交給URL管理器,通過去重以後,取出URL放入url_queue隊列中傳遞給爬蟲節點,代碼以下:
def url_manager_proc(self,url_q,conn_q,root_url): url_manager = UrlManager() url_manager.add_new_url(root_url) while True: while(url_manager.has_new_url()): #從URL管理器獲取新的url new_url = url_manager.get_new_url() #將新的URL發給工做節點 url_q.put(new_url) print 'old_url=',url_manager.old_url_size() #加一個判斷條件,當爬去2000個連接後就關閉,並保存進度 if(url_manager.old_url_size()>2000): #通知爬行節點工做結束 url_q.put('end') print '控制節點發起結束通知!' #關閉管理節點,同時存儲set狀態 url_manager.save_progress('new_urls.txt',url_manager.new_urls) url_manager.save_progress('old_urls.txt',url_manager.old_urls) return #將從result_solve_proc獲取到的urls添加到URL管理器之間 try: if not conn_q.empty(): urls = conn_q.get() url_manager.add_new_urls(urls) except BaseException,e: time.sleep(0.1)#延時休息
數據提取進程從result_queue隊列讀取返回的數據,並將數據中的URL添加到conn_q隊列交給URL管理進程,將數據中的文章標題和摘要添加到store_q隊列交給數據存儲進程。代碼以下:
def result_solve_proc(self,result_q,conn_q,store_q): while(True): try: if not result_q.empty(): content = result_q.get(True) if content['new_urls']=='end': #結果分析進程接受通知而後結束 print '結果分析進程接受通知而後結束!' store_q.put('end') return conn_q.put(content['new_urls'])#url爲set類型 store_q.put(content['data'])#解析出來的數據爲dict類型 else: time.sleep(0.1)#延時休息 except BaseException,e: time.sleep(0.1)#延時休息
數據存儲進程從store_q隊列中讀取數據,並調用數據存儲器進行數據存儲。代碼以下:
def store_proc(self,store_q): output = DataOutput() while True: if not store_q.empty(): data = store_q.get() if data=='end': print '存儲進程接受通知而後結束!' output.ouput_end(output.filepath) return output.store_data(data) else: time.sleep(0.1)
最後將分佈式管理器、URL管理進程、 數據提取進程和數據存儲進程進行啓動,並初始化4個隊列。代碼以下:
if __name__=='__main__': #初始化4個隊列 url_q = Queue() result_q = Queue() store_q = Queue() conn_q = Queue() #建立分佈式管理器 node = NodeManager() manager = node.start_Manager(url_q,result_q) #建立URL管理進程、 數據提取進程和數據存儲進程 url_manager_proc = Process(target=node.url_manager_proc, args=(url_q,conn_q,'http://baike.baidu.com/view/284853.htm',)) result_solve_proc = Process(target=node.result_solve_proc, args=(result_q,conn_q,store_q,)) store_proc = Process(target=node.store_proc, args=(store_q,)) #啓動3個進程和分佈式管理器 url_manager_proc.start() result_solve_proc.start() store_proc.start() manager.get_server().serve_forever()
爬蟲節點相對簡單,主要包含HTML下載器、HTML解析器和爬蟲調度器。執行流程以下:
HTML下載器的代碼和第六章的一致,只要注意網頁編碼便可。代碼以下:
#coding:utf-8 import requests class HtmlDownloader(object): def download(self,url): if url is None: return None user_agent = 'Mozilla/4.0 (compatible; MSIE 5.5; Windows NT)' headers={'User-Agent':user_agent} r = requests.get(url,headers=headers) if r.status_code==200: r.encoding='utf-8' return r.text return None
HTML解析器的代碼和第六章的一致,詳細的網頁分析過程能夠回顧第六章。代碼以下:
#coding:utf-8 import re import urlparse from bs4 import BeautifulSoup class HtmlParser(object): def parser(self,page_url,html_cont): ''' 用於解析網頁內容抽取URL和數據 :param page_url: 下載頁面的URL :param html_cont: 下載的網頁內容 :return:返回URL和數據 ''' if page_url is None or html_cont is None: return soup = BeautifulSoup(html_cont,'html.parser',from_encoding='utf-8') new_urls = self._get_new_urls(page_url,soup) new_data = self._get_new_data(page_url,soup) return new_urls,new_data def _get_new_urls(self,page_url,soup): ''' 抽取新的URL集合 :param page_url: 下載頁面的URL :param soup:soup :return: 返回新的URL集合 ''' new_urls = set() #抽取符合要求的a標籤 links = soup.find_all('a',href=re.compile(r'/view/\d+\.htm')) for link in links: #提取href屬性 new_url = link['href'] #拼接成完整網址 new_full_url = urlparse.urljoin(page_url,new_url) new_urls.add(new_full_url) return new_urls def _get_new_data(self,page_url,soup): ''' 抽取有效數據 :param page_url:下載頁面的URL :param soup: :return:返回有效數據 ''' data={} data['url']=page_url title = soup.find('dd',class_='lemmaWgt-lemmaTitle-title').find('h1') data['title']=title.get_text() summary = soup.find('div',class_='lemma-summary') #獲取tag中包含的全部文版內容包括子孫tag中的內容,並將結果做爲Unicode字符串返回 data['summary']=summary.get_text() return data
爬蟲調度器須要用到分佈式進程中工做進程的代碼,具體內容能夠參考第一章的分佈式進程章節。爬蟲調度器須要先鏈接上控制節點,而後依次完成從url_q隊列中獲取URL,下載並解析網頁,將獲取的數據交給result_q隊列,返回給控制節點等各項任務,代碼以下:
class SpiderWork(object): def __init__(self): #初始化分佈式進程中的工做節點的鏈接工做 # 實現第一步:使用BaseManager註冊獲取Queue的方法名稱 BaseManager.register('get_task_queue') BaseManager.register('get_result_queue') # 實現第二步:鏈接到服務器: server_addr = '127.0.0.1' print('Connect to server %s...' % server_addr) # 端口和驗證口令注意保持與服務進程設置的徹底一致: self.m = BaseManager(address=(server_addr, 8001), authkey='baike') # 從網絡鏈接: self.m.connect() # 實現第三步:獲取Queue的對象: self.task = self.m.get_task_queue() self.result = self.m.get_result_queue() #初始化網頁下載器和解析器 self.downloader = HtmlDownloader() self.parser = HtmlParser() print 'init finish' def crawl(self): while(True): try: if not self.task.empty(): url = self.task.get() if url =='end': print '控制節點通知爬蟲節點中止工做...' #接着通知其它節點中止工做 self.result.put({'new_urls':'end','data':'end'}) return print '爬蟲節點正在解析:%s'%url.encode('utf-8') content = self.downloader.download(url) new_urls,data = self.parser.parser(url,content) self.result.put({"new_urls":new_urls,"data":data}) except EOFError,e: print "鏈接工做節點失敗" return except Exception,e: print e print 'Crawl fali ' if __name__=="__main__": spider = SpiderWork() spider.crawl()
在爬蟲調度器設置了一個本地IP:127.0.0.1,你們能夠將在一臺機器上測試代碼的正確性。固然也可使用三臺VPS服務器,兩臺運行爬蟲節點程序,將IP改成控制節點主機的公網IP,一臺運行控制節點程序,進行分佈式爬取,這樣更貼近真實的爬取環境。下面圖7.3爲最終爬取的數據,圖7.4爲new_urls.txt內容,圖7.5爲old_urls.txt內容,你們能夠進行對比測試,這個簡單的分佈式爬蟲還有很大發揮的空間,但願你們發揮本身的聰明才智進一步完善。
圖7.3 最終爬取的數據
圖7.4 new_urls.txt
圖7.5 old_urls.txt
本章講解了一個簡單的分佈式爬蟲結構,主要目的是幫助你們將Python爬蟲基礎篇的知識進行總結和強化,開拓你們的思惟,同時也讓你們知道分佈式爬蟲並非這麼遙不可及。不過當你親手打造一個分佈式爬蟲後,就會知道分佈式爬蟲的難點在於節點的調度,什麼樣的結構能讓各個節點穩定高效的運做纔是分佈式爬蟲要考慮的核心內容。到本章爲止,Python爬蟲基礎篇已經結束,這個時候你們基本上能夠編寫簡單的爬蟲,爬取一些靜態網站的內容,可是Python爬蟲開發不只如此,你們接着往下學習吧。
打個廣告,若是你們對這本書感興趣的話,能夠看一下 試讀樣章 或者直接購買。
歡迎你們支持我公衆號:
