Scrapy爬蟲去重效率優化之Bloom Filter的算法的對接

首先回顧一下Scrapy-Redis的去重機制。Scrapy-Redis將Request的指紋存儲到了Redis集合中,每一個指紋的長度爲40,例如27adcc2e8979cdee0c9cecbbe8bf8ff51edefb61就是一個指紋,它的每一位都是16進制數。
python

咱們計算一下用這種方式耗費的存儲空間。每一個十六進制數佔用4 b,1個指紋用40個十六進制數表示,佔用空間爲20 B,1萬個指紋即佔用空間200 KB,1億個指紋佔用2 GB。當爬取數量達到上億級別時,Redis的佔用的內存就會變得很大,並且這僅僅是指紋的存儲。Redis還存儲了爬取隊列,內存佔用會進一步提升,更別說有多個Scrapy項目同時爬取的狀況了。當爬取達到億級別規模時,Scrapy-Redis提供的集合去重已經不能知足咱們的要求。因此咱們須要使用一個更加節省內存的去重算法Bloom Filter。git

1. 瞭解Bloom Filter

Bloom Filter,中文名稱叫做布隆過濾器,是1970年由Bloom提出的,它能夠被用來檢測一個元素是否在一個集合中。Bloom Filter的空間利用效率很高,使用它能夠大大節省存儲空間。Bloom Filter使用位數組表示一個待檢測集合,並能夠快速地經過幾率算法判斷一個元素是否存在於這個集合中。利用這個算法咱們能夠實現去重效果。github

本節咱們來了解Bloom Filter的基本算法,以及Scrapy-Redis中對接Bloom Filter的方法。redis

2. Bloom Filter的算法

在Bloom Filter中使用位數組來輔助實現檢測判斷。在初始狀態下,咱們聲明一個包含m位的位數組,它的全部位都是0,以下圖所示。算法

如今咱們有了一個待檢測集合,其表示爲S={x1, x2, …, xn}。接下來須要作的就是檢測一個x是否已經存在於集合S中。在Bloom Filter算法中,首先使用k個相互獨立、隨機的散列函數來將集合S中的每一個元素x1, x2, …, xn映射到長度爲m的位數組上,散列函數獲得的結果記做位置索引,而後將位數組該位置索引的位置1。例如,咱們取k爲3,表示有三個散列函數,x1通過三個散列函數映射獲得的結果分別爲一、四、8,x2通過三個散列函數映射獲得的結果分別爲四、六、10,那麼位數組的一、四、六、八、10這五位就會置爲1,以下圖所示。數組

若是有一個新的元素x,咱們要判斷x是否屬於S集合,咱們仍然用k個散列函數對x求映射結果。若是全部結果對應的位數組位置均爲1,那麼x屬於S這個集合;若是有一個不爲1,則x不屬於S集合。架構

例如,新元素x通過三個散列函數映射的結果爲四、六、8,對應的位置均爲1,則x屬於S集合。若是結果爲四、六、7,而7對應的位置爲0,則x不屬於S集合。併發

注意,這裏m、n、k知足的關係是m>nk,也就是說位數組的長度m要比集合元素n和散列函數k的乘積還要大。app

這樣的斷定方法很高效,可是也是有代價的,它可能把不屬於這個集合的元素誤認爲屬於這個集合。咱們來估計一下這種方法的錯誤率。當集合S={x1, x2,…, xn} 的全部元素都被k個散列函數映射到m位的位數組中時,這個位數組中某一位仍是0的機率是:scrapy

散列函數是隨機的,則任意一個散列函數選中這一位的機率爲1/m,那麼1-1/m就表明散列函數從未沒有選中這一位的機率,要把S徹底映射到m位數組中,須要作kn次散列運算,最後的機率就是1-1/m的kn次方。

一個不屬於S的元素x若是誤斷定爲在S中,那麼這個機率就是k次散列運算獲得的結果對應的位數組位置都爲1,則誤判機率爲:

根據:

能夠將誤判機率轉化爲:

在給定m、n時,能夠求出使得f最小化的k值爲:

這裏將誤判機率概括以下:

表中第一列爲m/n的值,第二列爲最優k值,其後列爲不一樣k值的誤判機率。當k值肯定時,隨着m/n的增大,誤判機率逐漸變小。當m/n的值肯定時,當k越靠近最優K值,誤判機率越小。誤判機率整體來看都是極小的,在容忍此誤判機率的狀況下,大幅減少存儲空間和斷定速度是徹底值得的。

接下來,咱們將Bloom Filter算法應用到Scrapy-Redis分佈式爬蟲的去重過程當中,以解決Redis內存不足的問題。

3. 對接Scrapy-Redis

實現Bloom Filter時,首先要保證不能破壞Scrapy-Redis分佈式爬取的運行架構。咱們須要修改Scrapy-Redis的源碼,將它的去重類替換掉。同時,Bloom Filter的實現須要藉助於一個位數組,既然當前架構仍是依賴於Redis,那麼位數組的維護直接使用Redis就行了。

首先實現一個基本的散列算法,將一個值通過散列運算後映射到一個m位數組的某一位上,代碼以下:

class HashMap(object):
    def __init__(self, m, seed):
        self.m = m
        self.seed = seed
   
   def hash(self, value):        """        Hash Algorithm        :param value: Value        :return: Hash Value        """        ret = 0        for i in range(len(value)):            ret += self.seed * ret + ord(value[i])        
       return (self.m - 1) & ret

這裏新建了一個HashMap類。構造函數傳入兩個值,一個是m位數組的位數,另外一個是種子值seed。不一樣的散列函數須要有不一樣的seed,這樣能夠保證不一樣的散列函數的結果不會碰撞。

hash()方法的實現中,value是要被處理的內容。這裏遍歷了value的每一位,並利用ord()方法取到每一位的ASCII碼值,而後混淆seed進行迭代求和運算,最終獲得一個數值。這個數值的結果就由valueseed惟一肯定。咱們再將這個數值和m進行按位與運算,便可獲取到m位數組的映射結果,這樣就實現了一個由字符串和seed來肯定的散列函數。當m固定時,只要seed值相同,散列函數就是相同的,相同的value必然會映射到相同的位置。因此若是想要構造幾個不一樣的散列函數,只須要改變其seed就行了。以上內容即是一個簡易的散列函數的實現。

接下來咱們再實現Bloom Filter。Bloom Filter裏面須要用到k個散列函數,這裏要對這幾個散列函數指定相同的m值和不一樣的seed值,構造以下:

BLOOMFILTER_HASH_NUMBER = 6
BLOOMFILTER_BIT = 30

class BloomFilter(object):    def __init__(self, server, key, bit=BLOOMFILTER_BIT, hash_number=BLOOMFILTER_HASH_NUMBER):        """        Initialize BloomFilter        :param server: Redis Server        :param key: BloomFilter Key        :param bit: m = 2 ^ bit        :param hash_number: the number of hash function        """        # default to 1 << 30 = 10,7374,1824 = 2^30 = 128MB, max filter 2^30/hash_number = 1,7895,6970 fingerprints        self.m = 1 << bit        self.seeds = range(hash_number)        self.maps = [HashMap(self.m, seed) for seed in self.seeds]        self.server = server        self.key = key

因爲咱們須要億級別的數據的去重,即前文介紹的算法中的n爲1億以上,散列函數的個數k大約取10左右的量級。而m>kn,這裏m值大約保底在10億,因爲這個數值比較大,因此這裏用移位操做來實現,傳入位數bit,將其定義爲30,而後作一個移位操做1<<30,至關於2的30次方,等於1073741824,量級也是剛好在10億左右,因爲是位數組,因此這個位數組佔用的大小就是2^30 b=128 MB。開頭咱們計算過Scrapy-Redis集合去重的佔用空間大約在2 GB左右,可見Bloom Filter的空間利用效率極高。

隨後咱們再傳入散列函數的個數,用它來生成幾個不一樣的seed。用不一樣的seed來定義不一樣的散列函數,這樣咱們就能夠構造一個散列函數列表。遍歷seed,構造帶有不一樣seed值的HashMap對象,而後將HashMap對象保存成變量maps供後續使用。

另外,server就是Redis鏈接對象,key就是這個m位數組的名稱。

接下來,咱們要實現比較關鍵的兩個方法:一個是斷定元素是否重複的方法exists(),另外一個是添加元素到集合中的方法insert(),實現以下:

def exists(self, value):
    """    if value exists    :param value:    :return:    """
    if not value:        
       return False    exist = 1    for map in self.maps:        offset = map.hash(value)        exist = exist & self.server.getbit(self.key, offset)    
   return exist
   
def insert(self, value):    """    add value to bloom    :param value:    :return:    """    for f in self.maps:        offset = f.hash(value)        self.server.setbit(self.key, offset, 1)

首先看下insert()方法。Bloom Filter算法會逐個調用散列函數對放入集合中的元素進行運算,獲得在m位位數組中的映射位置,而後將位數組對應的位置置1。這裏代碼中咱們遍歷了初始化好的散列函數,而後調用其hash()方法算出映射位置offset,再利用Redis的setbit()方法將該位置1。

exists()方法中,咱們要實現斷定是否重複的邏輯,方法參數value爲待判斷的元素。咱們首先定義一個變量exist,遍歷全部散列函數對value進行散列運算,獲得映射位置,用getbit()方法取得該映射位置的結果,循環進行與運算。這樣只有每次getbit()獲得的結果都爲1時,最後的exist才爲True,即表明value屬於這個集合。若是其中只要有一次getbit()獲得的結果爲0,即m位數組中有對應的0位,那麼最終的結果exist就爲False,即表明value不屬於這個集合。

Bloom Filter的實現就已經完成了,咱們能夠用一個實例來測試一下,代碼以下:

conn = StrictRedis(host='localhost', port=6379, password='foobared')
bf = BloomFilter(conn, 'testbf', 5, 6)
bf.insert('Hello')
bf.insert('World')
result = bf.exists('Hello')
print(bool(result))
result = bf.exists('Python')
print(bool(result))

這裏首先定義了一個Redis鏈接對象,而後傳遞給Bloom Filter。爲了不內存佔用過大,這裏傳的位數bit比較小,設置爲5,散列函數的個數設置爲6。

調用insert()方法插入HelloWorld兩個字符串,隨後判斷HelloPython這兩個字符串是否存在,最後輸出它的結果,運行結果以下:

True
False

很明顯,結果徹底沒有問題。這樣咱們就藉助Redis成功實現了Bloom Filter的算法。

接下來繼續修改Scrapy-Redis的源碼,將它的dupefilter邏輯替換爲Bloom Filter的邏輯。這裏主要是修改RFPDupeFilter類的request_seen()方法,實現以下:

def request_seen(self, request):
    fp = self.request_fingerprint(request)    
   if self.bf.exists(fp):        
       return True    self.bf.insert(fp)    
       return False

利用request_fingerprint()方法獲取Request的指紋,調用Bloom Filter的exists()方法斷定該指紋是否存在。若是存在,則說明該Request是重複的,返回True,不然調用Bloom Filter的insert()方法將該指紋添加並返回False。這樣就成功利用Bloom Filter替換了Scrapy-Redis的集合去重。

對於Bloom Filter的初始化定義,咱們能夠將__init__()方法修改成以下內容:

def __init__(self, server, key, debug, bit, hash_number):
    self.server = server
    self.key = key
    self.debug = debug
    self.bit = bit
    self.hash_number = hash_number
    self.logdupes = True
    self.bf = BloomFilter(server, self.key, bit, hash_number)

其中bithash_number須要使用from_settings()方法傳遞,修改以下:

@classmethod
def from_settings(cls, settings):
   server = get_redis_from_settings(settings)    key = defaults.DUPEFILTER_KEY % {'timestamp': int(time.time())}    debug = settings.getbool('DUPEFILTER_DEBUG', DUPEFILTER_DEBUG)    bit = settings.getint('BLOOMFILTER_BIT', BLOOMFILTER_BIT)    hash_number = settings.getint('BLOOMFILTER_HASH_NUMBER', BLOOMFILTER_HASH_NUMBER)    
   return cls(server, key=key, debug=debug, bit=bit, hash_number=hash_number)

其中,常量DUPEFILTER_DEBUGBLOOMFILTER_BIT統必定義在defaults.py中,默認以下:

BLOOMFILTER_HASH_NUMBER = 6
BLOOMFILTER_BIT = 30

如今,咱們成功實現了Bloom Filter和Scrapy-Redis的對接。

4. 本節代碼

本節代碼地址爲:https://github.com/Python3WebSpider/ScrapyRedisBloomFilter。

5. 使用

爲了方便使用,本節的代碼已經打包成一個Python包併發布到PyPi,連接爲https://pypi.python.org/pypi/scrapy-redis-bloomfilter,能夠直接使用ScrapyRedisBloomFilter,不須要本身實現一遍。

咱們能夠直接使用pip來安裝,命令以下:

pip3 install scrapy-redis-bloomfilter

使用的方法和Scrapy-Redis基本類似,在這裏說明幾個關鍵配置。

# 去重類,要使用Bloom Filter請替換DUPEFILTER_CLASS
DUPEFILTER_CLASS = "scrapy_redis_bloomfilter.dupefilter.RFPDupeFilter"
# 散列函數的個數,默認爲6,能夠自行修改
BLOOMFILTER_HASH_NUMBER = 6
# Bloom Filter的bit參數,默認30,佔用128MB空間,去重量級1億
BLOOMFILTER_BIT = 30
  • DUPEFILTER_CLASS是去重類,若是要使用Bloom Filter,則DUPEFILTER_CLASS須要修改成該包的去重類。

  • BLOOMFILTER_HASH_NUMBER是Bloom Filter使用的散列函數的個數,默認爲6,能夠根據去重量級自行修改。

  • BLOOMFILTER_BIT即前文所介紹的BloomFilter類的bit參數,它決定了位數組的位數。若是BLOOMFILTER_BIT爲30,那麼位數組位數爲2的30次方,這將佔用Redis 128 MB的存儲空間,去重量級在1億左右,即對應爬取量級1億左右。若是爬取量級在10億、20億甚至100億,請務必將此參數對應調高。

6. 測試

源代碼附有一個測試項目,放在tests文件夾,該項目使用了ScrapyRedisBloomFilter來去重,Spider的實現以下:

from scrapy import Request, Spider

class TestSpider(Spider):    name = 'test'    base_url = 'https://www.baidu.com/s?wd='    def start_requests(self):        for i in range(10):            url = self.base_url + str(i)            
           yield Request(url, callback=self.parse)
       
       # Here contains 10 duplicated Requests            for i in range(100):            url = self.base_url + str(i)            
           yield Request(url, callback=self.parse)    

   def parse(self, response):        self.logger.debug('Response of ' + response.url)

start_requests()方法首先循環10次,構造參數爲0~9的URL,而後從新循環了100次,構造了參數爲0~99的URL。那麼這裏就會包含10個重複的Request,咱們運行項目測試一下:

scrapy crawl test

最後的輸出結果以下:

{'bloomfilter/filtered': 10, 
'downloader/request_bytes': 34021,
'downloader/request_count': 100,
'downloader/request_method_count/GET': 100,
'downloader/response_bytes': 72943,
'downloader/response_count': 100,
'downloader/response_status_count/200': 100,
'finish_reason': 'finished',
'finish_time': datetime.datetime(2017, 8, 11, 9, 34, 30, 419597),
'log_count/DEBUG': 202,
'log_count/INFO': 7,
'memusage/max': 54153216,
'memusage/startup': 54153216,
'response_received_count': 100,
'scheduler/dequeued/redis': 100,
'scheduler/enqueued/redis': 100,
'start_time': datetime.datetime(2017, 8, 11, 9, 34, 26, 495018)}

最後統計的第一行的結果:

'bloomfilter/filtered': 10,

這就是Bloom Filter過濾後的統計結果,它的過濾個數爲10個,也就是它成功將重複的10個Reqeust識別出來了,測試經過。

7. 結語

以上內容即是Bloom Filter的原理及對接實現,Bloom Filter的使用能夠大大節省Redis內存。在數據量大的狀況下推薦此方案。

相關文章
相關標籤/搜索