Scrapy-Redis庫已經爲咱們提供了Scrapy分佈式的隊列、調度器、去重等功能,其GitHub地址爲:https://github.com/rmax/scrapy-redis。
git
本節咱們深刻了解一下,利用Redis如何實現Scrapy分佈式。github
能夠把源碼Clone下來,執行以下命令:redis
git clone https://github.com/rmax/scrapy-redis.git
核心源碼在scrapy-redis/src/scrapy_redis目錄下。數據庫
從爬取隊列入手,看看它的具體實現。源碼文件爲queue.py,它有三個隊列的實現,首先它實現了一個父類Base
,提供一些基本方法和屬性,以下所示:數據結構
class Base(object): """Per-spider base queue class""" def __init__(self, server, spider, key, serializer=None): if serializer is None: serializer = picklecompat
if not hasattr(serializer, 'loads'):
raise TypeError("serializer does not implement 'loads' function: %r" % serializer)
if not hasattr(serializer, 'dumps'):
raise TypeError("serializer '%s' does not implement 'dumps' function: %r" % serializer) self.server = server self.spider = spider self.key = key % {'spider': spider.name} self.serializer = serializer
def _encode_request(self, request): obj = request_to_dict(request, self.spider)
return self.serializer.dumps(obj)
def _decode_request(self, encoded_request): obj = self.serializer.loads(encoded_request)
return request_from_dict(obj, self.spider)
def __len__(self): """Return the length of the queue""" raise NotImplementedError
def push(self, request): """Push a request""" raise NotImplementedError
def pop(self, timeout=0): """Pop a request""" raise NotImplementedError
def clear(self): """Clear queue/stack""" self.server.delete(self.key)
首先看一下_encode_request()和_decode_request()
方法。咱們要把一個Request對象存儲到數據庫中,但數據庫沒法直接存儲對象,因此先要將Request序列化轉成字符串,而這兩個方法分別能夠實現序列化和反序列化的操做,這個過程能夠利用pickle庫來實現。隊列Queue在調用push()
方法將Request存入數據庫時,會調用_encode_request()
方法進行序列化,在調用pop()
取出Request時,會調用_decode_request()
進行反序列化。app
在父類中,__len__()
、push()
和pop()
這三個方法都是未實現的,三個方法直接拋出NotImplementedError
異常,所以這個類不能直接使用。那麼,必需要實現一個子類來重寫這三個方法,而不一樣的子類就會有不一樣的實現和不一樣的功能。scrapy
接下來咱們定義一些子類來繼承Base
類,並重寫這幾個方法。在源碼中有三個子類的實現,它們分別是FifoQueue
、PriorityQueue
、LifoQueue
,咱們分別來看看它們的實現原理。分佈式
首先是FifoQueue
,以下所示:ide
class FifoQueue(Base): """Per-spider FIFO queue""" def __len__(self): """Return the length of the queue""" return self.server.llen(self.key)
def push(self, request): """Push a request""" self.server.lpush(self.key, self._encode_request(request))
def pop(self, timeout=0): """Pop a request""" if timeout > 0: data = self.server.brpop(self.key, timeout)
if isinstance(data, tuple): data = data[1]
else: data = self.server.rpop(self.key)
if data:
return self._decode_request(data)
這個類繼承了Base
類,並重寫了__len__()
、push()
、pop()
三個方法,這三個方法都是對server
對象的操做。server
對象就是一個Redis鏈接對象,咱們能夠直接調用其操做Redis的方法對數據庫進行操做,這裏的操做方法有llen()
、lpush()
、rpop()
等,這就表明此爬取隊列使用了Redis的列表。序列化後的Request會存入列表中,__len__()
方法獲取列表的長度,push()
方法調用了lpush()
操做,這表明從列表左側存入數據,pop()
方法中調用了rpop()
操做,這表明從列表右側取出數據。this
Request在列表中的存取順序是左側進、右側出,這是有序的進出,即先進先出(First Input First Output,FIFO),此類的名稱就叫做FifoQueue
。
還有一個與之相反的實現類,叫做LifoQueue
,實現以下:
class LifoQueue(Base): """Per-spider LIFO queue.""" def __len__(self): """Return the length of the stack""" return self.server.llen(self.key)
def push(self, request): """Push a request""" self.server.lpush(self.key, self._encode_request(request))
def pop(self, timeout=0): """Pop a request""" if timeout > 0: data = self.server.blpop(self.key, timeout)
if isinstance(data, tuple): data = data[1]
else: data = self.server.lpop(self.key)
if data:
return self._decode_request(data)
與FifoQueue
不一樣的是LifoQueue
的pop()
方法,它使用的是lpop()
操做,也就是從左側出,push()
方法依然使用lpush()
操做,從左側入。那麼效果就是先進後出、後進先出(Last In First Out,LIFO),此類名稱就叫做LifoQueue
。這個存取方式相似棧的操做,因此也能夠稱做StackQueue
。
在源碼中還有一個子類叫做PriorityQueue
,顧名思義,它是優先級隊列,實現以下:
class PriorityQueue(Base): """Per-spider priority queue abstraction using redis' sorted set""" def __len__(self): """Return the length of the queue""" return self.server.zcard(self.key)
def push(self, request): """Push a request""" data = self._encode_request(request) score = -request.priority self.server.execute_command('ZADD', self.key, score, data)
def pop(self, timeout=0): """ Pop a request timeout not support in this queue class """ pipe = self.server.pipeline() pipe.multi() pipe.zrange(self.key, 0, 0).zremrangebyrank(self.key, 0, 0) results, count = pipe.execute()
if results:
return self._decode_request(results[0])
在這裏__len__()
、push()
、pop()
方法使用了server
對象的zcard()
、zadd()
、zrange()
操做,這裏使用的存儲結果是有序集合,這個集合中的每一個元素均可以設置一個分數,這個分數就表明優先級。
__len__()
方法調用了zcard()
操做,返回的就是有序集合的大小,也就是爬取隊列的長度。push()
方法調用了zadd()
操做,就是向集合中添加元素,這裏的分數指定成Request的優先級的相反數,分數低的會排在集合的前面,即高優先級的Request就會在集合的最前面。pop()
方法首先調用了zrange()
操做,取出集合的第一個元素,第一個元素就是最高優先級的Request,而後再調用zremrangebyrank()
操做,將這個元素刪除,這樣就完成了取出並刪除的操做。
此隊列是默認使用的隊列,即爬取隊列默認是使用有序集合來存儲的。
前面說過Scrapy的去重是利用集合來實現的,而在Scrapy分佈式中的去重就須要利用共享的集合,那麼這裏使用的就是Redis中的集合數據結構。咱們來看看去重類是怎樣實現的,源碼文件是dupefilter.py,其內實現了一個RFPDupeFilter
類,以下所示:
class RFPDupeFilter(BaseDupeFilter): """Redis-based request duplicates filter. This class can also be used with default Scrapy's scheduler. """ logger = logger
def __init__(self, server, key, debug=False): """Initialize the duplicates filter. Parameters ---------- server : redis.StrictRedis The redis server instance. key : str Redis key Where to store fingerprints. debug : bool, optional Whether to log filtered requests. """ self.server = server self.key = key self.debug = debug self.logdupes = True @classmethod def from_settings(cls, settings): """Returns an instance from given settings. This uses by default the key ``dupefilter:<timestamp>``. When using the ``scrapy_redis.scheduler.Scheduler`` class, this method is not used as it needs to pass the spider name in the key. Parameters ---------- settings : scrapy.settings.Settings Returns ------- RFPDupeFilter A RFPDupeFilter instance. """ server = get_redis_from_settings(settings) key = defaults.DUPEFILTER_KEY % {'timestamp': int(time.time())} debug = settings.getbool('DUPEFILTER_DEBUG')
return cls(server, key=key, debug=debug)
@classmethod def from_crawler(cls, crawler): """Returns instance from crawler. Parameters ---------- crawler : scrapy.crawler.Crawler Returns ------- RFPDupeFilter Instance of RFPDupeFilter. """ return cls.from_settings(crawler.settings)
def request_seen(self, request): """Returns True if request was already seen. Parameters ---------- request : scrapy.http.Request Returns ------- bool """ fp = self.request_fingerprint(request) added = self.server.sadd(self.key, fp)
return added == 0 def request_fingerprint(self, request): """Returns a fingerprint for a given request. Parameters ---------- request : scrapy.http.Request Returns ------- str """ return request_fingerprint(request)
def close(self, reason=''): """Delete data on close. Called by Scrapy's scheduler. Parameters ---------- reason : str, optional """ self.clear()
def clear(self): """Clears fingerprints data.""" self.server.delete(self.key)
def log(self, request, spider): """Logs given request. Parameters ---------- request : scrapy.http.Request spider : scrapy.spiders.Spider """ if self.debug: msg = "Filtered duplicate request: %(request)s" self.logger.debug(msg, {'request': request}, extra={'spider': spider})
elif self.logdupes: msg = ("Filtered duplicate request %(request)s" " - no more duplicates will be shown" " (see DUPEFILTER_DEBUG to show all duplicates)") self.logger.debug(msg, {'request': request}, extra={'spider': spider}) self.logdupes = False
這裏一樣實現了一個request_seen()
方法,和Scrapy中的request_seen()
方法實現極其相似。不過這裏集合使用的是server
對象的sadd()
操做,也就是集合再也不是一個簡單數據結構了,而是直接換成了數據庫的存儲方式。
鑑別重複的方式仍是使用指紋,指紋一樣是依靠request_fingerprint()
方法來獲取的。獲取指紋以後就直接向集合添加指紋,若是添加成功,說明這個指紋本來不存在於集合中,返回值1。代碼中最後的返回結果是斷定添加結果是否爲0,若是剛纔的返回值爲1,那這個斷定結果就是False
,也就是不重複,不然斷定爲重複。
這樣咱們就成功利用Redis的集合完成了指紋的記錄和重複的驗證。
Scrapy-Redis還幫咱們實現了配合Queue、DupeFilter使用的調度器Scheduler,源文件名稱是scheduler.py。咱們能夠指定一些配置,如SCHEDULER_FLUSH_ON_START
便是否在爬取開始的時候清空爬取隊列,SCHEDULER_PERSIST
便是否在爬取結束後保持爬取隊列不清除。咱們能夠在settings.py裏自由配置,而此調度器很好地實現了對接。
接下來咱們看看兩個核心的存取方法,實現以下所示:
def enqueue_request(self, request): if not request.dont_filter and self.df.request_seen(request): self.df.log(request, self.spider)
return False if self.stats: self.stats.inc_value('scheduler/enqueued/redis', spider=self.spider) self.queue.push(request)
return True
def next_request(self): block_pop_timeout = self.idle_before_close request = self.queue.pop(block_pop_timeout)
if request and self.stats: self.stats.inc_value('scheduler/dequeued/redis', spider=self.spider)
return request
enqueue_request()
能夠向隊列中添加Request,核心操做就是調用Queue的push()
操做,還有一些統計和日誌操做。next_request()
就是從隊列中取Request,核心操做就是調用Queue的pop()
操做,此時若是隊列中還有Request,則Request會直接取出來,爬取繼續,不然若是隊列爲空,爬取則會從新開始。
目前爲止,咱們就以前所說的三個分佈式的問題解決了,總結以下。
爬取隊列的實現。這裏提供了三種隊列,使用了Redis的列表或有序集合來維護。
去重的實現。這裏使用了Redis的集合來保存Request的指紋,以提供重複過濾。
中斷後從新爬取的實現。中斷後Redis的隊列沒有清空,爬取再次啓動時,調度器的next_request()
會從隊列中取到下一個Request,爬取繼續。