scrapy入門與進階

  • Scrapy是用純Python實現一個爲了爬取網站數據、提取結構性數據而編寫的應用框架,用途很是普遍。
  • 框架的力量,用戶只須要定製開發幾個模塊就能夠輕鬆的實現一個爬蟲,用來抓取網頁內容以及各類圖片,很是之方便。
  • Scrapy 使用了 Twisted異步網絡框架來處理網絡通信,能夠加快咱們的下載速度,不用本身去實現異步框架,而且包含了各類中間件接口,能夠靈活的完成各類需求。

scrapy流程圖
舊版
php

新版
css

組件及調用流程(數據流)

Scrapy Engine(引擎): 負責Spider、ItemPipeline、Downloader、Scheduler中間的通信,信號、數據傳遞等。html

Scheduler(調度器): 它負責接受引擎發送過來的Request請求,並按照必定的方式進行整理排列,入隊,當引擎須要時,交還給引擎。python

Downloader(下載器):負責下載Scrapy Engine(引擎)發送的全部Requests請求,並將其獲取到的Responses交還給Scrapy Engine(引擎),由引擎交給Spider來處理,git

Spider(爬蟲):它負責處理全部Responses,從中分析提取數據,獲取Item字段須要的數據,並將須要跟進的URL提交給引擎,再次進入Scheduler(調度器),github

Item Pipeline(管道):它負責處理Spider中獲取到的Item,並進行進行後期處理(詳細分析、過濾、存儲等)的地方.web

Downloader Middlewares(下載中間件):你能夠看成是一個能夠自定義擴展下載功能的組件。正則表達式

Spider Middlewares(Spider中間件):你能夠理解爲是一個能夠自定擴展和操做引擎和Spider中間通訊的功能組件(好比進入Spider的Responses;和從Spider出去的Requests)redis

數據流(Data flow)mongodb

  1. 引擎打開一個網站(open a domain),找處處理該網站的Spider並向該spider請求第一個要爬取的URL(s)。
  2. 引擎從Spider中獲取到第一個要爬取的URL並在調度器(Scheduler)以Request調度。
  3. 引擎向調度器請求下一個要爬取的URL。
  4. 調度器返回下一個要爬取的URL給引擎,引擎將URL經過下載中間件(請求(request)方向)轉發給下載器(Downloader)。
  5. 一旦頁面下載完畢,下載器生成一個該頁面的Response,並將其經過下載中間件(返回(response)方向)發送給引擎。
  6. 引擎從下載器中接收到Response並經過Spider中間件(輸入方向)發送給Spider處理。
  7. Spider處理Response並返回爬取到的Item及(跟進的)新的Request給引擎。
  8. 引擎將(Spider返回的)爬取到的Item給Item Pipeline,將(Spider返回的)Request給調度器。
  9. (從第二步)重複直到調度器中沒有更多地request,引擎關閉該網站。

引擎獲取起始url併發起請求,將獲取的響應內容返回給spider,
在spider中進行數據的提取和下一個url的連接,
數據交給item和pipeline進行處理,
url繼續發起請求,

編寫spider

製做 Scrapy 爬蟲 一共須要4步:

  1. 新建項目 (scrapy startproject xxx):新建一個新的爬蟲項目
  2. 明確目標 (編寫items.py):明確你想要抓取的目標
  3. 製做爬蟲 (spiders/xxspider.py):製做爬蟲開始爬取網頁
  4. 存儲內容 (pipelines.py):設計管道存儲爬取內容

命令行輸入
scrapy startproject tutorial

目錄結構

scrapy.cfg: 項目的配置文件;(用於發佈到服務器)
tutorial/: 該項目文件夾。以後將在此編寫Python代碼。
tutorial/items.py: 項目中的item文件;(定義結構化數據字段field).
tutorial/pipelines.py: 項目中的pipelines文件;(用於存放執行後期數據處理的功能,定義如何存儲結構化數據)
tutorial/settings.py: 項目的設置文件;(如何修改User-Agent,設置爬取時間間隔,設置代理,配置中間件等等)
tutorial/spiders/: 放置spider代碼的目錄;(編寫爬取網站規則)

定義item,在items.py文件中編寫item
相似與django

import scrapy

class DmozItem(scrapy.Item):
    title = scrapy.Field()
    link = scrapy.Field()
    desc = scrapy.Field()

編寫spider

Spider是用戶編寫用於從單個網站(或者一些網站)爬取數據的類。
其包含了一個用於下載的初始URL,如何跟進網頁中的連接以及如何分析頁面中的內容, 提取生成 item 的方法。

爲了建立一個Spider,您必須繼承scrapy.Spider 類, 且定義一些屬性:

  • name: 用於區別Spider。 該名字必須是惟一的。
  • start_urls: 包含了Spider在啓動時進行爬取的url列表。 所以,第一個被獲取到的頁面將是其中之一。 後續的URL則從初始的URL獲取到的數據中提取。
  • parse()spider的一個方法。 被調用時,每一個初始URL完成下載後生成的 Response 對象將會做爲惟一的參數傳遞給該函數。 該方法負責解析返回的數據(response data),提取數據(生成item)以及生成須要進一步處理的URL的 Request 對象。

scrapy genspider name "example.com"

import scrapy

class DmozSpider(scrapy.Spider):
    name = "dmoz"
    allowed_domains = ["dmoz.org"]
    start_urls = [
        "http://www.dmoz.org/Computers/Programming/Languages/Python/Books/",
        "http://www.dmoz.org/Computers/Programming/Languages/Python/Resources/"
    ]

    def parse(self, response):
        filename = response.url.split("/")[-2] + '.html'
        with open(filename, 'wb') as f:
            f.write(response.body)

啓動爬蟲

scrapy crawl dmoz

提取Item

Selectors選擇器簡介
https://scrapy-chs.readthedocs.io/zh_CN/1.0/topics/selectors.html
Scrapy Selectors內置XPathCSS Selector表達式機制

Selector有四個基本的方法:

  1. xpath(): 傳入xpath表達式,返回該表達式所對應的全部節點的selector list列表
  2. extract(): 序列化該節點爲Unicode字符串並返回list
  3. css(): 傳入CSS表達式,返回該表達式所對應的全部節點的selector list列表,語法同 BeautifulSoup4
  4. re(): 根據傳入的正則表達式對數據進行提取,返回Unicode字符串list列表

經過shell能夠很方便的提取出須要的數據

Item Pipelines

當Item在Spider中被收集以後,它將會被傳遞到Item Pipeline
每一個Item Pipeline組件接收到Item,定義一些操做行爲,好比決定此Item是丟棄而存儲。
如下是item pipeline的一些典型應用:

  1. 驗證爬取的數據(檢查item包含某些字段,好比說name字段)
  2. 查重(並丟棄)
  3. 將爬取結果保存到文件或者數據庫中

編寫item pipeline

編寫item pipeline很簡單,item pipiline組件是一個獨立的Python類,其中process_item()方法必須實現:

import something

class SomethingPipeline(object):
    def __init__(self):    
        # 可選實現,作參數初始化等
        # doing something

    def process_item(self, item, spider):
        # item (Item 對象) – 被爬取的item
        # spider (Spider 對象) – 爬取該item的spider
        # 這個方法必須實現,每一個item pipeline組件都須要調用該方法,
        # 這個方法必須返回一個 Item 對象,被丟棄的item將不會被以後的pipeline組件所處理。
        return item

    def open_spider(self, spider):
        # spider (Spider 對象) – 被開啓的spider
        # 可選實現,當spider被開啓時,這個方法被調用。

    def close_spider(self, spider):
        # spider (Spider 對象) – 被關閉的spider
        # 可選實現,當spider被關閉時,這個方法被調用

將item寫入json文件

import json

class JsonWriterPipeline(object):

    def __init__(self):
        self.file = open('items.json', 'wb')

    def process_item(self, item, spider):
        line = json.dumps(dict(item),ensure_ascii=False) + "\n"
        self.file.write(line)
        return item

啓用一個Item Pipeline組件

爲了啓用Item Pipeline組件,必須將它的類添加到 settings.py文件ITEM_PIPELINES 配置,就像下面這個例子:

ITEM_PIPELINES = {
    #'tutorial.pipelines.PricePipeline': 300,
    'tutorial.pipelines.JsonWriterPipeline': 800,
}

分配給每一個類的整型值,肯定了他們運行的順序,item按數字從低到高的順序,經過pipeline,一般將這些數字定義在0-1000範圍內。數值越低,越先運行

將item寫入MongoDB

pipeline中還有一個from_crawler(cls, crawler)類方法
若是使用,這個類方法被調用建立爬蟲管道實例。必須返回管道的一個新實例。crawler提供存取全部Scrapy核心組件配置和信號管理器; 對於pipelines這是一種訪問配置和信號管理器 的方式。

在這個例子中,咱們將使用pymongo將Item寫到MongoDB。MongoDB的地址和數據庫名稱在Scrapy setttings.py配置文件中;
這個例子主要是說明如何使用from_crawler()方法

import pymongo

class MongoPipeline(object):

    collection_name = 'scrapy_items'

    def __init__(self, mongo_uri, mongo_db):
        self.mongo_uri = mongo_uri
        self.mongo_db = mongo_db

    @classmethod
    def from_crawler(cls, crawler):
        return cls(
            mongo_uri=crawler.settings.get('MONGO_URI'),
            mongo_db=crawler.settings.get('MONGO_DATABASE', 'items')
        )

    def open_spider(self, spider):
        self.client = pymongo.MongoClient(self.mongo_uri)
        self.db = self.client[self.mongo_db]

    def close_spider(self, spider):
        self.client.close()

    def process_item(self, item, spider):
        self.db[self.collection_name].insert(dict(item))
        return item

Spiders

https://scrapy-chs.readthedocs.io/zh_CN/1.0/topics/spiders.html
Spider類定義瞭如何爬取某個(或某些)網站。包括了爬取的動做(例如:是否跟進連接)以及如何從網頁的內容中提取結構化數據(爬取item)。 換句話說,Spider就是定義爬取的動做及分析某個網頁(或者是有些網頁)的地方。

Spider

class scrapy.spider.Spider
Spider是最簡單的spider。每一個spider必須繼承自該類。Spider並無提供什麼特殊的功能。其僅僅請求給定的 start_urls/start_requests,並根據返回的結果調用spider的parse方法。

源碼參考

#全部爬蟲的基類,用戶定義的爬蟲必須從這個類繼承
class Spider(object_ref):

    #定義spider名字的字符串(string)。spider的名字定義了Scrapy如何定位(並初始化)spider,因此其必須是惟一的。
    #name是spider最重要的屬性,並且是必須的。
    #通常作法是以該網站(domain)(加或不加 後綴 )來命名spider。 例如,若是spider爬取 mywebsite.com ,該spider一般會被命名爲 mywebsite
    name = None

    #初始化,提取爬蟲名字,start_ruls
    def __init__(self, name=None, **kwargs):
        if name is not None:
            self.name = name
        # 若是爬蟲沒有名字,中斷後續操做則報錯
        elif not getattr(self, 'name', None):
            raise ValueError("%s must have a name" % type(self).__name__)

        # python 對象或類型經過內置成員__dict__來存儲成員信息
        self.__dict__.update(kwargs)

        #URL列表。當沒有指定的URL時,spider將從該列表中開始進行爬取。 所以,第一個被獲取到的頁面的URL將是該列表之一。 後續的URL將會從獲取到的數據中提取。
        if not hasattr(self, 'start_urls'):
            self.start_urls = []

    # 打印Scrapy執行後的log信息
    def log(self, message, level=log.DEBUG, **kw):
        log.msg(message, spider=self, level=level, **kw)

    # 判斷對象object的屬性是否存在,不存在作斷言處理
    def set_crawler(self, crawler):
        assert not hasattr(self, '_crawler'), "Spider already bounded to %s" % crawler
        self._crawler = crawler

    @property
    def crawler(self):
        assert hasattr(self, '_crawler'), "Spider not bounded to any crawler"
        return self._crawler

    @property
    def settings(self):
        return self.crawler.settings

    #該方法將讀取start_urls內的地址,併爲每個地址生成一個Request對象,交給Scrapy下載並返回Response
    #該方法僅調用一次
    def start_requests(self):
        for url in self.start_urls:
            yield self.make_requests_from_url(url)

    #start_requests()中調用,實際生成Request的函數。
    #Request對象默認的回調函數爲parse(),提交的方式爲get
    def make_requests_from_url(self, url):
        return Request(url, dont_filter=True)

    #默認的Request對象回調函數,處理返回的response。
    #生成Item或者Request對象。用戶必須實現這個類
    def parse(self, response):
        raise NotImplementedError

    @classmethod
    def handles_request(cls, request):
        return url_is_from_spider(request.url, cls)

    def __str__(self):
        return "<%s %r at 0x%0x>" % (type(self).__name__, self.name, id(self))

    __repr__ = __str__

主要屬性和方法

  • name

    定義spider名字的字符串。
    例如,若是spider爬取 mywebsite.com ,該spider一般會被命名爲 mywebsite

  • allowed_domains

    包含了spider容許爬取的域名(domain)的列表,可選。

  • start_urls

    初始URL元祖/列表。當沒有制定特定的URL時,spider將從該列表中開始進行爬取。

  • start_requests(self)

    該方法必須返回一個可迭代對象(iterable)。該對象包含了spider用於爬取(默認實現是使用start_urls 的url)的第一個Request。
    當spider啓動爬取而且未指定start_urls時,該方法被調用。

  • parse(self, response)

    當請求url返回網頁沒有指定回調函數時,默認的Request對象回調函數。用來處理網頁返回的response,以及生成Item或者Request對象。

  • log(self, message[, level, component])

    使用 scrapy.log.msg() 方法記錄(log)message。 更多數據請參見 logging

騰訊招聘網自動翻頁
(代碼採集自互聯網)

from mySpider.items import TencentItem
import scrapy
import re

class TencentSpider(scrapy.Spider):
    name = "tencent"
    allowed_domains = ["hr.tencent.com"]
    start_urls = [
        "http://hr.tencent.com/position.php?&start=0#a"
    ]

    def parse(self, response):
        for each in response.xpath('//*[@class="even"]'):

            item = TencentItem()
            name = each.xpath('./td[1]/a/text()').extract()[0]
            detailLink = each.xpath('./td[1]/a/@href').extract()[0]
            positionInfo = each.xpath('./td[2]/text()').extract()[0]
            peopleNumber = each.xpath('./td[3]/text()').extract()[0]
            workLocation = each.xpath('./td[4]/text()').extract()[0]
            publishTime = each.xpath('./td[5]/text()').extract()[0]

            #print name, detailLink, catalog, peopleNumber, workLocation,publishTime

            item['name'] = name.encode('utf-8')
            item['detailLink'] = detailLink.encode('utf-8')
            item['positionInfo'] = positionInfo.encode('utf-8')
            item['peopleNumber'] = peopleNumber.encode('utf-8')
            item['workLocation'] = workLocation.encode('utf-8')
            item['publishTime'] = publishTime.encode('utf-8')

            curpage = re.search('(\d+)',response.url).group(1)
            page = int(curpage) + 10
            url = re.sub('\d+', str(page), response.url)

            # 發送新的url請求加入待爬隊列,並調用回調函數 self.parse
            yield scrapy.Request(url, callback = self.parse)

            # 將獲取的數據交給pipeline
            yield item

CrawlSpider

經過下面的命令能夠快速建立 CrawlSpider模板 的代碼:

scrapy genspider -t crawl tencent tencent.com

class scrapy.spiders.CrawlSpider
它是Spider的派生類,Spider類的設計原則是隻爬取start_url列表中的網頁,而CrawlSpider類定義了一些規則(rule)來提供跟進link的方便的機制,從爬取的網頁中獲取link並繼續爬取的工做更適合。

源碼解析參考

class CrawlSpider(Spider):
    rules = ()
    def __init__(self, *a, **kw):
        super(CrawlSpider, self).__init__(*a, **kw)
        self._compile_rules()

    #首先調用parse()來處理start_urls中返回的response對象
    #parse()則將這些response對象傳遞給了_parse_response()函數處理,並設置回調函數爲parse_start_url()
    #設置了跟進標誌位True
    #parse將返回item和跟進了的Request對象    
    def parse(self, response):
        return self._parse_response(response, self.parse_start_url, cb_kwargs={}, follow=True)

    #處理start_url中返回的response,須要重寫
    def parse_start_url(self, response):
        return []

    def process_results(self, response, results):
        return results

    #從response中抽取符合任一用戶定義'規則'的連接,並構形成Resquest對象返回
    def _requests_to_follow(self, response):
        if not isinstance(response, HtmlResponse):
            return
        seen = set()
        #抽取以內的全部連接,只要經過任意一個'規則',即表示合法
        for n, rule in enumerate(self._rules):
            links = [l for l in rule.link_extractor.extract_links(response) if l not in seen]
            #使用用戶指定的process_links處理每一個鏈接
            if links and rule.process_links:
                links = rule.process_links(links)
            #將連接加入seen集合,爲每一個連接生成Request對象,並設置回調函數爲_repsonse_downloaded()
            for link in links:
                seen.add(link)
                #構造Request對象,並將Rule規則中定義的回調函數做爲這個Request對象的回調函數
                r = Request(url=link.url, callback=self._response_downloaded)
                r.meta.update(rule=n, link_text=link.text)
                #對每一個Request調用process_request()函數。該函數默認爲indentify,即不作任何處理,直接返回該Request.
                yield rule.process_request(r)

    #處理經過rule提取出的鏈接,並返回item以及request
    def _response_downloaded(self, response):
        rule = self._rules[response.meta['rule']]
        return self._parse_response(response, rule.callback, rule.cb_kwargs, rule.follow)

    #解析response對象,會用callback解析處理他,並返回request或Item對象
    def _parse_response(self, response, callback, cb_kwargs, follow=True):
        #首先判斷是否設置了回調函數。(該回調函數多是rule中的解析函數,也多是 parse_start_url函數)
        #若是設置了回調函數(parse_start_url()),那麼首先用parse_start_url()處理response對象,
        #而後再交給process_results處理。返回cb_res的一個列表
        if callback:
            #若是是parse調用的,則會解析成Request對象
            #若是是rule callback,則會解析成Item
            cb_res = callback(response, **cb_kwargs) or ()
            cb_res = self.process_results(response, cb_res)
            for requests_or_item in iterate_spider_output(cb_res):
                yield requests_or_item

        #若是須要跟進,那麼使用定義的Rule規則提取並返回這些Request對象
        if follow and self._follow_links:
            #返回每一個Request對象
            for request_or_item in self._requests_to_follow(response):
                yield request_or_item

    def _compile_rules(self):
        def get_method(method):
            if callable(method):
                return method
            elif isinstance(method, basestring):
                return getattr(self, method, None)

        self._rules = [copy.copy(r) for r in self.rules]
        for rule in self._rules:
            rule.callback = get_method(rule.callback)
            rule.process_links = get_method(rule.process_links)
            rule.process_request = get_method(rule.process_request)

    def set_crawler(self, crawler):
        super(CrawlSpider, self).set_crawler(crawler)
        self._follow_links = crawler.settings.getbool('CRAWLSPIDER_FOLLOW_LINKS', True)

CrawlSpider繼承於Spider類,除了繼承過來的屬性外(name、allow_domains),還提供了新的屬性和方法:

LinkExtractors

class scrapy.linkextractors.LinkExtractor
Link Extractors 的目的很簡單: 提取連接。
每一個LinkExtractor有惟一的公共方法是 extract_links(),它接收一個 Response 對象,並返回一個 scrapy.link.Link 對象。

Link Extractors要實例化一次,而且 extract_links 方法會根據不一樣的response調用屢次提取連接。

class scrapy.linkextractors.LinkExtractor(
    allow = (),
    deny = (),
    allow_domains = (),
    deny_domains = (),
    deny_extensions = None,
    restrict_xpaths = (),
    tags = ('a','area'),
    attrs = ('href'),
    canonicalize = True,
    unique = True,
    process_value = None
)

主要參數:

  • allow:知足括號中「正則表達式」的值會被提取,若是爲空,則所有匹配。
  • deny:與這個正則表達式(或正則表達式列表)不匹配的URL必定不提取。
  • allow_domains:會被提取的連接的domains。
  • deny_domains:必定不會被提取連接的domains。
  • restrict_xpaths:使用xpath表達式,和allow共同做用過濾連接。
rules

在rules中包含一個或多個Rule對象,每一個Rule對爬取網站的動做定義了特定操做。若是多個rule匹配了相同的連接,則根據規則在本集合中被定義的順序,第一個會被使用。

class scrapy.spiders.Rule(
        link_extractor, 
        callback = None, 
        cb_kwargs = None, 
        follow = None, 
        process_links = None, 
        process_request = None
)
  • link_extractor:是一個Link Extractor對象,用於定義須要提取的連接。
  • callback: 從link_extractor中每獲取到連接時,參數所指定的值做爲回調函數,該回調函數接受一個response做爲其第一個參數。

    注意:當編寫爬蟲規則時,避免使用parse做爲回調函數。因爲CrawlSpider使用parse方法來實現其邏輯,若是覆蓋了 parse方法,crawl spider將會運行失敗。

  • follow:是一個布爾(boolean)值,指定了根據該規則從response提取的連接是否須要跟進。 若是callback爲None,follow 默認設置爲True ,不然默認爲False。
  • process_links:指定該spider中哪一個的函數將會被調用,從link_extractor中獲取到連接列表時將會調用該函數。該方法主要用來過濾。
  • process_request:指定該spider中哪一個的函數將會被調用, 該規則提取到每一個request時都會調用該函數。 (用來過濾request)

翻頁

import scrapy
from scrapy.spiders import CrawlSpider, Rule
from scrapy.linkextractors import LinkExtractor
from mySpider.items import TencentItem

class TencentSpider(CrawlSpider):
    name = "tencent"
    allowed_domains = ["hr.tencent.com"]
    start_urls = [
        "http://hr.tencent.com/position.php?&start=0#a"
    ]

    page_lx = LinkExtractor(allow=("start=\d+"))

    rules = [
        Rule(page_lx, callback = "parseContent", follow = True)
    ]

    def parseContent(self, response):
        for each in response.xpath('//*[@class="even"]'):
            name = each.xpath('./td[1]/a/text()').extract()[0]
            detailLink = each.xpath('./td[1]/a/@href').extract()[0]
            positionInfo = each.xpath('./td[2]/text()').extract()[0]

            peopleNumber = each.xpath('./td[3]/text()').extract()[0]
            workLocation = each.xpath('./td[4]/text()').extract()[0]
            publishTime = each.xpath('./td[5]/text()').extract()[0]
            #print name, detailLink, catalog,recruitNumber,workLocation,publishTime

            item = TencentItem()
            item['name']=name.encode('utf-8')
            item['detailLink']=detailLink.encode('utf-8')
            item['positionInfo']=positionInfo.encode('utf-8')
            item['peopleNumber']=peopleNumber.encode('utf-8')
            item['workLocation']=workLocation.encode('utf-8')
            item['publishTime']=publishTime.encode('utf-8')

            yield item
process_links參數:動態網頁爬取,動態url的處理

某些網站會爲每個url增長一個sessionid屬性,多是爲了標記用戶訪問歷史,並且這個seesionid隨着每次訪問都會動態變化,這就爲爬蟲的去重處理(即標記已經爬取過的網站)和提取規則增長了難度。
https://bitsharestalk.org/index.php?board=5.0會變成https://bitsharestalk.org/index.phpPHPSESSID=9771d42640ab3c89eb77e8bd9e220b53&board=5.0,下面介紹集中處理方法
僅適用你的爬蟲使用的是 scrapy.contrib.spiders.CrawlSpider, 在這個內置爬蟲中,你提取url要經過Rule類來進行提取,其自帶了對提取後的url進行加工的函數。

rules =  (
    Rule(LinkExtractor(allow = ( "https://bitsharestalk\.org/index\.php\?PHPSESSID\S*board=\d+\.\d+$", "https://bitsharestalk\.org/index\.php\?board=\d+\.\d+$" )), process_links = 'link_filtering' ), #默認函數process_links

    Rule(LinkExtractor(allow = ( " https://bitsharestalk\.org/index\.php\?PHPSESSID\S*topic=\d+\.\d+$" ,  "https://bitsharestalk\.org/index\.php\?topic=\d+\.\d+$", ),),
    callback = "extractPost" ,
    follow = True, process_links = 'link_filtering' ),

    Rule(LinkExtractor(allow = ( "https://bitsharestalk\.org/index\.php\?PHPSESSID\S*action=profile;u=\d+$" ,  "https://bitsharestalk\.org/index\.php\?action=profile;u=\d+$" , ),),
    callback =  "extractUser", process_links = 'link_filtering' )
)

def link_filtering(self, links):

    ret = []

    for link  in links:
        url = link.url
    # print "This is the yuanlai ", link.url 
    urlfirst, urllast = url.split( " ? " )

    if urllast:
        link.url = urlfirst +  " ? " + urllast.split( " & " , 1)[1]
    # print link.url
    return links
process_request參數:修改請求參數
class WeiboSpider(CrawlSpider):
    name = 'weibo'
    allowed_domains = ['weibo.com']
    start_urls = ['http://www.weibo.com/u/1876296184']  # 不加www,則匹配不到cookie, get_login_cookie()方法正則代完善
    rules = (
        Rule(LinkExtractor(allow=r'^http:\/\/(www\.)?weibo.com/[a-z]/.*'),  # 微博我的頁面的規則,或/u/或/n/後面跟一串數字
             process_request='process_request',
             callback='parse_item', follow=True), )
    cookies = None

    def process_request(self, request):
        link=request.url
        page = re.search('page=\d*', link).group()
        type = re.search('type=\d+', link).group()
        newrequest = request.replace(cookies =self.cookies, url='.../questionType?' + page + "&" + type)

        return newrequest

Logging

Scrapy提供了log功能,能夠經過 logging 模塊使用。

Log levels

Scrapy提供5層logging級別:

  • CRITICAL - 嚴重錯誤(critical)
  • ERROR - 通常錯誤(regular errors)
  • WARNING - 警告信息(warning messages)
  • INFO - 通常信息(informational messages)
  • DEBUG - 調試信息(debugging messages)

默認狀況下python的logging模塊將日誌打印到了標準輸出中,且只顯示了大於等於WARNING級別的日誌,這說明默認的日誌級別設置爲WARNING(日誌級別等級CRITICAL > ERROR > WARNING > INFO > DEBUG,默認的日誌格式爲DEBUG級別

logging設置

經過在setting.py中進行如下設置能夠被用來配置logging:

  • LOG_ENABLED 默認: True,啓用logging
  • LOG_ENCODING 默認: 'utf-8',logging使用的編碼
  • LOG_FILE 默認: None,在當前目錄裏建立logging輸出文件的文件名
  • LOG_LEVEL 默認: 'DEBUG',log的最低級別
  • LOG_STDOUT 默認: False 若是爲 True,進程全部的標準輸出(及錯誤)將會被重定向到log中。例如,執行print("hello") ,其將會在Scrapy log中顯示。
#coding:utf-8
######################
##Logging的使用
######################
import logging
'''
1. logging.CRITICAL - for critical errors (highest severity) 致命錯誤
2. logging.ERROR - for regular errors 通常錯誤
3. logging.WARNING - for warning messages 警告+錯誤
4. logging.INFO - for informational messages 消息+警告+錯誤
5. logging.DEBUG - for debugging messages (lowest severity) 低級別
'''
logging.warning("This is a warning")

logging.log(logging.WARNING,"This is a warning")

#獲取實例對象
logger=logging.getLogger()
logger.warning("這是警告消息")
#指定消息發出者
logger = logging.getLogger('SimilarFace')
logger.warning("This is a warning")

#在爬蟲中使用log
import scrapy
class MySpider(scrapy.Spider):
    name = 'myspider'
    start_urls = ['http://scrapinghub.com']
    def parse(self, response):
        #方法1 自帶的logger
        self.logger.info('Parse function called on %s', response.url)
        #方法2 本身定義個logger
        logger.info('Parse function called on %s', response.url)

'''
Logging 設置
• LOG_FILE
• LOG_ENABLED
• LOG_ENCODING
• LOG_LEVEL
• LOG_FORMAT
• LOG_DATEFORMAT 
• LOG_STDOUT

命令行中使用
--logfile FILE
Overrides LOG_FILE

--loglevel/-L LEVEL
Overrides LOG_LEVEL

--nolog
Sets LOG_ENABLED to False
'''

import logging
from scrapy.utils.log import configure_logging

configure_logging(install_root_handler=False)
#定義了logging的些屬性
logging.basicConfig(
    filename='log.txt',
    format='%(levelname)s: %(levelname)s: %(message)s',
    level=logging.INFO
)
#運行時追加模式
logging.info('進入Log文件')
logger = logging.getLogger('SimilarFace')
logger.warning("也要進入Log文件")

Settings

https://scrapy-chs.readthedocs.io/zh_CN/1.0/topics/settings.html
Scrapy設置(settings)提供了定製Scrapy組件的方法。能夠控制包括核心(core),插件(extension),pipeline及spider組件。好比 設置Json Pipeliine、LOG_LEVEL

內置設置參考手冊

  • BOT_NAME
    默認: scrapybot
    當您使用 startproject 命令建立項目時其也被自動賦值。

  • CONCURRENT_ITEMS
    默認: 100
    Item Processor(即 Item Pipeline) 同時處理(每一個response的)item的最大值。

  • CONCURRENT_REQUESTS
    默認: 16
    Scrapy downloader併發請求(concurrent requests)的最大值。

  • DEFAULT_REQUEST_HEADERS 默認:
{
      'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
      'Accept-Language': 'en',
  }

Scrapy HTTP Request使用的默認header。

  • DEPTH_LIMIT
    默認: 0
    爬取網站最大容許的深度(depth)值。若是爲0,則沒有限制。

  • DOWNLOAD_DELAY
    默認: 0
    下載器在下載同一個網站下一個頁面前須要等待的時間。該選項能夠用來限制爬取速度, 減輕服務器壓力。同時也支持小數:DOWNLOAD_DELAY = 0.25 # 250 ms of delay
    該設置影響(默認啓用的) RANDOMIZE_DOWNLOAD_DELAY 設置。 默認狀況下,Scrapy在兩個請求間不等待一個固定的值, 而是使用0.5到1.5之間的一個隨機值 DOWNLOAD_DELAY 的結果做爲等待間隔。

  • DOWNLOAD_TIMEOUT
    默認: 180
    下載器超時時間(單位: 秒)。

  • ITEM_PIPELINES
    默認: {}
    保存項目中啓用的pipeline及其順序的字典。該字典默認爲空,值(value)任意。 不過值(value)習慣設置在0-1000範圍內。
    樣例:
ITEM_PIPELINES = {
      'mybot.pipelines.validate.ValidateMyItem': 300,
      'mybot.pipelines.validate.StoreMyItem': 800,
  }
  • LOG_ENABLED
    默認: True
    是否啓用logging。

  • LOG_ENCODING
    默認: 'utf-8'
    logging使用的編碼。

  • LOG_LEVEL
    默認: 'DEBUG'
    log的最低級別。可選的級別有: CRITICAL、 ERROR、WARNING、INFO、DEBUG

  • USER_AGENT
    默認: Scrapy/VERSION (+http://scrapy.org)
    爬取的默認User-Agent,除非被覆蓋。

Request/Response

https://docs.scrapy.org/en/latest/topics/request-response.html

Request 部分源碼:

# 部分代碼
class Request(object_ref):
    def __init__(self, url, callback=None, method='GET', headers=None, body=None, 
                 cookies=None, meta=None, encoding='utf-8', priority=0,
                 dont_filter=False, errback=None):

        self._encoding = encoding  # this one has to be set first
        self.method = str(method).upper()
        self._set_url(url)
        self._set_body(body)
        assert isinstance(priority, int), "Request priority not an integer: %r" % priority
        self.priority = priority

        assert callback or not errback, "Cannot use errback without a callback"
        self.callback = callback
        self.errback = errback

        self.cookies = cookies or {}
        self.headers = Headers(headers or {}, encoding=encoding)
        self.dont_filter = dont_filter

        self._meta = dict(meta) if meta else None

    @property
    def meta(self):
        if self._meta is None:
            self._meta = {}
        return self._meta

經常使用參數

url: 就是須要請求,並進行下一步處理的url

callback: 指定該請求返回的Response,由那個函數來處理。

method: 請求通常不須要指定,默認GET方法,可設置爲"GET", "POST", "PUT"等,且保證字符串大寫

headers: 請求時,包含的頭文件。通常不須要。內容通常以下:

# 本身寫過爬蟲的確定知道
         Host: media.readthedocs.org
         User-Agent: Mozilla/5.0 (Windows NT 6.2; WOW64; rv:33.0) Gecko/20100101 Firefox/33.0
         Accept: text/css,*/*;q=0.1
         Accept-Language: zh-cn,zh;q=0.8,en-us;q=0.5,en;q=0.3
         Accept-Encoding: gzip, deflate
         Referer: http://scrapy-chs.readthedocs.org/zh_CN/0.24/
         Cookie: _ga=GA1.2.1612165614.1415584110;
         Connection: keep-alive
         If-Modified-Since: Mon, 25 Aug 2014 21:59:35 GMT
         Cache-Control: max-age=0

meta: 比較經常使用,在不一樣的請求之間傳遞數據使用的。字典dict型

request_with_cookies = Request(
             url="http://www.example.com",
             cookies={'currency': 'USD', 'country': 'UY'},
             meta={'dont_merge_cookies': True}
         )

encoding: 使用默認的 'utf-8' 就行。

dont_filter: 代表該請求不禁調度器過濾。這是當你想使用屢次執行相同的請求,忽略重複的過濾器。默認爲False。

errback: 指定錯誤處理函數

Response

# 部分代碼
class Response(object_ref):
    def __init__(self, url, status=200, headers=None, body='', flags=None, request=None):
        self.headers = Headers(headers or {})
        self.status = int(status)
        self._set_body(body)
        self._set_url(url)
        self.request = request
        self.flags = [] if flags is None else list(flags)

    @property
    def meta(self):
        try:
            return self.request.meta
        except AttributeError:
            raise AttributeError("Response.meta not available, this response " \
                "is not tied to any request")

大部分參數和上面的差很少:

status: 響應碼
_set_body(body): 響應體
_set_url(url):響應url
self.request = request

Downloader Middlewares

https://docs.scrapy.org/en/latest/topics/downloader-middleware.html

下載中間件是處於引擎(crawler.engine)和下載器(crawler.engine.download())之間的一層組件,能夠有多個下載中間件被加載運行。

當引擎傳遞請求給下載器的過程當中,下載中間件能夠對請求進行處理 (例如增長http header信息,增長proxy信息等);

在下載器完成http請求,傳遞響應給引擎的過程當中, 下載中間件能夠對響應進行處理(例如進行gzip的解壓等)

要激活下載器中間件組件,將其加入到 DOWNLOADER_MIDDLEWARES 設置中。 該設置是一個字典(dict),鍵爲中間件類的路徑,值爲其中間件的順序(order)。

這裏是一個例子:

DOWNLOADER_MIDDLEWARES = {
    'mySpider.middlewares.MyDownloaderMiddleware': 543,
}

編寫下載器中間件十分簡單。每一箇中間件組件是一個定義瞭如下一個或多個方法的Python類:

class scrapy.contrib.downloadermiddleware.DownloaderMiddleware

process_request(self, request, spider)

  • 當每一個request經過下載中間件時,該方法被調用。
  • process_request()必須返回如下其中之一:一個 None 、一個 Response 對象、一個 Request 對象或 raise IgnoreRequest:
  • 若是其返回 None ,Scrapy將繼續處理該request,執行其餘的中間件的相應方法,直到合適的下載器處理函數(download handler)被調用, 該request被執行(其response被下載)。
  • 若是其返回 Response 對象,Scrapy將不會調用 任何 其餘的 process_request() 或 process_exception() 方法,或相應地下載函數; 其將返回該response。 已安裝的中間件的 process_response() 方法則會在每一個response返回時被調用。
  • 若是其返回 Request 對象,Scrapy則中止調用 process_request方法並從新調度返回的request。當新返回的request被執行後, 相應地中間件鏈將會根據下載的response被調用。
  • 若是其raise一個 IgnoreRequest 異常,則安裝的下載中間件的 process_exception() 方法會被調用。若是沒有任何一個方法處理該異常, 則request的errback(Request.errback)方法會被調用。若是沒有代碼處理拋出的異常, 則該異常被忽略且不記錄(不一樣於其餘異常那樣)。

  • 參數:

    request (Request 對象) – 處理的request
    spider (Spider 對象) – 該request對應的spider

process_response(self, request, response, spider)

  • 當下載器完成http請求,傳遞響應給引擎的時候調用
  • process_request() 必須返回如下其中之一: 返回一個 Response 對象、 返回一個 Request 對象或raise一個 IgnoreRequest 異常。
  • 若是其返回一個 Response (能夠與傳入的response相同,也能夠是全新的對象), 該response會被在鏈中的其餘中間件的 process_response() 方法處理。
  • 若是其返回一個 Request 對象,則中間件鏈中止, 返回的request會被從新調度下載。處理相似於 process_request() 返回request所作的那樣。
  • 若是其拋出一個 IgnoreRequest 異常,則調用request的errback(Request.errback)。 若是沒有代碼處理拋出的異常,則該異常被忽略且不記錄(不一樣於其餘異常那樣)。

  • 參數:

    request (Request 對象) – response所對應的request
    response (Response 對象) – 被處理的response
    spider (Spider 對象) – response所對應的spider

暫停和重啓

https://scrapy-chs.readthedocs.io/zh_CN/1.0/topics/jobs.html

要啓用持久化支持,你只須要經過 JOBDIR 設置 job directory 選項。這個路徑將會存儲 全部的請求數據來保持一個單獨任務的狀態(例如:一次spider爬取(a spider run))。必需要注意的是,這個目錄不容許被不一樣的spider 共享,甚至是同一個spider的不一樣jobs/runs也不行。也就是說,這個目錄就是存儲一個 單獨 job的狀態信息。

scrapy crawl somespider -s JOBDIR=crawls/somespider-1

而後,你就能在任什麼時候候安全地中止爬蟲(按Ctrl-C或者發送一個信號)。恢復這個爬蟲也是一樣的命令:

scrapy crawl somespider -s JOBDIR=crawls/somespider-1

去重原理

在scrapy源碼中找到scrapy/dupefilters.py文件,部分源碼

class RFPDupeFilter(BaseDupeFilter):
    """Request Fingerprint duplicates filter"""

    def __init__(self, path=None, debug=False):
        self.file = None
        self.fingerprints = set()
        self.logdupes = True
        self.debug = debug
        self.logger = logging.getLogger(__name__)
        if path:
            self.file = open(os.path.join(path, 'requests.seen'), 'a+')
            self.file.seek(0)
            self.fingerprints.update(x.rstrip() for x in self.file)

    @classmethod
    def from_settings(cls, settings):
        debug = settings.getbool('DUPEFILTER_DEBUG')
        return cls(job_dir(settings), debug)

    def request_seen(self, request):
        fp = self.request_fingerprint(request)
        if fp in self.fingerprints:
            return True
        self.fingerprints.add(fp)
        if self.file:
            self.file.write(fp + os.linesep)

    def request_fingerprint(self, request):
        return request_fingerprint(request)

    def close(self, reason):
        if self.file:
            self.file.close()

    def log(self, request, spider):
        if self.debug:
            msg = "Filtered duplicate request: %(request)s"
            self.logger.debug(msg, {'request': request}, extra={'spider': spider})
        elif self.logdupes:
            msg = ("Filtered duplicate request: %(request)s"
                   " - no more duplicates will be shown"
                   " (see DUPEFILTER_DEBUG to show all duplicates)")
            self.logger.debug(msg, {'request': request}, extra={'spider': spider})
            self.logdupes = False

        spider.crawler.stats.inc_value('dupefilter/filtered', spider=spider)

裏面有一個request_seen方法,這個方法在scrapy/core/scheduler.py中被調用

class Scheduler(object):
    ...

    def enqueue_request(self, request):
        if not request.dont_filter and self.df.request_seen(request):
            self.df.log(request, self.spider)
            return False
        dqok = self._dqpush(request)
        if dqok:
            self.stats.inc_value('scheduler/enqueued/disk', spider=self.spider)
        else:
            self._mqpush(request)
            self.stats.inc_value('scheduler/enqueued/memory', spider=self.spider)
        self.stats.inc_value('scheduler/enqueued', spider=self.spider)
        return True
    ...

回到request_seen方法繼續查看

def request_seen(self, request):
        fp = self.request_fingerprint(request)
        if fp in self.fingerprints:
            return True
        self.fingerprints.add(fp)
        if self.file:
            self.file.write(fp + os.linesep)

    # 返回的`request_fingerprint`是`from scrapy.utils.request import request_fingerprint`            
    def request_fingerprint(self, request):
        return request_fingerprint(request)

scrapy\utils\request.py 這個函數將request進行hash,最後生成摘要(fp.hexdigest())

def request_fingerprint(request, include_headers=None):
    """
    Return the request fingerprint.

    The request fingerprint is a hash that uniquely identifies the resource the
    request points to. For example, take the following two urls:

    http://www.example.com/query?id=111&cat=222
    http://www.example.com/query?cat=222&id=111

    Even though those are two different URLs both point to the same resource
    and are equivalent (ie. they should return the same response).

    Another example are cookies used to store session ids. Suppose the
    following page is only accesible to authenticated users:

    http://www.example.com/members/offers.html

    Lot of sites use a cookie to store the session id, which adds a random
    component to the HTTP Request and thus should be ignored when calculating
    the fingerprint.

    For this reason, request headers are ignored by default when calculating
    the fingeprint. If you want to include specific headers use the
    include_headers argument, which is a list of Request headers to include.

    """
    if include_headers:
        include_headers = tuple(to_bytes(h.lower())
                                 for h in sorted(include_headers))
    cache = _fingerprint_cache.setdefault(request, {})
    if include_headers not in cache:
        fp = hashlib.sha1()
        fp.update(to_bytes(request.method))
        fp.update(to_bytes(canonicalize_url(request.url)))
        fp.update(request.body or b'')
        if include_headers:
            for hdr in include_headers:
                if hdr in request.headers:
                    fp.update(hdr)
                    for v in request.headers.getlist(hdr):
                        fp.update(v)
        cache[include_headers] = fp.hexdigest()
    return cache[include_headers]

咱們能夠看到,去重指紋是sha1(method + url + body + header)
因此,實際可以去掉重複的比例並不大。
若是咱們須要本身提取去重的finger,須要本身實現Filter,並配置上它。
下面這個Filter只根據url去重:

from scrapy.dupefilter import RFPDupeFilter
class SeenURLFilter(RFPDupeFilter):
      """A dupe filter that considers the URL"""
      def __init__(self, path=None):
        self.urls_seen = set()
        RFPDupeFilter.__init__(self, path)
      def request_seen(self, request):
        if request.url in self.urls_seen:
              return True
        else:
              self.urls_seen.add(request.url)

不要忘記配置上:

DUPEFILTER_CLASS ='scraper.custom_filters.SeenURLFilter'

Telnet

https://scrapy-chs.readthedocs.io/zh_CN/1.0/topics/telnetconsole.html

Scrapy運行的有telnet服務,咱們能夠經過這個功能來獲得一些性能指標。經過telnet命令鏈接到6023端口,而後就會獲得一個在爬蟲內部環境的Python命令行。要當心的是,若是你在這裏運行了一些阻塞的操做,好比time.sleep(),正在運行的爬蟲就會被停止。經過內建的est()函數能夠打印出一些性能指標。
打開第一個命令行,運行如下代碼:

```shell
$ telnet localhost 6023

est()
...
len(engine.downloader.active) : 16
...
len(engine.slot.scheduler.mqs) : 4475
...
len(engine.scraper.slot.active) : 115
engine.scraper.slot.active_size : 117760
engine.scraper.slot.itemproc_size : 105
```
在這裏咱們忽略了dqs指標,若是你啓用了持久化支持的功能,亦即設置了JOBDIR設置項,你也
會獲得非零的dqs(len(engine.slot.scheduler.dqs)
)值,這時候就應當把dqs加到mqs上去,以便後續的分析。

  • mqs
    意味着在調度器中有不少請求等待處理(4475個請求)。這是沒問題的。
  • len(engine.downloader.active)
    表示着如今有16個請求正被下載器下載。這和咱們設置的CONCURRENT_REQUESTS值是同樣的,因此也沒問題。
  • len(engine.scraper.slot.active)
    告訴咱們如今正有115個響應在scraper中處理,這些響應的總的大小能夠從engine.scraper.slot.active_size指標獲得,共是115kb。除了這些響應,pipeline中正有105個
  • Item
    被處理——從engine.scraper.slot.itemproc_size中得知,也就是說,還有10個正在爬蟲中進行處理。總的來講,能夠肯定下載器就是系統的瓶頸,由於在下載器以前有不少請求(mqs)在隊列中等待處理,下載器已經被充分地利用了;在下載器以後,咱們有一個或多或少比較很穩定的工做量(能夠經過屢次調用est()函數來證明這一點)。

另外一個信息來源是stats對象,它通常狀況下會在爬蟲運行結束後打印出來。而在telnet中,咱們能夠隨時經過stats.get_stats()獲得一個dict
對象,並用p()函數打印出來:

$ p(stats.get_stats())
{'downloader/request_bytes': 558330,
...
    'item_scraped_count': 2485,
...}

數據收集

https://scrapy-chs.readthedocs.io/zh_CN/1.0/topics/stats.html

scrapy/statscollectors.py

"""
Scrapy extension for collecting scraping stats
"""
import pprint
import logging

logger = logging.getLogger(__name__)


class StatsCollector(object):

    def __init__(self, crawler):
        self._dump = crawler.settings.getbool('STATS_DUMP')
        self._stats = {}

    def get_value(self, key, default=None, spider=None):
        return self._stats.get(key, default)

    def get_stats(self, spider=None):
        return self._stats

    def set_value(self, key, value, spider=None):
        self._stats[key] = value

    def set_stats(self, stats, spider=None):
        self._stats = stats

    def inc_value(self, key, count=1, start=0, spider=None):
        d = self._stats
        d[key] = d.setdefault(key, start) + count

    def max_value(self, key, value, spider=None):
        self._stats[key] = max(self._stats.setdefault(key, value), value)

    def min_value(self, key, value, spider=None):
        self._stats[key] = min(self._stats.setdefault(key, value), value)

    def clear_stats(self, spider=None):
        self._stats.clear()

    def open_spider(self, spider):
        pass

    def close_spider(self, spider, reason):
        if self._dump:
            logger.info("Dumping Scrapy stats:\n" + pprint.pformat(self._stats),
                        extra={'spider': spider})
        self._persist_stats(self._stats, spider)

    def _persist_stats(self, stats, spider):
        pass


class MemoryStatsCollector(StatsCollector):

    def __init__(self, crawler):
        super(MemoryStatsCollector, self).__init__(crawler)
        self.spider_stats = {}

    def _persist_stats(self, stats, spider):
        self.spider_stats[spider.name] = stats


class DummyStatsCollector(StatsCollector):

    def get_value(self, key, default=None, spider=None):
        return default

    def set_value(self, key, value, spider=None):
        pass

    def set_stats(self, stats, spider=None):
        pass

    def inc_value(self, key, count=1, start=0, spider=None):
        pass

    def max_value(self, key, value, spider=None):
        pass

    def min_value(self, key, value, spider=None):
        pass

404頁面收集

class JobboleSpider(scrapy.Spider):
    name = 'jobbole'
    allowed_domains = ['blog.jobbole.com']
    start_urls = ['http://blog.jobbole.com/all-posts/']


    # 收集404的url和數量
    handle_httpstatus_list = [404,]

    def __init__(self):
        self.fail_urls = []
        super(JobboleSpider, self).__init__()


    def parse(self, response):

        if response.status == 404:
            self.fail_urls.append(response.url)
            self.crawler.stats.inc_value('failed_url')
        ...

信號

https://scrapy-chs.readthedocs.io/zh_CN/1.0/topics/signals.html

在spider關閉時對fail_urls進行處理

def __init__(self):
        self.fail_urls = []
        super(JobboleSpider, self).__init__()
        dispatcher.connect(self.handle_spider_closed, signal=signals.spider_closed)

    def handle_spider_closed(self, spider, response):
        self.crawler.stats.set_value('failed_urls', ','.join(self.fail_urls))
        ...

擴展

https://scrapy-chs.readthedocs.io/zh_CN/1.0/topics/extensions.html

scrapy/extensions包裏有一些擴展實例

分佈式爬蟲

http://doc.scrapy.org/en/master/topics/practices.html#distributed-crawls

Scrapy並無提供內置的機制支持分佈式(多服務器)爬取。不過仍是有辦法進行分佈式爬取, 取決於您要怎麼分佈了。
若是您有不少spider,那分佈負載最簡單的辦法就是啓動多個Scrapyd,並分配到不一樣機器上。
若是想要在多個機器上運行一個單獨的spider,那您能夠將要爬取的url進行分塊,併發送給spider。 例如:
首先,準備要爬取的url列表,並分配到不一樣的文件url裏:

http://somedomain.com/urls-to-crawl/spider1/part1.list
http://somedomain.com/urls-to-crawl/spider1/part2.list
http://somedomain.com/urls-to-crawl/spider1/part3.list

接着在3個不一樣的Scrapd服務器中啓動spider。spider會接收一個(spider)參數 part , 該參數表示要爬取的分塊:

curl http://scrapy1.mycompany.com:6800/schedule.json -d project=myproject -d spider=spider1 -d part=1
curl http://scrapy2.mycompany.com:6800/schedule.json -d project=myproject -d spider=spider1 -d part=2
curl http://scrapy3.mycompany.com:6800/schedule.json -d project=myproject -d spider=spider1 -d part=3

scrapy-redis分佈式爬蟲

https://github.com/rmax/scrapy-redis
Redis 命令參考
http://redisdoc.com/

pip install scrapy-redis

Scrapy 是一個通用的爬蟲框架,可是不支持分佈式,Scrapy-redis是爲了更方便地實現Scrapy分佈式爬取,而提供了一些以redis爲基礎的組件(僅有組件)。

Scrapy-redis提供了下面四種組件(components):(四種組件意味着這四個模塊都要作相應的修改)

  • Scheduler
  • Duplication Filter
  • Item Pipeline
  • Base Spider

如上圖所⽰示,scrapy-redis在scrapy的架構上增長了redis,基於redis的特性拓展了以下組件:

  • Scheduler
    Scrapy改造了python原本的collection.deque(雙向隊列)造成了本身的Scrapy queue(https://github.com/scrapy/queuelib/blob/master/queuelib/queue.py)),可是Scrapy多個spider不能共享待爬取隊列Scrapy queue, 即Scrapy自己不支持爬蟲分佈式,scrapy-redis 的解決是把這個Scrapy queue換成redis數據庫(也是指redis隊列),從同一個redis-server存放要爬取的request,便能讓多個spider去同一個數據庫裏讀取。

Scrapy中跟「待爬隊列」直接相關的就是調度器Scheduler,它負責對新的request進行入列操做(加入Scrapy queue),取出下一個要爬取的request(從Scrapy queue中取出)等操做。它把待爬隊列按照優先級創建了一個字典結構,好比:

{
        優先級0 : 隊列0
        優先級1 : 隊列1
        優先級2 : 隊列2
    }

而後根據request中的優先級,來決定該入哪一個隊列,出列時則按優先級較小的優先出列。爲了管理這個比較高級的隊列字典,Scheduler須要提供一系列的方法。可是原來的Scheduler已經沒法使用,因此使用Scrapy-redisscheduler組件。

  • Duplication Filter
    Scrapy中用集合實現這個request去重功能,Scrapy中把已經發送的request指紋放入到一個集合中,把下一個request的指紋拿到集合中比對,若是該指紋存在於集合中,說明這個request發送過了,若是沒有則繼續操做。這個核心的判重功能是這樣實現的:
def request_seen(self, request):
        # self.request_figerprints就是一個指紋集合  
        fp = self.request_fingerprint(request)

        # 這就是判重的核心操做  
        if fp in self.fingerprints:
            return True
        self.fingerprints.add(fp)
        if self.file:
            self.file.write(fp + os.linesep)

scrapy-redis中去重是由Duplication Filter組件來實現的,它經過redisset不重複的特性,巧妙的實現了Duplication Filter去重。scrapy-redis調度器從引擎接受request,將request的指紋存⼊redisset檢查是否重複,並將不重複的request push寫⼊redisrequest queue

引擎請求request(Spider發出的)時,調度器從redisrequest queue隊列⾥里根據優先級pop 出⼀個request返回給引擎,引擎將此request發給spider處理。

  • Item Pipeline
    引擎將(Spider返回的)爬取到的Item給Item Pipelinescrapy-redisItem Pipeline將爬取到的 Item 存⼊redisitems queue
    修改後的Item Pipeline能夠很方便的根據 keyitems queue提取item,從⽽實現 items processes集羣。

  • Base Spider
    不在使用scrapy原有的Spider類,重寫的RedisSpider繼承了SpiderRedisMixin這兩個類,RedisMixin是用來從redis讀取url的類。
    當咱們生成一個Spider繼承RedisSpider時,調用setup_redis函數,這個函數會去鏈接redis數據庫,而後會設置signals(信號):

一個是當spider空閒時候的signal,會調用spider_idle函數,這個函數調用schedule_next_request函數,保證spider是一直活着的狀態,而且拋出DontCloseSpider異常。

一個是當抓到一個item時的signal,會調用item_scraped函數,這個函數會調用schedule_next_request函數,獲取下一個request

scrapy-redis源碼分析參考

scrapy-redis的源碼並很少,工程的主體仍是是redis和scrapy兩個庫,工程自己實現的東西不是不少,這個工程就像膠水同樣,把這兩個插件粘結了起來。下面咱們來看看,scrapy-redis的每個源代碼文件都實現了什麼功能,最後如何實現分佈式的爬蟲系統

connection.py

負責根據setting中配置實例化redis鏈接。被dupefilterscheduler調用,總之涉及到redis存取的都要使用到這個模塊。

import six

from scrapy.utils.misc import load_object

from . import defaults

# 鏈接redis數據庫
# Shortcut maps 'setting name' -> 'parmater name'.
SETTINGS_PARAMS_MAP = {
    'REDIS_URL': 'url',
    'REDIS_HOST': 'host',
    'REDIS_PORT': 'port',
    'REDIS_ENCODING': 'encoding',
}


def get_redis_from_settings(settings):
    """Returns a redis client instance from given Scrapy settings object.

    This function uses ``get_client`` to instantiate the client and uses
    ``defaults.REDIS_PARAMS`` global as defaults values for the parameters. You
    can override them using the ``REDIS_PARAMS`` setting.

    Parameters
    ----------
    settings : Settings
        A scrapy settings object. See the supported settings below.

    Returns
    -------
    server
        Redis client instance.

    Other Parameters
    ----------------
    REDIS_URL : str, optional
        Server connection URL.
    REDIS_HOST : str, optional
        Server host.
    REDIS_PORT : str, optional
        Server port.
    REDIS_ENCODING : str, optional
        Data encoding.
    REDIS_PARAMS : dict, optional
        Additional client parameters.

    """
    params = defaults.REDIS_PARAMS.copy()
    params.update(settings.getdict('REDIS_PARAMS'))
    # XXX: Deprecate REDIS_* settings.
    for source, dest in SETTINGS_PARAMS_MAP.items():
        val = settings.get(source)
        if val:
            params[dest] = val

    # Allow ``redis_cls`` to be a path to a class.
    if isinstance(params.get('redis_cls'), six.string_types):
        params['redis_cls'] = load_object(params['redis_cls'])

    return get_redis(**params)


# Backwards compatible alias.
from_settings = get_redis_from_settings


def get_redis(**kwargs):
    """Returns a redis client instance.

    Parameters
    ----------
    redis_cls : class, optional
        Defaults to ``redis.StrictRedis``.
    url : str, optional
        If given, ``redis_cls.from_url`` is used to instantiate the class.
    **kwargs
        Extra parameters to be passed to the ``redis_cls`` class.

    Returns
    -------
    server
        Redis client instance.

    """
    redis_cls = kwargs.pop('redis_cls', defaults.REDIS_CLS)
    url = kwargs.pop('url', None)
    if url:
        return redis_cls.from_url(url, **kwargs)
    else:
        return redis_cls(**kwargs)
defaults.py

scrapy-redis默認配置

import redis


# For standalone use.
DUPEFILTER_KEY = 'dupefilter:%(timestamp)s'

PIPELINE_KEY = '%(spider)s:items'

REDIS_CLS = redis.StrictRedis
REDIS_ENCODING = 'utf-8'
# Sane connection defaults.
# 套接字的超時時間、等待時間等
REDIS_PARAMS = {
    'socket_timeout': 30,
    'socket_connect_timeout': 30,
    'retry_on_timeout': True,
    'encoding': REDIS_ENCODING,
}

SCHEDULER_QUEUE_KEY = '%(spider)s:requests'
SCHEDULER_QUEUE_CLASS = 'scrapy_redis.queue.PriorityQueue'
SCHEDULER_DUPEFILTER_KEY = '%(spider)s:dupefilter'
SCHEDULER_DUPEFILTER_CLASS = 'scrapy_redis.dupefilter.RFPDupeFilter'

START_URLS_KEY = '%(name)s:start_urls'
START_URLS_AS_SET = False
dupefilter.py

負責執行requst的去重,實現的頗有技巧性,使用redisset數據結構。可是注意scheduler並不使用其中用於在這個模塊中實現的dupefilter鍵作request的調度,而是使用queue.py模塊中實現的queue

request不重複時,將其存入到queue中,調度時將其彈出。

import logging
import time

from scrapy.dupefilters import BaseDupeFilter
from scrapy.utils.request import request_fingerprint

from . import defaults
from .connection import get_redis_from_settings


logger = logging.getLogger(__name__)


# TODO: Rename class to RedisDupeFilter.
class RFPDupeFilter(BaseDupeFilter):
    """Redis-based request duplicates filter.

    This class can also be used with default Scrapy's scheduler.

    """

    logger = logger

    def __init__(self, server, key, debug=False):
        """Initialize the duplicates filter.

        Parameters
        ----------
        server : redis.StrictRedis
            The redis server instance.
        key : str
            Redis key Where to store fingerprints.
        debug : bool, optional
            Whether to log filtered requests.

        """
        self.server = server
        self.key = key
        self.debug = debug
        self.logdupes = True

    @classmethod
    def from_settings(cls, settings):
        """Returns an instance from given settings.

        This uses by default the key ``dupefilter:<timestamp>``. When using the
        ``scrapy_redis.scheduler.Scheduler`` class, this method is not used as
        it needs to pass the spider name in the key.

        Parameters
        ----------
        settings : scrapy.settings.Settings

        Returns
        -------
        RFPDupeFilter
            A RFPDupeFilter instance.


        """
        server = get_redis_from_settings(settings)
        # XXX: This creates one-time key. needed to support to use this
        # class as standalone dupefilter with scrapy's default scheduler
        # if scrapy passes spider on open() method this wouldn't be needed
        # TODO: Use SCRAPY_JOB env as default and fallback to timestamp.
        key = defaults.DUPEFILTER_KEY % {'timestamp': int(time.time())}
        debug = settings.getbool('DUPEFILTER_DEBUG')
        return cls(server, key=key, debug=debug)

    @classmethod
    def from_crawler(cls, crawler):
        """Returns instance from crawler.

        Parameters
        ----------
        crawler : scrapy.crawler.Crawler

        Returns
        -------
        RFPDupeFilter
            Instance of RFPDupeFilter.

        """
        return cls.from_settings(crawler.settings)

    def request_seen(self, request):
        """Returns True if request was already seen.

        Parameters
        ----------
        request : scrapy.http.Request

        Returns
        -------
        bool

        """
        fp = self.request_fingerprint(request)
        # This returns the number of values added, zero if already exists.
        added = self.server.sadd(self.key, fp)
        return added == 0

    def request_fingerprint(self, request):
        """Returns a fingerprint for a given request.

        Parameters
        ----------
        request : scrapy.http.Request

        Returns
        -------
        str

        """
        return request_fingerprint(request)

    @classmethod
    def from_spider(cls, spider):
        settings = spider.settings
        server = get_redis_from_settings(settings)
        dupefilter_key = settings.get("SCHEDULER_DUPEFILTER_KEY", defaults.SCHEDULER_DUPEFILTER_KEY)
        key = dupefilter_key % {'spider': spider.name}
        debug = settings.getbool('DUPEFILTER_DEBUG')
        return cls(server, key=key, debug=debug)

    def close(self, reason=''):
        """Delete data on close. Called by Scrapy's scheduler.

        Parameters
        ----------
        reason : str, optional

        """
        self.clear()

    def clear(self):
        """Clears fingerprints data."""
        self.server.delete(self.key)

    def log(self, request, spider):
        """Logs given request.

        Parameters
        ----------
        request : scrapy.http.Request
        spider : scrapy.spiders.Spider

        """
        if self.debug:
            msg = "Filtered duplicate request: %(request)s"
            self.logger.debug(msg, {'request': request}, extra={'spider': spider})
        elif self.logdupes:
            msg = ("Filtered duplicate request %(request)s"
                   " - no more duplicates will be shown"
                   " (see DUPEFILTER_DEBUG to show all duplicates)")
            self.logger.debug(msg, {'request': request}, extra={'spider': spider})
            self.logdupes = False

這個文件看起來比較複雜,重寫了scrapy自己已經實現的request判重功能。由於自己scrapy單機跑的話,只須要讀取內存中的request隊列或者持久化的request隊列,就能判斷此次要發出的request url是否已經請求過或者正在調度(本地讀就好了)。而分佈式跑的話,就須要各個主機上的scheduler都鏈接同一個數據庫的同一個request池來判斷此次的請求是不是重複的了。

在這個文件中,經過繼承BaseDupeFilter重寫他的方法,實現了基於redis的判重。根據源代碼來看,scrapy-redis使用了scrapy自己的一個fingerprintrequest_fingerprint,這個函數在前面去重原理中已經說過了.

這個類經過鏈接redis,使用一個key來向redis的一個set中插入fingerprint(這個key對於同一種spider是相同的,redis是一個key-value的數據庫,若是key是相同的,訪問到的值就是相同的,這裏使用spider名字+DupeFilter的key就是爲了在不一樣主機上的不一樣爬蟲實例,只要屬於同一種spider,就會訪問到同一個set,而這個set就是他們的url判重池),若是返回值爲0,說明該set中該fingerprint已經存在(由於集合是沒有重複值的),則返回False,若是返回值爲1,說明添加了一個fingerprintset中,則說明這個request沒有重複,因而返回True,還順便把新fingerprint加入到數據庫中了。 DupeFilter判重會在scheduler類中用到,每個request在進入調度以前都要進行判重,若是重複就不須要參加調度,直接捨棄就行了,否則就是白白浪費資源。

picklecompat.py
"""A pickle wrapper module with protocol=-1 by default."""

try:
    import cPickle as pickle  # PY2
except ImportError:
    import pickle


def loads(s):
    return pickle.loads(s)


def dumps(obj):
    return pickle.dumps(obj, protocol=-1)

這裏實現了loadsdumps兩個函數,其實就是實現了一個序列化器。

由於redis數據庫不能存儲複雜對象(key部分只能是字符串,value部分只能是字符串,字符串列表,字符串集合和hash),因此咱們存啥都要先串行化成文本才行。

這裏使用的就是pythonpickle模塊,一個兼容py2和py3的串行化工具。這個serializer主要用於一會的schedulerreuqest對象。

pipelines.py

這是是用來實現分佈式處理的做用。它將Item存儲在redis中以實現分佈式處理。因爲在這裏須要讀取配置,因此就用到了from_crawler()函數。

from scrapy.utils.misc import load_object
from scrapy.utils.serialize import ScrapyJSONEncoder
from twisted.internet.threads import deferToThread

from . import connection, defaults


default_serialize = ScrapyJSONEncoder().encode


class RedisPipeline(object):
    """Pushes serialized item into a redis list/queue

    Settings
    --------
    REDIS_ITEMS_KEY : str
        Redis key where to store items.
    REDIS_ITEMS_SERIALIZER : str
        Object path to serializer function.

    """

    def __init__(self, server,
                 key=defaults.PIPELINE_KEY,
                 serialize_func=default_serialize):
        """Initialize pipeline.

        Parameters
        ----------
        server : StrictRedis
            Redis client instance.
        key : str
            Redis key where to store items.
        serialize_func : callable
            Items serializer function.

        """
        self.server = server
        self.key = key
        self.serialize = serialize_func

    @classmethod
    def from_settings(cls, settings):
        params = {
            'server': connection.from_settings(settings),
        }
        if settings.get('REDIS_ITEMS_KEY'):
            params['key'] = settings['REDIS_ITEMS_KEY']
        if settings.get('REDIS_ITEMS_SERIALIZER'):
            params['serialize_func'] = load_object(
                settings['REDIS_ITEMS_SERIALIZER']
            )

        return cls(**params)

    @classmethod
    def from_crawler(cls, crawler):
        return cls.from_settings(crawler.settings)

    def process_item(self, item, spider):
        return deferToThread(self._process_item, item, spider)

    def _process_item(self, item, spider):
        key = self.item_key(item, spider)
        data = self.serialize(item)
        self.server.rpush(key, data)
        return item

    def item_key(self, item, spider):
        """Returns redis key based on given spider.

        Override this function to use a different key depending on the item
        and/or spider.

        """
        return self.key % {'spider': spider.name}

pipelines文件實現了一個item pipieline類,和scrapyitem pipeline是同一個對象,經過從settings中拿到咱們配置的REDIS_ITEMS_KEY做爲key,把item串行化以後存入redis數據庫對應的value中(這個value能夠看出出是個list,咱們的每一個item是這個list中的一個結點),這個pipeline把提取出的item存起來,主要是爲了方便咱們後續處理數據。(集中處理放在同一臺服務器,仍是各自保存各自的)

queue.py

該文件實現了幾個容器類,這些容器與redis進行交互,在交互時,會對request請求進行編碼和解碼操做(序列化和反序列化)

from scrapy.utils.reqser import request_to_dict, request_from_dict

from . import picklecompat


class Base(object):
    """Per-spider base queue class"""

    def __init__(self, server, spider, key, serializer=None):
        """Initialize per-spider redis queue.

        Parameters
        ----------
        server : StrictRedis
            Redis client instance.
        spider : Spider
            Scrapy spider instance.
        key: str
            Redis key where to put and get messages.
        serializer : object
            Serializer object with ``loads`` and ``dumps`` methods.

        """
        if serializer is None:
            # Backward compatibility.
            # TODO: deprecate pickle.
            serializer = picklecompat
        if not hasattr(serializer, 'loads'):
            raise TypeError("serializer does not implement 'loads' function: %r"
                            % serializer)
        if not hasattr(serializer, 'dumps'):
            raise TypeError("serializer '%s' does not implement 'dumps' function: %r"
                            % serializer)

        self.server = server
        self.spider = spider
        self.key = key % {'spider': spider.name}
        self.serializer = serializer

    def _encode_request(self, request):
        """Encode a request object"""
        obj = request_to_dict(request, self.spider)
        return self.serializer.dumps(obj)

    def _decode_request(self, encoded_request):
        """Decode an request previously encoded"""
        obj = self.serializer.loads(encoded_request)
        return request_from_dict(obj, self.spider)

    def __len__(self):
        """Return the length of the queue"""
        raise NotImplementedError

    def push(self, request):
        """Push a request"""
        raise NotImplementedError

    def pop(self, timeout=0):
        """Pop a request"""
        raise NotImplementedError

    def clear(self):
        """Clear queue/stack"""
        self.server.delete(self.key)

# 先進先出, 隊列
class FifoQueue(Base):
    """Per-spider FIFO queue"""

    def __len__(self):
        """Return the length of the queue"""
        return self.server.llen(self.key)

    # 壓頭, 出尾 
    def push(self, request):
        """Push a request"""
        self.server.lpush(self.key, self._encode_request(request))

    def pop(self, timeout=0):
        """Pop a request"""
        if timeout > 0:
            data = self.server.brpop(self.key, timeout)
            if isinstance(data, tuple):
                data = data[1]
        else:
            data = self.server.rpop(self.key)
        if data:
            return self._decode_request(data)

# 有序隊列
class PriorityQueue(Base):
    """Per-spider priority queue abstraction using redis' sorted set"""

    def __len__(self):
        """Return the length of the queue"""
        return self.server.zcard(self.key)

    def push(self, request):
        """Push a request"""
        data = self._encode_request(request)
        score = -request.priority
        # We don't use zadd method as the order of arguments change depending on
        # whether the class is Redis or StrictRedis, and the option of using
        # kwargs only accepts strings, not bytes.
        self.server.execute_command('ZADD', self.key, score, data)

    def pop(self, timeout=0):
        """
        Pop a request
        timeout not support in this queue class
        """
        # use atomic range/remove using multi/exec
        pipe = self.server.pipeline()
        pipe.multi()
        pipe.zrange(self.key, 0, 0).zremrangebyrank(self.key, 0, 0)
        results, count = pipe.execute()
        if results:
            return self._decode_request(results[0])

# 後進先出 棧
class LifoQueue(Base):
    """Per-spider LIFO queue."""

    def __len__(self):
        """Return the length of the stack"""
        return self.server.llen(self.key)

    # 壓頭, 出頭
    def push(self, request):
        """Push a request"""
        self.server.lpush(self.key, self._encode_request(request))

    def pop(self, timeout=0):
        """Pop a request"""
        if timeout > 0:
            data = self.server.blpop(self.key, timeout)
            if isinstance(data, tuple):
                data = data[1]
        else:
            data = self.server.lpop(self.key)

        if data:
            return self._decode_request(data)


# TODO: Deprecate the use of these names.
SpiderQueue = FifoQueue
SpiderStack = LifoQueue
SpiderPriorityQueue = PriorityQueue
scheduler.py

此擴展是對scrapy中自帶的scheduler的替代(在settingsSCHEDULER變量中指出),正是利用此擴展實現crawler的分佈式調度。其利用的數據結構來自於queue中實現的數據結構。

scrapy-redis所實現的兩種分佈式:爬蟲分佈式以及item處理分佈式就是由模塊scheduler和模塊pipelines實現。上述其它模塊做爲爲兩者輔助的功能模塊

import importlib
import six

from scrapy.utils.misc import load_object

from . import connection, defaults


# TODO: add SCRAPY_JOB support.
class Scheduler(object):
    """Redis-based scheduler

    Settings
    --------
    SCHEDULER_PERSIST : bool (default: False)
        Whether to persist or clear redis queue.
    SCHEDULER_FLUSH_ON_START : bool (default: False)
        Whether to flush redis queue on start.
    SCHEDULER_IDLE_BEFORE_CLOSE : int (default: 0)
        How many seconds to wait before closing if no message is received.
    SCHEDULER_QUEUE_KEY : str
        Scheduler redis key.
    SCHEDULER_QUEUE_CLASS : str
        Scheduler queue class.
    SCHEDULER_DUPEFILTER_KEY : str
        Scheduler dupefilter redis key.
    SCHEDULER_DUPEFILTER_CLASS : str
        Scheduler dupefilter class.
    SCHEDULER_SERIALIZER : str
        Scheduler serializer.

    """

    def __init__(self, server,
                 persist=False,
                 flush_on_start=False,
                 queue_key=defaults.SCHEDULER_QUEUE_KEY,
                 queue_cls=defaults.SCHEDULER_QUEUE_CLASS,
                 dupefilter_key=defaults.SCHEDULER_DUPEFILTER_KEY,
                 dupefilter_cls=defaults.SCHEDULER_DUPEFILTER_CLASS,
                 idle_before_close=0,
                 serializer=None):
        """Initialize scheduler.

        Parameters
        ----------
        server : Redis
            The redis server instance.
        persist : bool
            Whether to flush requests when closing. Default is False.
        flush_on_start : bool
            Whether to flush requests on start. Default is False.
        queue_key : str
            Requests queue key.
        queue_cls : str
            Importable path to the queue class.
        dupefilter_key : str
            Duplicates filter key.
        dupefilter_cls : str
            Importable path to the dupefilter class.
        idle_before_close : int
            Timeout before giving up.

        """
        if idle_before_close < 0:
            raise TypeError("idle_before_close cannot be negative")

        self.server = server
        self.persist = persist
        self.flush_on_start = flush_on_start
        self.queue_key = queue_key
        self.queue_cls = queue_cls
        self.dupefilter_cls = dupefilter_cls
        self.dupefilter_key = dupefilter_key
        self.idle_before_close = idle_before_close
        self.serializer = serializer
        self.stats = None

    def __len__(self):
        return len(self.queue)

    @classmethod
    def from_settings(cls, settings):
        kwargs = {
            'persist': settings.getbool('SCHEDULER_PERSIST'),
            'flush_on_start': settings.getbool('SCHEDULER_FLUSH_ON_START'),
            'idle_before_close': settings.getint('SCHEDULER_IDLE_BEFORE_CLOSE'),
        }

        # If these values are missing, it means we want to use the defaults.
        optional = {
            # TODO: Use custom prefixes for this settings to note that are
            # specific to scrapy-redis.
            'queue_key': 'SCHEDULER_QUEUE_KEY',
            'queue_cls': 'SCHEDULER_QUEUE_CLASS',
            'dupefilter_key': 'SCHEDULER_DUPEFILTER_KEY',
            # We use the default setting name to keep compatibility.
            'dupefilter_cls': 'DUPEFILTER_CLASS',
            'serializer': 'SCHEDULER_SERIALIZER',
        }
        for name, setting_name in optional.items():
            val = settings.get(setting_name)
            if val:
                kwargs[name] = val

        # Support serializer as a path to a module.
        if isinstance(kwargs.get('serializer'), six.string_types):
            kwargs['serializer'] = importlib.import_module(kwargs['serializer'])

        server = connection.from_settings(settings)
        # Ensure the connection is working.
        server.ping()

        return cls(server=server, **kwargs)

    @classmethod
    def from_crawler(cls, crawler):
        instance = cls.from_settings(crawler.settings)
        # FIXME: for now, stats are only supported from this constructor
        instance.stats = crawler.stats
        return instance

    def open(self, spider):
        self.spider = spider

        try:
            self.queue = load_object(self.queue_cls)(
                server=self.server,
                spider=spider,
                key=self.queue_key % {'spider': spider.name},
                serializer=self.serializer,
            )
        except TypeError as e:
            raise ValueError("Failed to instantiate queue class '%s': %s",
                             self.queue_cls, e)

        self.df = load_object(self.dupefilter_cls).from_spider(spider)

        if self.flush_on_start:
            self.flush()
        # notice if there are requests already in the queue to resume the crawl
        if len(self.queue):
            spider.log("Resuming crawl (%d requests scheduled)" % len(self.queue))

    def close(self, reason):
        if not self.persist:
            self.flush()

    def flush(self):
        self.df.clear()
        self.queue.clear()

    def enqueue_request(self, request):
        if not request.dont_filter and self.df.request_seen(request):
            self.df.log(request, self.spider)
            return False
        if self.stats:
            self.stats.inc_value('scheduler/enqueued/redis', spider=self.spider)
        self.queue.push(request)
        return True

    def next_request(self):
        block_pop_timeout = self.idle_before_close
        request = self.queue.pop(block_pop_timeout)
        if request and self.stats:
            self.stats.inc_value('scheduler/dequeued/redis', spider=self.spider)
        return request

    def has_pending_requests(self):
        return len(self) > 0

這個文件重寫了scheduler類,用來代替scrapy.core.scheduler的原有調度器。其實對原有調度器的邏輯沒有很大的改變,主要是使用了redis做爲數據存儲的媒介,以達到各個爬蟲之間的統一調度。scheduler負責調度各個spiderrequest請求scheduler初始化時,經過settings文件讀取queuedupefilters的類型(通常就用上邊默認的),配置queuedupefilters使用的key(通常就是spider name加上queue或者dupefilters,這樣對於同一種spider的不一樣實例,就會使用相同的數據塊了)。每當一個request要被調度時,enqueue_request被調用,scheduler使用dupefilters來判斷這個url是否重複,若是不重複,就添加到queue的容器中(先進先出,先進後出和優先級均可以,能夠在settings中配置)。當調度完成時,next_request被調用,scheduler就經過queue容器的接口,取出一個request,把他發送給相應的spider,讓spider進行爬取工做。

spiders.py

設計的這個spider從redis中讀取要爬的url,而後執行爬取,若爬取過程當中返回更多的url,那麼繼續進行直至全部的request完成。以後繼續從redis中讀取url,循環這個過程。

分析:在這個spider中經過signals.spider_idle(空閒)信號實現對crawler狀態的監視。當idle時,返回新的make_requests_from_url(url)給引擎,進而交給調度器調度。

from scrapy import signals
from scrapy.exceptions import DontCloseSpider
from scrapy.spiders import Spider, CrawlSpider

from . import connection, defaults
from .utils import bytes_to_str


class RedisMixin(object):
    """Mixin class to implement reading urls from a redis queue."""
    redis_key = None
    redis_batch_size = None
    redis_encoding = None

    # Redis client placeholder.
    server = None

    def start_requests(self):
        """Returns a batch of start requests from redis."""
        return self.next_requests()

    def setup_redis(self, crawler=None):
        """Setup redis connection and idle signal.

        This should be called after the spider has set its crawler object.
        """
        if self.server is not None:
            return

        if crawler is None:
            # We allow optional crawler argument to keep backwards
            # compatibility.
            # XXX: Raise a deprecation warning.
            crawler = getattr(self, 'crawler', None)

        if crawler is None:
            raise ValueError("crawler is required")

        settings = crawler.settings

        if self.redis_key is None:
            self.redis_key = settings.get(
                'REDIS_START_URLS_KEY', defaults.START_URLS_KEY,
            )

        self.redis_key = self.redis_key % {'name': self.name}

        if not self.redis_key.strip():
            raise ValueError("redis_key must not be empty")

        if self.redis_batch_size is None:
            # TODO: Deprecate this setting (REDIS_START_URLS_BATCH_SIZE).
            self.redis_batch_size = settings.getint(
                'REDIS_START_URLS_BATCH_SIZE',
                settings.getint('CONCURRENT_REQUESTS'),
            )

        try:
            self.redis_batch_size = int(self.redis_batch_size)
        except (TypeError, ValueError):
            raise ValueError("redis_batch_size must be an integer")

        if self.redis_encoding is None:
            self.redis_encoding = settings.get('REDIS_ENCODING', defaults.REDIS_ENCODING)

        self.logger.info("Reading start URLs from redis key '%(redis_key)s' "
                         "(batch size: %(redis_batch_size)s, encoding: %(redis_encoding)s",
                         self.__dict__)

        self.server = connection.from_settings(crawler.settings)
        # The idle signal is called when the spider has no requests left,
        # that's when we will schedule new requests from redis queue
        crawler.signals.connect(self.spider_idle, signal=signals.spider_idle)

    def next_requests(self):
        """Returns a request to be scheduled or none."""
        use_set = self.settings.getbool('REDIS_START_URLS_AS_SET', defaults.START_URLS_AS_SET)
        fetch_one = self.server.spop if use_set else self.server.lpop
        # XXX: Do we need to use a timeout here?
        found = 0
        # TODO: Use redis pipeline execution.
        while found < self.redis_batch_size:
            data = fetch_one(self.redis_key)
            if not data:
                # Queue empty.
                break
            req = self.make_request_from_data(data)
            if req:
                yield req
                found += 1
            else:
                self.logger.debug("Request not made from data: %r", data)

        if found:
            self.logger.debug("Read %s requests from '%s'", found, self.redis_key)

    def make_request_from_data(self, data):
        """Returns a Request instance from data coming from Redis.

        By default, ``data`` is an encoded URL. You can override this method to
        provide your own message decoding.

        Parameters
        ----------
        data : bytes
            Message from redis.

        """
        url = bytes_to_str(data, self.redis_encoding)
        return self.make_requests_from_url(url)

    def schedule_next_requests(self):
        """Schedules a request if available"""
        # TODO: While there is capacity, schedule a batch of redis requests.
        for req in self.next_requests():
            self.crawler.engine.crawl(req, spider=self)

    def spider_idle(self):
        """Schedules a request if available, otherwise waits."""
        # XXX: Handle a sentinel to close the spider.
        self.schedule_next_requests()
        raise DontCloseSpider


class RedisSpider(RedisMixin, Spider):
    """Spider that reads urls from redis queue when idle.

    Attributes
    ----------
    redis_key : str (default: REDIS_START_URLS_KEY)
        Redis key where to fetch start URLs from..
    redis_batch_size : int (default: CONCURRENT_REQUESTS)
        Number of messages to fetch from redis on each attempt.
    redis_encoding : str (default: REDIS_ENCODING)
        Encoding to use when decoding messages from redis queue.

    Settings
    --------
    REDIS_START_URLS_KEY : str (default: "<spider.name>:start_urls")
        Default Redis key where to fetch start URLs from..
    REDIS_START_URLS_BATCH_SIZE : int (deprecated by CONCURRENT_REQUESTS)
        Default number of messages to fetch from redis on each attempt.
    REDIS_START_URLS_AS_SET : bool (default: False)
        Use SET operations to retrieve messages from the redis queue. If False,
        the messages are retrieve using the LPOP command.
    REDIS_ENCODING : str (default: "utf-8")
        Default encoding to use when decoding messages from redis queue.

    """

    @classmethod
    def from_crawler(self, crawler, *args, **kwargs):
        obj = super(RedisSpider, self).from_crawler(crawler, *args, **kwargs)
        obj.setup_redis(crawler)
        return obj


class RedisCrawlSpider(RedisMixin, CrawlSpider):
    """Spider that reads urls from redis queue when idle.

    Attributes
    ----------
    redis_key : str (default: REDIS_START_URLS_KEY)
        Redis key where to fetch start URLs from..
    redis_batch_size : int (default: CONCURRENT_REQUESTS)
        Number of messages to fetch from redis on each attempt.
    redis_encoding : str (default: REDIS_ENCODING)
        Encoding to use when decoding messages from redis queue.

    Settings
    --------
    REDIS_START_URLS_KEY : str (default: "<spider.name>:start_urls")
        Default Redis key where to fetch start URLs from..
    REDIS_START_URLS_BATCH_SIZE : int (deprecated by CONCURRENT_REQUESTS)
        Default number of messages to fetch from redis on each attempt.
    REDIS_START_URLS_AS_SET : bool (default: True)
        Use SET operations to retrieve messages from the redis queue.
    REDIS_ENCODING : str (default: "utf-8")
        Default encoding to use when decoding messages from redis queue.

    """

    @classmethod
    def from_crawler(self, crawler, *args, **kwargs):
        obj = super(RedisCrawlSpider, self).from_crawler(crawler, *args, **kwargs)
        obj.setup_redis(crawler)
        return obj

spider的改動也不是很大,主要是經過connect接口,給spider綁定了spider_idle信號,spider初始化時,經過setup_redis函數初始化和redis的鏈接,以後經過next_requests函數從redis中取出strat url,使用的keysettingsREDIS_START_URLS_AS_SET定義的(注意了這裏的初始化url池和咱們上邊的queueurl池不是一個東西,queue的池是用於調度的,初始化url池是存放入口url的,他們都存在redis中,可是使用不一樣的key來區分,就當成是不一樣的表吧),spider使用少許的start url,能夠發展出不少新的url,這些url會進入scheduler進行判重和調度。直到spider跑到調度池內沒有url的時候,會觸發spider_idle信號,從而觸發spidernext_requests函數,再次從redisstart url池中讀取一些url

utils.py

py2和py3字符串兼容

import six


def bytes_to_str(s, encoding='utf-8'):
    """Returns a str if a bytes object is given."""
    if six.PY3 and isinstance(s, bytes):
        return s.decode(encoding)
    return s

總結

這個工程經過重寫schedulerspider類,實現了調度spider啓動redis的交互。實現新的dupefilterqueue類,達到了判重調度容器redis的交互,由於每一個主機上的爬蟲進程都訪問同一個redis數據庫,因此調度和判重都統一進行統一管理,達到了分佈式爬蟲的目的。 當spider被初始化時,同時會初始化一個對應的scheduler對象,這個調度器對象經過讀取settings,配置好本身的調度容器queue和判重工具dupefilter。每當一個spider產出一個request的時候,scrapy內核會把這個reuqest遞交給這個spider對應的scheduler對象進行調度,scheduler對象經過訪問redisrequest進行判重,若是不重複就把他添加進redis中的調度池。當調度條件知足時,scheduler對象就從redis的調度池中取出一個request發送給spider,讓他爬取。當spider爬取的全部暫時可用url以後,scheduler發現這個spider對應的redis的調度池空了,因而觸發信號spider_idlespider收到這個信號以後,直接鏈接redis讀取strart url池,拿去新的一批url入口,而後再次重複上邊的工做。

Scrapy-Redis調度的任務是Request對象,裏面信息量比較大(不只包含url,還有callback函數、headers等信息),可能致使的結果就是會下降爬蟲速度、並且會佔用Redis大量的存儲空間,因此若是要保證效率,那麼就須要必定硬件水平,尤爲是主機。

Bloom Filter

https://piaosanlang.gitbooks.io/spiders/09day/section9.1.html

https://pypi.org/project/pybloomfiltermmap3/#description

https://pypi.org/project/pybloom_live

scrapy-redis去重

scrapy_redis是利用set數據結構來去重的,去重的對象是requestfingerprint
去重原理說過了.

def request_seen(self, request):
        fp = self.request_fingerprint(request)
        # This returns the number of values added, zero if already exists.
        added = self.server.sadd(self.key, fp)
        return added == 0

若是要使用Bloomfilter優化,能夠修改去重函數request_seen

def request_seen(self, request):
    fp = self.request_fingerprint(request)
    if self.bf.isContains(fp):    # 若是已經存在
        return True
    else:
        self.bf.insert(fp)
        return False

self.bf是類Bloomfilter()的實例化

# encoding=utf-8

import redis
from hashlib import md5


class SimpleHash(object):
    def __init__(self, cap, seed):
        self.cap = cap
        self.seed = seed

    def hash(self, value):
        ret = 0
        for i in range(len(value)):
            ret += self.seed * ret + ord(value[i])
        return (self.cap - 1) & ret


class BloomFilter(object):
    def __init__(self, host='localhost', port=6379, db=0, blockNum=1, key='bloomfilter'):
        """
        :param host: the host of Redis
        :param port: the port of Redis
        :param db: witch db in Redis
        :param blockNum: one blockNum for about 90,000,000; if you have more strings for filtering, increase it.
        :param key: the key's name in Redis
        """
        self.server = redis.Redis(host=host, port=port, db=db)
        self.bit_size = 1 << 31  # Redis的String類型最大容量爲512M,現使用256M= 2^8 *2^20 字節 = 2^28 * 2^3bit
        self.seeds = [5, 7, 11, 13, 31, 37, 61]
        self.key = key
        self.blockNum = blockNum
        self.hashfunc = []
        for seed in self.seeds:
            self.hashfunc.append(SimpleHash(self.bit_size, seed))

    def isContains(self, str_input):
        if not str_input:
            return False
        m5 = md5()
        m5.update(str_input)
        str_input = m5.hexdigest()
        ret = True
        name = self.key + str(int(str_input[0:2], 16) % self.blockNum)
        for f in self.hashfunc:
            loc = f.hash(str_input)
            ret = ret & self.server.getbit(name, loc)
        return ret

    def insert(self, str_input):
        m5 = md5()
        m5.update(str_input)
        str_input = m5.hexdigest()
        name = self.key + str(int(str_input[0:2], 16) % self.blockNum)
        for f in self.hashfunc:
            loc = f.hash(str_input)
            self.server.setbit(name, loc, 1)


if __name__ == '__main__':
""" 第一次運行時會顯示 not exists!,以後再運行會顯示 exists! """
    bf = BloomFilter()
    if bf.isContains('http://www.baidu.com'):   # 判斷字符串是否存在
        print 'exists!'
    else:
        print 'not exists!'
        bf.insert('http://www.baidu.com')

基於RedisBloomfilter去重,既用上了Bloomfilter的海量去重能力,又用上了Redis的可持久化能力,基於Redis也方便分佈式機器的去重

scrapyd部署scrapy

https://github.com/scrapy/scrapyd

擴展

如何防止死循環

在Scrapy的默認配置中,是根據url進行去重的。這個對付通常網站是夠的。可是有一些網站的SEO作的很變態:爲了讓爬蟲多抓,會根據request,動態的生成一些連接,致使爬蟲 在網站上抓取大量的隨機頁面,甚至是死循環。。
爲了解決這個問題,有2個方案:

(1) 在setting.py中,設定爬蟲的嵌套次數上限(全局設定,實際是經過DepthMiddleware實現的):

DEPTH_LIMIT = 20

(2) 在parse中經過讀取response來自行判斷(spider級別設定) :

def parse(self, response):
    if response.meta['depth'] > 100:
        print ('Loop?')
相關文章
相關標籤/搜索