數據採集: scrapy-redis源碼分析(一)

通常咱們使用python來寫爬蟲會優先選擇scrapy框架, 框架自己基於異步網絡請求性能比較高, 另外對併發控制, 延遲請求支持的比較好, 可使咱們專一於爬蟲的邏輯. 可是scrapy僅僅支持單機的爬蟲, 若是要支持分佈式的話還須要藉助scrapy-redis來實現. 接下來咱們主要關注scrapy-redis的實現方式, 瞭解實現原理使用起來會更加順手.python

scrapy-redis的目錄結構以下, 各個模塊功能見註釋redis

├── __init__.py
├── connection.py   # 負責redis的鏈接
├── defaults.py    # 一些默認參數配置
├── dupefilter.py    # 用於請求隊列的去重, 繼承了scrapy自己的去重器
├── picklecompat.py    # 使用pickle進行序列化
├── pipelines.py    # 會把item丟進redis中去
├── queue.py    # 調度隊列, 調度器會使用該隊列
├── scheduler.py    # 調度器, 負責任務的調度工做
├── spiders.py    # spider基類, 加入了信號等
└── utils.py

queue.py

支持三種隊列, 都繼承自Base算法

1. FIFO Queue

使用了redis的list結構網絡

class FifoQueue(Base):
    def __len__(self):
        """返回隊列長度大小"""
        return self.server.llen(self.key)

    def push(self, request):
        """發送請求到隊列左邊"""
        self.server.lpush(self.key, self._encode_request(request))

    def pop(self, timeout=0):
        """從隊列右邊拋出請求"""
        if timeout > 0:
            data = self.server.brpop(self.key, timeout)
            if isinstance(data, tuple):
                data = data[1]
        else:
            data = self.server.rpop(self.key)
        if data:
            return self._decode_request(data)

2. PriorityQueue

使用了redis的有序集合結構數據結構

class PriorityQueue(Base):

    def __len__(self):
        """返回隊列內長度大小"""
        return self.server.zcard(self.key)

    def push(self, request):
        """放入請求到zset中"""
        data = self._encode_request(request)
        score = -request.priority
        self.server.execute_command('ZADD', self.key, score, data)

    def pop(self, timeout=0):
        """從zset中拋出請求. 此處不支持timeout參數"""
        pipe = self.server.pipeline()
        pipe.multi()
        pipe.zrange(self.key, 0, 0).zremrangebyrank(self.key, 0, 0)
        results, count = pipe.execute()
        if results:
            return self._decode_request(results[0])

使用redis的sorted set實現, 若是在spider腳本中須要指定priority的話, 必定要在settings中來聲明使用的是PriorityQueue.併發

3. LIFO Queue

後入先出, 使用list結構實現框架

class LifoQueue(Base):
    """Per-spider LIFO queue."""

    def __len__(self):
        """Return the length of the stack"""
        return self.server.llen(self.key)

    def push(self, request):
        """Push a request"""
        self.server.lpush(self.key, self._encode_request(request))

    def pop(self, timeout=0):
        """Pop a request"""
        if timeout > 0:
            data = self.server.blpop(self.key, timeout)
            if isinstance(data, tuple):
                data = data[1]
        else:
            data = self.server.lpop(self.key)

        if data:
            return self._decode_request(data)

和先進先出隊列基本同樣, 實現了棧結構異步

dupefilter.py

scrapy默認使用了集合結構來進行去重, 在scrapy-redis中使用redis的集合(set)進行了替換, 請求指紋的計算方法仍是用的內置的.scrapy

def request_seen(self, request):
    """獲取請求指紋並添加到redis的去重集合中去"""
    fp = self.request_fingerprint(request)    # 獲得請求的指紋
    added = self.server.sadd(self.key, fp)    # 把指紋添加到redis的集合中
    return added == 0

def request_fingerprint(self, request):
    return request_fingerprint(request)    # 獲得請求指紋

去重指紋計算使用的是sha1算法, 計算值包括請求方法, url, body等信息數據結構和算法

scheduler.py

替換了scrapy原生的scheduler, 全部方法名稱和原生scheduler保持一致, 在爬蟲開啓後會鏈接待抓取隊列和去重集合, 而後就是不斷把新的請求去重後放入待抓取隊列, 而後從待抓取隊列拿出請求給下載器

def open(self, spider):
      """爬蟲啓動時觸發, 主要是鏈接待抓取和去重模塊"""
      pass
      
    def enqueue_request(self, request):
        """
        把請求去重後放入 待抓取隊列中
        Parameters
        ----------
        request

        Returns
        -------

        """
        if not request.dont_filter and self.df.request_seen(request):
            self.df.log(request, self.spider)
            return False
        if self.stats:
            self.stats.inc_value('scheduler/enqueued/redis', spider=self.spider)
        self.queue.push(request)
        return True

    def next_request(self):
        """
        從請求隊列拿出下一個請求並返回
        Returns
        -------

        """
        block_pop_timeout = self.idle_before_close
        request = self.queue.pop(block_pop_timeout)
        if request and self.stats:
            self.stats.inc_value('scheduler/dequeued/redis', spider=self.spider)
        return request

調度器確定要和請求隊列去重隊列進行交互, 因此初始化要獲取使用的queuedupfilter的類, 並在open方法中完成實例化

open方法

def open(self, spider):
        self.spider = spider

        try:
            # 獲得隊列queue的實例化對象
            self.queue = load_object(self.queue_cls)(
                server=self.server,
                spider=spider,
                key=self.queue_key % {'spider': spider.name},
                serializer=self.serializer,
            )
        except TypeError as e:
            raise ValueError("Failed to instantiate queue class '%s': %s",
                             self.queue_cls, e)

        try:
            # 獲得去重的實例化對象
            self.df = load_object(self.dupefilter_cls)(
                server=self.server,
                key=self.dupefilter_key % {'spider': spider.name},
                debug=spider.settings.getbool('DUPEFILTER_DEBUG'),
            )
        except TypeError as e:
            raise ValueError("Failed to instantiate dupefilter class '%s': %s",
                             self.dupefilter_cls, e)

        if self.flush_on_start:    # 若是爲True, 要在爬蟲開啓前刪除對應爬蟲request隊列和dupfilter隊列
            self.flush()
        # notice if there are requests already in the queue to resume the crawl
        if len(self.queue):
            spider.log("Resuming crawl (%d requests scheduled)" % len(self.queue))

spider.py

spider空閒的時候會從start_urls隊列中讀取url, 默認一次讀取CONCURRENT_REQUESTS個url, 能夠在settings中設置REDIS_START_URLS_BATCH_SIZE來改變每次的讀取數量, 通常我會在使用的時候增大這個值, 能夠下降spide進入idle的次數, 從而適當提高抓取性能

def setup_redis(self, crawler=None):
        """初始化了redis參數, 包括使用的種子url的key, 批量讀取url的數量等信息"""
        ......
        # 當spider空閒的時候會觸發該信號, 調用spider_idle函數
        crawler.signals.connect(self.spider_idle, signal=signals.spider_idle)
      
    def spider_idle(self):
      """空閒的時候觸發該函數, 嘗試請求下一批url. 有url的時候會直接請求, 最後都會拋出異常, 防止spider被關閉, 而後等待新的url過來"""
      self.schedule_next_requests()
      raise DontCloseSpider

以上是scrapy-redis的基本分析, 能夠發現源碼中大量使用了基礎的數據結構和算法的知識, 不太熟悉的同窗能夠參看以前的文章, 我會在接下來的時間繼續分享兩篇使用技巧和一些實用的特性.

文章會首發在公衆號, 歡迎你們關注及時查看.

相關文章
相關標籤/搜索