通常咱們使用python來寫爬蟲會優先選擇scrapy框架, 框架自己基於異步網絡請求性能比較高, 另外對併發控制, 延遲請求支持的比較好, 可使咱們專一於爬蟲的邏輯. 可是scrapy僅僅支持單機的爬蟲, 若是要支持分佈式的話還須要藉助scrapy-redis
來實現. 接下來咱們主要關注scrapy-redis
的實現方式, 瞭解實現原理使用起來會更加順手.python
scrapy-redis的目錄結構以下, 各個模塊功能見註釋redis
├── __init__.py ├── connection.py # 負責redis的鏈接 ├── defaults.py # 一些默認參數配置 ├── dupefilter.py # 用於請求隊列的去重, 繼承了scrapy自己的去重器 ├── picklecompat.py # 使用pickle進行序列化 ├── pipelines.py # 會把item丟進redis中去 ├── queue.py # 調度隊列, 調度器會使用該隊列 ├── scheduler.py # 調度器, 負責任務的調度工做 ├── spiders.py # spider基類, 加入了信號等 └── utils.py
支持三種隊列, 都繼承自Base
類算法
使用了redis的list結構網絡
class FifoQueue(Base): def __len__(self): """返回隊列長度大小""" return self.server.llen(self.key) def push(self, request): """發送請求到隊列左邊""" self.server.lpush(self.key, self._encode_request(request)) def pop(self, timeout=0): """從隊列右邊拋出請求""" if timeout > 0: data = self.server.brpop(self.key, timeout) if isinstance(data, tuple): data = data[1] else: data = self.server.rpop(self.key) if data: return self._decode_request(data)
使用了redis的有序集合結構數據結構
class PriorityQueue(Base): def __len__(self): """返回隊列內長度大小""" return self.server.zcard(self.key) def push(self, request): """放入請求到zset中""" data = self._encode_request(request) score = -request.priority self.server.execute_command('ZADD', self.key, score, data) def pop(self, timeout=0): """從zset中拋出請求. 此處不支持timeout參數""" pipe = self.server.pipeline() pipe.multi() pipe.zrange(self.key, 0, 0).zremrangebyrank(self.key, 0, 0) results, count = pipe.execute() if results: return self._decode_request(results[0])
使用redis的sorted set
實現, 若是在spider腳本中須要指定priority
的話, 必定要在settings
中來聲明使用的是PriorityQueue
.併發
後入先出, 使用list結構實現框架
class LifoQueue(Base): """Per-spider LIFO queue.""" def __len__(self): """Return the length of the stack""" return self.server.llen(self.key) def push(self, request): """Push a request""" self.server.lpush(self.key, self._encode_request(request)) def pop(self, timeout=0): """Pop a request""" if timeout > 0: data = self.server.blpop(self.key, timeout) if isinstance(data, tuple): data = data[1] else: data = self.server.lpop(self.key) if data: return self._decode_request(data)
和先進先出隊列基本同樣, 實現了棧結構異步
scrapy默認使用了集合結構來進行去重, 在scrapy-redis中使用redis的集合(set)
進行了替換, 請求指紋的計算方法仍是用的內置的.scrapy
def request_seen(self, request): """獲取請求指紋並添加到redis的去重集合中去""" fp = self.request_fingerprint(request) # 獲得請求的指紋 added = self.server.sadd(self.key, fp) # 把指紋添加到redis的集合中 return added == 0 def request_fingerprint(self, request): return request_fingerprint(request) # 獲得請求指紋
去重指紋計算使用的是sha1算法, 計算值包括請求方法, url, body等信息數據結構和算法
替換了scrapy原生的scheduler, 全部方法名稱和原生scheduler保持一致, 在爬蟲開啓後會鏈接待抓取隊列和去重集合, 而後就是不斷把新的請求去重後放入待抓取隊列, 而後從待抓取隊列拿出請求給下載器
def open(self, spider): """爬蟲啓動時觸發, 主要是鏈接待抓取和去重模塊""" pass def enqueue_request(self, request): """ 把請求去重後放入 待抓取隊列中 Parameters ---------- request Returns ------- """ if not request.dont_filter and self.df.request_seen(request): self.df.log(request, self.spider) return False if self.stats: self.stats.inc_value('scheduler/enqueued/redis', spider=self.spider) self.queue.push(request) return True def next_request(self): """ 從請求隊列拿出下一個請求並返回 Returns ------- """ block_pop_timeout = self.idle_before_close request = self.queue.pop(block_pop_timeout) if request and self.stats: self.stats.inc_value('scheduler/dequeued/redis', spider=self.spider) return request
調度器確定要和請求隊列
和去重隊列
進行交互, 因此初始化要獲取使用的queue
和dupfilter
的類, 並在open
方法中完成實例化
def open(self, spider): self.spider = spider try: # 獲得隊列queue的實例化對象 self.queue = load_object(self.queue_cls)( server=self.server, spider=spider, key=self.queue_key % {'spider': spider.name}, serializer=self.serializer, ) except TypeError as e: raise ValueError("Failed to instantiate queue class '%s': %s", self.queue_cls, e) try: # 獲得去重的實例化對象 self.df = load_object(self.dupefilter_cls)( server=self.server, key=self.dupefilter_key % {'spider': spider.name}, debug=spider.settings.getbool('DUPEFILTER_DEBUG'), ) except TypeError as e: raise ValueError("Failed to instantiate dupefilter class '%s': %s", self.dupefilter_cls, e) if self.flush_on_start: # 若是爲True, 要在爬蟲開啓前刪除對應爬蟲request隊列和dupfilter隊列 self.flush() # notice if there are requests already in the queue to resume the crawl if len(self.queue): spider.log("Resuming crawl (%d requests scheduled)" % len(self.queue))
spider空閒的時候會從start_urls
隊列中讀取url, 默認一次讀取CONCURRENT_REQUESTS
個url, 能夠在settings
中設置REDIS_START_URLS_BATCH_SIZE
來改變每次的讀取數量, 通常我會在使用的時候增大這個值, 能夠下降spide進入idle的次數, 從而適當提高抓取性能
def setup_redis(self, crawler=None): """初始化了redis參數, 包括使用的種子url的key, 批量讀取url的數量等信息""" ...... # 當spider空閒的時候會觸發該信號, 調用spider_idle函數 crawler.signals.connect(self.spider_idle, signal=signals.spider_idle) def spider_idle(self): """空閒的時候觸發該函數, 嘗試請求下一批url. 有url的時候會直接請求, 最後都會拋出異常, 防止spider被關閉, 而後等待新的url過來""" self.schedule_next_requests() raise DontCloseSpider
以上是scrapy-redis
的基本分析, 能夠發現源碼中大量使用了基礎的數據結構和算法的知識, 不太熟悉的同窗能夠參看以前的文章, 我會在接下來的時間繼續分享兩篇使用技巧和一些實用的特性.
文章會首發在公衆號, 歡迎你們關注及時查看.