Scrapy - 分佈式爬蟲

分佈式爬蟲

概念

所謂分佈式, 多個程序同時對一個任務進行操做html

一分多的高效率的任務進行方式web

簡單說明

一個 10GB 的爬蟲任務, 交給10臺服務器進行同時爬取redis

對比單服務器不管怎麼優化都是 10倍的效率, 可是成本高數據庫

須要硬件環境支持 ( 帶寬, 服務器設備等 )json

多態主機共享一個爬取隊列即爲分佈式爬蟲api

物理拓撲

 / -------------服務器 2服務器

| / ------------------服務器3架構

服務器1    ----------- 服務器 4框架

| \-------------------服務器5dom

 \-------------服務器6

服務器 1 負責 url隊列的分發 ( 本質上是個 redis 數據庫 )

其餘服務器可能分佈在全國各地, 分別和服務器1 創建聯繫,

經過 服務器 1 的共享隊列進行爬取任務的獲取

爲何使用 redis

redis 速度快 

redis 非關係型數據庫, redis 中的集合,  存儲每一個 request 的指紋

Scrapy 實現分佈式

Scrapy 原生是不支持分佈式爬蟲

Scrapy 的爬取隊列是放在調度器中的

所以只要可以實現調度器的共享便可完成爬蟲任務的共享

即重寫 scrapy 的調度器, ( 別人造的輪子又大又圓 scrapy_redis )

scrapy-redis

簡介

scrapy-redis是scrapy框架基於redis數據庫的組件,用於scrapy項目的分佈式開發和部署。

特徵

分佈式爬取

能夠啓動多個spider工程,相互之間共享單個redis的requests隊列。最適合普遍的多個域名網站的內容爬取。

分佈式數據處理

爬取到的scrapy的item數據能夠推入到redis隊列中

這意味着你能夠根據需求啓動儘量多的處理程序來共享item的隊列,進行item數據持久化處理

Scrapy即插即用組件

Scheduler調度器 + Duplication複製 過濾器,Item Pipeline,基本spider

三大做用

去重規則的校驗

實現調度器對請求的調配

定製起始URL

scrapy-redis架構

安裝

pip install scrapy-redis

使用

全部可選的配置, 未註釋表示必選設置

# 替換調度器, 啓用 redis 中存儲的請求調度隊列 
SCHEDULER = "scrapy_redis.scheduler.Scheduler"

# 替換過濾器, 全部的通向 redis 的爬蟲都須要使用此過濾器 ( 去重機制 )
DUPEFILTER_CLASS = "scrapy_redis.dupefilter.RFPDupeFilter"

# 默認的 序列化方式爲 pickle, 能夠更改成 json 或者 msgpack
# SCHEDULER_SERIALIZER = "scrapy_redis.picklecompat"

# 是否雲訊斷點續爬 ( 爬取完成後是否清楚指紋 )
# SCHEDULER_PERSIST = True

# 默認使用優先級隊列(默認),其餘:PriorityQueue(有序集合),FifoQueue(列表)、LifoQueue(列表)
# SCHEDULER_QUEUE_CLASS = 'scrapy_redis.queue.PriorityQueue'
# SCHEDULER_QUEUE_CLASS = 'scrapy_redis.queue.FifoQueue'  # 廣度優先 基於隊列 先進先出
# SCHEDULER_QUEUE_CLASS = 'scrapy_redis.queue.LifoQueue' # 深度優先 基於棧 後進先出


# SCHEDULER_QUEUE_CLASS = 'scrapy_redis.queue.PriorityQueue'  的時候纔有用
# DEPTH_PRIORITY = 1  # 廣度優先
# DEPTH_PRIORITY = -1 # 深度優先 後進先出

# 初次啓動的時候的阻塞時間 ( 僅適用於隊列類爲SpiderQueue或SpiderStack時 )
# SCHEDULER_IDLE_BEFORE_CLOSE = 10

# 替換持久化工具爲 scrapy-redis 的
ITEM_PIPELINES = {
    'scrapy_redis.pipelines.RedisPipeline': 300
}

# The item pipeline serializes and stores the items in this redis key.
# REDIS_ITEMS_KEY = '%(spider)s:items'

# The items serializer is by default ScrapyJSONEncoder. You can use any
# importable path to a callable object.
# REDIS_ITEMS_SERIALIZER = 'json.dumps'

# 指定鏈接 redis 的主機和端口號
# REDIS_HOST = 'localhost'
# REDIS_PORT = 6379

# Specify the full Redis URL for connecting (optional).
# If set, this takes precedence over the REDIS_HOST and REDIS_PORT settings.
# REDIS_URL = 'redis://user:pass@hostname:9001'

# Custom redis client parameters (i.e.: socket timeout, etc.)
# REDIS_PARAMS  = {}
# Use custom redis client class.
# REDIS_PARAMS['redis_cls'] = 'myproject.RedisClient'

# If True, it uses redis' ``SPOP`` operation. You have to use the ``SADD``
# command to add URLs to the redis queue. This could be useful if you
# want to avoid duplicates in your start urls list and the order of
# processing does not matter.
# REDIS_START_URLS_AS_SET = False

# Default start urls key for RedisSpider and RedisCrawlSpider.
# REDIS_START_URLS_KEY = '%(name)s:start_urls'

# Use other encoding than utf-8 for redis.
# REDIS_ENCODING = 'latin1'

去重

scrapy-redis  去重原理

 redis的集合有去重屬性,在保存重複的值的時候返回值會是 0 

徹底自定義實現去重

from scrapy.dupefilter import BaseDupeFilter
import redis
from scrapy.utils.request import request_fingerprint

class DupFilter(BaseDupeFilter):
    def __init__(self):
        self.conn = redis.Redis(host='140.143.227.206',port=8888,password='beta')

    def request_seen(self, request):
        """
        檢測當前請求是否已經被訪問過
        :param request: 
        :return: True表示已經訪問過;False表示未訪問過
        """
        fid = request_fingerprint(request)
        result = self.conn.sadd('visited_urls', fid)
        if result == 1:
            return False
        return True

 使用 scrapy-redis 去重

# 直接加了配置就生效。可是是基於時間戳的,時間戳變了就會失效很不實用 
################ scrapy redis鏈接 #################### REDIS_HOST = '127.0.0.1'     # 主機名 REDIS_PORT = 6379 # 端口 REDIS_PARAMS = {'password':'yangtuo'} # Redis鏈接參數,好比密碼
# 默認:REDIS_PARAMS = {'socket_timeout': 30,'socket_connect_timeout': 30,'retry_on_timeout': True,'encoding': REDIS_ENCODING,})
REDIS_ENCODING = "utf-8" # redis編碼類型 默認:'utf-8' # REDIS_URL = 'redis://user:pass@hostname:9001' # 鏈接URL(優先於以上配置)
################ scrapy redis去重 #################### DUPEFILTER_KEY = 'dupefilter:%(timestamp)s' # 默認的是按照時間戳的。結果每次請求的時候是會進行去重 # 可是下一次請求時間戳不一致就會清空致使沒法去重 所以這裏存在優化空間 最好定死 # DUPEFILTER_CLASS = 'scrapy_redis.dupefilter.RFPDupeFilter' # 若是使用 scrapy-redis 的默認去重是會使用這個類來處理去重 DUPEFILTER_CLASS = 'dbd.xxx.RedisDupeFilter' # 這樣使用本身自定義的去重規則

優化

from scrapy_redis.dupefilter import RFPDupeFilter
from scrapy_redis.connection import get_redis_from_settings
from scrapy_redis import defaults

class RedisDupeFilter(RFPDupeFilter):
    @classmethod
    def from_settings(cls, settings):
server = get_redis_from_settings(settings) # key = defaults.DUPEFILTER_KEY % {'timestamp': int(time.time())} # 源碼這裏是用的時間戳 不斷變換致使每次重置 key = defaults.DUPEFILTER_KEY % {'timestamp': 'yangtuo'} # 不要時間戳了,我定死一個固定值 debug = settings.getbool('DUPEFILTER_DEBUG') return cls(server, key=key, debug=debug)

調度器

settings.py 中進行配置 

SCHEDULER = "scrapy_redis.scheduler.Scheduler"


# 默認使用優先級隊列(默認),其餘:PriorityQueue(有序集合),FifoQueue(列表)、LifoQueue(列表)
SCHEDULER_QUEUE_CLASS = 'scrapy_redis.queue.PriorityQueue'  
# SCHEDULER_QUEUE_CLASS = 'scrapy_redis.queue.FifoQueue'  # 廣度優先 基於隊列 先進先出 
# SCHEDULER_QUEUE_CLASS = 'scrapy_redis.queue.LifoQueue' # 深度優先 基於棧 後進先出 


# SCHEDULER_QUEUE_CLASS = 'scrapy_redis.queue.PriorityQueue'  的時候纔有用 
DEPTH_PRIORITY = 1  # 廣度優先    
# DEPTH_PRIORITY = -1 # 深度優先 後進先出


SCHEDULER_QUEUE_KEY = '%(spider)s:requests'  # 調度器中請求存放在redis中的key


SCHEDULER_SERIALIZER = "scrapy_redis.picklecompat"  # 對保存到redis中的數據進行序列化,默認使用pickle


SCHEDULER_PERSIST = False  # 是否在關閉時候保留原來的調度器和去重記錄,True=保留,False=清空
SCHEDULER_FLUSH_ON_START = True  # 是否在開始以前清空 調度器和去重記錄,True=清空,False=不清空
# SCHEDULER_IDLE_BEFORE_CLOSE = 10  # 去調度器中獲取數據時,若是爲空,最多等待時間(最後沒數據,未獲取到)。


SCHEDULER_DUPEFILTER_KEY = '%(spider)s:dupefilter'  # 去重規則,在redis中保存時對應的key


# 優先使用 DUPEFILTER_CLASS,若是沒有就是用 SCHEDULER_DUPEFILTER_CLASS
SCHEDULER_DUPEFILTER_CLASS = 'scrapy_redis.dupefilter.RFPDupeFilter'  # 去重規則對應處理的類

起始url

爬蟲文件

import scrapy
from scrapy.http import Request
import scrapy_redis
from scrapy_redis.spiders import RedisSpider


class ChoutiSpider(RedisSpider): # 繼承改成 用 RedisSpider
    name = 'chouti'
    allowed_domains = ['chouti.com']

    # def start_requests(self): # 再也不須要寫 start_requests 了
    #     yield Request(url='https://dig.chouti.com',callback=self.parse)


    def parse(self, response):
        print(response)

另寫一個腳本文件 

import redis

conn = redis.Redis(host='127.0.0.1',port=6379,password='yangtuo')

# 經過往 redis 裏面加數據從而控制爬蟲的執行
# 若是沒有數據就會夯住。一旦有數據就爬取,能夠
conn.lpush('chouti:start_urls','https://dig.chouti.com/r/pic/hot/1')

代碼流程

1.啓動

scrapy crawl chouti --nolog

2.實例化調度器對象

找到  SCHEDULER = "scrapy_redis.scheduler.Scheduler"  配置並實例化調度器對象

  - 執行Scheduler.from_crawler

  - 執行Scheduler.from_settings
- 讀取配置文件:

SCHEDULER_PERSIST    # 是否在關閉時候保留原來的調度器和去重記錄,True=保留,False=清空
SCHEDULER_FLUSH_ON_START # 是否在開始以前清空 調度器和去重記錄,True=清空,False=不清空
SCHEDULER_IDLE_BEFORE_CLOSE # 去調度器中獲取數據時,若是爲空,最多等待時間(最後沒數據,未獲取到)

- 讀取配置文件:

SCHEDULER_QUEUE_KEY    # %(spider)s:requests    # 讀取保存在 redis 中存放爬蟲名字的 name 的值
SCHEDULER_QUEUE_CLASS    # scrapy_redis.queue.FifoQueue # 讀取要使用的隊列方式
SCHEDULER_DUPEFILTER_KEY # '%(spider)s:dupefilter'    # 讀取保存在 redis 中存放爬蟲去重規則名字的 key 的值
DUPEFILTER_CLASS    # 'scrapy_redis.dupefilter.RFPDupeFilter'    # 讀取去重的配置的類
SCHEDULER_SERIALIZER    # "scrapy_redis.picklecompat"    # 讀取保存在 redis 的時候用的序列化方式

- 讀取配置文件:

REDIS_HOST = '140.143.227.206' # 主機名
REDIS_PORT = 8888 # 端口
REDIS_PARAMS = {'password':'beta'} # Redis鏈接參數 默認:REDIS_PARAMS = {'socket_timeout': 30,'socket_connect_timeout': 30,'retry_on_timeout': True,'encoding': REDIS_ENCODING,})
REDIS_ENCODING = "utf-8"

- 實例Scheduler對象

3. 爬蟲開始執行起始URL

- 調用 scheduler.enqueue_requests()

def enqueue_request(self, request):
    # 請求是否須要過濾? dont_filter = False 表示過濾 
    # 去重規則中是否已經有?(是否已經訪問過,若是未訪問添加到去重記錄中。)
    # 要求過濾,且去重記錄裏面有就變表示訪問過了
    if not request.dont_filter and self.df.request_seen(request):
        self.df.log(request, self.spider)
        return False     # 已經訪問過就不要再訪問了
    
    if self.stats:
        self.stats.inc_value('scheduler/enqueued/redis', spider=self.spider)
    # print('未訪問過,添加到調度器', request)
    self.queue.push(request)
    return True

4. 下載器去調度器中獲取任務,去下載

- 調用 scheduler.next_requests()

def next_request(self):
    block_pop_timeout = self.idle_before_close
    request = self.queue.pop(block_pop_timeout)
    if request and self.stats:
        self.stats.inc_value('scheduler/dequeued/redis', spider=self.spider)
    return request

總結幾個問題

1. 什麼是深度優先?什麼是廣度優先?

深度優先 :基於層級先進入到最深層級進行處理所有後再往上層級處理

廣度優先 :基於從第一層級開始,每次層級處理以後進入下一層及處理

2. scrapy中如何實現深度和廣度優先?

先進先出,廣度優先 FifoQueue

後進先出,深度優先 LifoQueue

優先級隊列:

DEPTH_PRIORITY = 1 # 廣度優先

DEPTH_PRIORITY = -1 # 深度優先

3. scrapy中 調度器 和 隊列 和 dupefilter 的關係?

調度器,調配添加或獲取那個request.

隊列,存放request。

dupefilter,訪問記錄。

調度器拿到一個 request 要先去 dupefilter 裏面看下有沒有存在

  若是存在就直接丟了

  若是不存在才能夠放在隊列中

而後隊列要基於本身的類型來進行相應規則的存放

打比方來講

  調度器 貨車司機

  dupefilter 倉庫門衛

  隊列 倉庫

  司機要往倉庫放貨。先問門衛

    門衛說裏面有貨了不讓就無法放

    門衛說能夠才能夠放

      放貨的時候倉庫要按照本身的規則放貨

  司機要拉貨 倉庫就基於本身的規則直接給司機就好了

示例 - 騰訊招聘分佈式爬蟲

流程剖析

使用 scrapy-redis 配合 mongoDB 的分佈式爬蟲

代碼

items.py

import scrapy


class TencentItem(scrapy.Item):
    # define the fields for your item here like:
    zh_name = scrapy.Field()
    zh_type = scrapy.Field()
    post_id = scrapy.Field()
    duty = scrapy.Field()
    require = scrapy.Field()

settings.py

BOT_NAME = 'Tencent'

SPIDER_MODULES = ['Tencent.spiders']
NEWSPIDER_MODULE = 'Tencent.spiders'

# Crawl responsibly by identifying yourself (and your website) on the user-agent
USER_AGENT = 'Mozilla/5.0'

# Obey robots.txt rules
ROBOTSTXT_OBEY = False
# 使用scrapy_redis的調度器 SCHEDULER = "scrapy_redis.scheduler.Scheduler"
# 使用scrapy_redis的去重機制 DUPEFILTER_CLASS = "scrapy_redis.dupefilter.RFPDupeFilter"
# 爬取完成後是否清除請求指紋 SCHEDULER_PERSIST = True # 'scrapy_redis.pipelines.RedisPipeline': 200 REDIS_HOST = '172.40.91.129' REDIS_PORT = 6379 ITEM_PIPELINES = { 'Tencent.pipelines.TencentPipeline': 300, # 'scrapy_redis.pipelines.RedisPipeline': 200 # redis 的開啓這個 'Tencent.pipelines.TencentMongoPipeline': 200, }

pipline.py

import pymongo


class
TencentPipeline(object):
def process_item(self, item, spider):
        print(dict(item))
        return item


class TencentMongoPipeline(object):
    def open_spider(self, spider):
        conn = pymongo.MongoClient('localhost',27017)
        db = conn['tencent']
        self.myset = db['job']

    def process_item(self, item, spider):
        self.myset.insert_one(dict(item))

spider.py

# -*- coding: utf-8 -*-
import scrapy
import json
from ..items import TencentItem


class TencentSpider(scrapy.Spider):
    name = 'tencent'
    allowed_domains = ['tencent.com']
    start_urls = [
        'https://careers.tencent.com/tencentcareer/api/post/Query?timestamp=1557114143837&countryId=&cityId=&bgIds=&productId=&categoryId=&parentCategoryId=&attrId=&keyword=&pageIndex=1&pageSize=10&language=zh-cn&area=cn']

    def parse(self, response):
        for page_index in range(1, 200):
            url = 'https://careers.tencent.com/tencentcareer/api/post/Query?timestamp=1557114143837&countryId=&cityId=&bgIds=&productId=&categoryId=&parentCategoryId=&attrId=&keyword=&pageIndex=%s&pageSize=10&language=zh-cn&area=cn' % str(
                page_index)
            yield scrapy.Request(
                url=url,
                callback=self.parse_one_page
            )

    # 一級頁面解析
    def parse_one_page(self, response):
        html = json.loads(response.text)

        for h in html['Data']['Posts']:
            item = TencentItem()

            item['zh_name'] = h['RecruitPostName']
            item['zh_type'] = h['LocationName']
            # 一級頁面獲取PostId,詳情頁URL須要此參數
            item['post_id'] = h['PostId']
            # 想辦法獲取到職位要求和職責,F12抓包,抓到地址
            two_url = 'https://careers.tencent.com/tencentcareer/api/post/ByPostId?timestamp=1557122746678&postId=%s&language=zh-cn' % \
                      item['post_id']

            yield scrapy.Request(
                url=two_url,
                meta={'item': item},
                callback=self.parse_two_page
            )

    def parse_two_page(self, response):
        item = response.meta['item']
        html = json.loads(response.text)
        # 職責
        item['duty'] = html['Data']['Responsibility']
        # 要求
        item['require'] = html['Data']['Requirement']

        yield item
相關文章
相關標籤/搜索