分析這個項目的源碼緣由是須要有去重過濾,增量爬取兩個功能,而scrapy-redis
項目已經幫咱們實現了,想看看他是怎麼實現的。這裏只貼出部分主要代碼,查看時請打開源碼對照,筆記有點長,建議看的童鞋按部分看。這是第一次分析源碼,限於我的水平,若有錯誤懇請指正,謝謝!python
地址:https://github.com/rmax/scrapy-redis/tree/master/src/scrapy_redislinux
tips: 源碼涉及scrapy的方法,不知道的在文檔裏搜一下就知道它的做用了,redis也是。git
安裝git:https://git-scm.com/download/wingithub
git clone https://github.com/rmax/scrapy-redis.git
查看源碼的話仍是visual studio code 和pycharm方便,只要安裝了某個包,在源碼裏用Ctrl+鼠標左鍵
點擊方法名就能夠跳轉到包的源碼裏。在linux下,則麻煩點要安裝vim跳轉的插件exuberant-ctags
,有興趣能夠本身百度。redis
pip install --upgrade scrapy pip install --upgrade redis
這裏僅copy部分關鍵代碼用於理清原理,所有代碼請查看GitHub源碼。mongodb
__init__.py
。從當前目錄下的connection.py
文件中import兩個函數get_redis
和get_redis_from_settings
__init__.py
文件from .connection import ( # NOQA get_redis, get_redis_from_settings, )
接着先看看get_redis
函數。此函數返回一個redis客戶端實例。此函數定義了一個redis_cls
類,其值爲redis.StrictRedis
,是從default.py
裏的設置的值,全部默認值都放在了這個文件。(這裏就忽略了)docker
還有一個url
,默認爲None
。若是scrapy的settings.py
啓用了REDIS_URL
這個參數,就會傳遞到這裏,而後調用redis.StrictRedis
的類方法from_url
,這個方法返回一個鏈接到傳入url
的redis客戶端對象。若是scrapy的settings.py
沒有啓用REDIS_URL
這個參數,則返回一個redis的默認客戶端對象,即默認鏈接到redis://[:password]@localhost:6379/0
,而不是給定的redis地址。數據庫
connection.py
文件def get_redis(**kwargs): redis_cls = kwargs.pop('redis_cls', defaults.REDIS_CLS) url = kwargs.pop('url', None) if url: return redis_cls.from_url(url, **kwargs) else: return redis_cls(**kwargs)
再看第二個函數get_redis_from_settings
,它有個參數settings
。這個函數首先設置了一個從當前目錄下的defaults.py
獲取默認參數的副本,而後再用從scrapy項目的settings.py
中獲取的有關redis字典型的配置參數來更新替換默認參數;而後引用six
庫作了python版本兼容,最後返回個redis客戶端實例。注:scrapy的getdict方法用於將settings裏的配置轉爲字典json
connection.py
vim
def get_redis_from_settings(settings): params = defaults.REDIS_PARAMS.copy() params.update(settings.getdict('REDIS_PARAMS')) # XXX: Deprecate REDIS_* settings. for source, dest in SETTINGS_PARAMS_MAP.items(): val = settings.get(source) if val: params[dest] = val # Allow ``redis_cls`` to be a path to a class. if isinstance(params.get('redis_cls'), six.string_types): params['redis_cls'] = load_object(params['redis_cls']) return get_redis(**params)
這樣__init__
以後,就可以實例化一個鏈接到本身設置的redis
地址的redis
客戶端實例了。
可以鏈接redis
後就要將scrapy
請求的url
存到redis
。這裏做者實現了個調度器Scheduler
類來替換scrapy
默認的調度器scrapy.core.scheduler.Scheduler
。在本身項目的配置文件settings.py
中設置成SCHEDULER = "scrapy_redis.scheduler.Scheduler"
來替換默認的調度器。
SCHEDULER = "scrapy_redis.scheduler.Scheduler"
scheduler.py
就只有一個Scheduler
類。先看__init__
函數,只需傳入一個server
參數,即本身的redis
實例,其餘均是選默認參數。def __init__(self, server, ...): self.server = server ...
Scheduler
類的類方法from_settings
,設置了個字典kargs
用於從本身項目的settings.py
中讀取參數SCHEDULER_PERSIST
,SCHEDULER_FLUSH_ON_START
,SCHEDULER_IDLE_BEFORE_CLOSE
等,其中SCHEDULER_PERSIST這個參數就是用於實現增量爬取功能的,若是爲TRUE則已經存入redis
隊列裏的url
就會一直保存不會清空,在咱們中止了爬蟲,下次再繼續運行時就能夠直接跳過已經在redis
隊列裏的url
了;接着這裏還有個可選字典optional
用於替換剛纔提到的初始化函數init
裏的默認參數;接着將optional
設置了的值加入到kwargs
裏;接着做者爲了支持本地文件能像包同樣導入,就用importlib.import_module
函數轉化了下;最後實例化一個鏈接到本身的redis
實例對象,檢查對象連通性;return cls()
中的cls
返回的是一個redis
實例做爲參數的Scheduler
類對象自己,它被下一個類方法from_crawler
調用,這樣調用類方法後返回cls
就會調用這個類的__init__
方法再次初始化。@classmethod def from_settings(cls, settings): kwargs = { 'persist': settings.getbool('SCHEDULER_PERSIST'), ... } optional = { 'queue_key': 'SCHEDULER_QUEUE_KEY', ... } for name, setting_name in optional.items(): val = settings.get(setting_name) if val: kwargs[name] = val server = connection.from_settings(settings) server.ping() return cls(server=server, **kwargs)
scrapy是如何調用自定義scheduler的
from_crawler
也是類方法,它須要傳入一個crawler
對象做爲參數,便是本身項目中的crawler
;接着調用該類自己的類方法from_settings
,並將crawler.settings
做爲參數傳入,像上面說的同樣,就會獲得一個包含本身項目配置的redis
實例;獲取本身項目crawler
的stats
狀態,返回實例。@classmethod def from_crawler(cls, crawler): instance = cls.from_settings(crawler.settings) instance.stats = crawler.stats return instance
open
,傳入一個spider
對象做爲參數;用load_object
模塊加載scrapy-redis
項目默認配置中的隊列類,默認爲scrapy_redis.queue.PriorityQueue
,傳入參數隊列類必須的參數server, spider, key, serializer
等(做者在queue.py
定義了3中隊列類,都是操做redis
的,等下咱們再看)。跟隊列類同樣,加載去重過濾類,調用這個類的類方法from_spider
,即dupefilter.py
裏的類方法,而from_spider
,傳入一個本身項目的spider
對象,獲取spider
對象對應的配置,獲取一個鏈接到本身配置的redis
地址的redis
對象,而後將本身項目的spider name
結合scrapy-redis
的默認配置生成spider_name:dupefilter
做爲過濾去重的redis key
,搜索官網可知debug
爲本身項目的默認值False
,最後返回調用此類方法的對象自己,這樣就獲得了一個鏈接到本身配置的redis
實例和本身項目配置及默認配置的spider
對象這二者結合的對象;接着判斷是否清空redis
的去重隊列,默認不清空;經過判斷隊列長度判斷是否還有請求在爬取。(這個過程感受挺難理解的,下一篇筆記會用視頻記錄pycharm debug類方法調用的過程)def open(self, spider): self.spider = spider ... self.df = load_object(self.dupefilter_cls).from_spider(spider) ...
dupefilter.py
的類方法from_spider
class RFPDupeFilter(BaseDupeFilter): @classmethod def from_spider(cls, spider): settings = spider.settings server = get_redis_from_settings(settings) dupefilter_key = settings.get("SCHEDULER_DUPEFILTER_KEY", defaults.SCHEDULER_DUPEFILTER_KEY) key = dupefilter_key % {'spider': spider.name} debug = settings.getbool('DUPEFILTER_DEBUG') return cls(server, key=key, debug=debug)
close
和flush
函數,經過persist
來肯定是否清空去重隊列和請求隊列,默認False
,可是語句爲if not False
,即爲True
,因此默認會清空;close
的參數reason
爲scrapy
默認異常cancelled
操做。def close(self, reason): if not self.persist: self.flush() def flush(self): self.df.clear() self.queue.clear()
queue.py
的實例方法clear
def clear(self): """Clear queue/stack""" self.server.delete(self.key)
dupfilter.py
的實例方法clear
def clear(self): """Clears fingerprints data.""" self.server.delete(self.key)
enqueue_request
,顧名思義即爲入列請求的函數;傳入scrapy
的request
對象,判斷request
的dont_filter
參數,默認爲False
和上面獲得的去重過濾對象self.df
的request_seen
方法,它又調用request_fingerprint
方法,request_fingerprint
方法調用request
的默認方法request_fingerprint
來獲取請求的指紋,而後將指紋做爲值存入redis
的去重隊列中,若是存入成功,則redis
返回0,即該請求的指紋沒有重複,返回added == 0
,if not request.dont_filter and self.df.request_seen(request)
也就是if not False and 0 == 0
時,調用去重過濾對象self.df
的log
函數,當這個函數的參數debug
爲True
時啓用log
庫的debug
模式記錄日誌,不然記錄日誌添加no more duplicates will be shown
,記錄後將參數self.logdupes
設置爲False
,以後返回False
;接着,用scrapy
的stats.inc_value
收集統計spider
的狀態(不太理解這個函數,有知道的童鞋能夠告知下,謝謝),最後入列請求後返回True
。def enqueue_request(self, request): if not request.dont_filter and self.df.request_seen(request): self.df.log(request, self.spider) return False if self.stats: self.stats.inc_value('scheduler/enqueued/redis', spider=self.spider) self.queue.push(request) return True
dupefilter.py
的實例方法request_seen
def request_seen(self, request): fp = self.request_fingerprint(request) added = self.server.sadd(self.key, fp) return added == 0
dupefilter.py
的實例方法log
def log(self, request, spider): if self.debug: msg = "Filtered duplicate request: %(request)s" self.logger.debug(msg, {'request': request}, extra={'spider': spider}) elif self.logdupes: msg = ("Filtered duplicate request %(request)s" " - no more duplicates will be shown" " (see DUPEFILTER_DEBUG to show all duplicates)") self.logger.debug(msg, {'request': request}, extra={'spider': spider}) self.logdupes = False
next_request
函數;block_pop_timeout
爲默認值0,調用redis
的pop
每隔0秒從隊列取出一個請求,取出操做使用redis
的pipeline
,要先執行multi()
操做,而後執行取請求操做zrange(0, 0)
取出一個請求並用zremrangebyrank(0, 0)
刪除這個索引對應的請求,而後執行execute
獲取結果;最後返回解碼json
格式後的結果;若是取出了請求而且狀態不爲None
時,用scrapy
的stats.inc_value
收集統計spider
的狀態,以後返回request
請求。def next_request(self): block_pop_timeout = self.idle_before_close request = self.queue.pop(block_pop_timeout) if request and self.stats: self.stats.inc_value('scheduler/dequeued/redis', spider=self.spider) return request
queue.py
的實例方法pop
def pop(self, timeout=0): pipe = self.server.pipeline() pipe.multi() pipe.zrange(self.key, 0, 0).zremrangebyrank(self.key, 0, 0) results, count = pipe.execute() if results: return self._decode_request(results[0])
在本身項目的settings
替換成scrapy-redis
的Scheduler
後,就會將項目的crawler
對象傳入到Scheduler
類中,Scheduler
類會生成一個本身定義的redis
對象,一個去重過濾的df
對象,一個存取請求的隊列對象queue
,再進行一些spider
對象狀態值的統計,請求的入列、清空等操做,通過這部分就實現了去重過濾功能了。dupefilter.py
的方法基本在這裏都用到了,因此就不單獨再分析了。
scheduler部分只用到了PriorityQueue
有優先級的隊列。做者其實在queue.py
中實現了FifoQueue
、PriorityQueue
、LifoQueue
3中操做redis
的方案,下面逐一看一下。
先看3個類都繼承了的基類Base
;初始化須要一個redis
客戶端實例server
,一個spider
實例,一個key
,一個默認爲None
的serializer
;若是serializer
爲None
則pass
,異常處理,若是沒有loads
、dumps
則報錯;初始化參數。
接下來是私有方法_encode_request
和_decode_request
,這裏用到了scrapy
內置的函數request_to_dict
和request_from_dict
來實現;_encode_request
將請求轉爲字典類型,用serializer.dumps
轉換成json
格式類型的數據並返回;_decode_request
也相似,過程相反,將json
類型數據轉爲字典類型後返回。
def _encode_request(self, request): """Encode a request object""" obj = request_to_dict(request, self.spider) return self.serializer.dumps(obj) def _decode_request(self, encoded_request): """Decode an request previously encoded""" obj = self.serializer.loads(encoded_request) return request_from_dict(obj, self.spider)
__len__
、push
、pop
等,它們都是沒有實現的,clear
則是通用的實現,用於刪除redis
指定key
FifoQueue
類實現了基類Base
沒有實現的__len__
、push
、pop
方法;__len__
返回列表類型的redis
隊列長度;
push
將編碼成json
格式的數據存入列表類型的redis
隊列;
pop
判斷timeout
參數是否大於0,是則使用redis
的brpop(key, timeout)
方法,當這個key
裏面沒有值時會等待n秒後才返回tuple
類型的數據,返回第一個是key
鍵,第二個是值;若是timeout
不大於0,則用rpop
方法刪除並獲取列表中的最後一個元素,當隊列裏面沒有值時,2種方法都會返回None
,即data
爲None
,最後若是data
不爲None
返回解碼後的請求數據。
def pop(self, timeout=0): """Pop a request""" if timeout > 0: data = self.server.brpop(self.key, timeout) if isinstance(data, tuple): data = data[1] else: data = self.server.rpop(self.key) if data: return self._decode_request(data)
與FifoQueue
類相似,PriorityQueue
類也實現了基類Base
沒有實現的__len__
、push
、pop
方法,不過這裏用的是redis
的有序集合sorted set
;__len__
方法用zcard
獲取有序集合長度;
push
方法用_encode_request
獲取請求,設置了score
值爲scrapy
的內置屬性-request.priority
默認值爲0,最後用redis
的zadd
方法將key
即spider.name
,score
,data
即request
請求等添加到有序集合中。
def push(self, request): data = self._encode_request(request) score = -request.priority self.server.execute_command('ZADD', self.key, score, data)
pop
在scheduler
部分調用的時候已經分析過了。就是調用redis
的pop
每隔0秒從隊列取出一個請求,取出操做使用redis
的pipeline
,要先執行multi()
操做,而後執行取請求操做zrange(0, 0)
取出一個請求並用zremrangebyrank(0, 0)
刪除這個索引對應的請求,而後執行execute
獲取結果,若是取出告終果就返回解碼後的results[0]
即request
對象。def pop(self, timeout=0): pipe = self.server.pipeline() pipe.multi() pipe.zrange(self.key, 0, 0).zremrangebyrank(self.key, 0, 0) results, count = pipe.execute() if results: return self._decode_request(results[0])
LifoQueue
類也實現了__len__
、push
、pop
方法;__len__
、push
方法跟FifoQueue
類的一致,再也不贅述;pop
方法,其實也差很少,不過FifoQueue
類用的是redis
的brpop
和rpop
用於從最早進入隊列刪除key
(最舊),LifoQueue
類用的是blpop
和lpop
用於從最後進入隊列刪除key
(最新)。def pop(self, timeout=0): if timeout > 0: data = self.server.blpop(self.key, timeout) if isinstance(data, tuple): data = data[1] else: data = self.server.lpop(self.key) if data: return self._decode_request(data
做者實現了3中隊列類,先進先出隊列、優先級隊列、後進先出隊列,項目用到的是優先級隊列。
其實到這裏已經可以知足個人去重過濾、增量爬取的需求了。但做者還提供了本身的spiders.py
來執行爬取請求,和pipelines.py
來將數據存儲到redis
的功能,有興趣的接着看。
先看spider部分,這裏定義了3個類,RedisMixin
類用於實現從redis隊列讀取urls
,RedisSpider
類繼承RedisMixin
和scrapy的Spider
類;RedisCrawlSpider
繼承RedisMixin
和scrapy的CrawlerSpider
類;都用於空閒時從redis隊列中讀取爬取的請求urls
。
先看看RedisSpider
類和RedisCrawlSpider
。它們都實現了scrapy
的一個類方法from_crawler
。這個類方法是幹嗎的呢,不知道,因此去官網文檔搜索下Spider from_crawler
,發現method -- scrapy.spiders.Spider.from_crawler -- in Spiders
這個內容比較符合咱們想找的類方法。進去文檔搜索from_crawler
,的確找到如出一轍的類方法from_crawler(crawler, *args, **kwargs)
,它是scrapy下的class scrapy.spiders.Spider
類下的類方法,用來建立spider對象。
RedisSpider
類和RedisCrawlSpider
功能是同樣的,就拿RedisSpider
這個類來講吧,調用from_crawler
類方法,裏面的super
會繼承scrapy.spiders.CrawlSpider
建立的spider
對象,而後RedisSpider
類也就具備了spider
對象的全部屬性和方法,同時又繼承了RedisMixin
類,那麼類RedisSpider
又具備了RedisMixin
類的全部屬性和方法,因此就能夠調用RedisMixin
類裏的setup_redis
方法了。(這個過程感受挺難理解的,下一篇筆記會用視頻記錄pycharm debug類方法調用的過程)
from scrapy.spiders import Spider, CrawlSpider class RedisMixin(object): def setup_redis(self, crawler=None): pass class RedisSpider(RedisMixin, Spider): @classmethod def from_crawler(self, crawler, *args, **kwargs): obj = super(RedisSpider, self).from_crawler(crawler, *args, **kwargs) obj.setup_redis(crawler) return obj class RedisCrawlSpider(RedisMixin, CrawlSpider): @classmethod def from_crawler(self, crawler, *args, **kwargs): obj = super(RedisCrawlSpider, self).from_crawler(crawler, *args, **kwargs) obj.setup_redis(crawler) return obj
接着說RedisMixin
類,通過調用類方法from_crawler
,RedisMixin
類已經具備了spider
對象的全部屬性和方法,那麼就能夠在RedisMixin
類裏面使用它們了。
首先這個類定義了start_requests
函數直接返回next_requests
函數,next_requests
函數返回一個要調度的request
或返回none
;
next_requests
函數具體實現:先設置了個標誌位use_set
,其名爲REDIS_START_URLS_AS_SET
,其值爲default.py
設置的默認值False
;由於use_set
爲False
,因此fetch_one
調用上面說過的init部分生成的redis
實例的spop
方法,不然調用lpop
方法;初始化時found
爲0,進入循環,redis_batch_size
的值爲scrapy項目的settings.py
設置的CONCURRENT_REQUESTS
的值,默認併發值是16;調用fetch_one
從redis
獲取一個redis_key
即通過去重過濾的請求url
,若是沒有獲取到請求就說明隊列爲空,跳出循環;有的話接着調用make_request_from_data
方法將字節類型url
編碼成str
類型再返回(這個函數返回make_requests_from_url
,但我找不到哪裏有定義,不知道是否是做者寫錯了,在github問也沒人回答。。);若是有返回,則用yield
同時處理多個請求url
,而後將請求個數加一,並將日誌輸出。
def next_requests(self): use_set = self.settings.getbool('REDIS_START_URLS_AS_SET', defaults.START_URLS_AS_SET) fetch_one = self.server.spop if use_set else self.server.lpop found = 0 while found < self.redis_batch_size: data = fetch_one(self.redis_key) if not data: # Queue empty. break req = self.make_request_from_data(data) if req: yield req found += 1 else: self.logger.debug("Request not made from data: %r", data) if found: self.logger.debug("Read %s requests from '%s'", found, self.redis_key) def make_request_from_data(self, data): url = bytes_to_str(data, self.redis_encoding) return self.make_requests_from_url(url)
setup_redis
方法,按照註釋,這是用於設置redis
鏈接和空閒信號的,須要在spider
對象設置了它的crawler
對象以後才能夠被調用,也就是要使用上面提到的RedisSpider
和RedisCrawlSpider
兩個類以後,繼承了spider
和crawler
以後才行;這個方法須要傳入參數crawler
,默認爲None
,若是傳入值爲None
則會報錯,提示crawler is required
;若是沒報錯說明已經繼承了spider
和crawler
對象,那接着就是從crawler
對象獲取配置信息等屬性;接着判斷redis
隊列中是否有請求url
,若是沒有則將本項目 的配置賦值給redis_key
,而後格式化成'name': 本身項目爬蟲名
這樣的格式;接着用字符串的strip()
方法判斷redis_key
是否爲空字符串,是則報錯;而後判斷redis_batch_size
,若是爲None
則將默認值賦值給它,而後異常處理redis_batch_size
是否爲整形;再來就是判斷redis_encoding
,爲None
則將默認值賦值給它;判斷參數都ok
後,將參數信息做爲日誌輸出;而後如init
部分分析的,生成一個redis
客戶端對象;最後調用crawler.signals.connect
方法,這個方法調用spider_idle
方法,spider_idle
方法又調用schedule_next_requests
方法,schedule_next_requests
方法調用next_requests
方法從reids
隊列來獲取新的請求url
,而後用scrapy
的crawler.engine.crawl
方法在爬蟲空閒時來獲取並執行爬取請求,執行完了返回到spider_idle
執行raise DontCloseSpider
來禁止關閉爬蟲spider
,正常來講執行完了請求就會關閉爬蟲,到沒法再獲取到新的請求時,也就是redis
請求隊列沒有請求了纔會關閉spider
。(其實這裏有點蒙,做者用了DontCloseSpider
來禁止關閉爬蟲spider
,最後是怎麼關閉爬蟲的?有知道的童鞋望告知,謝謝!)class RedisMixin(object): """Mixin class to implement reading urls from a redis queue.""" redis_key = None redis_batch_size = None redis_encoding = None server = None def setup_redis(self, crawler=None): #...忽略部份內容 self.server = connection.from_settings(crawler.settings) # The idle signal is called when the spider has no requests left, # that's when we will schedule new requests from redis queue crawler.signals.connect(self.spider_idle, signal=signals.spider_idle) def schedule_next_requests(self): # TODO: While there is capacity, schedule a batch of redis requests. for req in self.next_requests(): self.crawler.engine.crawl(req, spider=self) def spider_idle(self): """Schedules a request if available, otherwise waits.""" # XXX: Handle a sentinel to close the spider. self.schedule_next_requests() raise DontCloseSpider
根據scrapy
的原理可知,通過spider
模塊engine
模塊scheduler
模塊後到達pipelines
模塊,請求url
爬取的內容將在這裏被處理。
根據文檔,自定義pipeline
要實現from_crawler
、process_item
、open_spider
、close_spider
這幾個方法(這裏沒有實現open_spider
、close_spider
,有點不理解)。
先看__init__
,初始化一個server
對象,也就是本身的redis
客戶端對象,一個用於存儲爬取數據的item
的key
,還有個用於編碼成json
格式的序列化函數,默認使用ScrapyJSONEncoder().encode
接着是2個類方法,from_crawler
類方法調用from_settings
類方法,from_settings
類方法首先從項目配置settings.py
讀取配置,若是讀取到有REDIS_ITEMS_KEY
這個關鍵字就將其做爲參數添加到params
字典中,REDIS_ITEMS_SERIALIZER
也是同樣,最後return cls(**params)
,此時類方法就會再次初始化,將params
字典中的參數賦值給__init__
中的參數。(這裏的類方法跟上面scheduler
,spiders
提到的是相似的)
而後是實現process_item
方法,它調用deferToThread
方法,這個方法的做用是在線程中運行函數,並將結果做爲延遲返回。這個方法傳入一個私有方法_process_item
來處理item
,首先調用item_key
格式化,將spidername:items
做爲key
,而後序列化items
的內容做爲值,最後用rpush
將鍵值對存入redis
並返回item
,讓它繼續走完scrapy
的流程。
通過spiders.py
和pipelines.py
後就能夠將爬蟲爬取的內容存儲到本身的redis
了。
若是有童鞋看到了最後,手動給你點個贊!真有耐心,哈哈。本人水平有限,有問題歡迎留言交流,謝謝。
scrapy過濾重複數據和增量爬取
redis基礎筆記
scrapy電影天堂實戰(二)建立爬蟲項目
scrapy電影天堂實戰(一)建立數據庫
scrapy基礎筆記
在docker鏡像中加入環境變量
筆記 | mongodb 入門操做
筆記 | python元類
筆記 | python2和python3使用super()
那些你在python3中可能沒用到但應該用的東西
superset docker 部署
開機啓動容器裏面的程序
博客 | 三步部署hitchhiker-api