scrapy流程圖
舊版
php
新版
css
Scrapy Engine(引擎)
: 負責Spider、ItemPipeline、Downloader、Scheduler中間的通信,信號、數據傳遞等。html
Scheduler(調度器)
: 它負責接受引擎發送過來的Request請求,並按照必定的方式進行整理排列,入隊,當引擎須要時,交還給引擎。python
Downloader(下載器)
:負責下載Scrapy Engine(引擎)發送的全部Requests請求,並將其獲取到的Responses交還給Scrapy Engine(引擎),由引擎交給Spider來處理,git
Spider(爬蟲)
:它負責處理全部Responses,從中分析提取數據,獲取Item字段須要的數據,並將須要跟進的URL提交給引擎,再次進入Scheduler(調度器),github
Item Pipeline(管道)
:它負責處理Spider中獲取到的Item,並進行進行後期處理(詳細分析、過濾、存儲等)的地方.web
Downloader Middlewares(下載中間件)
:你能夠看成是一個能夠自定義擴展下載功能的組件。正則表達式
Spider Middlewares(Spider中間件)
:你能夠理解爲是一個能夠自定擴展和操做引擎和Spider中間通訊的功能組件(好比進入Spider的Responses;和從Spider出去的Requests)redis
數據流(Data flow)mongodb
引擎獲取起始url併發起請求,將獲取的響應內容返回給spider,
在spider中進行數據的提取和下一個url的連接,
數據交給item和pipeline進行處理,
url繼續發起請求,
製做 Scrapy 爬蟲 一共須要4步:
命令行輸入
scrapy startproject tutorial
目錄結構
scrapy.cfg: 項目的配置文件;(用於發佈到服務器)
tutorial/: 該項目文件夾。以後將在此編寫Python代碼。
tutorial/items.py: 項目中的item文件;(定義結構化數據字段field).
tutorial/pipelines.py: 項目中的pipelines文件;(用於存放執行後期數據處理的功能,定義如何存儲結構化數據)
tutorial/settings.py: 項目的設置文件;(如何修改User-Agent,設置爬取時間間隔,設置代理,配置中間件等等)
tutorial/spiders/: 放置spider代碼的目錄;(編寫爬取網站規則)
定義item,在items.py
文件中編寫item
相似與django
import scrapy class DmozItem(scrapy.Item): title = scrapy.Field() link = scrapy.Field() desc = scrapy.Field()
Spider是用戶編寫用於從單個網站(或者一些網站)爬取數據的類。
其包含了一個用於下載的初始URL,如何跟進網頁中的連接以及如何分析頁面中的內容, 提取生成 item 的方法。
爲了建立一個Spider,您必須繼承scrapy.Spider
類, 且定義一些屬性:
name
: 用於區別Spider。 該名字必須是惟一的。start_urls
: 包含了Spider在啓動時進行爬取的url列表。 所以,第一個被獲取到的頁面將是其中之一。 後續的URL則從初始的URL獲取到的數據中提取。parse()
是spider
的一個方法。 被調用時,每一個初始URL完成下載後生成的 Response 對象將會做爲惟一的參數傳遞給該函數。 該方法負責解析返回的數據(response data),提取數據(生成item)以及生成須要進一步處理的URL的 Request 對象。scrapy genspider name "example.com"
import scrapy class DmozSpider(scrapy.Spider): name = "dmoz" allowed_domains = ["dmoz.org"] start_urls = [ "http://www.dmoz.org/Computers/Programming/Languages/Python/Books/", "http://www.dmoz.org/Computers/Programming/Languages/Python/Resources/" ] def parse(self, response): filename = response.url.split("/")[-2] + '.html' with open(filename, 'wb') as f: f.write(response.body)
啓動爬蟲
scrapy crawl dmoz
Selectors選擇器簡介
https://scrapy-chs.readthedocs.io/zh_CN/1.0/topics/selectors.html
Scrapy Selectors
內置XPath
和 CSS Selector
表達式機制
Selector有四個基本的方法:
- xpath(): 傳入xpath表達式,返回該表達式所對應的全部節點的selector list列表
- extract(): 序列化該節點爲Unicode字符串並返回list
- css(): 傳入CSS表達式,返回該表達式所對應的全部節點的selector list列表,語法同 BeautifulSoup4
- re(): 根據傳入的正則表達式對數據進行提取,返回Unicode字符串list列表
經過shell能夠很方便的提取出須要的數據
當Item在Spider中被收集以後,它將會被傳遞到Item Pipeline
每一個Item Pipeline組件接收到Item,定義一些操做行爲,好比決定此Item是丟棄而存儲。
如下是item pipeline的一些典型應用:
編寫item pipeline
很簡單,item pipiline
組件是一個獨立的Python類,其中process_item()
方法必須實現:
import something class SomethingPipeline(object): def __init__(self): # 可選實現,作參數初始化等 # doing something def process_item(self, item, spider): # item (Item 對象) – 被爬取的item # spider (Spider 對象) – 爬取該item的spider # 這個方法必須實現,每一個item pipeline組件都須要調用該方法, # 這個方法必須返回一個 Item 對象,被丟棄的item將不會被以後的pipeline組件所處理。 return item def open_spider(self, spider): # spider (Spider 對象) – 被開啓的spider # 可選實現,當spider被開啓時,這個方法被調用。 def close_spider(self, spider): # spider (Spider 對象) – 被關閉的spider # 可選實現,當spider被關閉時,這個方法被調用
import json class JsonWriterPipeline(object): def __init__(self): self.file = open('items.json', 'wb') def process_item(self, item, spider): line = json.dumps(dict(item),ensure_ascii=False) + "\n" self.file.write(line) return item
爲了啓用Item Pipeline
組件,必須將它的類添加到 settings.py
文件ITEM_PIPELINES
配置,就像下面這個例子:
ITEM_PIPELINES = { #'tutorial.pipelines.PricePipeline': 300, 'tutorial.pipelines.JsonWriterPipeline': 800, }
分配給每一個類的整型值,肯定了他們運行的順序,item按數字從低到高的順序,經過pipeline,一般將這些數字定義在0-1000範圍內。數值越低,越先運行
pipeline中還有一個from_crawler(cls, crawler)
類方法
若是使用,這個類方法被調用建立爬蟲管道實例。必須返回管道的一個新實例。crawler提供存取全部Scrapy核心組件配置和信號管理器; 對於pipelines這是一種訪問配置和信號管理器 的方式。
在這個例子中,咱們將使用pymongo
將Item寫到MongoDB。MongoDB的地址和數據庫名稱在Scrapy setttings.py
配置文件中;
這個例子主要是說明如何使用from_crawler()
方法
import pymongo class MongoPipeline(object): collection_name = 'scrapy_items' def __init__(self, mongo_uri, mongo_db): self.mongo_uri = mongo_uri self.mongo_db = mongo_db @classmethod def from_crawler(cls, crawler): return cls( mongo_uri=crawler.settings.get('MONGO_URI'), mongo_db=crawler.settings.get('MONGO_DATABASE', 'items') ) def open_spider(self, spider): self.client = pymongo.MongoClient(self.mongo_uri) self.db = self.client[self.mongo_db] def close_spider(self, spider): self.client.close() def process_item(self, item, spider): self.db[self.collection_name].insert(dict(item)) return item
https://scrapy-chs.readthedocs.io/zh_CN/1.0/topics/spiders.html
Spider類定義瞭如何爬取某個(或某些)網站。包括了爬取的動做(例如:是否跟進連接)以及如何從網頁的內容中提取結構化數據(爬取item)。 換句話說,Spider就是定義爬取的動做及分析某個網頁(或者是有些網頁)的地方。
class scrapy.spider.Spider
Spider是最簡單的spider。每一個spider必須繼承自該類。Spider並無提供什麼特殊的功能。其僅僅請求給定的 start_urls/start_requests
,並根據返回的結果調用spider的parse方法。
源碼參考
#全部爬蟲的基類,用戶定義的爬蟲必須從這個類繼承 class Spider(object_ref): #定義spider名字的字符串(string)。spider的名字定義了Scrapy如何定位(並初始化)spider,因此其必須是惟一的。 #name是spider最重要的屬性,並且是必須的。 #通常作法是以該網站(domain)(加或不加 後綴 )來命名spider。 例如,若是spider爬取 mywebsite.com ,該spider一般會被命名爲 mywebsite name = None #初始化,提取爬蟲名字,start_ruls def __init__(self, name=None, **kwargs): if name is not None: self.name = name # 若是爬蟲沒有名字,中斷後續操做則報錯 elif not getattr(self, 'name', None): raise ValueError("%s must have a name" % type(self).__name__) # python 對象或類型經過內置成員__dict__來存儲成員信息 self.__dict__.update(kwargs) #URL列表。當沒有指定的URL時,spider將從該列表中開始進行爬取。 所以,第一個被獲取到的頁面的URL將是該列表之一。 後續的URL將會從獲取到的數據中提取。 if not hasattr(self, 'start_urls'): self.start_urls = [] # 打印Scrapy執行後的log信息 def log(self, message, level=log.DEBUG, **kw): log.msg(message, spider=self, level=level, **kw) # 判斷對象object的屬性是否存在,不存在作斷言處理 def set_crawler(self, crawler): assert not hasattr(self, '_crawler'), "Spider already bounded to %s" % crawler self._crawler = crawler @property def crawler(self): assert hasattr(self, '_crawler'), "Spider not bounded to any crawler" return self._crawler @property def settings(self): return self.crawler.settings #該方法將讀取start_urls內的地址,併爲每個地址生成一個Request對象,交給Scrapy下載並返回Response #該方法僅調用一次 def start_requests(self): for url in self.start_urls: yield self.make_requests_from_url(url) #start_requests()中調用,實際生成Request的函數。 #Request對象默認的回調函數爲parse(),提交的方式爲get def make_requests_from_url(self, url): return Request(url, dont_filter=True) #默認的Request對象回調函數,處理返回的response。 #生成Item或者Request對象。用戶必須實現這個類 def parse(self, response): raise NotImplementedError @classmethod def handles_request(cls, request): return url_is_from_spider(request.url, cls) def __str__(self): return "<%s %r at 0x%0x>" % (type(self).__name__, self.name, id(self)) __repr__ = __str__
主要屬性和方法
name
定義spider名字的字符串。
例如,若是spider爬取 mywebsite.com ,該spider一般會被命名爲 mywebsite
allowed_domains
包含了spider容許爬取的域名(domain)的列表,可選。
start_urls
初始URL元祖/列表。當沒有制定特定的URL時,spider將從該列表中開始進行爬取。
start_requests(self)
該方法必須返回一個可迭代對象(iterable)。該對象包含了spider用於爬取(默認實現是使用start_urls 的url)的第一個Request。
當spider啓動爬取而且未指定start_urls時,該方法被調用。
parse(self, response)
當請求url返回網頁沒有指定回調函數時,默認的Request對象回調函數。用來處理網頁返回的response,以及生成Item或者Request對象。
log(self, message[, level, component])
使用 scrapy.log.msg() 方法記錄(log)message。 更多數據請參見 logging
騰訊招聘網自動翻頁
(代碼採集自互聯網)
from mySpider.items import TencentItem import scrapy import re class TencentSpider(scrapy.Spider): name = "tencent" allowed_domains = ["hr.tencent.com"] start_urls = [ "http://hr.tencent.com/position.php?&start=0#a" ] def parse(self, response): for each in response.xpath('//*[@class="even"]'): item = TencentItem() name = each.xpath('./td[1]/a/text()').extract()[0] detailLink = each.xpath('./td[1]/a/@href').extract()[0] positionInfo = each.xpath('./td[2]/text()').extract()[0] peopleNumber = each.xpath('./td[3]/text()').extract()[0] workLocation = each.xpath('./td[4]/text()').extract()[0] publishTime = each.xpath('./td[5]/text()').extract()[0] #print name, detailLink, catalog, peopleNumber, workLocation,publishTime item['name'] = name.encode('utf-8') item['detailLink'] = detailLink.encode('utf-8') item['positionInfo'] = positionInfo.encode('utf-8') item['peopleNumber'] = peopleNumber.encode('utf-8') item['workLocation'] = workLocation.encode('utf-8') item['publishTime'] = publishTime.encode('utf-8') curpage = re.search('(\d+)',response.url).group(1) page = int(curpage) + 10 url = re.sub('\d+', str(page), response.url) # 發送新的url請求加入待爬隊列,並調用回調函數 self.parse yield scrapy.Request(url, callback = self.parse) # 將獲取的數據交給pipeline yield item
經過下面的命令能夠快速建立 CrawlSpider模板 的代碼:
scrapy genspider -t crawl tencent tencent.com
class scrapy.spiders.CrawlSpider
它是Spider的派生類,Spider類的設計原則是隻爬取start_url列表中的網頁,而CrawlSpider類定義了一些規則(rule)來提供跟進link的方便的機制,從爬取的網頁中獲取link並繼續爬取的工做更適合。
源碼解析參考
class CrawlSpider(Spider): rules = () def __init__(self, *a, **kw): super(CrawlSpider, self).__init__(*a, **kw) self._compile_rules() #首先調用parse()來處理start_urls中返回的response對象 #parse()則將這些response對象傳遞給了_parse_response()函數處理,並設置回調函數爲parse_start_url() #設置了跟進標誌位True #parse將返回item和跟進了的Request對象 def parse(self, response): return self._parse_response(response, self.parse_start_url, cb_kwargs={}, follow=True) #處理start_url中返回的response,須要重寫 def parse_start_url(self, response): return [] def process_results(self, response, results): return results #從response中抽取符合任一用戶定義'規則'的連接,並構形成Resquest對象返回 def _requests_to_follow(self, response): if not isinstance(response, HtmlResponse): return seen = set() #抽取以內的全部連接,只要經過任意一個'規則',即表示合法 for n, rule in enumerate(self._rules): links = [l for l in rule.link_extractor.extract_links(response) if l not in seen] #使用用戶指定的process_links處理每一個鏈接 if links and rule.process_links: links = rule.process_links(links) #將連接加入seen集合,爲每一個連接生成Request對象,並設置回調函數爲_repsonse_downloaded() for link in links: seen.add(link) #構造Request對象,並將Rule規則中定義的回調函數做爲這個Request對象的回調函數 r = Request(url=link.url, callback=self._response_downloaded) r.meta.update(rule=n, link_text=link.text) #對每一個Request調用process_request()函數。該函數默認爲indentify,即不作任何處理,直接返回該Request. yield rule.process_request(r) #處理經過rule提取出的鏈接,並返回item以及request def _response_downloaded(self, response): rule = self._rules[response.meta['rule']] return self._parse_response(response, rule.callback, rule.cb_kwargs, rule.follow) #解析response對象,會用callback解析處理他,並返回request或Item對象 def _parse_response(self, response, callback, cb_kwargs, follow=True): #首先判斷是否設置了回調函數。(該回調函數多是rule中的解析函數,也多是 parse_start_url函數) #若是設置了回調函數(parse_start_url()),那麼首先用parse_start_url()處理response對象, #而後再交給process_results處理。返回cb_res的一個列表 if callback: #若是是parse調用的,則會解析成Request對象 #若是是rule callback,則會解析成Item cb_res = callback(response, **cb_kwargs) or () cb_res = self.process_results(response, cb_res) for requests_or_item in iterate_spider_output(cb_res): yield requests_or_item #若是須要跟進,那麼使用定義的Rule規則提取並返回這些Request對象 if follow and self._follow_links: #返回每一個Request對象 for request_or_item in self._requests_to_follow(response): yield request_or_item def _compile_rules(self): def get_method(method): if callable(method): return method elif isinstance(method, basestring): return getattr(self, method, None) self._rules = [copy.copy(r) for r in self.rules] for rule in self._rules: rule.callback = get_method(rule.callback) rule.process_links = get_method(rule.process_links) rule.process_request = get_method(rule.process_request) def set_crawler(self, crawler): super(CrawlSpider, self).set_crawler(crawler) self._follow_links = crawler.settings.getbool('CRAWLSPIDER_FOLLOW_LINKS', True)
CrawlSpider繼承於Spider類,除了繼承過來的屬性外(name、allow_domains),還提供了新的屬性和方法:
class scrapy.linkextractors.LinkExtractor
Link Extractors
的目的很簡單: 提取連接。
每一個LinkExtractor
有惟一的公共方法是 extract_links()
,它接收一個 Response
對象,並返回一個 scrapy.link.Link
對象。
Link Extractors
要實例化一次,而且 extract_links
方法會根據不一樣的response
調用屢次提取連接。
class scrapy.linkextractors.LinkExtractor( allow = (), deny = (), allow_domains = (), deny_domains = (), deny_extensions = None, restrict_xpaths = (), tags = ('a','area'), attrs = ('href'), canonicalize = True, unique = True, process_value = None )
主要參數:
allow
:知足括號中「正則表達式」的值會被提取,若是爲空,則所有匹配。deny
:與這個正則表達式(或正則表達式列表)不匹配的URL必定不提取。allow_domains
:會被提取的連接的domains。deny_domains
:必定不會被提取連接的domains。restrict_xpaths
:使用xpath表達式,和allow共同做用過濾連接。在rules中包含一個或多個Rule對象,每一個Rule對爬取網站的動做定義了特定操做。若是多個rule匹配了相同的連接,則根據規則在本集合中被定義的順序,第一個會被使用。
class scrapy.spiders.Rule( link_extractor, callback = None, cb_kwargs = None, follow = None, process_links = None, process_request = None )
link_extractor
:是一個Link Extractor對象,用於定義須要提取的連接。callback
: 從link_extractor中每獲取到連接時,參數所指定的值做爲回調函數,該回調函數接受一個response做爲其第一個參數。
注意:當編寫爬蟲規則時,避免使用parse做爲回調函數。因爲CrawlSpider使用parse方法來實現其邏輯,若是覆蓋了 parse方法,crawl spider將會運行失敗。
follow
:是一個布爾(boolean)值,指定了根據該規則從response提取的連接是否須要跟進。 若是callback爲None,follow 默認設置爲True ,不然默認爲False。process_links
:指定該spider中哪一個的函數將會被調用,從link_extractor中獲取到連接列表時將會調用該函數。該方法主要用來過濾。process_request
:指定該spider中哪一個的函數將會被調用, 該規則提取到每一個request時都會調用該函數。 (用來過濾request)
翻頁
import scrapy from scrapy.spiders import CrawlSpider, Rule from scrapy.linkextractors import LinkExtractor from mySpider.items import TencentItem class TencentSpider(CrawlSpider): name = "tencent" allowed_domains = ["hr.tencent.com"] start_urls = [ "http://hr.tencent.com/position.php?&start=0#a" ] page_lx = LinkExtractor(allow=("start=\d+")) rules = [ Rule(page_lx, callback = "parseContent", follow = True) ] def parseContent(self, response): for each in response.xpath('//*[@class="even"]'): name = each.xpath('./td[1]/a/text()').extract()[0] detailLink = each.xpath('./td[1]/a/@href').extract()[0] positionInfo = each.xpath('./td[2]/text()').extract()[0] peopleNumber = each.xpath('./td[3]/text()').extract()[0] workLocation = each.xpath('./td[4]/text()').extract()[0] publishTime = each.xpath('./td[5]/text()').extract()[0] #print name, detailLink, catalog,recruitNumber,workLocation,publishTime item = TencentItem() item['name']=name.encode('utf-8') item['detailLink']=detailLink.encode('utf-8') item['positionInfo']=positionInfo.encode('utf-8') item['peopleNumber']=peopleNumber.encode('utf-8') item['workLocation']=workLocation.encode('utf-8') item['publishTime']=publishTime.encode('utf-8') yield item
某些網站會爲每個url增長一個sessionid屬性,多是爲了標記用戶訪問歷史,並且這個seesionid隨着每次訪問都會動態變化,這就爲爬蟲的去重處理(即標記已經爬取過的網站)和提取規則增長了難度。
https://bitsharestalk.org/index.php?board=5.0
會變成https://bitsharestalk.org/index.phpPHPSESSID=9771d42640ab3c89eb77e8bd9e220b53&board=5.0
,下面介紹集中處理方法
僅適用你的爬蟲使用的是 scrapy.contrib.spiders.CrawlSpider, 在這個內置爬蟲中,你提取url要經過Rule類來進行提取,其自帶了對提取後的url進行加工的函數。
rules = ( Rule(LinkExtractor(allow = ( "https://bitsharestalk\.org/index\.php\?PHPSESSID\S*board=\d+\.\d+$", "https://bitsharestalk\.org/index\.php\?board=\d+\.\d+$" )), process_links = 'link_filtering' ), #默認函數process_links Rule(LinkExtractor(allow = ( " https://bitsharestalk\.org/index\.php\?PHPSESSID\S*topic=\d+\.\d+$" , "https://bitsharestalk\.org/index\.php\?topic=\d+\.\d+$", ),), callback = "extractPost" , follow = True, process_links = 'link_filtering' ), Rule(LinkExtractor(allow = ( "https://bitsharestalk\.org/index\.php\?PHPSESSID\S*action=profile;u=\d+$" , "https://bitsharestalk\.org/index\.php\?action=profile;u=\d+$" , ),), callback = "extractUser", process_links = 'link_filtering' ) ) def link_filtering(self, links): ret = [] for link in links: url = link.url # print "This is the yuanlai ", link.url urlfirst, urllast = url.split( " ? " ) if urllast: link.url = urlfirst + " ? " + urllast.split( " & " , 1)[1] # print link.url return links
class WeiboSpider(CrawlSpider): name = 'weibo' allowed_domains = ['weibo.com'] start_urls = ['http://www.weibo.com/u/1876296184'] # 不加www,則匹配不到cookie, get_login_cookie()方法正則代完善 rules = ( Rule(LinkExtractor(allow=r'^http:\/\/(www\.)?weibo.com/[a-z]/.*'), # 微博我的頁面的規則,或/u/或/n/後面跟一串數字 process_request='process_request', callback='parse_item', follow=True), ) cookies = None def process_request(self, request): link=request.url page = re.search('page=\d*', link).group() type = re.search('type=\d+', link).group() newrequest = request.replace(cookies =self.cookies, url='.../questionType?' + page + "&" + type) return newrequest
Scrapy提供了log功能,能夠經過 logging 模塊使用。
Scrapy提供5層logging級別:
默認狀況下python的logging模塊將日誌打印到了標準輸出中,且只顯示了大於等於WARNING級別的日誌,這說明默認的日誌級別設置爲WARNING(日誌級別等級CRITICAL > ERROR > WARNING > INFO > DEBUG,默認的日誌格式爲DEBUG級別
經過在setting.py中
進行如下設置能夠被用來配置logging
:
print("hello")
,其將會在Scrapy log中顯示。#coding:utf-8 ###################### ##Logging的使用 ###################### import logging ''' 1. logging.CRITICAL - for critical errors (highest severity) 致命錯誤 2. logging.ERROR - for regular errors 通常錯誤 3. logging.WARNING - for warning messages 警告+錯誤 4. logging.INFO - for informational messages 消息+警告+錯誤 5. logging.DEBUG - for debugging messages (lowest severity) 低級別 ''' logging.warning("This is a warning") logging.log(logging.WARNING,"This is a warning") #獲取實例對象 logger=logging.getLogger() logger.warning("這是警告消息") #指定消息發出者 logger = logging.getLogger('SimilarFace') logger.warning("This is a warning") #在爬蟲中使用log import scrapy class MySpider(scrapy.Spider): name = 'myspider' start_urls = ['http://scrapinghub.com'] def parse(self, response): #方法1 自帶的logger self.logger.info('Parse function called on %s', response.url) #方法2 本身定義個logger logger.info('Parse function called on %s', response.url) ''' Logging 設置 • LOG_FILE • LOG_ENABLED • LOG_ENCODING • LOG_LEVEL • LOG_FORMAT • LOG_DATEFORMAT • LOG_STDOUT 命令行中使用 --logfile FILE Overrides LOG_FILE --loglevel/-L LEVEL Overrides LOG_LEVEL --nolog Sets LOG_ENABLED to False ''' import logging from scrapy.utils.log import configure_logging configure_logging(install_root_handler=False) #定義了logging的些屬性 logging.basicConfig( filename='log.txt', format='%(levelname)s: %(levelname)s: %(message)s', level=logging.INFO ) #運行時追加模式 logging.info('進入Log文件') logger = logging.getLogger('SimilarFace') logger.warning("也要進入Log文件")
https://scrapy-chs.readthedocs.io/zh_CN/1.0/topics/settings.html
Scrapy設置(settings)提供了定製Scrapy組件的方法。能夠控制包括核心(core),插件(extension),pipeline及spider組件。好比 設置Json Pipeliine、LOG_LEVEL
BOT_NAME
默認: scrapybot
當您使用 startproject
命令建立項目時其也被自動賦值。
CONCURRENT_ITEMS
默認: 100
Item Processor
(即 Item Pipeline
) 同時處理(每一個response的)item的最大值。
CONCURRENT_REQUESTS
默認: 16
Scrapy downloader
併發請求(concurrent requests
)的最大值。
{ 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', 'Accept-Language': 'en', }
Scrapy HTTP Request
使用的默認header。
DEPTH_LIMIT
默認: 0
爬取網站最大容許的深度
(depth)值。若是爲0,則沒有限制。
DOWNLOAD_DELAY
默認: 0
下載器在下載同一個網站下一個頁面前須要等待的時間。該選項能夠用來限制爬取速度, 減輕服務器壓力。同時也支持小數:DOWNLOAD_DELAY = 0.25 # 250 ms of delay
該設置影響(默認啓用的) RANDOMIZE_DOWNLOAD_DELAY
設置。 默認狀況下,Scrapy在兩個請求間不等待一個固定的值, 而是使用0.5到1.5之間的一個隨機值 DOWNLOAD_DELAY
的結果做爲等待間隔。
DOWNLOAD_TIMEOUT
默認: 180
下載器超時時間(單位: 秒)。
ITEM_PIPELINES
ITEM_PIPELINES = { 'mybot.pipelines.validate.ValidateMyItem': 300, 'mybot.pipelines.validate.StoreMyItem': 800, }
LOG_ENABLED
默認: True
是否啓用logging。
LOG_ENCODING
默認: 'utf-8'
logging使用的編碼。
LOG_LEVEL
默認: 'DEBUG'
log的最低級別。可選的級別有: CRITICAL、 ERROR、WARNING、INFO、DEBUG
。
USER_AGENT
默認: Scrapy/VERSION (+http://scrapy.org)
爬取的默認User-Agent
,除非被覆蓋。
https://docs.scrapy.org/en/latest/topics/request-response.html
Request 部分源碼:
# 部分代碼 class Request(object_ref): def __init__(self, url, callback=None, method='GET', headers=None, body=None, cookies=None, meta=None, encoding='utf-8', priority=0, dont_filter=False, errback=None): self._encoding = encoding # this one has to be set first self.method = str(method).upper() self._set_url(url) self._set_body(body) assert isinstance(priority, int), "Request priority not an integer: %r" % priority self.priority = priority assert callback or not errback, "Cannot use errback without a callback" self.callback = callback self.errback = errback self.cookies = cookies or {} self.headers = Headers(headers or {}, encoding=encoding) self.dont_filter = dont_filter self._meta = dict(meta) if meta else None @property def meta(self): if self._meta is None: self._meta = {} return self._meta
經常使用參數
url
: 就是須要請求,並進行下一步處理的url
callback
: 指定該請求返回的Response
,由那個函數來處理。
method
: 請求通常不須要指定,默認GET方法,可設置爲"GET", "POST", "PUT"等,且保證字符串大寫
headers
: 請求時,包含的頭文件。通常不須要。內容通常以下:
# 本身寫過爬蟲的確定知道 Host: media.readthedocs.org User-Agent: Mozilla/5.0 (Windows NT 6.2; WOW64; rv:33.0) Gecko/20100101 Firefox/33.0 Accept: text/css,*/*;q=0.1 Accept-Language: zh-cn,zh;q=0.8,en-us;q=0.5,en;q=0.3 Accept-Encoding: gzip, deflate Referer: http://scrapy-chs.readthedocs.org/zh_CN/0.24/ Cookie: _ga=GA1.2.1612165614.1415584110; Connection: keep-alive If-Modified-Since: Mon, 25 Aug 2014 21:59:35 GMT Cache-Control: max-age=0
meta
: 比較經常使用,在不一樣的請求之間傳遞數據使用的。字典dict型
request_with_cookies = Request( url="http://www.example.com", cookies={'currency': 'USD', 'country': 'UY'}, meta={'dont_merge_cookies': True} )
encoding
: 使用默認的 'utf-8' 就行。
dont_filter
: 代表該請求不禁調度器過濾。這是當你想使用屢次執行相同的請求,忽略重複的過濾器。默認爲False。
errback
: 指定錯誤處理函數
Response
# 部分代碼 class Response(object_ref): def __init__(self, url, status=200, headers=None, body='', flags=None, request=None): self.headers = Headers(headers or {}) self.status = int(status) self._set_body(body) self._set_url(url) self.request = request self.flags = [] if flags is None else list(flags) @property def meta(self): try: return self.request.meta except AttributeError: raise AttributeError("Response.meta not available, this response " \ "is not tied to any request")
大部分參數和上面的差很少:
status: 響應碼 _set_body(body): 響應體 _set_url(url):響應url self.request = request
https://docs.scrapy.org/en/latest/topics/downloader-middleware.html
下載中間件是處於引擎(crawler.engine)和下載器(crawler.engine.download())之間的一層組件,能夠有多個下載中間件被加載運行。
當引擎傳遞請求給下載器的過程當中,下載中間件能夠對請求進行處理 (例如增長http header信息,增長proxy信息等);
在下載器完成http請求,傳遞響應給引擎的過程當中, 下載中間件能夠對響應進行處理(例如進行gzip的解壓等)
要激活下載器中間件組件,將其加入到 DOWNLOADER_MIDDLEWARES 設置中。 該設置是一個字典(dict),鍵爲中間件類的路徑,值爲其中間件的順序(order)。
這裏是一個例子:
DOWNLOADER_MIDDLEWARES = { 'mySpider.middlewares.MyDownloaderMiddleware': 543, }
編寫下載器中間件十分簡單。每一箇中間件組件是一個定義瞭如下一個或多個方法的Python類:
class scrapy.contrib.downloadermiddleware.DownloaderMiddleware
process_request()
必須返回如下其中之一:一個 None 、一個 Response 對象、一個 Request 對象或 raise IgnoreRequest:若是其raise一個 IgnoreRequest 異常,則安裝的下載中間件的 process_exception() 方法會被調用。若是沒有任何一個方法處理該異常, 則request的errback(Request.errback)方法會被調用。若是沒有代碼處理拋出的異常, 則該異常被忽略且不記錄(不一樣於其餘異常那樣)。
參數:
request (Request 對象) – 處理的request
spider (Spider 對象) – 該request對應的spider
process_request()
必須返回如下其中之一: 返回一個 Response 對象、 返回一個 Request 對象或raise一個 IgnoreRequest 異常。若是其拋出一個 IgnoreRequest 異常,則調用request的errback(Request.errback)。 若是沒有代碼處理拋出的異常,則該異常被忽略且不記錄(不一樣於其餘異常那樣)。
參數:
request (Request 對象) – response所對應的request
response (Response 對象) – 被處理的response
spider (Spider 對象) – response所對應的spider
https://scrapy-chs.readthedocs.io/zh_CN/1.0/topics/jobs.html
要啓用持久化支持,你只須要經過 JOBDIR
設置 job directory
選項。這個路徑將會存儲 全部的請求數據來保持一個單獨任務的狀態(例如:一次spider爬取(a spider run))。必需要注意的是,這個目錄不容許被不一樣的spider 共享,甚至是同一個spider的不一樣jobs/runs也不行。也就是說,這個目錄就是存儲一個 單獨 job的狀態信息。
scrapy crawl somespider -s JOBDIR=crawls/somespider-1
而後,你就能在任什麼時候候安全地中止爬蟲(按Ctrl-C或者發送一個信號)。恢復這個爬蟲也是一樣的命令:
scrapy crawl somespider -s JOBDIR=crawls/somespider-1
在scrapy源碼中找到scrapy/dupefilters.py
文件,部分源碼
class RFPDupeFilter(BaseDupeFilter): """Request Fingerprint duplicates filter""" def __init__(self, path=None, debug=False): self.file = None self.fingerprints = set() self.logdupes = True self.debug = debug self.logger = logging.getLogger(__name__) if path: self.file = open(os.path.join(path, 'requests.seen'), 'a+') self.file.seek(0) self.fingerprints.update(x.rstrip() for x in self.file) @classmethod def from_settings(cls, settings): debug = settings.getbool('DUPEFILTER_DEBUG') return cls(job_dir(settings), debug) def request_seen(self, request): fp = self.request_fingerprint(request) if fp in self.fingerprints: return True self.fingerprints.add(fp) if self.file: self.file.write(fp + os.linesep) def request_fingerprint(self, request): return request_fingerprint(request) def close(self, reason): if self.file: self.file.close() def log(self, request, spider): if self.debug: msg = "Filtered duplicate request: %(request)s" self.logger.debug(msg, {'request': request}, extra={'spider': spider}) elif self.logdupes: msg = ("Filtered duplicate request: %(request)s" " - no more duplicates will be shown" " (see DUPEFILTER_DEBUG to show all duplicates)") self.logger.debug(msg, {'request': request}, extra={'spider': spider}) self.logdupes = False spider.crawler.stats.inc_value('dupefilter/filtered', spider=spider)
裏面有一個request_seen
方法,這個方法在scrapy/core/scheduler.py
中被調用
class Scheduler(object): ... def enqueue_request(self, request): if not request.dont_filter and self.df.request_seen(request): self.df.log(request, self.spider) return False dqok = self._dqpush(request) if dqok: self.stats.inc_value('scheduler/enqueued/disk', spider=self.spider) else: self._mqpush(request) self.stats.inc_value('scheduler/enqueued/memory', spider=self.spider) self.stats.inc_value('scheduler/enqueued', spider=self.spider) return True ...
回到request_seen
方法繼續查看
def request_seen(self, request): fp = self.request_fingerprint(request) if fp in self.fingerprints: return True self.fingerprints.add(fp) if self.file: self.file.write(fp + os.linesep) # 返回的`request_fingerprint`是`from scrapy.utils.request import request_fingerprint` def request_fingerprint(self, request): return request_fingerprint(request)
scrapy\utils\request.py
這個函數將request
進行hash
,最後生成摘要(fp.hexdigest()
)
def request_fingerprint(request, include_headers=None): """ Return the request fingerprint. The request fingerprint is a hash that uniquely identifies the resource the request points to. For example, take the following two urls: http://www.example.com/query?id=111&cat=222 http://www.example.com/query?cat=222&id=111 Even though those are two different URLs both point to the same resource and are equivalent (ie. they should return the same response). Another example are cookies used to store session ids. Suppose the following page is only accesible to authenticated users: http://www.example.com/members/offers.html Lot of sites use a cookie to store the session id, which adds a random component to the HTTP Request and thus should be ignored when calculating the fingerprint. For this reason, request headers are ignored by default when calculating the fingeprint. If you want to include specific headers use the include_headers argument, which is a list of Request headers to include. """ if include_headers: include_headers = tuple(to_bytes(h.lower()) for h in sorted(include_headers)) cache = _fingerprint_cache.setdefault(request, {}) if include_headers not in cache: fp = hashlib.sha1() fp.update(to_bytes(request.method)) fp.update(to_bytes(canonicalize_url(request.url))) fp.update(request.body or b'') if include_headers: for hdr in include_headers: if hdr in request.headers: fp.update(hdr) for v in request.headers.getlist(hdr): fp.update(v) cache[include_headers] = fp.hexdigest() return cache[include_headers]
咱們能夠看到,去重指紋是sha1(method + url + body + header)
因此,實際可以去掉重複的比例並不大。
若是咱們須要本身提取去重的finger,須要本身實現Filter,並配置上它。
下面這個Filter只根據url去重:
from scrapy.dupefilter import RFPDupeFilter class SeenURLFilter(RFPDupeFilter): """A dupe filter that considers the URL""" def __init__(self, path=None): self.urls_seen = set() RFPDupeFilter.__init__(self, path) def request_seen(self, request): if request.url in self.urls_seen: return True else: self.urls_seen.add(request.url)
不要忘記配置上:
DUPEFILTER_CLASS ='scraper.custom_filters.SeenURLFilter'
https://scrapy-chs.readthedocs.io/zh_CN/1.0/topics/telnetconsole.html
Scrapy運行的有telnet服務,咱們能夠經過這個功能來獲得一些性能指標。經過telnet命令鏈接到6023端口,而後就會獲得一個在爬蟲內部環境的Python命令行。要當心的是,若是你在這裏運行了一些阻塞的操做,好比time.sleep(),正在運行的爬蟲就會被停止。經過內建的est()函數能夠打印出一些性能指標。
打開第一個命令行,運行如下代碼:
```shell
$ telnet localhost 6023
est()
...
len(engine.downloader.active) : 16
...
len(engine.slot.scheduler.mqs) : 4475
...
len(engine.scraper.slot.active) : 115
engine.scraper.slot.active_size : 117760
engine.scraper.slot.itemproc_size : 105
```
在這裏咱們忽略了dqs指標,若是你啓用了持久化支持的功能,亦即設置了JOBDIR設置項,你也
會獲得非零的dqs(len(engine.slot.scheduler.dqs)
)值,這時候就應當把dqs加到mqs上去,以便後續的分析。
mqs
len(engine.downloader.active)
CONCURRENT_REQUESTS
值是同樣的,因此也沒問題。len(engine.scraper.slot.active)
scraper
中處理,這些響應的總的大小能夠從engine.scraper.slot.active_size
指標獲得,共是115kb。除了這些響應,pipeline
中正有105個Item
engine.scraper.slot.itemproc_size
中得知,也就是說,還有10個正在爬蟲中進行處理。總的來講,能夠肯定下載器就是系統的瓶頸,由於在下載器以前有不少請求(mqs)在隊列中等待處理,下載器已經被充分地利用了;在下載器以後,咱們有一個或多或少比較很穩定的工做量(能夠經過屢次調用est()
函數來證明這一點)。另外一個信息來源是stats
對象,它通常狀況下會在爬蟲運行結束後打印出來。而在telnet
中,咱們能夠隨時經過stats.get_stats()
獲得一個dict
對象,並用p()函數打印出來:
$ p(stats.get_stats()) {'downloader/request_bytes': 558330, ... 'item_scraped_count': 2485, ...}
https://scrapy-chs.readthedocs.io/zh_CN/1.0/topics/stats.html
scrapy/statscollectors.py
""" Scrapy extension for collecting scraping stats """ import pprint import logging logger = logging.getLogger(__name__) class StatsCollector(object): def __init__(self, crawler): self._dump = crawler.settings.getbool('STATS_DUMP') self._stats = {} def get_value(self, key, default=None, spider=None): return self._stats.get(key, default) def get_stats(self, spider=None): return self._stats def set_value(self, key, value, spider=None): self._stats[key] = value def set_stats(self, stats, spider=None): self._stats = stats def inc_value(self, key, count=1, start=0, spider=None): d = self._stats d[key] = d.setdefault(key, start) + count def max_value(self, key, value, spider=None): self._stats[key] = max(self._stats.setdefault(key, value), value) def min_value(self, key, value, spider=None): self._stats[key] = min(self._stats.setdefault(key, value), value) def clear_stats(self, spider=None): self._stats.clear() def open_spider(self, spider): pass def close_spider(self, spider, reason): if self._dump: logger.info("Dumping Scrapy stats:\n" + pprint.pformat(self._stats), extra={'spider': spider}) self._persist_stats(self._stats, spider) def _persist_stats(self, stats, spider): pass class MemoryStatsCollector(StatsCollector): def __init__(self, crawler): super(MemoryStatsCollector, self).__init__(crawler) self.spider_stats = {} def _persist_stats(self, stats, spider): self.spider_stats[spider.name] = stats class DummyStatsCollector(StatsCollector): def get_value(self, key, default=None, spider=None): return default def set_value(self, key, value, spider=None): pass def set_stats(self, stats, spider=None): pass def inc_value(self, key, count=1, start=0, spider=None): pass def max_value(self, key, value, spider=None): pass def min_value(self, key, value, spider=None): pass
404頁面收集
class JobboleSpider(scrapy.Spider): name = 'jobbole' allowed_domains = ['blog.jobbole.com'] start_urls = ['http://blog.jobbole.com/all-posts/'] # 收集404的url和數量 handle_httpstatus_list = [404,] def __init__(self): self.fail_urls = [] super(JobboleSpider, self).__init__() def parse(self, response): if response.status == 404: self.fail_urls.append(response.url) self.crawler.stats.inc_value('failed_url') ...
https://scrapy-chs.readthedocs.io/zh_CN/1.0/topics/signals.html
在spider關閉時對fail_urls
進行處理
def __init__(self): self.fail_urls = [] super(JobboleSpider, self).__init__() dispatcher.connect(self.handle_spider_closed, signal=signals.spider_closed) def handle_spider_closed(self, spider, response): self.crawler.stats.set_value('failed_urls', ','.join(self.fail_urls)) ...
https://scrapy-chs.readthedocs.io/zh_CN/1.0/topics/extensions.html
scrapy/extensions
包裏有一些擴展實例
http://doc.scrapy.org/en/master/topics/practices.html#distributed-crawls
Scrapy並無提供內置的機制支持分佈式(多服務器)爬取。不過仍是有辦法進行分佈式爬取, 取決於您要怎麼分佈了。
若是您有不少spider,那分佈負載最簡單的辦法就是啓動多個Scrapyd,並分配到不一樣機器上。
若是想要在多個機器上運行一個單獨的spider,那您能夠將要爬取的url進行分塊,併發送給spider。 例如:
首先,準備要爬取的url列表,並分配到不一樣的文件url裏:
http://somedomain.com/urls-to-crawl/spider1/part1.list http://somedomain.com/urls-to-crawl/spider1/part2.list http://somedomain.com/urls-to-crawl/spider1/part3.list
接着在3個不一樣的Scrapd服務器中啓動spider。spider會接收一個(spider)參數 part , 該參數表示要爬取的分塊:
curl http://scrapy1.mycompany.com:6800/schedule.json -d project=myproject -d spider=spider1 -d part=1 curl http://scrapy2.mycompany.com:6800/schedule.json -d project=myproject -d spider=spider1 -d part=2 curl http://scrapy3.mycompany.com:6800/schedule.json -d project=myproject -d spider=spider1 -d part=3
https://github.com/rmax/scrapy-redis
Redis 命令參考
http://redisdoc.com/
pip install scrapy-redis
Scrapy 是一個通用的爬蟲框架,可是不支持分佈式,Scrapy-redis是爲了更方便地實現Scrapy分佈式爬取,而提供了一些以redis爲基礎的組件(僅有組件)。
Scrapy-redis提供了下面四種組件(components):(四種組件意味着這四個模塊都要作相應的修改)
- Scheduler
- Duplication Filter
- Item Pipeline
- Base Spider
如上圖所⽰示,scrapy-redis在scrapy的架構上增長了redis,基於redis的特性拓展了以下組件:
Scheduler
collection.deque
(雙向隊列)造成了本身的Scrapy queue(https://github.com/scrapy/queuelib/blob/master/queuelib/queue.py)),可是Scrapy多個spider不能共享
待爬取隊列Scrapy queue, 即Scrapy自己不支持爬蟲分佈式,scrapy-redis
的解決是把這個Scrapy queue
換成redis
數據庫(也是指redis隊列),從同一個redis-server
存放要爬取的request
,便能讓多個spider
去同一個數據庫裏讀取。Scrapy中跟「待爬隊列」直接相關的就是調度器Scheduler
,它負責對新的request進行入列操做(加入Scrapy queue
),取出下一個要爬取的request
(從Scrapy queue
中取出)等操做。它把待爬隊列按照優先級創建了一個字典結構,好比:
{ 優先級0 : 隊列0 優先級1 : 隊列1 優先級2 : 隊列2 }
而後根據request中的優先級,來決定該入哪一個隊列,出列時則按優先級較小的優先出列。爲了管理這個比較高級的隊列字典,Scheduler
須要提供一系列的方法。可是原來的Scheduler
已經沒法使用,因此使用Scrapy-redis
的scheduler
組件。
Duplication Filter
request
去重功能,Scrapy中把已經發送的request指紋放入到一個集合中,把下一個request的指紋拿到集合中比對,若是該指紋存在於集合中,說明這個request發送過了,若是沒有則繼續操做。這個核心的判重功能是這樣實現的:def request_seen(self, request): # self.request_figerprints就是一個指紋集合 fp = self.request_fingerprint(request) # 這就是判重的核心操做 if fp in self.fingerprints: return True self.fingerprints.add(fp) if self.file: self.file.write(fp + os.linesep)
在scrapy-redis
中去重是由Duplication Filter
組件來實現的,它經過redis
的set
不重複的特性,巧妙的實現了Duplication Filter
去重。scrapy-redis
調度器從引擎接受request
,將request
的指紋存⼊redis
的set
檢查是否重複,並將不重複的request push
寫⼊redis
的 request queue
。
引擎請求request(Spider發出的)
時,調度器從redis
的request queue
隊列⾥里根據優先級pop
出⼀個request
返回給引擎,引擎將此request
發給spider
處理。
Item Pipeline
引擎將(Spider返回的)爬取到的Item給Item Pipeline
,scrapy-redis
的Item Pipeline
將爬取到的 Item
存⼊redis
的 items queue
。
修改後的Item Pipeline
能夠很方便的根據 key
從 items queue
提取item
,從⽽實現 items processes
集羣。
Base Spider
不在使用scrapy原有的Spider類,重寫的RedisSpider
繼承了Spider
和RedisMixin
這兩個類,RedisMixin
是用來從redis
讀取url
的類。
當咱們生成一個Spider
繼承RedisSpider
時,調用setup_redis
函數,這個函數會去鏈接redis
數據庫,而後會設置signals
(信號):
一個是當spider空閒時候的signal,會調用
spider_idle
函數,這個函數調用schedule_next_request
函數,保證spider
是一直活着的狀態,而且拋出DontCloseSpider
異常。一個是當抓到一個item時的signal,會調用
item_scraped
函數,這個函數會調用schedule_next_request
函數,獲取下一個request
。
scrapy-redis的源碼並很少,工程的主體仍是是redis和scrapy兩個庫,工程自己實現的東西不是不少,這個工程就像膠水同樣,把這兩個插件粘結了起來。下面咱們來看看,scrapy-redis的每個源代碼文件都實現了什麼功能,最後如何實現分佈式的爬蟲系統
負責根據setting
中配置實例化redis
鏈接。被dupefilter
和scheduler
調用,總之涉及到redis
存取的都要使用到這個模塊。
import six from scrapy.utils.misc import load_object from . import defaults # 鏈接redis數據庫 # Shortcut maps 'setting name' -> 'parmater name'. SETTINGS_PARAMS_MAP = { 'REDIS_URL': 'url', 'REDIS_HOST': 'host', 'REDIS_PORT': 'port', 'REDIS_ENCODING': 'encoding', } def get_redis_from_settings(settings): """Returns a redis client instance from given Scrapy settings object. This function uses ``get_client`` to instantiate the client and uses ``defaults.REDIS_PARAMS`` global as defaults values for the parameters. You can override them using the ``REDIS_PARAMS`` setting. Parameters ---------- settings : Settings A scrapy settings object. See the supported settings below. Returns ------- server Redis client instance. Other Parameters ---------------- REDIS_URL : str, optional Server connection URL. REDIS_HOST : str, optional Server host. REDIS_PORT : str, optional Server port. REDIS_ENCODING : str, optional Data encoding. REDIS_PARAMS : dict, optional Additional client parameters. """ params = defaults.REDIS_PARAMS.copy() params.update(settings.getdict('REDIS_PARAMS')) # XXX: Deprecate REDIS_* settings. for source, dest in SETTINGS_PARAMS_MAP.items(): val = settings.get(source) if val: params[dest] = val # Allow ``redis_cls`` to be a path to a class. if isinstance(params.get('redis_cls'), six.string_types): params['redis_cls'] = load_object(params['redis_cls']) return get_redis(**params) # Backwards compatible alias. from_settings = get_redis_from_settings def get_redis(**kwargs): """Returns a redis client instance. Parameters ---------- redis_cls : class, optional Defaults to ``redis.StrictRedis``. url : str, optional If given, ``redis_cls.from_url`` is used to instantiate the class. **kwargs Extra parameters to be passed to the ``redis_cls`` class. Returns ------- server Redis client instance. """ redis_cls = kwargs.pop('redis_cls', defaults.REDIS_CLS) url = kwargs.pop('url', None) if url: return redis_cls.from_url(url, **kwargs) else: return redis_cls(**kwargs)
scrapy-redis默認配置
import redis # For standalone use. DUPEFILTER_KEY = 'dupefilter:%(timestamp)s' PIPELINE_KEY = '%(spider)s:items' REDIS_CLS = redis.StrictRedis REDIS_ENCODING = 'utf-8' # Sane connection defaults. # 套接字的超時時間、等待時間等 REDIS_PARAMS = { 'socket_timeout': 30, 'socket_connect_timeout': 30, 'retry_on_timeout': True, 'encoding': REDIS_ENCODING, } SCHEDULER_QUEUE_KEY = '%(spider)s:requests' SCHEDULER_QUEUE_CLASS = 'scrapy_redis.queue.PriorityQueue' SCHEDULER_DUPEFILTER_KEY = '%(spider)s:dupefilter' SCHEDULER_DUPEFILTER_CLASS = 'scrapy_redis.dupefilter.RFPDupeFilter' START_URLS_KEY = '%(name)s:start_urls' START_URLS_AS_SET = False
負責執行requst
的去重,實現的頗有技巧性,使用redis
的set
數據結構。可是注意scheduler
並不使用其中用於在這個模塊中實現的dupefilter
鍵作request
的調度,而是使用queue.py
模塊中實現的queue
。
當request
不重複時,將其存入到queue
中,調度時將其彈出。
import logging import time from scrapy.dupefilters import BaseDupeFilter from scrapy.utils.request import request_fingerprint from . import defaults from .connection import get_redis_from_settings logger = logging.getLogger(__name__) # TODO: Rename class to RedisDupeFilter. class RFPDupeFilter(BaseDupeFilter): """Redis-based request duplicates filter. This class can also be used with default Scrapy's scheduler. """ logger = logger def __init__(self, server, key, debug=False): """Initialize the duplicates filter. Parameters ---------- server : redis.StrictRedis The redis server instance. key : str Redis key Where to store fingerprints. debug : bool, optional Whether to log filtered requests. """ self.server = server self.key = key self.debug = debug self.logdupes = True @classmethod def from_settings(cls, settings): """Returns an instance from given settings. This uses by default the key ``dupefilter:<timestamp>``. When using the ``scrapy_redis.scheduler.Scheduler`` class, this method is not used as it needs to pass the spider name in the key. Parameters ---------- settings : scrapy.settings.Settings Returns ------- RFPDupeFilter A RFPDupeFilter instance. """ server = get_redis_from_settings(settings) # XXX: This creates one-time key. needed to support to use this # class as standalone dupefilter with scrapy's default scheduler # if scrapy passes spider on open() method this wouldn't be needed # TODO: Use SCRAPY_JOB env as default and fallback to timestamp. key = defaults.DUPEFILTER_KEY % {'timestamp': int(time.time())} debug = settings.getbool('DUPEFILTER_DEBUG') return cls(server, key=key, debug=debug) @classmethod def from_crawler(cls, crawler): """Returns instance from crawler. Parameters ---------- crawler : scrapy.crawler.Crawler Returns ------- RFPDupeFilter Instance of RFPDupeFilter. """ return cls.from_settings(crawler.settings) def request_seen(self, request): """Returns True if request was already seen. Parameters ---------- request : scrapy.http.Request Returns ------- bool """ fp = self.request_fingerprint(request) # This returns the number of values added, zero if already exists. added = self.server.sadd(self.key, fp) return added == 0 def request_fingerprint(self, request): """Returns a fingerprint for a given request. Parameters ---------- request : scrapy.http.Request Returns ------- str """ return request_fingerprint(request) @classmethod def from_spider(cls, spider): settings = spider.settings server = get_redis_from_settings(settings) dupefilter_key = settings.get("SCHEDULER_DUPEFILTER_KEY", defaults.SCHEDULER_DUPEFILTER_KEY) key = dupefilter_key % {'spider': spider.name} debug = settings.getbool('DUPEFILTER_DEBUG') return cls(server, key=key, debug=debug) def close(self, reason=''): """Delete data on close. Called by Scrapy's scheduler. Parameters ---------- reason : str, optional """ self.clear() def clear(self): """Clears fingerprints data.""" self.server.delete(self.key) def log(self, request, spider): """Logs given request. Parameters ---------- request : scrapy.http.Request spider : scrapy.spiders.Spider """ if self.debug: msg = "Filtered duplicate request: %(request)s" self.logger.debug(msg, {'request': request}, extra={'spider': spider}) elif self.logdupes: msg = ("Filtered duplicate request %(request)s" " - no more duplicates will be shown" " (see DUPEFILTER_DEBUG to show all duplicates)") self.logger.debug(msg, {'request': request}, extra={'spider': spider}) self.logdupes = False
這個文件看起來比較複雜,重寫了scrapy
自己已經實現的request
判重功能。由於自己scrapy
單機跑的話,只須要讀取內存中的request
隊列或者持久化的request
隊列,就能判斷此次要發出的request url
是否已經請求過或者正在調度(本地讀就好了)。而分佈式跑的話,就須要各個主機上的scheduler
都鏈接同一個數據庫的同一個request池
來判斷此次的請求是不是重複的了。
在這個文件中,經過繼承BaseDupeFilter
重寫他的方法,實現了基於redis
的判重。根據源代碼來看,scrapy-redis
使用了scrapy
自己的一個fingerprint
即request_fingerprint
,這個函數在前面去重原理中已經說過了.
這個類經過鏈接redis
,使用一個key
來向redis
的一個set
中插入fingerprint
(這個key
對於同一種spider
是相同的,redis
是一個key-value
的數據庫,若是key
是相同的,訪問到的值就是相同的,這裏使用spider名字
+DupeFilter的key
就是爲了在不一樣主機上的不一樣爬蟲實例,只要屬於同一種spider
,就會訪問到同一個set
,而這個set
就是他們的url
判重池),若是返回值爲0
,說明該set
中該fingerprint
已經存在(由於集合是沒有重複值的),則返回False
,若是返回值爲1
,說明添加了一個fingerprint
到set
中,則說明這個request
沒有重複,因而返回True
,還順便把新fingerprint
加入到數據庫中了。 DupeFilter
判重會在scheduler
類中用到,每個request
在進入調度以前都要進行判重,若是重複就不須要參加調度,直接捨棄就行了,否則就是白白浪費資源。
"""A pickle wrapper module with protocol=-1 by default.""" try: import cPickle as pickle # PY2 except ImportError: import pickle def loads(s): return pickle.loads(s) def dumps(obj): return pickle.dumps(obj, protocol=-1)
這裏實現了loads
和dumps
兩個函數,其實就是實現了一個序列化器。
由於redis
數據庫不能存儲複雜對象(key
部分只能是字符串,value
部分只能是字符串,字符串列表,字符串集合和hash),因此咱們存啥都要先串行化成文本才行。
這裏使用的就是python
的pickle
模塊,一個兼容py2和py3的串行化工具。這個serializer
主要用於一會的scheduler
存reuqest
對象。
這是是用來實現分佈式處理的做用。它將Item
存儲在redis
中以實現分佈式處理。因爲在這裏須要讀取配置,因此就用到了from_crawler()
函數。
from scrapy.utils.misc import load_object from scrapy.utils.serialize import ScrapyJSONEncoder from twisted.internet.threads import deferToThread from . import connection, defaults default_serialize = ScrapyJSONEncoder().encode class RedisPipeline(object): """Pushes serialized item into a redis list/queue Settings -------- REDIS_ITEMS_KEY : str Redis key where to store items. REDIS_ITEMS_SERIALIZER : str Object path to serializer function. """ def __init__(self, server, key=defaults.PIPELINE_KEY, serialize_func=default_serialize): """Initialize pipeline. Parameters ---------- server : StrictRedis Redis client instance. key : str Redis key where to store items. serialize_func : callable Items serializer function. """ self.server = server self.key = key self.serialize = serialize_func @classmethod def from_settings(cls, settings): params = { 'server': connection.from_settings(settings), } if settings.get('REDIS_ITEMS_KEY'): params['key'] = settings['REDIS_ITEMS_KEY'] if settings.get('REDIS_ITEMS_SERIALIZER'): params['serialize_func'] = load_object( settings['REDIS_ITEMS_SERIALIZER'] ) return cls(**params) @classmethod def from_crawler(cls, crawler): return cls.from_settings(crawler.settings) def process_item(self, item, spider): return deferToThread(self._process_item, item, spider) def _process_item(self, item, spider): key = self.item_key(item, spider) data = self.serialize(item) self.server.rpush(key, data) return item def item_key(self, item, spider): """Returns redis key based on given spider. Override this function to use a different key depending on the item and/or spider. """ return self.key % {'spider': spider.name}
pipelines
文件實現了一個item pipieline
類,和scrapy
的item pipeline
是同一個對象,經過從settings
中拿到咱們配置的REDIS_ITEMS_KEY
做爲key
,把item
串行化以後存入redis
數據庫對應的value
中(這個value
能夠看出出是個list
,咱們的每一個item
是這個list
中的一個結點),這個pipeline
把提取出的item
存起來,主要是爲了方便咱們後續處理數據。(集中處理放在同一臺服務器,仍是各自保存各自的)
該文件實現了幾個容器類,這些容器與redis進行交互,在交互時,會對request請求
進行編碼和解碼操做(序列化和反序列化)
from scrapy.utils.reqser import request_to_dict, request_from_dict from . import picklecompat class Base(object): """Per-spider base queue class""" def __init__(self, server, spider, key, serializer=None): """Initialize per-spider redis queue. Parameters ---------- server : StrictRedis Redis client instance. spider : Spider Scrapy spider instance. key: str Redis key where to put and get messages. serializer : object Serializer object with ``loads`` and ``dumps`` methods. """ if serializer is None: # Backward compatibility. # TODO: deprecate pickle. serializer = picklecompat if not hasattr(serializer, 'loads'): raise TypeError("serializer does not implement 'loads' function: %r" % serializer) if not hasattr(serializer, 'dumps'): raise TypeError("serializer '%s' does not implement 'dumps' function: %r" % serializer) self.server = server self.spider = spider self.key = key % {'spider': spider.name} self.serializer = serializer def _encode_request(self, request): """Encode a request object""" obj = request_to_dict(request, self.spider) return self.serializer.dumps(obj) def _decode_request(self, encoded_request): """Decode an request previously encoded""" obj = self.serializer.loads(encoded_request) return request_from_dict(obj, self.spider) def __len__(self): """Return the length of the queue""" raise NotImplementedError def push(self, request): """Push a request""" raise NotImplementedError def pop(self, timeout=0): """Pop a request""" raise NotImplementedError def clear(self): """Clear queue/stack""" self.server.delete(self.key) # 先進先出, 隊列 class FifoQueue(Base): """Per-spider FIFO queue""" def __len__(self): """Return the length of the queue""" return self.server.llen(self.key) # 壓頭, 出尾 def push(self, request): """Push a request""" self.server.lpush(self.key, self._encode_request(request)) def pop(self, timeout=0): """Pop a request""" if timeout > 0: data = self.server.brpop(self.key, timeout) if isinstance(data, tuple): data = data[1] else: data = self.server.rpop(self.key) if data: return self._decode_request(data) # 有序隊列 class PriorityQueue(Base): """Per-spider priority queue abstraction using redis' sorted set""" def __len__(self): """Return the length of the queue""" return self.server.zcard(self.key) def push(self, request): """Push a request""" data = self._encode_request(request) score = -request.priority # We don't use zadd method as the order of arguments change depending on # whether the class is Redis or StrictRedis, and the option of using # kwargs only accepts strings, not bytes. self.server.execute_command('ZADD', self.key, score, data) def pop(self, timeout=0): """ Pop a request timeout not support in this queue class """ # use atomic range/remove using multi/exec pipe = self.server.pipeline() pipe.multi() pipe.zrange(self.key, 0, 0).zremrangebyrank(self.key, 0, 0) results, count = pipe.execute() if results: return self._decode_request(results[0]) # 後進先出 棧 class LifoQueue(Base): """Per-spider LIFO queue.""" def __len__(self): """Return the length of the stack""" return self.server.llen(self.key) # 壓頭, 出頭 def push(self, request): """Push a request""" self.server.lpush(self.key, self._encode_request(request)) def pop(self, timeout=0): """Pop a request""" if timeout > 0: data = self.server.blpop(self.key, timeout) if isinstance(data, tuple): data = data[1] else: data = self.server.lpop(self.key) if data: return self._decode_request(data) # TODO: Deprecate the use of these names. SpiderQueue = FifoQueue SpiderStack = LifoQueue SpiderPriorityQueue = PriorityQueue
此擴展是對scrapy
中自帶的scheduler
的替代(在settings
的SCHEDULER
變量中指出),正是利用此擴展實現crawler
的分佈式調度。其利用的數據結構來自於queue
中實現的數據結構。
scrapy-redi
s所實現的兩種分佈式:爬蟲分佈式
以及item處理分佈式
就是由模塊scheduler
和模塊pipelines
實現。上述其它模塊做爲爲兩者輔助的功能模塊
import importlib import six from scrapy.utils.misc import load_object from . import connection, defaults # TODO: add SCRAPY_JOB support. class Scheduler(object): """Redis-based scheduler Settings -------- SCHEDULER_PERSIST : bool (default: False) Whether to persist or clear redis queue. SCHEDULER_FLUSH_ON_START : bool (default: False) Whether to flush redis queue on start. SCHEDULER_IDLE_BEFORE_CLOSE : int (default: 0) How many seconds to wait before closing if no message is received. SCHEDULER_QUEUE_KEY : str Scheduler redis key. SCHEDULER_QUEUE_CLASS : str Scheduler queue class. SCHEDULER_DUPEFILTER_KEY : str Scheduler dupefilter redis key. SCHEDULER_DUPEFILTER_CLASS : str Scheduler dupefilter class. SCHEDULER_SERIALIZER : str Scheduler serializer. """ def __init__(self, server, persist=False, flush_on_start=False, queue_key=defaults.SCHEDULER_QUEUE_KEY, queue_cls=defaults.SCHEDULER_QUEUE_CLASS, dupefilter_key=defaults.SCHEDULER_DUPEFILTER_KEY, dupefilter_cls=defaults.SCHEDULER_DUPEFILTER_CLASS, idle_before_close=0, serializer=None): """Initialize scheduler. Parameters ---------- server : Redis The redis server instance. persist : bool Whether to flush requests when closing. Default is False. flush_on_start : bool Whether to flush requests on start. Default is False. queue_key : str Requests queue key. queue_cls : str Importable path to the queue class. dupefilter_key : str Duplicates filter key. dupefilter_cls : str Importable path to the dupefilter class. idle_before_close : int Timeout before giving up. """ if idle_before_close < 0: raise TypeError("idle_before_close cannot be negative") self.server = server self.persist = persist self.flush_on_start = flush_on_start self.queue_key = queue_key self.queue_cls = queue_cls self.dupefilter_cls = dupefilter_cls self.dupefilter_key = dupefilter_key self.idle_before_close = idle_before_close self.serializer = serializer self.stats = None def __len__(self): return len(self.queue) @classmethod def from_settings(cls, settings): kwargs = { 'persist': settings.getbool('SCHEDULER_PERSIST'), 'flush_on_start': settings.getbool('SCHEDULER_FLUSH_ON_START'), 'idle_before_close': settings.getint('SCHEDULER_IDLE_BEFORE_CLOSE'), } # If these values are missing, it means we want to use the defaults. optional = { # TODO: Use custom prefixes for this settings to note that are # specific to scrapy-redis. 'queue_key': 'SCHEDULER_QUEUE_KEY', 'queue_cls': 'SCHEDULER_QUEUE_CLASS', 'dupefilter_key': 'SCHEDULER_DUPEFILTER_KEY', # We use the default setting name to keep compatibility. 'dupefilter_cls': 'DUPEFILTER_CLASS', 'serializer': 'SCHEDULER_SERIALIZER', } for name, setting_name in optional.items(): val = settings.get(setting_name) if val: kwargs[name] = val # Support serializer as a path to a module. if isinstance(kwargs.get('serializer'), six.string_types): kwargs['serializer'] = importlib.import_module(kwargs['serializer']) server = connection.from_settings(settings) # Ensure the connection is working. server.ping() return cls(server=server, **kwargs) @classmethod def from_crawler(cls, crawler): instance = cls.from_settings(crawler.settings) # FIXME: for now, stats are only supported from this constructor instance.stats = crawler.stats return instance def open(self, spider): self.spider = spider try: self.queue = load_object(self.queue_cls)( server=self.server, spider=spider, key=self.queue_key % {'spider': spider.name}, serializer=self.serializer, ) except TypeError as e: raise ValueError("Failed to instantiate queue class '%s': %s", self.queue_cls, e) self.df = load_object(self.dupefilter_cls).from_spider(spider) if self.flush_on_start: self.flush() # notice if there are requests already in the queue to resume the crawl if len(self.queue): spider.log("Resuming crawl (%d requests scheduled)" % len(self.queue)) def close(self, reason): if not self.persist: self.flush() def flush(self): self.df.clear() self.queue.clear() def enqueue_request(self, request): if not request.dont_filter and self.df.request_seen(request): self.df.log(request, self.spider) return False if self.stats: self.stats.inc_value('scheduler/enqueued/redis', spider=self.spider) self.queue.push(request) return True def next_request(self): block_pop_timeout = self.idle_before_close request = self.queue.pop(block_pop_timeout) if request and self.stats: self.stats.inc_value('scheduler/dequeued/redis', spider=self.spider) return request def has_pending_requests(self): return len(self) > 0
這個文件重寫了scheduler
類,用來代替scrapy.core.scheduler
的原有調度器。其實對原有調度器的邏輯沒有很大的改變,主要是使用了redis
做爲數據存儲的媒介,以達到各個爬蟲之間的統一調度。scheduler
負責調度各個spider
的request請求
,scheduler
初始化時,經過settings
文件讀取queue
和dupefilters
的類型(通常就用上邊默認的),配置queue
和dupefilters
使用的key
(通常就是spider name
加上queue
或者dupefilters
,這樣對於同一種spider
的不一樣實例,就會使用相同的數據塊了)。每當一個request
要被調度時,enqueue_request
被調用,scheduler
使用dupefilters
來判斷這個url
是否重複,若是不重複,就添加到queue
的容器中(先進先出,先進後出和優先級均可以,能夠在settings
中配置)。當調度完成時,next_request
被調用,scheduler
就經過queue
容器的接口,取出一個request
,把他發送給相應的spider
,讓spider
進行爬取工做。
設計的這個spider從redis中讀取要爬的url,而後執行爬取,若爬取過程當中返回更多的url,那麼繼續進行直至全部的request完成。以後繼續從redis中讀取url,循環這個過程。
分析:在這個spider中經過signals.spider_idle
(空閒)信號實現對crawler
狀態的監視。當idle時,返回新的make_requests_from_url(url)
給引擎,進而交給調度器調度。
from scrapy import signals from scrapy.exceptions import DontCloseSpider from scrapy.spiders import Spider, CrawlSpider from . import connection, defaults from .utils import bytes_to_str class RedisMixin(object): """Mixin class to implement reading urls from a redis queue.""" redis_key = None redis_batch_size = None redis_encoding = None # Redis client placeholder. server = None def start_requests(self): """Returns a batch of start requests from redis.""" return self.next_requests() def setup_redis(self, crawler=None): """Setup redis connection and idle signal. This should be called after the spider has set its crawler object. """ if self.server is not None: return if crawler is None: # We allow optional crawler argument to keep backwards # compatibility. # XXX: Raise a deprecation warning. crawler = getattr(self, 'crawler', None) if crawler is None: raise ValueError("crawler is required") settings = crawler.settings if self.redis_key is None: self.redis_key = settings.get( 'REDIS_START_URLS_KEY', defaults.START_URLS_KEY, ) self.redis_key = self.redis_key % {'name': self.name} if not self.redis_key.strip(): raise ValueError("redis_key must not be empty") if self.redis_batch_size is None: # TODO: Deprecate this setting (REDIS_START_URLS_BATCH_SIZE). self.redis_batch_size = settings.getint( 'REDIS_START_URLS_BATCH_SIZE', settings.getint('CONCURRENT_REQUESTS'), ) try: self.redis_batch_size = int(self.redis_batch_size) except (TypeError, ValueError): raise ValueError("redis_batch_size must be an integer") if self.redis_encoding is None: self.redis_encoding = settings.get('REDIS_ENCODING', defaults.REDIS_ENCODING) self.logger.info("Reading start URLs from redis key '%(redis_key)s' " "(batch size: %(redis_batch_size)s, encoding: %(redis_encoding)s", self.__dict__) self.server = connection.from_settings(crawler.settings) # The idle signal is called when the spider has no requests left, # that's when we will schedule new requests from redis queue crawler.signals.connect(self.spider_idle, signal=signals.spider_idle) def next_requests(self): """Returns a request to be scheduled or none.""" use_set = self.settings.getbool('REDIS_START_URLS_AS_SET', defaults.START_URLS_AS_SET) fetch_one = self.server.spop if use_set else self.server.lpop # XXX: Do we need to use a timeout here? found = 0 # TODO: Use redis pipeline execution. while found < self.redis_batch_size: data = fetch_one(self.redis_key) if not data: # Queue empty. break req = self.make_request_from_data(data) if req: yield req found += 1 else: self.logger.debug("Request not made from data: %r", data) if found: self.logger.debug("Read %s requests from '%s'", found, self.redis_key) def make_request_from_data(self, data): """Returns a Request instance from data coming from Redis. By default, ``data`` is an encoded URL. You can override this method to provide your own message decoding. Parameters ---------- data : bytes Message from redis. """ url = bytes_to_str(data, self.redis_encoding) return self.make_requests_from_url(url) def schedule_next_requests(self): """Schedules a request if available""" # TODO: While there is capacity, schedule a batch of redis requests. for req in self.next_requests(): self.crawler.engine.crawl(req, spider=self) def spider_idle(self): """Schedules a request if available, otherwise waits.""" # XXX: Handle a sentinel to close the spider. self.schedule_next_requests() raise DontCloseSpider class RedisSpider(RedisMixin, Spider): """Spider that reads urls from redis queue when idle. Attributes ---------- redis_key : str (default: REDIS_START_URLS_KEY) Redis key where to fetch start URLs from.. redis_batch_size : int (default: CONCURRENT_REQUESTS) Number of messages to fetch from redis on each attempt. redis_encoding : str (default: REDIS_ENCODING) Encoding to use when decoding messages from redis queue. Settings -------- REDIS_START_URLS_KEY : str (default: "<spider.name>:start_urls") Default Redis key where to fetch start URLs from.. REDIS_START_URLS_BATCH_SIZE : int (deprecated by CONCURRENT_REQUESTS) Default number of messages to fetch from redis on each attempt. REDIS_START_URLS_AS_SET : bool (default: False) Use SET operations to retrieve messages from the redis queue. If False, the messages are retrieve using the LPOP command. REDIS_ENCODING : str (default: "utf-8") Default encoding to use when decoding messages from redis queue. """ @classmethod def from_crawler(self, crawler, *args, **kwargs): obj = super(RedisSpider, self).from_crawler(crawler, *args, **kwargs) obj.setup_redis(crawler) return obj class RedisCrawlSpider(RedisMixin, CrawlSpider): """Spider that reads urls from redis queue when idle. Attributes ---------- redis_key : str (default: REDIS_START_URLS_KEY) Redis key where to fetch start URLs from.. redis_batch_size : int (default: CONCURRENT_REQUESTS) Number of messages to fetch from redis on each attempt. redis_encoding : str (default: REDIS_ENCODING) Encoding to use when decoding messages from redis queue. Settings -------- REDIS_START_URLS_KEY : str (default: "<spider.name>:start_urls") Default Redis key where to fetch start URLs from.. REDIS_START_URLS_BATCH_SIZE : int (deprecated by CONCURRENT_REQUESTS) Default number of messages to fetch from redis on each attempt. REDIS_START_URLS_AS_SET : bool (default: True) Use SET operations to retrieve messages from the redis queue. REDIS_ENCODING : str (default: "utf-8") Default encoding to use when decoding messages from redis queue. """ @classmethod def from_crawler(self, crawler, *args, **kwargs): obj = super(RedisCrawlSpider, self).from_crawler(crawler, *args, **kwargs) obj.setup_redis(crawler) return obj
spider
的改動也不是很大,主要是經過connect
接口,給spider
綁定了spider_idle
信號,spider
初始化時,經過setup_redis
函數初始化和redis
的鏈接,以後經過next_requests
函數從redis
中取出strat url
,使用的key
是settings
中REDIS_START_URLS_AS_SET
定義的(注意了這裏的初始化url
池和咱們上邊的queue
的url
池不是一個東西,queue
的池是用於調度的,初始化url
池是存放入口url
的,他們都存在redis
中,可是使用不一樣的key
來區分,就當成是不一樣的表吧),spider
使用少許的start url
,能夠發展出不少新的url
,這些url
會進入scheduler
進行判重和調度。直到spider
跑到調度池內沒有url
的時候,會觸發spider_idle
信號,從而觸發spider
的next_requests
函數,再次從redis
的start url
池中讀取一些url
。
py2和py3字符串兼容
import six def bytes_to_str(s, encoding='utf-8'): """Returns a str if a bytes object is given.""" if six.PY3 and isinstance(s, bytes): return s.decode(encoding) return s
這個工程經過重寫scheduler
和spider
類,實現了調度
、spider啓動
和redis的交互
。實現新的dupefilter
和queue
類,達到了判重
和調度容器
和redis的交互
,由於每一個主機上的爬蟲進程都訪問同一個redis
數據庫,因此調度和判重都統一進行統一管理,達到了分佈式爬蟲的目的。 當spider
被初始化時,同時會初始化一個對應的scheduler
對象,這個調度器對象經過讀取settings
,配置好本身的調度容器queue
和判重工具dupefilter
。每當一個spider
產出一個request
的時候,scrapy
內核會把這個reuqest
遞交給這個spider
對應的scheduler
對象進行調度,scheduler
對象經過訪問redis
對request
進行判重,若是不重複就把他添加進redis
中的調度池。當調度條件知足時,scheduler
對象就從redis
的調度池中取出一個request
發送給spider
,讓他爬取。當spider
爬取的全部暫時可用url
以後,scheduler
發現這個spider
對應的redis
的調度池空了,因而觸發信號spider_idle
,spider
收到這個信號以後,直接鏈接redis
讀取strart url
池,拿去新的一批url
入口,而後再次重複上邊的工做。
Scrapy-Redis調度的任務是Request對象,裏面信息量比較大(不只包含url,還有callback函數、headers等信息),可能致使的結果就是會下降爬蟲速度、並且會佔用Redis大量的存儲空間
,因此若是要保證效率,那麼就須要必定硬件水平,尤爲是主機。
https://piaosanlang.gitbooks.io/spiders/09day/section9.1.html
https://pypi.org/project/pybloomfiltermmap3/#description
https://pypi.org/project/pybloom_live
scrapy_redis
是利用set
數據結構來去重的,去重的對象是request
的fingerprint
。
去重原理說過了.
def request_seen(self, request): fp = self.request_fingerprint(request) # This returns the number of values added, zero if already exists. added = self.server.sadd(self.key, fp) return added == 0
若是要使用Bloomfilter優化,能夠修改去重函數request_seen
def request_seen(self, request): fp = self.request_fingerprint(request) if self.bf.isContains(fp): # 若是已經存在 return True else: self.bf.insert(fp) return False
self.bf
是類Bloomfilter()
的實例化
# encoding=utf-8 import redis from hashlib import md5 class SimpleHash(object): def __init__(self, cap, seed): self.cap = cap self.seed = seed def hash(self, value): ret = 0 for i in range(len(value)): ret += self.seed * ret + ord(value[i]) return (self.cap - 1) & ret class BloomFilter(object): def __init__(self, host='localhost', port=6379, db=0, blockNum=1, key='bloomfilter'): """ :param host: the host of Redis :param port: the port of Redis :param db: witch db in Redis :param blockNum: one blockNum for about 90,000,000; if you have more strings for filtering, increase it. :param key: the key's name in Redis """ self.server = redis.Redis(host=host, port=port, db=db) self.bit_size = 1 << 31 # Redis的String類型最大容量爲512M,現使用256M= 2^8 *2^20 字節 = 2^28 * 2^3bit self.seeds = [5, 7, 11, 13, 31, 37, 61] self.key = key self.blockNum = blockNum self.hashfunc = [] for seed in self.seeds: self.hashfunc.append(SimpleHash(self.bit_size, seed)) def isContains(self, str_input): if not str_input: return False m5 = md5() m5.update(str_input) str_input = m5.hexdigest() ret = True name = self.key + str(int(str_input[0:2], 16) % self.blockNum) for f in self.hashfunc: loc = f.hash(str_input) ret = ret & self.server.getbit(name, loc) return ret def insert(self, str_input): m5 = md5() m5.update(str_input) str_input = m5.hexdigest() name = self.key + str(int(str_input[0:2], 16) % self.blockNum) for f in self.hashfunc: loc = f.hash(str_input) self.server.setbit(name, loc, 1) if __name__ == '__main__': """ 第一次運行時會顯示 not exists!,以後再運行會顯示 exists! """ bf = BloomFilter() if bf.isContains('http://www.baidu.com'): # 判斷字符串是否存在 print 'exists!' else: print 'not exists!' bf.insert('http://www.baidu.com')
基於Redis
的Bloomfilter
去重,既用上了Bloomfilter
的海量去重能力,又用上了Redis
的可持久化能力,基於Redis
也方便分佈式機器的去重
https://github.com/scrapy/scrapyd
在Scrapy的默認配置中,是根據url進行去重的。這個對付通常網站是夠的。可是有一些網站的SEO作的很變態:爲了讓爬蟲多抓,會根據request,動態的生成一些連接,致使爬蟲 在網站上抓取大量的隨機頁面,甚至是死循環。。
爲了解決這個問題,有2個方案:
(1) 在setting.py中,設定爬蟲的嵌套次數上限(全局設定,實際是經過DepthMiddleware實現的):
DEPTH_LIMIT = 20
(2) 在parse中經過讀取response來自行判斷(spider級別設定) :
def parse(self, response): if response.meta['depth'] > 100: print ('Loop?')