Python簡單分佈式爬蟲

分佈式爬蟲採用主從模式。主從模式是指由一臺主機做爲控制節點,負責管理全部運行網絡爬蟲的主機(url管理器,數據存儲器,控制調度器),爬蟲只須要從控制節點哪裏接收任務,並把新生成任務提交給控制節點。這次使用三臺主機進行分佈式爬取,一臺主機做爲控制節點,另外兩臺主機做爲爬蟲節點。html

控制節點主要分爲url管理器、數據存儲器和控制調度器。控制調度器經過三個進程來協調URL管理器和數據存儲器的工做:一個是URL管理進程,負責URL的管理和將URL傳遞給爬蟲節點,一個是數據提取進程,負責讀取爬蟲節點返回的數據,將返回數據中的URL交給URL管理進程,數據存儲進程,負責將數據提取進程中提交的數據進行本地存儲。node

url管理器服務器

# coding:utf-8
try :
import cPickle as pickle
except ImportError:
import pickle
#cPickle引用序列化包
import hashlib網絡


class UrlManager(object):app

def __init__(self):
self.new_urls = self.load_progress('new_urls.txt') # 未爬取URL集合
self.old_urls = self.load_progress('old_urls.txt') # 已爬取URL集合分佈式

def has_new_url(self):
# 判斷是否有未爬取的URLide

return self.new_url_size() != 0加密

def get_new_url(self):
# 獲取一個未爬取的URLurl

new_url = self.new_urls.pop()
m = hashlib.md5()#對url進行MD5加密
m.update(new_url)
self.old_urls.add(m.hexdigest()[8:-8])#
return new_urlcode

def add_new_url(self, url):
# 將新的URL添加到未爬取的URL結合中
if url is None:
return
m = hashlib.md5()
m.update(url)
url_md5 = m.hexdigest()[8:-8]
if url not in self.new_urls and url_md5 not in self.old_urls:
self.new_urls.add(url) # 將新的url添加到列表中

# 批量添加url
def add_new_urls(self, urls):
# 將新的URL添加到未爬取的URL集合中

if urls is None or len(urls) == 0:
return
for url in urls:
self.add_new_url(url)

# 獲取未爬取url集合的大小
def new_url_size(self):
return len(self.new_urls)

# 獲取已經爬取URL集合的大小
def old_url_size(self):
return len(self.old_urls)

#保存進度
#param path:文件路徑
#param data:數據
# return:
def save_progress(self, path, data):

with open(path, 'wb') as f:
pickle.dump(data, f)

#從本地文件加載進度
#param path 文件路徑
#return set集合
def load_progress(self, path):

print '[+]從文件加載進度:%s' %path
try:

with open(path,'rb') as f:
tmp = pickle.load(f)
return tmp
except:

print '[!]無進度文件,建立:%s' % path

return set()

數據存儲器 

# coding:utf-8
import codecs
import sys
import time
from urllib import unquote
class DataOutput(object):

def __init__(self):
self.filepath ='baike_%s.html'%(time.strftime("%Y_%m_%d_%H_%M_%S", time.localtime()))
self.output_head(self.filepath)
self.datas = []

def store_data(self, data):

if data is None:
return

self.datas.append(data)
if len(self.datas)>10:
self.output_html(self.filepath)
#將HTML頭寫進去
#param path:保存路徑
def output_head(self, path):
fout = codecs.open(path, 'w', encoding = 'uft-8')
fout.write("<html>")
fout.write("<body>")
fout.write("<table>")
fout.close()
#將數據寫入HTML文件中
#param path:文件路徑
def output_html(self,path):

fout = codecs.open(path, 'w', encoding = 'utf-8')
for data in self.datas:
fout.write("<tr>")
fout.write("<td>%s</td>" % data['url'])
fout.write("<td>%s</td>" % data['title'])
fout.write("<td>%s</td>" % data['summary'])
fout.write("</tr>")
self.datas.remove(data)
fout.close()

#輸出HTML結束
#param path文件存儲路徑
def output_end(self,path):

fout = codecs.open(path, 'a', encoding = 'utf-8')
fout.write("</table>")
fout.write("</body>")
fout.write("</html>")
fout.close()

 

控制調度器

# coding:utf-8
import time, sys, Queue
import multiprocessing
from multiprocessing.managers import BaseManager
from UrlManager import UrlManager
from DataOutput import DataOutput

class QueueManager(BaseManager):
pass

class NodeManager(object):
# 建立一個分佈式管理器
# param:url_q url隊列
# param result_q 結果隊列
def start_Manager(self, url_q, result_q):
# 把建立的兩個隊列註冊在網絡上,利用register方法,callable參數關聯了Queue對象
# 將Queue對象在網絡中暴露
QueueManager.register('get_task_queue', callable=lambda: url_q)
QueueManager.register('get_result_queue', callable=lambda: result_q)
# 綁定端口8001,設置驗證口令"baike"
manger = BaseManager(address=('', 8001), authkey='baike')
# 返回manager對象
return manger


def url_manager_proc(self, url_q, conn_q, root_url):
url_manager = UrlManager()
url_manager.add_new_url(root_url)
while True:
# 從URL管理器獲取新的URL
while (url_manager.has_new_url()):

new_url = url_manager.get_new_url()
# 將新的URL發給工做節點
url_q.put(new_url)
print 'old_url=', url_manager.old_url_size()

if (url_manager.old_url_size() > 2000):
# 通知爬蟲節點工做結束
url_q.put('end')
print '控制節點發起結束通知!'
# 關閉管理節點,同時存儲set狀態
url_manager.save_progress('new_urls.txt', url_manager.new_urls)
url_manager.save_progress('old_urls.txt', url_manager.old_urls)
return

try:

if not conn_q.empty():
urls = conn_q.get()
url_manager.add_new_urls(urls)
except BaseException, e:
time.sleep(0.1) # 延時休息


def result_solve_proc(self, result_q, conn_q, store_q):
while (True):

try:
if not result_q.empty():
content = result_q.get(True)
if content['new_urls'] == 'end':
print '結果分析進程接收通知而後結束!'
store_q.put('end')
return
conn_q.put(content['new_urls']) # url爲set類型
store_q.put(content['data']) # 解析出來的數據爲dict類型
else:
time.sleep(0.1) # 延時休息
except BaseException, e:
time.sleep(0.1) # 延時休息


def store_proc(self, store_q):
output = DataOutput()
while True:
if not store_q.empty():
data = store_q.get()
if data == 'end':
print '存儲進程接受通知而後結束'
output.ouput_end(output.filepath)
return
output.store_data(data)

else:
time.sleep(0.1)


if __name__ == '__main__':
# 初始化4個隊列
url_q = Queue.Queue()
result_q = Queue.Queue()
store_q = Queue.Queue()
conn_q = Queue.Queue()
# 建立分佈式管理器
node = NodeManager()
manager = node.start_Manager(url_q, result_q)
# 建立URL管理進程、數據提取進程和數據存儲進程
url_manager_proc = multiprocessing.Process(target=node.url_manager_proc, args=(url_q, conn_q, 'https://baike.baidu.com/item/%E7%BD%91%E7%BB%9C%E7%88%AC%E8%99%AB/5162711?fr=aladdin&fromid=22046949&fromtitle=%E7%88%AC%E8%99%AB'))
result_solve_proc = multiprocessing.Process(target=node.result_solve_proc, args=(result_q, conn_q, store_q))
# 啓動3個進程和分佈式管理器
url_manager_proc.start()
result_solve_proc.start()
manager.get_server().serve_forever()

 

HTML下載器

# coding:utf-8
import requests
import urllib2
import sys
type = sys.getfilesystemencoding()
class HtmlDownloader(object):

def download(slef, url):

if url is None:
return None

user_agent = 'Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.132 Safari/537.36'

headers = {'User-Agent': user_agent}
req = urllib2.Request(url, headers=headers)
response = urllib2.urlopen(req)
if response.getcode() == 200:
html = response.read().decode("UTF-8").encode(type)
return html


return None

HTML解析器

# coding:utf-8
import re
import urlparse
from bs4 import BeautifulSoup


class HtmlParser(object):

# page_url下載頁面的URL
# html_cont 下載的網頁內容
# 返回URL和數據
def parser(self, page_url, html_cont):

if page_url is None or html_cont is None:
return

soup = BeautifulSoup(html_cont, 'html.parser')

new_urls = self._get_new_urls(page_url, soup)
new_data = self._get_new_data(page_url, soup)

return new_urls, new_data

# page_url下載頁面的url
# soup:soup
# 返回新的URL集合
def _get_new_urls(self, page_url, soup):
new_urls = set()

# 抽取符合要求的a標記
links = soup.find_all('a', href=re.compile(r'/item/.*'))
for link in links:
# 提取href屬性
new_url = link['href']
# 拼接成完整網址
new_full_url = urlparse.urljoin(page_url, new_url)
new_urls.add(new_full_url)

return new_urls

# 下載頁面的url
def _get_new_data(self, page_url, soup):
data = {}
data['url'] = page_url
title = soup.find('dd', class_='lemmaWgt-lemmaTitle-title').find('h1')
data['title'] = title.get_text()

summary = soup.find('div', class_='lemma-summary')
# 獲取tag中包含的全部文本內容,包括子孫tag中的內容,並將結果做爲Unicode字符串返回
data['summary'] = summary.get_text()

return data

爬蟲調度器

# coding:utf-8
import time, sys, Queue
from multiprocessing.managers import BaseManager
from UrlManager import UrlManager
from DataOutput import DataOutput
from HtmlDownloader import HtmlDownloader
from HtmlParser import HtmlParser

class SpoderWork(object):
def __init__(self):
#初始化分佈式進程中工做節點的鏈接工做
#實現第一步:使用BaseManager註冊用於獲取Queue的方法名稱
BaseManager.register('get_task_queue')
BaseManager.register('get_result_queue')
#實現第二步:鏈接到服務器
server_addr = '127.0.0.1'
print ('Connect to server %s....' % server_addr)

self.m = BaseManager(address=(server_addr,8001),authkey='baike')
#從網絡鏈接
self.m.connect()
#實現第三步:獲取Queue對象
self.task = self.m.get_task_queue()
self.result = self.m.get_result_queue()
#初始化網頁下載器和解析器
self.downloader = HtmlDownloader()
self.parser = HtmlParser()
print 'init finish'

def crawl(self):
while(True):
try:
if not self.task.empty():
url = self.task.get()

if url=='end':

print '控制節點通知爬蟲節點中止工做'

self.result.put({'new_urls':'end','data':'end'})
return
print '爬蟲節點正在解析:%s' % url.encode('utf-8')
content = self.downloader.download(url)
new_urls,data = self.parser.parser(url,content)
self.result.put({'new_urls': new_urls, 'data': data})

except EOFError,e:

print '鏈接工做節點失敗'
return

except Exception,e:
print e
print 'Crawl fail'


if __name__=='__main__':

spider = SpoderWork() spider.crawl()

相關文章
相關標籤/搜索