本文做者:盧瑋,掌閱資深後端工程師python
咱們的業務中常常會遇到穿庫的問題,一般能夠經過緩存解決。 若是數據維度比較多,結果數據集合比較大時,緩存的效果就不明顯了。 所以爲了解決穿庫的問題,咱們引入Bloom Filter。c++
開源項目地址:github.com/luw2007/blo…git
咱們先看看通常業務緩存流程
: github
先查詢緩存,緩存不命中再查詢數據庫。 而後將查詢結果放在緩存中即便數據不存在,也須要建立一個緩存,用來防止穿庫。這裏須要區分一下數據是否存在。 若是數據不存在,緩存時間能夠設置相對較短,防止由於主從同步等問題,致使問題被放大。redis
這個流程中存在薄弱的問題是,當用戶量太大時,咱們會緩存大量數據空數據,而且一旦來一波冷用戶,會形成雪崩效應。 對於這種狀況,咱們產生第二個版本流程:redis過濾冷用戶緩存流程
算法
咱們將數據庫裏面中命中的用戶放在redis的set類型中,設置不過時。 這樣至關把redis看成數據庫的索引,只要查詢redis,就能夠知道是否數據存在。 redis中不存在就能夠直接返回結果。 若是存在就按照上面提到通常業務緩存流程
處理。sql
聰明的你確定會想到更多的問題:chrome
問題1須要區分業務場景,結果數據少,咱們是能夠直接使用redis做爲緩存,直接返回數據。 結果比較大就不太適合用redis存放了。好比ugc內容,一個評論裏面可能存在上萬字,業務字段多。shell
redis使用有不少技巧。bigkey 危害比較大,不管是擴容或縮容帶來的內存申請釋放, 仍是查詢命令使用不當致使大量數據返回,都會影響redis的穩定。這裏就不細談緣由及危害了。 解決bigkey 方法很簡單。咱們可使用hash函數來分桶,將數據分散到多個key中。 減小單個key的大小,同時不影響查詢效率。數據庫
問題3是redis存儲佔用內存太大。所以咱們須要減小內存使用。 從新思考一下引入redis的目的。 redis像一個集合,整個業務就是驗證請求的參數是否在集合中。
大部分的編程語言都內置了filter。 拿python
舉例,filter函數用於過濾序列, 過濾掉不符合條件的元素,返回由符合條件元素組成的列表。
咱們看個例子:
$ python2
Python 2.7.10 (default, Oct 6 2017, 22:29:07)
[GCC 4.2.1 Compatible Apple LLVM 9.0.0 (clang-900.0.31)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
>>> s = {2, 4}
>>> filter(lambda x:x in s, [0, 1, 2])
[2]
複製代碼
集合s中存在 2,4兩個數字,咱們須要查詢 0,1,2 那些在集合s中。 lambda x:x in s
構造一個匿名函數,判斷入參x是否在集合s中。 過濾器filter依次對列表中的數字執行匿名函數。最終返回列表[2]
。
redis中實現set用了兩種結構:intset和hash table。 非數字或者大量數字時都會退化成hash table。 那麼是否好的算法能夠節省hash table的大小呢?
其實早在1970年由Burton Howard Bloom
提出的布隆過濾器(英語:Bloom Filter)。 它其實是一個很長的二進制向量和一系列隨機映射函數。 布隆過濾器能夠用於檢索一個元素是否在一個集合中。 它的優勢是空間效率和查詢時間都遠遠超過通常的算法, 缺點是有必定的誤識別率和刪除困難。
咱們常見的將業務字段拼接以後md5,放在一個集合中。 md5生成一個固定長度的128bit的串。 若是咱們用bitmap來表示,則須要
2**128 = 340282366920938463463374607431768211456 bit
複製代碼
判斷一個值在不在,就變成在這個bitmap中判斷所在位是否爲1。 可是咱們全世界的機器存儲空間也沒法存儲下載。 所以咱們只能分配有限的空間來存儲。 好比:
import crc32
def BloomFilter(sample, size, hash_size=1):
# 構造一個hash函數,將輸入數據散列到size一個位置上
hash = lambda x:crc32(str(x).encode())%size
collision, s = 0, set()
for i in range(sample):
k = set()
for j in range(hash_size):
k.add(hash(i+j*size/hash_size))
# 只有全部散列結果k都在s中,才認爲i重複
if not k - s:
collision += 1
continue
# 將散列結果k更新到集合s中
s |= k
return collision
複製代碼
當只有一個hash函數時:很容易發生衝突。
能夠看到上面1和2的hash結果都是7,發生衝突。 若是增長hash函數,會發生什麼狀況?
咱們使用更多的hash函數和更大的數據集合來測試。獲得下面這張表
由此能夠看到當增長hash方法可以有效的下降碰撞機率。 比較好的數據以下:
可是增長了hash方法以後,會下降空間的使用效率。當集合佔用整體空間達到25%的時候, 增長hash 的效果已經不明顯
上面的使用多個hash方法來下降碰撞就是BloomFilter的核心思想。
數據庫防止穿庫 Google Bigtable,Apache HBase和Apache Cassandra以及Postgresql 使用BloomFilter來減小不存在的行或列的磁盤查找。避免代價高昂的磁盤查找會大大提升數據庫查詢操做的性能。 如同一開始的業務場景。若是數據量較大,不方便放在緩存中。須要對請求作攔截防止穿庫。
緩存宕機 緩存宕機的場景,使用布隆過濾器會形成必定程度的誤判。緣由是除了Bloom Filter 自己有誤判率,宕機以前的緩存不必定能覆蓋到全部DB中的數據,當宕機後用戶請求了一個之前從未請求的數據,這個時候就會產生誤判。固然,緩存宕機時使用布隆過濾器做爲應急的方式,這種狀況應該也是能夠忍受的。
WEB攔截器 相同請求攔截防止被攻擊。用戶第一次請求,將請求參數放入BloomFilter中,當第二次請求時,先判斷請求參數是否被BloomFilter命中。能夠提升緩存命中率
惡意地址檢測 chrome 瀏覽器檢查是不是惡意地址。 首先針對本地BloomFilter檢查任何URL,而且僅當BloomFilter返回確定結果時纔對所執行的URL進行全面檢查(而且用戶警告,若是它也返回確定結果)。
比特幣加速 bitcoin 使用BloomFilter來加速錢包同步。
對於BloomFilter的優勢來講,缺點均可以忽略。畢竟只須要kN的存儲空間就能存儲N個元素。空間效率十分優秀。
BloomFilter 須要一個大的bitmap來存儲。鑑於目前公司現狀,最好的存儲容器是redis。 從github topics: bloom-filter中通過簡單的調研。
redis集成BloomFilter方案:
原生python 方法太慢,lua腳本和module 部署比較麻煩。因而咱們推薦使用pyreBloom,底層使用。
pyreBloom:master λ ls
Makefile bloom.h bloom.pxd murmur.c pyreBloom.pyx
bloom.c bloom.o main.c pyreBloom.c
複製代碼
從文件命名上能夠看到bloom 使用c編寫。pyreBloom 使用cython編寫。
bloom.h 裏面實現BloomFilter的核心邏輯,完成與redis server的交互;hash函數;添加,檢查和刪除方法的實現。
int init_pyrebloom(pyrebloomctxt * ctxt, char * key, uint32_t capacity, double error, char* host, uint32_t port, char* password, uint32_t db);
int free_pyrebloom(pyrebloomctxt * ctxt);
int add(pyrebloomctxt * ctxt, const char * data, uint32_t len);
int add_complete(pyrebloomctxt * ctxt, uint32_t count);
int check(pyrebloomctxt * ctxt, const char * data, uint32_t len);
int check_next(pyrebloomctxt * ctxt);
int delete(pyrebloomctxt * ctxt);
複製代碼
pyreBloom.pyx
import math
import random
cimport bloom
class pyreBloomException(Exception):
'''Some sort of exception has happened internally'''
pass
cdef class pyreBloom(object):
cdef bloom.pyrebloomctxt context
cdef bytes key
property bits:
def __get__(self):
return self.context.bits
property hashes:
def __get__(self):
return self.context.hashes
def __cinit__(self, key, capacity, error, host='127.0.0.1', port=6379, password='', db=0):
self.key = key
if bloom.init_pyrebloom(&self.context, self.key, capacity,
error, host, port, password, db):
raise pyreBloomException(self.context.ctxt.errstr)
def __dealloc__(self):
bloom.free_pyrebloom(&self.context)
def delete(self):
bloom.delete(&self.context)
def put(self, value):
if getattr(value, '__iter__', False):
r = [bloom.add(&self.context, v, len(v)) for v in value]
r = bloom.add_complete(&self.context, len(value))
else:
bloom.add(&self.context, value, len(value))
r = bloom.add_complete(&self.context, 1)
if r < 0:
raise pyreBloomException(self.context.ctxt.errstr)
return r
def add(self, value):
return self.put(value)
def extend(self, values):
return self.put(values)
def contains(self, value):
# If the object is 'iterable'...
if getattr(value, '__iter__', False):
r = [bloom.check(&self.context, v, len(v)) for v in value]
r = [bloom.check_next(&self.context) for i in range(len(value))]
if (min(r) < 0):
raise pyreBloomException(self.context.ctxt.errstr)
return [v for v, included in zip(value, r) if included]
else:
bloom.check(&self.context, value, len(value))
r = bloom.check_next(&self.context)
if (r < 0):
raise pyreBloomException(self.context.ctxt.errstr)
return bool(r)
def __contains__(self, value):
return self.contains(value)
def keys(self):
'''Return a list of the keys used in this bloom filter'''
return [self.context.keys[i] for i in range(self.context.num_keys)]
複製代碼
原生pyreBloom方法:
cdef class pyreBloom(object):
cdef bloom.pyrebloomctxt context
cdef bytes
property bits:
property hashes:
// 使用的hash方法數
def delete(self):
// 刪除,會在redis中刪除
def put(self, value):
// 添加 底層方法, 不建議直接調用
def add(self, value):
// 添加單個元素,調用put方法
def extend(self, values):
// 添加一組元素,調用put方法
def contains(self, value):
// 檢查是否存在,當`value`能夠迭代時,返回`[value]`, 不然返回`bool`
def keys(self):
// 在redis中存儲的key列表
複製代碼
因爲pyreBloom使用hiredis庫,自己沒有重連等邏輯,因而錯了簡單的封裝。
# coding=utf-8
''' bloom filter 基礎模塊 可用方法: extend, keys, contains, add, put, hashes, bits, delete 使用方法: >>> class TestModel(BaseModel): ... PREFIX = "bf:test" >>> t = TestModel() >>> t.add('hello') 1 >>> t.extend(['hi', 'world']) 2 >>> t.contains('hi') True >>> t.delete() '''
import logging
from six import PY3 as IS_PY3
from pyreBloom import pyreBloom, pyreBloomException
from BloomFilter.utils import force_utf8
class BaseModel(object):
''' bloom filter 基礎模塊 參數: SLOT: 可用方法類型 PREFIX: redis前綴 BF_SIZE: 存儲最大值 BF_ERROR: 容許的出錯率 RETRIES: 鏈接重試次數 host: redis 服務器IP port: redis 服務器端口 db: redis 服務器DB _bf_conn: 內部保存`pyreBloom`實例 '''
SLOT = {'add', 'contains', 'extend', 'keys', 'put', 'delete',
'bits', 'hashes'}
PREFIX = ""
BF_SIZE = 100000
BF_ERROR = 0.01
RETRIES = 2
def __init__(self, redis=None):
''' 初始化redis配置 :param redis: redis 配置 '''
# 這裏初始化防止類靜態變量多個繼承類複用,致使數據被污染
self._bf_conn = None
self._conf = {
'host': '127.0.0.1', 'password': '',
'port': 6379, 'db': 0
}
if redis:
for k, v in redis.items():
if k in self._conf:
self._conf[k] = redis[k]
self._conf = force_utf8(self._conf)
@property
def bf_conn(self):
''' 初始化pyreBloom '''
if not self._bf_conn:
prefix = force_utf8(self.PREFIX)
logging.debug(
'pyreBloom connect: redis://%s:%s/%s, (%s %s %s)',
self._conf['host'], self._conf['port'], self._conf['db'],
prefix, self.BF_SIZE, self.BF_ERROR,
)
self._bf_conn = pyreBloom(
prefix, self.BF_SIZE, self.BF_ERROR, **self._conf)
return self._bf_conn
def __getattr__(self, method):
'''調用pyrebloom方法 沒有枚舉的方法將從`pyreBloom`中獲取 :param method: :return: pyreBloom.{method} '''
# 只提供內部方法
if method not in self.SLOT:
raise NotImplementedError()
# 捕獲`pyreBloom`的異常, 打印必要的日誌
def catch_error(*a, **kwargs):
'''屢次重試服務'''
args = force_utf8(a)
kwargs = force_utf8(kwargs)
for _ in range(self.RETRIES):
try:
func = getattr(self.bf_conn, method)
res = func(*args, **kwargs)
# python3 返回值和python2返回值不相同,
# 手工處理返回類型
if method == 'contains' and IS_PY3:
if isinstance(res, list):
return [i.decode('utf8') for i in res]
return res
except pyreBloomException as error:
logging.warn(
'pyreBloom Error: %s %s', method, str(error))
self.reconnect()
if _ == self.RETRIES:
logging.error('pyreBloom Error')
raise error
return catch_error
def __contains__(self, item):
'''跳轉__contains__方法 :param item: 查詢元素列表/單個元素 :type item: list/basestring :return: [bool...]/bool '''
return self.contains(item)
def reconnect(self):
''' 從新鏈接bloom `pyreBloom` 鏈接使用c driver,沒有提供timeout參數,使用了內置的timeout 同時爲了保證服務的可靠性,增長了屢次重試機制。 struct timeval timeout = { 1, 5000 }; ctxt->ctxt = redisConnectWithTimeout(host, port, timeout); del self._bf_conn 會調用`pyreBloom`內置的C的del方法,會關閉redis鏈接 '''
if self._bf_conn:
logging.debug('pyreBloom reconnect')
del self._bf_conn
self._bf_conn = None
_ = self.bf_conn
複製代碼
提供了一種在BloomFilter上實現刪除操做的方法,而無需從新從新建立過濾器。在計數濾波器中,陣列位置(桶)從單個位擴展爲n位計數器。實際上,常規布隆過濾器能夠被視爲計數過濾器,其桶大小爲一位。
插入操做被擴展爲遞增桶的值,而且查找操做檢查每一個所需的桶是否爲非零。而後,刪除操做包括遞減每一個桶的值。
存儲桶的算術溢出是一個問題,而且存儲桶應該足夠大以使這種狀況不多見。若是確實發生,則增量和減量操做必須將存儲區設置爲最大可能值,以便保留BloomFilter的屬性。
計數器的大小一般爲3或4位。所以,計算布隆過濾器的空間比靜態布隆過濾器多3到4倍。相比之下, Pagh,Pagh和Rao(2005)以及Fan等人的數據結構。(2014)也容許刪除但使用比靜態BloomFilter更少的空間。
計數過濾器的另外一個問題是可擴展性有限。因爲沒法擴展計數布隆過濾器表,所以必須事先知道要同時存儲在過濾器中的最大鍵數。一旦超過表的設計容量,隨着插入更多密鑰,誤報率將迅速增加。
Bonomi等人。(2006)引入了一種基於d-left散列的數據結構,它在功能上是等效的,但使用的空間大約是計算BloomFilter的一半。此數據結構中不會出現可伸縮性問題。一旦超出設計容量,就能夠將密鑰從新插入到雙倍大小的新哈希表中。
Putze,Sanders和Singler(2007)的節省空間的變體也可用於經過支持插入和刪除來實現計數過濾器。
Rottenstreich,Kanizo和Keslassy(2012)引入了一種基於變量增量的新通用方法,該方法顯着提升了計算布隆過濾器及其變體的誤報機率,同時仍支持刪除。與計數布隆過濾器不一樣,在每一個元素插入時,散列計數器以散列變量增量而不是單位增量遞增。要查詢元素,須要考慮計數器的確切值,而不只僅是它們的正面性。若是由計數器值表示的總和不能由查詢元素的相應變量增量組成,則能夠將否認答案返回給查詢。
開源項目地址:github.com/luw2007/blo…