爬蟲應該可以快速高效的完成數據爬取和分析任務。使用多個進程協同完成一個任務,提升了數據爬取的效率。html
以百度百科的一條爲起點,抓取百度百科2000左右詞條數據。node
參閱模仿了:https://book.douban.com/subject/27061630/。python
做者說是簡單的分佈式爬蟲(hh),在書中有詳細的說明和註解。安全
這裏只是補漏和梳理。bash
由於進程傳遞參數的問題,搞了幾天仍是放棄了在WIndows上跑,換用了Linux。網絡
又由於各類各樣的問題,棄用CentOS(它確實是安全可靠的,可是...我不會裝QQ,輸入法等),換用了軟件容易安裝的Ubuntu。而後才裝了Eclipse等各類軟件後,纔開始多進程的調試。app
主節點和從節點的方案實現信息爬取。結構應該讓各個節點高效工做。分佈式
爬蟲爬取速度受到網絡延時的影響和網頁信息解析的影響比較嚴重,因此使用多個從節點用來專門負責下載網頁信息,解析網頁信息。ide
則分爲三個文件,爬取文件,下載網頁文件,解析網頁文件。函數
爬取文件接收來自主節點發送來的網頁地址。而後調用下載網頁文件並完成解析,將處理好的數據發送給主節點。
主節點負責發送給從節點網頁地址,並接收來自從節點的解析後的網頁信息,將網頁信息存儲下來。
主節點任務分爲分發網址,接收從節點的信息,存儲網頁三部分。在代碼裏,他創建了三個進程,來分別實現。
主節點任務中,存儲信息,定義一套存儲信息的方法。分發網址,定義一套分發網址過程當中可能用到的方法。主文件中,設立三個函數,創建三個進程。
主節點的三個任務,分紅三個進程,三個進程(分發網址,數據接收,數據存儲),作一個類。
數據接收與分發網址,須要分佈式進程。分佈式進程須要使用隊列Queue。這裏必定是multiprocessing中的導入的隊列。網址分發、數據接收分別使用一個隊列。
註冊,設定地址,祕鑰,完成初始化過程,將url_q,result_q分別註冊到網絡中。
而後設立分發任務,傳遞隊列給分發任務函數。分發任務使用url_q隊列完成數據的發送。使用conn_q接收了新的網址,並進行存儲,再次分發到url_q上。
數據接收任務,完成了數據的接收過程,接收之後須要及時將數據存儲,在這裏使用了兩個隊列conn_q,放置接收數據中的地址信息,store_q,放置接收數據中的網頁信息。
數據存儲任務,接收數據接收任務中的store_q隊列信息,及時寫入到磁盤中。
全部涉及到的文件以下:
NodeManager.py
import time #import sys #sys.path.append('/home')#if needed ,add path as package from UrlManager import UrlManager from multiprocessing import Process,Queue from multiprocessing.managers import BaseManager from DataOutput import DataOutput class NodeManager(): def start_manager(self,url_q,result_q): BaseManager.register('get_task_queue', callable=lambda:url_q) BaseManager.register('get_result_queue',callable=lambda:result_q) manager=BaseManager(address=('127.0.0.1',8001),authkey='baike'.encode('utf-8')) return manager def url_manager_proc(self,url_q,conn_q,root_url): #send url to queue and receive new urls for storing to object url_manager=UrlManager() url_manager.add_new_url(root_url) while True: while(url_manager.has_new_url()): new_url=url_manager.get_new_url() url_q.put(new_url) print('old url size:'+str(url_manager.old_url_size())) if(url_manager.old_url_size()>2000): url_q.put('end') url_manager.save_process('new_urls.txt',url_manager.new_urls) url_manager.save_process('old_urls.txt',url_manager.old_urls) print('finish url_manager_proc') return try: urls=conn_q.get() url_manager.add_new_urls(urls) print('get:'+urls) except Exception: time.sleep(0.1) def result_solve_proc(self,result_q,conn_q,store_q): while True: if not result_q.empty(): content=result_q.get(True) if content['new_urls']=='end': print('finish result_solve_proc') store_q.put('end') return conn_q.put(content["new_urls"]) store_q.put(content["data"]) else: time.sleep(0.1) def store_proc(self,store_q): output=DataOutput() while True: if not store_q.empty(): data=store_q.get() if data =='end': print('finish store_proc') output.output_end(output.path) return output.store_data(data) if __name__=='__main__': url_q=Queue()#send url to workers result_q=Queue()#receive url's analytical data from works store_q=Queue()#analytical data which is fresh is used for storing to disk for further extract conn_q=Queue()#urls which is fresh are used for storing to object for further extract nodeObject=NodeManager() manager=nodeObject.start_manager(url_q,result_q) root_url='https://baike.baidu.com/item/%E7%BD%91%E7%BB%9C%E7%88%AC%E8%99%AB/5162711?fr=aladdin' url_manager=Process(target=nodeObject.url_manager_proc,args=(url_q,conn_q,root_url,)) result_solve=Process(target=nodeObject.result_solve_proc,args=(result_q,conn_q,store_q,)) store=Process(target=nodeObject.store_proc,args=(store_q,)) url_manager.start() result_solve.start() store.start() manager.get_server().serve_forever()
UrlManager.py
import hashlib import pickle class UrlManager(): def __init__(self): self.old_urls=self.load_process('new_urls.txt') self.new_urls=self.load_process('old_urls.txt') pass def has_new_url(self): return self.new_url_size()!=0 def new_url_size(self): return len(self.new_urls) def old_url_size(self): return len(self.old_urls) def get_new_url(self): new_url=self.new_urls.pop() m=hashlib.md5() m.update(new_url.encode("utf8")) self.old_urls.add(m.hexdigest()[8:-8]) return new_url def add_new_url(self,url): if url is None: return m=hashlib.md5() m.update(url.encode('utf-8')) url_md5=m.hexdigest()[8:-8] if url not in self.new_urls and url_md5 not in self.old_urls: self.new_urls.add(url) def add_new_urls(self,urls): if urls is None or len(urls) == 0: return for url in urls: self.add_new_url(url) pass def save_process(self,path,data): with open(path,'wb') as f: pickle.dump(data,f) def load_process(self,path): print('loading..') try: with open(path,'rb') as f: tmp=pickle.load(f) return tmp except: print('loading error maybe loading file not exist and will create it:'+path) newSet=set() self.save_process(path, newSet) return newSet
DataOutput.py
import codecs from os.path import os class DataOutput(object): def __init__(self): self.path='baike.html' self.output_head(self.path) self.datas=[] def store_data(self,data): if data is None: return self.datas.append(data) self.output_html(self.path,data) def output_head(self,path): if os.path.exists(path): return fout=codecs.open('baike.html', 'w', encoding='utf-8') fout.write("<html>") fout.write("<head><meta charset='urf-8'></head>") fout.write("<body>") fout.write("<table border='1' width=1800 style='word-break:break-all;word-wrap:break-word;'>") fout.write("<tr>") fout.write("<td width='20'>序號</td>") fout.write("<td width='300'>URL</td>") fout.write("<td width='100'>標題</td>") fout.write("<td width='1200'>釋義</td>") fout.write("</tr>") fout.close() def output_end(self,path): fout=codecs.open(path, 'a', encoding='utf-8') fout.write("</table>") fout.write("</body>") fout.write("</html>") fout.close() def output_html(self,path,data): fout=codecs.open(path, 'a', encoding='utf-8') fout.write("<tr>") fout.write("<td>%s</td>"%str(len(self.datas))) fout.write("<td><a href=%s>%s</a></td>"%(data['url'],data['url'])) fout.write("<td>%s</td>"%data['title']) fout.write("<td>%s</td>"%data['summary']) fout.write("</tr>") fout.close()
從節點首先是鏈接到指定地址並驗證祕鑰。鏈接後獲取url_q、result_q。
從url_q中獲取發來的地址,調用HTML下載器下載數據,調動HTML解析器解析數據,而後把結果放到result_q隊列上。
代碼以下
SpiderWork.py
from multiprocessing.managers import BaseManager from HtmlDownloader import HtmlDownloader from HtmlParser import HtmlParser class SpiderWork(): def __init__(self): BaseManager.register('get_task_queue') BaseManager.register('get_result_queue') server_addr='127.0.0.1' print('connect'+server_addr) self.m=BaseManager(address=(server_addr,8001),authkey='baike'.encode('utf-8')) self.m.connect() self.task=self.m.get_task_queue() self.result=self.m.get_result_queue() print(self.task) self.downloader=HtmlDownloader() self.parser=HtmlParser() print('initial finish') def crawl(self): while (True): try: if not self.task.empty(): url=self.task.get() if url == 'end': print('stop spider1') self.result.put({'new_urls':'end','data':'end'}) return print('working:'+url)#url content=self.downloader.download(url) new_urls,data=self.parser.parser(url,content) self.result.put({"new_urls":new_urls,"data":data}) except Exception as e: print(e,url) if __name__=="__main__": spider=SpiderWork() spider.crawl()
HtmlDownloader.py
import requests import chardet class HtmlDownloader(object): def download(self,url): if url is None: return None user_agent='Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.36 SE 2.X MetaSr 1.0' headers={'User-Agent':user_agent} r=requests.get(url,headers=headers) if r.status_code is 200: r.encoding=chardet.detect(r.content)['encoding'] return r.text return None
HtmlParser.py
import re from urllib import parse from bs4 import BeautifulSoup class HtmlParser(object): def parser(self,page_url,html_cont): if page_url is None or html_cont is None: return soup=BeautifulSoup(html_cont,'lxml') new_urls=self.getNewUrls(page_url,soup) new_data=self.getNewData(page_url,soup) return new_urls,new_data def getNewUrls(self,page_url,soup): new_urls=set() links=soup.find_all('a',href=re.compile(r'/item/.*')) for link in links: new_url=link['href'] new_full_url=parse.urljoin(page_url,new_url) new_urls.add(new_full_url) return new_urls def getNewData(self,page_url,soup): data={} data['url']=page_url title = soup.find('dd',class_='lemmaWgt-lemmaTitle-title').find('h1') data['title']=title.get_text() summary = soup.find('div',class_='lemma-summary') #獲取到tag中包含的全部文版內容包括子孫tag中的內容,並將結果做爲Unicode字符串返回 data['summary']=summary.get_text() return data
創建.sh文件以下:
#!/bin/bash rm -rf log/* rm -rf baike.html rm -rf new_urls.txt rm -rf old_urls.txt
python3 control/NodeManager.py &> log/control.log & for ((i=1;i<=10;i++)) do python3 spider/SpiderWork.py &>log/spider$i.log & done
啓動主節點,而後啓動10個從節點。將它們所產生的日誌信息記錄到log/下,並都是在後臺運行的進程。
兩分鐘左右,完成約1900條的數據獲取。
kill -9 $(ps aux | grep python | awk '{print $2}')
!kill
等
Eclipse的pydev進程調試。
這代碼裏面真的有好多的細節文件,序列化操做與存儲,md5的壓縮方案等,都是值得思考的。