首先回顧一下Scrapy-Redis的去重機制。Scrapy-Redis將Request的指紋存儲到了Redis集合中,每一個指紋的長度爲40,例如27adcc2e8979cdee0c9cecbbe8bf8ff51edefb61就是一個指紋,它的每一位都是16進制數。
python
咱們計算一下用這種方式耗費的存儲空間。每一個十六進制數佔用4 b,1個指紋用40個十六進制數表示,佔用空間爲20 B,1萬個指紋即佔用空間200 KB,1億個指紋佔用2 GB。當爬取數量達到上億級別時,Redis的佔用的內存就會變得很大,並且這僅僅是指紋的存儲。Redis還存儲了爬取隊列,內存佔用會進一步提升,更別說有多個Scrapy項目同時爬取的狀況了。當爬取達到億級別規模時,Scrapy-Redis提供的集合去重已經不能知足咱們的要求。因此咱們須要使用一個更加節省內存的去重算法Bloom Filter。git
Bloom Filter,中文名稱叫做布隆過濾器,是1970年由Bloom提出的,它能夠被用來檢測一個元素是否在一個集合中。Bloom Filter的空間利用效率很高,使用它能夠大大節省存儲空間。Bloom Filter使用位數組表示一個待檢測集合,並能夠快速地經過幾率算法判斷一個元素是否存在於這個集合中。利用這個算法咱們能夠實現去重效果。github
本節咱們來了解Bloom Filter的基本算法,以及Scrapy-Redis中對接Bloom Filter的方法。redis
在Bloom Filter中使用位數組來輔助實現檢測判斷。在初始狀態下,咱們聲明一個包含m位的位數組,它的全部位都是0,以下圖所示。算法
如今咱們有了一個待檢測集合,其表示爲S={x1, x2, …, xn}。接下來須要作的就是檢測一個x是否已經存在於集合S中。在Bloom Filter算法中,首先使用k個相互獨立、隨機的散列函數來將集合S中的每一個元素x1, x2, …, xn映射到長度爲m的位數組上,散列函數獲得的結果記做位置索引,而後將位數組該位置索引的位置1。例如,咱們取k爲3,表示有三個散列函數,x1通過三個散列函數映射獲得的結果分別爲一、四、8,x2通過三個散列函數映射獲得的結果分別爲四、六、10,那麼位數組的一、四、六、八、10這五位就會置爲1,以下圖所示。數組
若是有一個新的元素x,咱們要判斷x是否屬於S集合,咱們仍然用k個散列函數對x求映射結果。若是全部結果對應的位數組位置均爲1,那麼x屬於S這個集合;若是有一個不爲1,則x不屬於S集合。架構
例如,新元素x通過三個散列函數映射的結果爲四、六、8,對應的位置均爲1,則x屬於S集合。若是結果爲四、六、7,而7對應的位置爲0,則x不屬於S集合。併發
注意,這裏m、n、k知足的關係是m>nk,也就是說位數組的長度m要比集合元素n和散列函數k的乘積還要大。app
這樣的斷定方法很高效,可是也是有代價的,它可能把不屬於這個集合的元素誤認爲屬於這個集合。咱們來估計一下這種方法的錯誤率。當集合S={x1, x2,…, xn} 的全部元素都被k個散列函數映射到m位的位數組中時,這個位數組中某一位仍是0的機率是:scrapy
散列函數是隨機的,則任意一個散列函數選中這一位的機率爲1/m,那麼1-1/m就表明散列函數從未沒有選中這一位的機率,要把S徹底映射到m位數組中,須要作kn次散列運算,最後的機率就是1-1/m的kn次方。
一個不屬於S的元素x若是誤斷定爲在S中,那麼這個機率就是k次散列運算獲得的結果對應的位數組位置都爲1,則誤判機率爲:
根據:
能夠將誤判機率轉化爲:
在給定m、n時,能夠求出使得f最小化的k值爲:
這裏將誤判機率概括以下:
表中第一列爲m/n的值,第二列爲最優k值,其後列爲不一樣k值的誤判機率。當k值肯定時,隨着m/n的增大,誤判機率逐漸變小。當m/n的值肯定時,當k越靠近最優K值,誤判機率越小。誤判機率整體來看都是極小的,在容忍此誤判機率的狀況下,大幅減少存儲空間和斷定速度是徹底值得的。
接下來,咱們將Bloom Filter算法應用到Scrapy-Redis分佈式爬蟲的去重過程當中,以解決Redis內存不足的問題。
實現Bloom Filter時,首先要保證不能破壞Scrapy-Redis分佈式爬取的運行架構。咱們須要修改Scrapy-Redis的源碼,將它的去重類替換掉。同時,Bloom Filter的實現須要藉助於一個位數組,既然當前架構仍是依賴於Redis,那麼位數組的維護直接使用Redis就行了。
首先實現一個基本的散列算法,將一個值通過散列運算後映射到一個m位數組的某一位上,代碼以下:
class HashMap(object): def __init__(self, m, seed): self.m = m self.seed = seed
def hash(self, value): """ Hash Algorithm :param value: Value :return: Hash Value """ ret = 0 for i in range(len(value)): ret += self.seed * ret + ord(value[i])
return (self.m - 1) & ret
這裏新建了一個HashMap
類。構造函數傳入兩個值,一個是m位數組的位數,另外一個是種子值seed
。不一樣的散列函數須要有不一樣的seed
,這樣能夠保證不一樣的散列函數的結果不會碰撞。
在hash()
方法的實現中,value
是要被處理的內容。這裏遍歷了value
的每一位,並利用ord()
方法取到每一位的ASCII碼值,而後混淆seed
進行迭代求和運算,最終獲得一個數值。這個數值的結果就由value
和seed
惟一肯定。咱們再將這個數值和m進行按位與運算,便可獲取到m位數組的映射結果,這樣就實現了一個由字符串和seed
來肯定的散列函數。當m固定時,只要seed
值相同,散列函數就是相同的,相同的value
必然會映射到相同的位置。因此若是想要構造幾個不一樣的散列函數,只須要改變其seed
就行了。以上內容即是一個簡易的散列函數的實現。
接下來咱們再實現Bloom Filter。Bloom Filter裏面須要用到k個散列函數,這裏要對這幾個散列函數指定相同的m值和不一樣的seed
值,構造以下:
BLOOMFILTER_HASH_NUMBER = 6
BLOOMFILTER_BIT = 30
class BloomFilter(object): def __init__(self, server, key, bit=BLOOMFILTER_BIT, hash_number=BLOOMFILTER_HASH_NUMBER): """ Initialize BloomFilter :param server: Redis Server :param key: BloomFilter Key :param bit: m = 2 ^ bit :param hash_number: the number of hash function """ # default to 1 << 30 = 10,7374,1824 = 2^30 = 128MB, max filter 2^30/hash_number = 1,7895,6970 fingerprints self.m = 1 << bit self.seeds = range(hash_number) self.maps = [HashMap(self.m, seed) for seed in self.seeds] self.server = server self.key = key
因爲咱們須要億級別的數據的去重,即前文介紹的算法中的n爲1億以上,散列函數的個數k大約取10左右的量級。而m>kn,這裏m值大約保底在10億,因爲這個數值比較大,因此這裏用移位操做來實現,傳入位數bit,將其定義爲30,而後作一個移位操做1<<30
,至關於2的30次方,等於1073741824,量級也是剛好在10億左右,因爲是位數組,因此這個位數組佔用的大小就是2^30 b=128 MB。開頭咱們計算過Scrapy-Redis集合去重的佔用空間大約在2 GB左右,可見Bloom Filter的空間利用效率極高。
隨後咱們再傳入散列函數的個數,用它來生成幾個不一樣的seed
。用不一樣的seed
來定義不一樣的散列函數,這樣咱們就能夠構造一個散列函數列表。遍歷seed
,構造帶有不一樣seed
值的HashMap
對象,而後將HashMap
對象保存成變量maps
供後續使用。
另外,server
就是Redis鏈接對象,key
就是這個m位數組的名稱。
接下來,咱們要實現比較關鍵的兩個方法:一個是斷定元素是否重複的方法exists()
,另外一個是添加元素到集合中的方法insert()
,實現以下:
def exists(self, value): """ if value exists :param value: :return: """ if not value:
return False exist = 1 for map in self.maps: offset = map.hash(value) exist = exist & self.server.getbit(self.key, offset)
return exist
def insert(self, value): """ add value to bloom :param value: :return: """ for f in self.maps: offset = f.hash(value) self.server.setbit(self.key, offset, 1)
首先看下insert()
方法。Bloom Filter算法會逐個調用散列函數對放入集合中的元素進行運算,獲得在m位位數組中的映射位置,而後將位數組對應的位置置1。這裏代碼中咱們遍歷了初始化好的散列函數,而後調用其hash()
方法算出映射位置offset
,再利用Redis的setbit()
方法將該位置1。
在exists()
方法中,咱們要實現斷定是否重複的邏輯,方法參數value
爲待判斷的元素。咱們首先定義一個變量exist
,遍歷全部散列函數對value
進行散列運算,獲得映射位置,用getbit()
方法取得該映射位置的結果,循環進行與運算。這樣只有每次getbit()
獲得的結果都爲1時,最後的exist
才爲True
,即表明value
屬於這個集合。若是其中只要有一次getbit()
獲得的結果爲0,即m位數組中有對應的0位,那麼最終的結果exist
就爲False
,即表明value
不屬於這個集合。
Bloom Filter的實現就已經完成了,咱們能夠用一個實例來測試一下,代碼以下:
conn = StrictRedis(host='localhost', port=6379, password='foobared') bf = BloomFilter(conn, 'testbf', 5, 6) bf.insert('Hello') bf.insert('World') result = bf.exists('Hello') print(bool(result)) result = bf.exists('Python') print(bool(result))
這裏首先定義了一個Redis鏈接對象,而後傳遞給Bloom Filter。爲了不內存佔用過大,這裏傳的位數bit比較小,設置爲5,散列函數的個數設置爲6。
調用insert()
方法插入Hello
和World
兩個字符串,隨後判斷Hello
和Python
這兩個字符串是否存在,最後輸出它的結果,運行結果以下:
True
False
很明顯,結果徹底沒有問題。這樣咱們就藉助Redis成功實現了Bloom Filter的算法。
接下來繼續修改Scrapy-Redis的源碼,將它的dupefilter邏輯替換爲Bloom Filter的邏輯。這裏主要是修改RFPDupeFilter
類的request_seen()
方法,實現以下:
def request_seen(self, request): fp = self.request_fingerprint(request)
if self.bf.exists(fp):
return True self.bf.insert(fp)
return False
利用request_fingerprint()
方法獲取Request的指紋,調用Bloom Filter的exists()
方法斷定該指紋是否存在。若是存在,則說明該Request是重複的,返回True
,不然調用Bloom Filter的insert()
方法將該指紋添加並返回False
。這樣就成功利用Bloom Filter替換了Scrapy-Redis的集合去重。
對於Bloom Filter的初始化定義,咱們能夠將__init__()
方法修改成以下內容:
def __init__(self, server, key, debug, bit, hash_number): self.server = server self.key = key self.debug = debug self.bit = bit self.hash_number = hash_number self.logdupes = True self.bf = BloomFilter(server, self.key, bit, hash_number)
其中bit
和hash_number
須要使用from_settings()
方法傳遞,修改以下:
@classmethod
def from_settings(cls, settings): server = get_redis_from_settings(settings) key = defaults.DUPEFILTER_KEY % {'timestamp': int(time.time())} debug = settings.getbool('DUPEFILTER_DEBUG', DUPEFILTER_DEBUG) bit = settings.getint('BLOOMFILTER_BIT', BLOOMFILTER_BIT) hash_number = settings.getint('BLOOMFILTER_HASH_NUMBER', BLOOMFILTER_HASH_NUMBER)
return cls(server, key=key, debug=debug, bit=bit, hash_number=hash_number)
其中,常量DUPEFILTER_DEBUG
和BLOOMFILTER_BIT
統必定義在defaults.py中,默認以下:
BLOOMFILTER_HASH_NUMBER = 6
BLOOMFILTER_BIT = 30
如今,咱們成功實現了Bloom Filter和Scrapy-Redis的對接。
本節代碼地址爲:https://github.com/Python3WebSpider/ScrapyRedisBloomFilter。
爲了方便使用,本節的代碼已經打包成一個Python包併發布到PyPi,連接爲https://pypi.python.org/pypi/scrapy-redis-bloomfilter,能夠直接使用ScrapyRedisBloomFilter,不須要本身實現一遍。
咱們能夠直接使用pip來安裝,命令以下:
pip3 install scrapy-redis-bloomfilter
使用的方法和Scrapy-Redis基本類似,在這裏說明幾個關鍵配置。
# 去重類,要使用Bloom Filter請替換DUPEFILTER_CLASS
DUPEFILTER_CLASS = "scrapy_redis_bloomfilter.dupefilter.RFPDupeFilter"
# 散列函數的個數,默認爲6,能夠自行修改
BLOOMFILTER_HASH_NUMBER = 6
# Bloom Filter的bit參數,默認30,佔用128MB空間,去重量級1億
BLOOMFILTER_BIT = 30
DUPEFILTER_CLASS
是去重類,若是要使用Bloom Filter,則DUPEFILTER_CLASS
須要修改成該包的去重類。
BLOOMFILTER_HASH_NUMBER
是Bloom Filter使用的散列函數的個數,默認爲6,能夠根據去重量級自行修改。
BLOOMFILTER_BIT
即前文所介紹的BloomFilter
類的bit
參數,它決定了位數組的位數。若是BLOOMFILTER_BIT
爲30,那麼位數組位數爲2的30次方,這將佔用Redis 128 MB的存儲空間,去重量級在1億左右,即對應爬取量級1億左右。若是爬取量級在10億、20億甚至100億,請務必將此參數對應調高。
源代碼附有一個測試項目,放在tests文件夾,該項目使用了ScrapyRedisBloomFilter
來去重,Spider的實現以下:
from scrapy import Request, Spider
class TestSpider(Spider): name = 'test' base_url = 'https://www.baidu.com/s?wd=' def start_requests(self): for i in range(10): url = self.base_url + str(i)
yield Request(url, callback=self.parse)
# Here contains 10 duplicated Requests for i in range(100): url = self.base_url + str(i)
yield Request(url, callback=self.parse)
def parse(self, response): self.logger.debug('Response of ' + response.url)
start_requests()
方法首先循環10次,構造參數爲0~9的URL,而後從新循環了100次,構造了參數爲0~99的URL。那麼這裏就會包含10個重複的Request,咱們運行項目測試一下:
scrapy crawl test
最後的輸出結果以下:
{'bloomfilter/filtered': 10,
'downloader/request_bytes': 34021,
'downloader/request_count': 100,
'downloader/request_method_count/GET': 100,
'downloader/response_bytes': 72943,
'downloader/response_count': 100,
'downloader/response_status_count/200': 100,
'finish_reason': 'finished',
'finish_time': datetime.datetime(2017, 8, 11, 9, 34, 30, 419597),
'log_count/DEBUG': 202,
'log_count/INFO': 7,
'memusage/max': 54153216,
'memusage/startup': 54153216,
'response_received_count': 100,
'scheduler/dequeued/redis': 100,
'scheduler/enqueued/redis': 100,
'start_time': datetime.datetime(2017, 8, 11, 9, 34, 26, 495018)}
最後統計的第一行的結果:
'bloomfilter/filtered': 10,
這就是Bloom Filter過濾後的統計結果,它的過濾個數爲10個,也就是它成功將重複的10個Reqeust識別出來了,測試經過。
以上內容即是Bloom Filter的原理及對接實現,Bloom Filter的使用能夠大大節省Redis內存。在數據量大的狀況下推薦此方案。