scrapy-redis分佈式爬取知乎問答,使用docker佈置多臺機器。

先上結果:html

問題:python

答案:git

能夠看到如今答案文檔有十萬多,十萬個爲何~hhgithub

 正文開始:ajax

分佈式爬蟲應該是在多臺服務器(A B C服務器)佈置爬蟲環境,讓它們重複交叉爬取,這樣的話須要用到狀態管理器。redis

狀態管理器主要負責url爬取隊列的管理,亦能夠當爬蟲服務器。同時配置好redis及scrapy-redis環境就行~mongodb

爬蟲服務器主要負責數據的爬取、處理等。安裝好scrapy-redis就行~docker

以下圖:數據庫

 

須要多臺機器同時爬取目標url而且同時從url中抽取數據,N臺機器作如出一轍的事,經過redis來調度、中轉,也就是說它沒有主機從機之分。json

要明白,scrapy是不支持分佈式的。

  1. scrapy中request是放在內存的,如今兩臺服務器就須要對隊列進行集中管理,將request放到redis裏。
  2. 去重也要進行集中管理,也是用到redis去重。

分佈式爬蟲的優勢

  1. 充分利用多臺機器的帶寬速度爬取數據
  2. 充分利用多臺機器的IP爬取
使用scrapy-redis很是簡單,能夠說修改scrapy項目裏的settings.py的幾個配置基本就好了。
安裝:
pip install scrapy-redis

而後這個知乎爬取項目要從scrapy講起:

cmd中啓動一個scrapy項目:

scrapy startproject ArticleSpider

進入項目文件夾並開始一個爬蟲:

cd ArticleSpider

scrapy genspider zhihu www.zhihu.com

目前的項目文件:

分析知乎的問題的api:

在知乎首頁每次向下拉都會觸發這個ajax請求,而且返回內容是問題的url、標題等,很明顯它就是問題的api了~

https://www.zhihu.com/api/v3/feed/topstory/recommend?session_token=8c3313b2932c370198480b54dc89fd3a&desktop=true&page_number=2&limit=6&action=down&after_id=5

 

它的返回:

有趣的是每次請求它,它的response頭都會返回一個set-cookie,意味着要用返回的新的cookie請求下一頁的答案,不然返回錯誤。

 

看看問題api請求頭帶的參數:

這是個get請求,其實第一條連接能夠在請求首頁後的html源碼裏找到,這樣就找到了這幾個參數,須要變更的只有頁數page_number:

在html源碼裏的問題api:

咱們須要先請求首頁html而後以re匹配得到這條開始的問題api,而後僞造後面頁數的請求。

 

答案api則是在往下拉全部答案時找到的:

 

答案api的請求更容易處理,只須要修改問題的id及offset偏移量就行,甚至不用cookie來請求。

好比這條就是某個問題的答案api:

https://www.zhihu.com/api/v4/questions/308761407/answers?include=data%5B%2A%5D.is_normal%2Cadmin_closed_comment%2Creward_info%2Cis_collapsed%2Cannotation_action%2Cannotation_detail%2Ccollapse_reason%2Cis_sticky%2Ccollapsed_by%2Csuggest_edit%2Ccomment_count%2Ccan_comment%2Ccontent%2Ceditable_content%2Cvoteup_count%2Creshipment_settings%2Ccomment_permission%2Ccreated_time%2Cupdated_time%2Creview_info%2Crelevant_info%2Cquestion%2Cexcerpt%2Crelationship.is_authorized%2Cis_author%2Cvoting%2Cis_thanked%2Cis_nothelp%2Cis_labeled%2Cis_recognized%2Cpaid_info%2Cpaid_info_content%3Bdata%5B%2A%5D.mark_infos%5B%2A%5D.url%3Bdata%5B%2A%5D.author.follower_count%2Cbadge%5B%2A%5D.topics&limit=5&offset=13&platform=desktop&sort_by=default

它的返回:

 問題及答案api返回的都是json,那就很是容易處理了。
 
能夠先看看items.py的配置,配置了一些問題及答案須要爬取的項:
class QuestionItem(scrapy.Item): ''' 問題的item,問題和答案分兩個集合保存在mongodb中 ''' title = scrapy.Field() created = scrapy.Field() answer_num = scrapy.Field() comment_num = scrapy.Field() follow_nums = scrapy.Field() question_id = scrapy.Field() topics = scrapy.Field() url = scrapy.Field() author_url = scrapy.Field() author_name = scrapy.Field() author_headline = scrapy.Field() author_gender = scrapy.Field() crawl_time = scrapy.Field() class Answer_Item(scrapy.Item): ''' 答案的item ''' answer_id = scrapy.Field() question_id = scrapy.Field() url = scrapy.Field() user_name = scrapy.Field() user_id = scrapy.Field() content = scrapy.Field() praise_num = scrapy.Field() comment_num = scrapy.Field() create_time = scrapy.Field() update_time = scrapy.Field() crawl_time = scrapy.Field()

而後是spider.py的修改:

# -*- coding: utf-8 -*-
import re import time import json import datetime import scrapy from ArticleSpider.items import QuestionItem, Answer_Item from scrapy.http import Request from scrapy_redis.spiders import RedisSpider def timestamp_2_date(timestamp): ''' 用來將時間戳轉爲日期時間形式 ''' time_array = time.localtime(timestamp) my_time = time.strftime("%Y-%m-%d %H:%M:%S", time_array) return my_time def handle_cookie(response): ''' 用來處理set-cookie ''' cookie_section = response.headers.get('set-cookie') # 匹配cookie片斷
    sections = re.findall('(KLBRSID=.*?);', str(cookie_section)) print(sections) raw_cookie = response.request.headers['Cookie'].decode('utf-8') # 替換cookie片斷到完整cookie裏
    cookie = re.sub('KLBRSID=.*', sections[0], raw_cookie) return cookie class ZhihuSpider(RedisSpider): # spider名
    name = 'zhihu'
    # 容許訪問的域名
    allowed_domains = ['www.zhihu.com'] # redis_key,到時scrapy會去redis讀這個鍵的值,即要訪問的url,原來start_url的值也是放在redis裏
    redis_key = 'zhihu:start_urls'
    # spider的設置,在這裏設置能夠覆蓋setting.py裏的設置
    custom_settings = { # 用來設置隨機延遲,最大5秒
        "RANDOM_DELAY": 5, } # 上面這樣設置好了就能使用scrapy-redis進行分佈式的爬取,其餘的好比parse()函數按照scrapy的邏輯設置就好
    # 答案的api
    answer_api = 'https://www.zhihu.com/api/v4/questions/{0}/answers?include=data%5B%2A%5D.is_normal%2Cadmin_closed_comment%2Creward_info%2Cis_collapsed%2Cannotation_action%2Cannotation_detail%2Ccollapse_reason%2Cis_sticky%2Ccollapsed_by%2Csuggest_edit%2Ccomment_count%2Ccan_comment%2Ccontent%2Ceditable_content%2Cvoteup_count%2Creshipment_settings%2Ccomment_permission%2Ccreated_time%2Cupdated_time%2Creview_info%2Crelevant_info%2Cquestion%2Cexcerpt%2Crelationship.is_authorized%2Cis_author%2Cvoting%2Cis_thanked%2Cis_nothelp%2Cis_labeled%2Cis_recognized%2Cpaid_info%2Cpaid_info_content%3Bdata%5B%2A%5D.mark_infos%5B%2A%5D.url%3Bdata%5B%2A%5D.author.follower_count%2Cbadge%5B%2A%5D.topics&limit={1}&offset={2}&platform=desktop&sort_by=default'

    def parse(self, response): ''' 解析首頁,獲取問題api '''
        # 每次請求知乎問題api都會返回一個新的set-cookie(只是一段cookie),用來設置新的cookie。舊的cookie沒法訪問下一頁的連接
        cookie = handle_cookie(response) print(cookie) # 請求首頁後,在首頁html源碼裏尋找問題的api
        question_api = re.findall('"previous":"(.*?)","next', response.text, re.S) question_url = question_api[0].replace('\\u002F', '/') # 用新的cookie請求問題api,回調函數爲parse_question
        yield Request(url=question_url,callback=self.parse_question,headers={'cookie':cookie}) def parse_question(self,response): ''' 解析問題api返回的json數據 '''
        # 構造新cookie
        cookie = handle_cookie(response) dics = json.loads(response.text) for dic in dics['data']: try: ques_item = QuestionItem() if 'question' in dic['target']: # 問題標題
                    ques_item['title'] = dic['target']['question']['title'] # 問題建立時間
                    ques_item['created'] = dic['target']['question']['created'] ques_item['created'] = timestamp_2_date(ques_item['created']) # 回答數
                    ques_item['answer_num'] = dic['target']['question']['answer_count'] # 評論數
                    ques_item['comment_num'] = dic['target']['question']['comment_count'] # 關注人數
                    ques_item['follow_nums'] = dic['target']['question']['follower_count'] # 問題id
                    ques_item['question_id'] = dic['target']['question']['id'] #問題url
                    ques_item['url'] = dic['target']['question']['id'] ques_item['url'] = 'https://www.zhihu.com/question/' + str(ques_item['url']) # 問題標籤
                    if 'uninterest_reasons' in dic: topics = [] for i in dic['uninterest_reasons']: topics.append(i['reason_text']) ques_item['topics'] = topics # 做者url
                    ques_item['author_url'] = dic['target']['question']['author']['url'] # 做者名
                    ques_item['author_name'] = dic['target']['question']['author']['name'] # 做者簽名
                    ques_item['author_headline'] = dic['target']['question']['author']['headline'] # 做者性別
                    ques_item['author_gender'] = dic['target']['question']['author'].get('gender') if ques_item['author_gender']: if ques_item['author_gender'] == 0: ques_item['author_gender'] = ''
                        else: ques_item['author_gender'] = ''
                    else: ques_item['author_gender'] = '未知'
                    # 爬取時間
                    ques_item['crawl_time'] = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") yield ques_item except: pass
        # 問題api裏會有個is_end的值,用來判斷是否還有下一頁
        if not dics['paging']['is_end']: # 有下一頁,獲取next裏的下一頁連接
            next_url = dics['paging']['next'] # 用新的cookie請求下一頁問題url
            yield Request(url=next_url, callback=self.parse_question, headers={'cookie': cookie}) # 請求答案api,api須要傳入question_id, limit及頁碼
            yield Request(url=self.answer_api.format(ques_item['question_id'], 20, 0), callback=self.parse_answer) def parse_answer(self,response): #處理answerAPI返回的json
        ans_json = json.loads(response.text) # is_end的值意味着當前url是不是最後一頁
        is_end = ans_json['paging']['is_end'] totals_answer = ans_json['paging']['totals'] # 下一頁url
        next_url = ans_json['paging']['next'] for answer in ans_json['data']: ans_item = Answer_Item() # 答案id
            ans_item['answer_id'] = answer['id'] # 答案對應的問題id
            ans_item['question_id'] = answer['question']['id'] # 答案url
            ans_item['url'] = answer['url'] # 答者用戶名
            ans_item['user_name'] = answer['author']['name'] if 'name' in answer['author'] else None # 答者id
            ans_item['user_id'] = answer['author']['id'] if 'id' in answer['author'] else None # 答案內容
            ans_item['content'] = answer['content'] if 'content' in answer else None # 贊同人數
            ans_item['praise_num'] = answer['voteup_count'] # 評論人數
            ans_item['comment_num'] = answer['comment_count'] # 答案建立時間
            ans_item['create_time'] = answer['created_time'] ans_item['create_time'] = timestamp_2_date(ans_item['create_time']) # 答案修改時間
            ans_item['update_time'] = answer['updated_time'] ans_item['update_time'] = timestamp_2_date(ans_item['update_time']) # 爬取時間
            ans_item['crawl_time'] = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") yield ans_item # offset偏移,一頁20,每問題只爬50頁回答。即offest>1000
        offset = next_url.split('offset=')[1].split('\u0026')[0] if int(offset)>1000: pass
        else: # 噹噹前頁不爲最後一頁且offset不大於1000時,繼續請求下一頁答案
            if not is_end: yield scrapy.Request(url=next_url, callback=self.parse_answer)

 

settings.py添加如下行:

# 指定使用scrapy-redis的調度器
SCHEDULER = "scrapy_redis.scheduler.Scheduler"
 
 
# 指定使用scrapy-redis的去重
DUPEFILTER_CLASS = "scrapy_redis.dupefilter.RFPDupeFilter"
 
# 在redis中保持scrapy-redis用到的各個隊列,從而容許暫停和暫停後恢復,也就是不清理redis queues
SCHEDULER_PERSIST = True FEED_EXPORT_ENCODING = 'utf-8'

# redis配置
REDIS_HOST = '填狀態管理器服務器ip,請必定要保證redis數據庫能遠程訪問' REDIS_PORT = 6379
# redis密碼
REDIS_PARAMS = {'password': '123456'} # 當scrapy-redis爬完以後會空等, 等待redis提供繼續爬取的url。可是若是已經爬完了。不必繼續等,設置這個當意味啓動3600s時中止spider。
CLOSESPIDER_TIMEOUT = 3600
pipelines.py的修改,將數據保存到遠程mongodb數據庫服務器:
from pymongo import MongoClient class ArticlespiderPipeline(object): def process_item(self, item, spider): return item class MongoPipeline(object): def __init__(self, databaseIp='遠程mongodb服務器ip', databasePort=27017, user="", password="",): self.client = MongoClient(databaseIp, databasePort) # self.db = self.client.test_database
        # self.db.authenticate(user, password)

    def process_item(self, item, spider): postItem = dict(item)  # 把item轉化成字典形式
        print(postItem) if item.__class__.__name__ == 'QuestionItem': mongodbName = 'zhihu' self.db = self.client[mongodbName] # 更新插入問題數據
            self.db.question.update({'question_id':postItem['question_id']},{'$set':postItem},upsert=True) elif item.__class__.__name__ == 'Answer_Item': mongodbName = 'zhihu' self.db = self.client[mongodbName] # 更新插入答案數據
            self.db.answer.update({'answer_id': postItem['answer_id']}, {'$set': postItem}, upsert=True) # 會在控制檯輸出原item數據,能夠選擇不寫
        return item
middlewares.py的修改:
import logging import random import time class RandomDelayMiddleware(object): ''' 這個類用來設置自定義的隨機延時 '''
    def __init__(self, delay): self.delay = delay @classmethod def from_crawler(cls, crawler): delay = crawler.spider.settings.get("RANDOM_DELAY", 10) if not isinstance(delay, int): raise ValueError("RANDOM_DELAY need a int") return cls(delay) def process_request(self, request, spider): if 'signin?next' in request.url: raise IgnoreRequest delay = random.randint(0, self.delay) logging.debug("### random delay: %s s ###" % delay) time.sleep(delay) class RandomUserAgentMiddleware(): ''' 這個類用來給spider隨機設置user_agent裏的請求頭 ''' user_agent = [ 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/79.0.3945.88 Safari/537.36', 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:72.0) Gecko/20100101 Firefox/72.0', 'Mozilla/5.0 (Windows NT 10.0; WOW64; Trident/7.0; rv:11.0) like Gecko', ] def process_request(self, request, spider): request.headers['User-Agent'] = random.choice(self.user_agent) class CookieMiddleware(): ''' 這個類用來給spider隨機設置cookie,scrapy要求cookies是字典類型 '''
    def process_request(self, request, spider): with open('cookies.txt') as f: raw_cookie = f.read() # 當請求首頁時,提供cookie
        if request.url in 'https://www.zhihu.com/': request.headers['cookie'] = raw_cookie print('---',request.headers)

 使用py文件 啓動scrapy-redis:

from scrapy.cmdline import execute import sys import os # os.path.abspath(__file__)當前py文件的路徑 # os.path.dirname(os.path.abspath(__file__))當前文件目錄 # 設置工程目錄
sys.path.append(os.path.dirname(os.path.abspath(__file__))) # 至關於在cmd裏執行scrapy crawl zhihu
execute(['scrapy','crawl','zhihu'])

啓動後scrapy-redis會等待start_urls push進redis

我在centos服務器佈置好了redis,在終端執行如下命令以將知乎首頁放進redis:


redis-cli auth 123456 lpush zhihu:start_urls https://www.zhihu.com

項目docker鏡像製做

centos關於docker的安裝請參考,系統要求是centos7+:

https://www.runoob.com/docker/centos-docker-install.html

若是是阿里雲的服務器能夠到阿里雲查看如何設置本身的鏡像加速器。

首先建立一個沒有後綴名的Dockerfile文件:

1 ##阿里雲python3 鏡像
2 FROM registry.cn‐shanghai.aliyuncs.com/tcc‐public/python:3
3 ##添加/usr/local/bin這個路徑
4 ENV PATH /usr/local/bin:$PATH
5 ##將本地的代碼放置到虛擬容器當中
6 ADD . /code
7 ##指定工做目錄
8 WORKDIR /code
9 ## 執行pip3 install ‐r requirements.txt
10 RUN pip3 install ‐r requirements.txt
11 ## 執行scrapy crawl zhihu開始爬取
12 CMD scrapy crawl zhihu

requirements.txt

lxml==4.4.2 pymongo==3.10.1 redis==3.4.1 requests==2.22.0 Scrapy==1.8.0 scrapy-redis==0.6.8

以下:

build:

docker build -t registry.cn-shenzhen.aliyuncs.com/test_for_tianchi/test_for_tianchi_submit:1.0 .

注意: registry.~~~ 是上面建立倉庫的公網地址,用本身倉庫地址替換。地址後面的:1.0爲本身指定的版本號,用於區分每次build的鏡像。最後的.是構建鏡像的路徑,不能夠

省掉。

測試是否能正常運行,正常運行後再進行推送。:

docker run registry.cn‐shenzhen.aliyuncs.com/test_for_tianchi/test_for_tianchi_submit:1.0

推送到鏡像倉庫:

docker push registry.cn‐shenzhen.aliyuncs.com/test_for_tianchi/test_for_tianchi_submit:1.0

 在另外機器使用docker運行項目:

先登陸

sudo docker login ‐‐username=*** registry.cn‐shanghai.aliyuncs.com

再直接運行便可:

docker run registry.cn‐shenzhen.aliyuncs.com/test_for_tianchi/test_for_tianchi_submit:1.0

 目前幾臺機器同時運行時沒問題的:

 

最後,已將該項目放github,如有須要的話請上去拿~

https://github.com/dao233/spider/tree/master/ArticleSpider

 END~

相關文章
相關標籤/搜索