所謂分佈式, 多個程序同時對一個任務進行操做html
一分多的高效率的任務進行方式web
一個 10GB 的爬蟲任務, 交給10臺服務器進行同時爬取redis
對比單服務器不管怎麼優化都是 10倍的效率, 可是成本高數據庫
須要硬件環境支持 ( 帶寬, 服務器設備等 )json
多態主機共享一個爬取隊列即爲分佈式爬蟲api
/ -------------服務器 2服務器
| / ------------------服務器3架構
服務器1 ----------- 服務器 4框架
| \-------------------服務器5dom
\-------------服務器6
服務器 1 負責 url隊列的分發 ( 本質上是個 redis 數據庫 )
其餘服務器可能分佈在全國各地, 分別和服務器1 創建聯繫,
經過 服務器 1 的共享隊列進行爬取任務的獲取
redis 速度快
redis 非關係型數據庫, redis 中的集合, 存儲每一個 request 的指紋
Scrapy 原生是不支持分佈式爬蟲
Scrapy 的爬取隊列是放在調度器中的
所以只要可以實現調度器的共享便可完成爬蟲任務的共享
即重寫 scrapy 的調度器, ( 別人造的輪子又大又圓 scrapy_redis )
scrapy-redis是scrapy框架基於redis數據庫的組件,用於scrapy項目的分佈式開發和部署。
能夠啓動多個spider工程,相互之間共享單個redis的requests隊列。最適合普遍的多個域名網站的內容爬取。
爬取到的scrapy的item數據能夠推入到redis隊列中
這意味着你能夠根據需求啓動儘量多的處理程序來共享item的隊列,進行item數據持久化處理
Scheduler調度器 + Duplication複製 過濾器,Item Pipeline,基本spider
去重規則的校驗
實現調度器對請求的調配
定製起始URL
pip install scrapy-redis
全部可選的配置, 未註釋表示必選設置
# 替換調度器, 啓用 redis 中存儲的請求調度隊列 SCHEDULER = "scrapy_redis.scheduler.Scheduler" # 替換過濾器, 全部的通向 redis 的爬蟲都須要使用此過濾器 ( 去重機制 ) DUPEFILTER_CLASS = "scrapy_redis.dupefilter.RFPDupeFilter" # 默認的 序列化方式爲 pickle, 能夠更改成 json 或者 msgpack # SCHEDULER_SERIALIZER = "scrapy_redis.picklecompat" # 是否雲訊斷點續爬 ( 爬取完成後是否清楚指紋 ) # SCHEDULER_PERSIST = True # 默認使用優先級隊列(默認),其餘:PriorityQueue(有序集合),FifoQueue(列表)、LifoQueue(列表) # SCHEDULER_QUEUE_CLASS = 'scrapy_redis.queue.PriorityQueue' # SCHEDULER_QUEUE_CLASS = 'scrapy_redis.queue.FifoQueue' # 廣度優先 基於隊列 先進先出 # SCHEDULER_QUEUE_CLASS = 'scrapy_redis.queue.LifoQueue' # 深度優先 基於棧 後進先出 # SCHEDULER_QUEUE_CLASS = 'scrapy_redis.queue.PriorityQueue' 的時候纔有用 # DEPTH_PRIORITY = 1 # 廣度優先 # DEPTH_PRIORITY = -1 # 深度優先 後進先出 # 初次啓動的時候的阻塞時間 ( 僅適用於隊列類爲SpiderQueue或SpiderStack時 ) # SCHEDULER_IDLE_BEFORE_CLOSE = 10 # 替換持久化工具爲 scrapy-redis 的 ITEM_PIPELINES = { 'scrapy_redis.pipelines.RedisPipeline': 300 } # The item pipeline serializes and stores the items in this redis key. # REDIS_ITEMS_KEY = '%(spider)s:items' # The items serializer is by default ScrapyJSONEncoder. You can use any # importable path to a callable object. # REDIS_ITEMS_SERIALIZER = 'json.dumps' # 指定鏈接 redis 的主機和端口號 # REDIS_HOST = 'localhost' # REDIS_PORT = 6379 # Specify the full Redis URL for connecting (optional). # If set, this takes precedence over the REDIS_HOST and REDIS_PORT settings. # REDIS_URL = 'redis://user:pass@hostname:9001' # Custom redis client parameters (i.e.: socket timeout, etc.) # REDIS_PARAMS = {} # Use custom redis client class. # REDIS_PARAMS['redis_cls'] = 'myproject.RedisClient' # If True, it uses redis' ``SPOP`` operation. You have to use the ``SADD`` # command to add URLs to the redis queue. This could be useful if you # want to avoid duplicates in your start urls list and the order of # processing does not matter. # REDIS_START_URLS_AS_SET = False # Default start urls key for RedisSpider and RedisCrawlSpider. # REDIS_START_URLS_KEY = '%(name)s:start_urls' # Use other encoding than utf-8 for redis. # REDIS_ENCODING = 'latin1'
redis的集合有去重屬性,在保存重複的值的時候返回值會是 0
from scrapy.dupefilter import BaseDupeFilter import redis from scrapy.utils.request import request_fingerprint class DupFilter(BaseDupeFilter): def __init__(self): self.conn = redis.Redis(host='140.143.227.206',port=8888,password='beta') def request_seen(self, request): """ 檢測當前請求是否已經被訪問過 :param request: :return: True表示已經訪問過;False表示未訪問過 """ fid = request_fingerprint(request) result = self.conn.sadd('visited_urls', fid) if result == 1: return False return True
# 直接加了配置就生效。可是是基於時間戳的,時間戳變了就會失效很不實用
################ scrapy redis鏈接 #################### REDIS_HOST = '127.0.0.1' # 主機名 REDIS_PORT = 6379 # 端口 REDIS_PARAMS = {'password':'yangtuo'} # Redis鏈接參數,好比密碼
# 默認:REDIS_PARAMS = {'socket_timeout': 30,'socket_connect_timeout': 30,'retry_on_timeout': True,'encoding': REDIS_ENCODING,}) REDIS_ENCODING = "utf-8" # redis編碼類型 默認:'utf-8' # REDIS_URL = 'redis://user:pass@hostname:9001' # 鏈接URL(優先於以上配置)
################ scrapy redis去重 #################### DUPEFILTER_KEY = 'dupefilter:%(timestamp)s' # 默認的是按照時間戳的。結果每次請求的時候是會進行去重 # 可是下一次請求時間戳不一致就會清空致使沒法去重 所以這裏存在優化空間 最好定死 # DUPEFILTER_CLASS = 'scrapy_redis.dupefilter.RFPDupeFilter' # 若是使用 scrapy-redis 的默認去重是會使用這個類來處理去重 DUPEFILTER_CLASS = 'dbd.xxx.RedisDupeFilter' # 這樣使用本身自定義的去重規則
from scrapy_redis.dupefilter import RFPDupeFilter from scrapy_redis.connection import get_redis_from_settings from scrapy_redis import defaults class RedisDupeFilter(RFPDupeFilter): @classmethod def from_settings(cls, settings):
server = get_redis_from_settings(settings) # key = defaults.DUPEFILTER_KEY % {'timestamp': int(time.time())} # 源碼這裏是用的時間戳 不斷變換致使每次重置 key = defaults.DUPEFILTER_KEY % {'timestamp': 'yangtuo'} # 不要時間戳了,我定死一個固定值 debug = settings.getbool('DUPEFILTER_DEBUG') return cls(server, key=key, debug=debug)
在 settings.py 中進行配置
SCHEDULER = "scrapy_redis.scheduler.Scheduler" # 默認使用優先級隊列(默認),其餘:PriorityQueue(有序集合),FifoQueue(列表)、LifoQueue(列表) SCHEDULER_QUEUE_CLASS = 'scrapy_redis.queue.PriorityQueue' # SCHEDULER_QUEUE_CLASS = 'scrapy_redis.queue.FifoQueue' # 廣度優先 基於隊列 先進先出 # SCHEDULER_QUEUE_CLASS = 'scrapy_redis.queue.LifoQueue' # 深度優先 基於棧 後進先出 # SCHEDULER_QUEUE_CLASS = 'scrapy_redis.queue.PriorityQueue' 的時候纔有用 DEPTH_PRIORITY = 1 # 廣度優先 # DEPTH_PRIORITY = -1 # 深度優先 後進先出 SCHEDULER_QUEUE_KEY = '%(spider)s:requests' # 調度器中請求存放在redis中的key SCHEDULER_SERIALIZER = "scrapy_redis.picklecompat" # 對保存到redis中的數據進行序列化,默認使用pickle SCHEDULER_PERSIST = False # 是否在關閉時候保留原來的調度器和去重記錄,True=保留,False=清空 SCHEDULER_FLUSH_ON_START = True # 是否在開始以前清空 調度器和去重記錄,True=清空,False=不清空 # SCHEDULER_IDLE_BEFORE_CLOSE = 10 # 去調度器中獲取數據時,若是爲空,最多等待時間(最後沒數據,未獲取到)。 SCHEDULER_DUPEFILTER_KEY = '%(spider)s:dupefilter' # 去重規則,在redis中保存時對應的key # 優先使用 DUPEFILTER_CLASS,若是沒有就是用 SCHEDULER_DUPEFILTER_CLASS SCHEDULER_DUPEFILTER_CLASS = 'scrapy_redis.dupefilter.RFPDupeFilter' # 去重規則對應處理的類
import scrapy from scrapy.http import Request import scrapy_redis from scrapy_redis.spiders import RedisSpider class ChoutiSpider(RedisSpider): # 繼承改成 用 RedisSpider name = 'chouti' allowed_domains = ['chouti.com'] # def start_requests(self): # 再也不須要寫 start_requests 了 # yield Request(url='https://dig.chouti.com',callback=self.parse) def parse(self, response): print(response)
import redis conn = redis.Redis(host='127.0.0.1',port=6379,password='yangtuo') # 經過往 redis 裏面加數據從而控制爬蟲的執行 # 若是沒有數據就會夯住。一旦有數據就爬取,能夠 conn.lpush('chouti:start_urls','https://dig.chouti.com/r/pic/hot/1')
scrapy crawl chouti --nolog
找到 SCHEDULER = "scrapy_redis.scheduler.Scheduler" 配置並實例化調度器對象
- 執行Scheduler.from_crawler
- 執行Scheduler.from_settings
- 讀取配置文件:
SCHEDULER_PERSIST # 是否在關閉時候保留原來的調度器和去重記錄,True=保留,False=清空 SCHEDULER_FLUSH_ON_START # 是否在開始以前清空 調度器和去重記錄,True=清空,False=不清空 SCHEDULER_IDLE_BEFORE_CLOSE # 去調度器中獲取數據時,若是爲空,最多等待時間(最後沒數據,未獲取到)
- 讀取配置文件:
SCHEDULER_QUEUE_KEY # %(spider)s:requests # 讀取保存在 redis 中存放爬蟲名字的 name 的值 SCHEDULER_QUEUE_CLASS # scrapy_redis.queue.FifoQueue # 讀取要使用的隊列方式 SCHEDULER_DUPEFILTER_KEY # '%(spider)s:dupefilter' # 讀取保存在 redis 中存放爬蟲去重規則名字的 key 的值 DUPEFILTER_CLASS # 'scrapy_redis.dupefilter.RFPDupeFilter' # 讀取去重的配置的類 SCHEDULER_SERIALIZER # "scrapy_redis.picklecompat" # 讀取保存在 redis 的時候用的序列化方式
- 讀取配置文件:
REDIS_HOST = '140.143.227.206' # 主機名 REDIS_PORT = 8888 # 端口 REDIS_PARAMS = {'password':'beta'} # Redis鏈接參數 默認:REDIS_PARAMS = {'socket_timeout': 30,'socket_connect_timeout': 30,'retry_on_timeout': True,'encoding': REDIS_ENCODING,}) REDIS_ENCODING = "utf-8"
- 實例Scheduler對象
- 調用 scheduler.enqueue_requests()
def enqueue_request(self, request): # 請求是否須要過濾? dont_filter = False 表示過濾 # 去重規則中是否已經有?(是否已經訪問過,若是未訪問添加到去重記錄中。) # 要求過濾,且去重記錄裏面有就變表示訪問過了 if not request.dont_filter and self.df.request_seen(request): self.df.log(request, self.spider) return False # 已經訪問過就不要再訪問了 if self.stats: self.stats.inc_value('scheduler/enqueued/redis', spider=self.spider) # print('未訪問過,添加到調度器', request) self.queue.push(request) return True
- 調用 scheduler.next_requests()
def next_request(self): block_pop_timeout = self.idle_before_close request = self.queue.pop(block_pop_timeout) if request and self.stats: self.stats.inc_value('scheduler/dequeued/redis', spider=self.spider) return request
深度優先 :基於層級先進入到最深層級進行處理所有後再往上層級處理
廣度優先 :基於從第一層級開始,每次層級處理以後進入下一層及處理
先進先出,廣度優先 FifoQueue
後進先出,深度優先 LifoQueue
優先級隊列:
DEPTH_PRIORITY = 1 # 廣度優先
DEPTH_PRIORITY = -1 # 深度優先
調度器,調配添加或獲取那個request.
隊列,存放request。
dupefilter,訪問記錄。
調度器拿到一個 request 要先去 dupefilter 裏面看下有沒有存在
若是存在就直接丟了
若是不存在才能夠放在隊列中
而後隊列要基於本身的類型來進行相應規則的存放
打比方來講:
調度器 貨車司機
dupefilter 倉庫門衛
隊列 倉庫
司機要往倉庫放貨。先問門衛
門衛說裏面有貨了不讓就無法放
門衛說能夠才能夠放
放貨的時候倉庫要按照本身的規則放貨
司機要拉貨 倉庫就基於本身的規則直接給司機就好了
使用 scrapy-redis 配合 mongoDB 的分佈式爬蟲
import scrapy class TencentItem(scrapy.Item): # define the fields for your item here like: zh_name = scrapy.Field() zh_type = scrapy.Field() post_id = scrapy.Field() duty = scrapy.Field() require = scrapy.Field()
BOT_NAME = 'Tencent' SPIDER_MODULES = ['Tencent.spiders'] NEWSPIDER_MODULE = 'Tencent.spiders' # Crawl responsibly by identifying yourself (and your website) on the user-agent USER_AGENT = 'Mozilla/5.0' # Obey robots.txt rules ROBOTSTXT_OBEY = False
# 使用scrapy_redis的調度器 SCHEDULER = "scrapy_redis.scheduler.Scheduler"
# 使用scrapy_redis的去重機制 DUPEFILTER_CLASS = "scrapy_redis.dupefilter.RFPDupeFilter"
# 爬取完成後是否清除請求指紋 SCHEDULER_PERSIST = True # 'scrapy_redis.pipelines.RedisPipeline': 200 REDIS_HOST = '172.40.91.129' REDIS_PORT = 6379 ITEM_PIPELINES = { 'Tencent.pipelines.TencentPipeline': 300, # 'scrapy_redis.pipelines.RedisPipeline': 200 # redis 的開啓這個 'Tencent.pipelines.TencentMongoPipeline': 200, }
import pymongo
class TencentPipeline(object):
def process_item(self, item, spider): print(dict(item)) return item class TencentMongoPipeline(object): def open_spider(self, spider): conn = pymongo.MongoClient('localhost',27017) db = conn['tencent'] self.myset = db['job'] def process_item(self, item, spider): self.myset.insert_one(dict(item))
# -*- coding: utf-8 -*- import scrapy import json from ..items import TencentItem class TencentSpider(scrapy.Spider): name = 'tencent' allowed_domains = ['tencent.com'] start_urls = [ 'https://careers.tencent.com/tencentcareer/api/post/Query?timestamp=1557114143837&countryId=&cityId=&bgIds=&productId=&categoryId=&parentCategoryId=&attrId=&keyword=&pageIndex=1&pageSize=10&language=zh-cn&area=cn'] def parse(self, response): for page_index in range(1, 200): url = 'https://careers.tencent.com/tencentcareer/api/post/Query?timestamp=1557114143837&countryId=&cityId=&bgIds=&productId=&categoryId=&parentCategoryId=&attrId=&keyword=&pageIndex=%s&pageSize=10&language=zh-cn&area=cn' % str( page_index) yield scrapy.Request( url=url, callback=self.parse_one_page ) # 一級頁面解析 def parse_one_page(self, response): html = json.loads(response.text) for h in html['Data']['Posts']: item = TencentItem() item['zh_name'] = h['RecruitPostName'] item['zh_type'] = h['LocationName'] # 一級頁面獲取PostId,詳情頁URL須要此參數 item['post_id'] = h['PostId'] # 想辦法獲取到職位要求和職責,F12抓包,抓到地址 two_url = 'https://careers.tencent.com/tencentcareer/api/post/ByPostId?timestamp=1557122746678&postId=%s&language=zh-cn' % \ item['post_id'] yield scrapy.Request( url=two_url, meta={'item': item}, callback=self.parse_two_page ) def parse_two_page(self, response): item = response.meta['item'] html = json.loads(response.text) # 職責 item['duty'] = html['Data']['Responsibility'] # 要求 item['require'] = html['Data']['Requirement'] yield item