今天是中國傳統佳節「猿宵節」,是程序猿通宵趕代碼的佳節。
AI考拉的技術小夥伴志在打破傳統,以「咱們不加班」爲口號,以「咱們提前下班」爲指導中心,在這裏安利技術知識給你們,祝你們節日快樂,提早下班,過真正的元宵節!git
在金融業務系統裏面,判斷用戶是不是黑名單,這種場景應該很常見。github
假設咱們系統裏面有一百萬個黑名單用戶,用手機號表示,如今有一我的想借款,咱們要判斷他是否在黑名單中,怎麼作?算法
最直接的方法,是在數據庫中查詢,目前數據庫上實現的索引,雖然能夠作到 O(logn) 或者理論O(1) 的時間複雜度,但畢竟是磁盤操做,跟內存操做不是一個數量級的。數據庫
因而,咱們能夠把黑名單中的手機號緩存到內存中,用一個數組儲存起來,這種方法有兩個問題,一是查找時間複雜度是 O(n),很是慢,二是佔用大量內存。數組
查找速度上能夠再優化,將數組變成Set,內部實現能夠選擇平衡二叉樹或者哈希,這樣子插入和查找的時間複雜度能作到 O(logn)或者理論O(1),可是帶來的是空間上的災難,比使用數組會更佔用空間。緩存
如今來看一下代碼,對比一下這兩種方法:bash
import random
import sys
def generate_random_phone():
""" 隨機生成11位的字符串 """
phone = ''
for j in range(0, 11):
phone += str(random.randint(0, 9))
return phone
# 10萬個黑名單用戶
black_list = []
for i in range(0, 100000):
black_list.append(generate_random_phone())
# 轉成集合
black_set = set(black_list)
print(len(black_list), len(black_set))
# 看一下兩種數據結構的空間佔用
print("size of black_list: %f M" % (sys.getsizeof(black_list) / 1024 / 1024))
print("size of black_set: %f M" % (sys.getsizeof(black_set) / 1024 / 1024))
def brute_force_find():
""" 直接列表線性查找,隨機查一個存在或者不存在的元素, O(n) """
if random.randint(0, 10) % 2:
target = black_list[random.randint(0, len(black_list))]
return __brute_force_find(target)
else:
return __brute_force_find(generate_random_phone())
def __brute_force_find(target):
for i in range(0, len(black_list)):
if target == black_list[i]:
return True
return False
def set_find():
""" 集合查找,隨機查一個存在或者不存在的元素, O(1) """
if random.randint(0, 10) % 2:
target = black_list[random.randint(0, len(black_list))]
return __set_find(target)
else:
return __set_find(generate_random_phone())
def __set_find(target):
return target in black_set
print(brute_force_find())
print(set_find())
複製代碼
能夠看到,數組和集合的長度相等,說明元素都是惟一的。列表的空間佔用爲0.78M,而集合的空間佔用爲4M,主要是由於哈希表的數據結構須要較多指針鏈接衝突的元素,空間佔用大概是列表的5倍。這是10w個手機號,若是有1億個手機號,將須要佔用3.9G的空間。數據結構
下面來看一下性能測試:app
import timeit
print(timeit.repeat('brute_force_find()', number=100, setup="from __main__ import brute_force_find"))
print(timeit.repeat('set_find()', number=100, setup="from __main__ import set_find"))
複製代碼
[0.0016423738561570644, 0.0013590981252491474, 0.0014535998925566673]
複製代碼
能夠看到,直接線性查詢大概須要0.85s, 而集合的查詢僅須要0.0016s,速度上是質的提高,可是空間佔用太多了!dom
答案是布隆過濾器,只是它有誤判的可能性,當一個手機號通過布隆過濾器的查找,返回屬於黑名單時,有必定機率,這個手機號實際上並不屬於黑名單。 回到咱們的業務中來,若是一個借款人有0.001%的機率被咱們認爲是黑名單而不借錢給他,實際上是能夠接受的,用風控的一句話說: 寧肯錯殺一百,也不放過一個。說明,利用布隆過濾器來解決這個問題是合適的。
原理很是簡單,維護一個很是大的位圖,設長度爲m,選取k個哈希函數。
初始時,這個位圖,全部元素都置爲0。 對於黑名單中的每個手機號,用k個哈希函數計算出來k個索引值,把位圖中這k個位置都置爲1。 當查詢某個元素時,用k個哈希函數計算出來k個索引值,若是位圖中k個位置的值都爲1,說明這個元素可能存在,若是有一個位置不爲1,則必定不存在。
這裏的查詢,說的可能存在,是由於哈希函數可能會出現衝突,一個不存在的元素,經過k個哈希函數計算出來索引,可能跟另一個存在的元素相同,這個時間就出現了誤判。因此,要下降誤判率,明顯是經過增大位圖的長度和哈希函數的個數來實現的。
來看一下代碼:
from bitarray import bitarray
import mmh3
class BloomFilter:
def __init__(self, arr):
# 位圖長度暫定爲20倍黑名單庫的大小
self.SIZE = 20 * len(arr)
self.bit_array = bitarray(self.SIZE)
self.bit_array.setall(0)
for item in arr:
for pos in self.get_positions(item):
self.bit_array[pos] = 1
def get_positions(self, val):
# 使用10個哈希函數,murmurhash算法,返回索引值
return [mmh3.hash(val, i) % self.SIZE for i in range(40, 50)]
def find(self, val):
for pos in self.get_positions(val):
if self.bit_array[pos] == 0:
return False
return True
bloomFilter = BloomFilter(black_list)
print("size of bloomFilter's bit_array: %f M" % (sys.getsizeof(bloomFilter.bit_array) / 1024 / 1024))
def get_error_rate():
# 用1w個隨機手機號,測試布隆過濾器的錯誤率
size = 10000
error_count = 0
for i in range(0, size):
phone = generate_random_phone()
bloom_filter_result = bloomFilter.find(phone)
set_result = __set_find(phone)
if bloom_filter_result != set_result:
error_count += 1
return error_count / size
print(get_error_rate())
複製代碼
size of bloomFilter's bit_array: 0.000092 M 0.0001 複製代碼
能夠看到,雖然位圖的長度是原數據的20倍,可是佔用的空間卻很小,這是由於位圖的8個元素才佔用1個字節,而原數據列表中1個元素就佔用了將近11個字節。
錯誤率大約爲0.0001,能夠嘗試不一樣的位圖長度,好比改爲30倍,錯誤率就會下降到0。
最後來看一下3種算法的性能測試:
def bloom_filter_find():
if random.randint(0, 10) % 2:
target = black_list[random.randint(0, len(black_list))]
return bloomFilter.find(target)
else:
return bloomFilter.find(generate_random_phone())
print(timeit.repeat('brute_force_find()', number=100, setup="from __main__ import brute_force_find"))
print(timeit.repeat('set_find()', number=100, setup="from __main__ import set_find"))
print(timeit.repeat('bloom_filter_find()', number=100, setup="from __main__ import bloom_filter_find"))
複製代碼
[0.70748823415488, 0.7686979519203305, 0.7785645266994834]
[0.001686999574303627, 0.002007704693824053, 0.0013333242386579514]
[0.001962156966328621, 0.0018132571130990982, 0.0023592300713062286]
複製代碼
能夠看到,布隆過濾器的查找速度接近集合的查找速度,有時候甚至更快,在很低的誤判率能夠接受的狀況下,選用布隆過濾器是即省時間又省空間的,是最佳的選擇。
著做權歸本文做者全部,未經受權,請勿轉載,謝謝。