一、BloomFilter能解決什麼問題?
以少許的內存空間判斷一個元素是否屬於這個集合, 代價是有必定的錯誤率
二、工做原理
1. 初始化一個數組, 全部位標爲0, A={x1, x2, x3,…,xm} (x1, x2, x3,…,xm 初始爲0)
2. 將已知集合S中的每個數組, 按如下方式映射到A中
2.0 選取n個互相獨立的hash函數 h1, h2, … hk
2.1 將元素經過以上hash函數獲得一組索引值 h1(xi), h2(xi),…,hk(xi)
2.2 將集合A中的上述索引值標記爲1(若是不一樣元素有重複, 則重複覆蓋爲1, 這是一個覓等操做)
3. 對於一個元素x, 將其根據2.0中選取的hash函數, 進行hash, 獲得一組索引值 h1(x), h2(x), …,hk(x)
若是集合A中的這些索引位置上的值都是1, 表示這個元素屬於集合S, 不然則不屬於S
舉例說明:
創建一個容量爲500萬的Bit Array結構(Bit Array的大小和keyword的數量決定了誤判的概率),將集合中的每一個keyword經過32個hash函數分別計算出32個數字,而後對這32個數字分別用500萬取模,而後將Bit Array中對應的位置爲1,咱們將其稱爲特徵值。簡單的說就是將每一個keyword對應到Bit Array中的32個位置上,見下圖:
當須要快速查找某個keyword時,只要將其經過一樣的32個hash函數運算,而後映射到Bit Array中的對應位,若是Bit Array中的對應位所有是1,那麼說明該keyword匹配成功(會有誤判的概率)。
三、幾個前提
1. hash函數的計算不能性能太差, 不然得不償失
2. 任意兩個hash函數之間必須是獨立的.
即任意兩個hash函數不存在單一相關性, 不然hash到其中一個索引上的元素也一定會hash到另外一個相關的索引上, 這樣多個hash沒有意義html
四、錯誤率
工做原理的第3步, 的出來的結論, 一個是絕對靠譜的, 一個是不能100%靠譜的。在判斷一個元素是否屬於某個集合時,有可能會把不屬於這個集合的元素誤認爲屬於這個集合(false positive)。所以,Bloom Filter不適合那些「零錯誤」的應用場合。而在能容忍低錯誤率的應用場合下,Bloom Filter經過極少的錯誤換取了存儲空間的極大節省。關於具體的錯誤率,這和最優的哈希函數個數以及位數組的大小有關,而這是能夠估算求得一個最優解的:
哈希函數個數k、位數組大小m及字符串數量n之間存在相互關係。相關文獻證實了對於給定的m、n,當 k = ln(2)* m/n 時出錯的機率是最小的。 具體的請看:http://blog.csdn.net/jiaomeng/article/details/1495500
五、基本特徵
從以上對基本原理和數學基礎的分析,咱們能夠獲得Bloom filter的以下基本特徵,用於指導實際應用。
(1)存在必定錯誤率,發生在正向判斷上(存在性),反向判斷不會發生錯誤(不存在性);
(2)錯誤率是可控制的,經過改變位數組大小、hash函數個數或更低碰撞率的hash函數來調節;
(3)保持較低的錯誤率,位數組空位至少保持在一半以上;
(4)給定m和n,能夠肯定最優hash個數,即k = ln2 * (m/n),此時錯誤率最小;
(5)給定容許的錯誤率E,能夠肯定合適的位數組大小,即m >= log2(e) * (n * log2(1/E)),繼而肯定hash函數個數k;
(6)正向錯誤率沒法徹底消除,即便不對位數組大小和hash函數個數進行限制,即沒法實現零錯誤率;
(7)空間效率高,僅保存「存在狀態」,但沒法存儲完整信息,須要其餘數據結構輔助存儲;
(8)不支持元素刪除操做,由於不能保證刪除的安全性。java
六、應用場景舉例:
(1)拼寫檢查、數據庫系統、文件系統
(2)假設要你寫一個網絡蜘蛛(web crawler)。因爲網絡間的連接錯綜複雜,蜘蛛在網絡間爬行極可能會造成「環」。爲了不造成「環」,就須要知道蜘蛛已經訪問過那些URL。給一個URL,怎樣知道蜘蛛是否已經訪問過呢?
(3)網絡應用
P2P網絡中查找資源操做,能夠對每條網絡通路保存Bloom Filter,當命中時,則選擇該通路訪問。
廣播消息時,能夠檢測某個IP是否已發包。
檢測廣播消息包的環路,將Bloom Filter保存在包裏,每一個節點將本身添加入Bloom Filter。
信息隊列管理,使用Counter Bloom Filter管理信息流量。
(4)垃圾郵件地址過濾
像網易,QQ這樣的公衆電子郵件(email)提供商,老是須要過濾來自發送垃圾郵件的人(spamer)的垃圾郵件。一個辦法就是記錄下那些發垃圾郵件的email 地址。因爲那些發送者不停地在註冊新的地址,全世界少說也有幾十億個發垃圾郵件的地址,將他們都存起來則須要大量的網絡服務器。若是用哈希表,每存儲一億個 email 地址,就須要1.6GB 的內存(用哈希表實現的具體辦法是將每個email 地址對應成一個八字節的信息指紋,而後將這些信息指紋存入哈希表,因爲哈希表的存儲效率通常只有50%,所以一個email 地址須要佔用十六個字節。一億個地址大約要1.6GB, 即十六億字節的內存)。所以存貯幾十億個郵件地址可能須要上百GB 的內存。而Bloom Filter只須要哈希表1/8 到1/4 的大小就能解決一樣的問題。Bloom Filter決不會漏掉任何一個在黑名單中的可疑地址。而至於誤判問題,常見的補救辦法是在創建一個小的白名單,存儲那些可能別誤判的郵件地址。
(5)Bloomfilter在HBase中的做用
HBase利用Bloomfilter來提升隨機讀(Get)的性能,對於順序讀(Scan)而言,設置Bloomfilter是沒有做用的(0.92之後,若是設置了bloomfilter爲ROWCOL,對於指定了qualifier的Scan有必定的優化,但不是那種直接過濾文件,排除在查找範圍的形式)
Bloomfilter在HBase中的開銷?
Bloomfilter是一個列族(cf)級別的配置屬性,若是你在表中設置了Bloomfilter,那麼HBase會在生成StoreFile時包含一份bloomfilter結構的數據,稱其爲MetaBlock;MetaBlock與DataBlock(真實的KeyValue數據)一塊兒由LRUBlockCache維護。因此,開啓bloomfilter會有必定的存儲及內存cache開銷。
Bloomfilter如何提升隨機讀(Get)的性能?
對於某個region的隨機讀,HBase會遍歷讀memstore及storefile(按照必定的順序),將結果合併返回給客戶端。若是你設置了bloomfilter,那麼在遍歷讀storefile時,就能夠利用bloomfilter,忽略某些storefile。
注意:hbase的bloom filter是惰性加載的,在寫壓力比較大的狀況下,會有不停的compact併產生storefile,那麼新的storefile是不會立刻將bloom filter加載到內存的,等到讀請求來的時候才加載。
這樣問題就來了,第一,若是storefile設置的比較大,max size爲2G,這會致使bloom filter也比較大;第二,系統的讀寫壓力都比較大。這樣或許會常常出現單個 GET請求花費3-5秒的超時現象。
七、reduce side join + BloomFilter 在hadoop中的應用舉例:
在某些狀況下,SemiJoin抽取出來的小表的key集合在內存中仍然存放不下,這時候可使用BloomFiler以節省空間。將小表中的key保存到BloomFilter中,在map階段過濾大表,可能有一些不在小表中的記錄沒有過濾掉(可是在小表中的記錄必定不會過濾掉),這不要緊,只不過增長了少許的網絡IO而已。最後再在reduce階段作表間join便可。
這個過程其實須要先對小表的數據作BloomFilter訓練,構造一個BloomFilter樣本文件(二進制的),放到分佈式緩存,而後在map階段被讀入用來過濾大表。而hadoop早已經支持 BloomFilter 了,咱們只需調相應的API便可,ok 下面上代碼了。mysql
01 |
import java.io.BufferedReader; |
02 |
import java.io.IOException; |
03 |
import java.io.InputStreamReader; |
04 |
import java.util.zip.GZIPInputStream; |
06 |
import org.apache.hadoop.conf.Configuration; |
07 |
import org.apache.hadoop.fs.FSDataOutputStream; |
08 |
import org.apache.hadoop.fs.FileStatus; |
09 |
import org.apache.hadoop.fs.FileSystem; |
10 |
import org.apache.hadoop.fs.Path; |
11 |
import org.apache.hadoop.util.bloom.BloomFilter; |
12 |
import org.apache.hadoop.util.bloom.Key; |
13 |
import org.apache.hadoop.util.hash.Hash; |
15 |
public class TrainingBloomfilter { |
17 |
public static int getOptimalBloomFilterSize( int numRecords, |
19 |
int size = ( int ) (-numRecords * ( float ) Math.log(falsePosRate) / Math |
20 |
.pow(Math.log( 2 ), 2 )); |
24 |
public static int getOptimalK( float numMembers, float vectorSize) { |
25 |
return ( int ) Math.round(vectorSize / numMembers * Math.log( 2 )); |
28 |
public static void main(String[] args) throws IOException { |
30 |
Path inputFile = new Path( "/tmp/decli/user1.txt" ); |
31 |
int numMembers = Integer.parseInt( "10" ); |
32 |
float falsePosRate = Float.parseFloat( "0.01" ); |
33 |
Path bfFile = new Path( "/tmp/decli/bloom.bin" ); |
36 |
int vectorSize = getOptimalBloomFilterSize(numMembers, falsePosRate); |
37 |
int nbHash = getOptimalK(numMembers, vectorSize); |
40 |
BloomFilter filter = new BloomFilter(vectorSize, nbHash, |
45 |
System.out.println( "Training Bloom filter of size " + vectorSize |
46 |
+ " with " + nbHash + " hash functions, " + numMembers |
47 |
+ " approximate number of records, and " + falsePosRate |
48 |
+ " false positive rate" ); |
52 |
FileSystem fs = FileSystem.get( new Configuration()); |
53 |
for (FileStatus status : fs.listStatus(inputFile)) { |
56 |
if (status.getPath().getName().endsWith( ".gz" )) { |
57 |
rdr = new BufferedReader( new InputStreamReader( |
58 |
new GZIPInputStream(fs.open(status.getPath())))); |
60 |
rdr = new BufferedReader( new InputStreamReader(fs.open(status |
64 |
System.out.println( "Reading " + status.getPath()); |
65 |
while ((line = rdr.readLine()) != null ) { |
66 |
filter.add( new Key(line.getBytes())); |
73 |
System.out.println( "Trained Bloom filter with " + numRecords |
76 |
System.out.println( "Serializing Bloom filter to HDFS at " + bfFile); |
77 |
FSDataOutputStream strm = fs.create(bfFile); |
83 |
System.out.println( "Done training Bloom filter." ); |
001 |
import java.io.BufferedReader; |
002 |
import java.io.DataInputStream; |
003 |
import java.io.FileInputStream; |
004 |
import java.io.IOException; |
005 |
import java.util.StringTokenizer; |
007 |
import org.apache.hadoop.conf.Configuration; |
008 |
import org.apache.hadoop.filecache.DistributedCache; |
009 |
import org.apache.hadoop.fs.FileSystem; |
010 |
import org.apache.hadoop.fs.Path; |
011 |
import org.apache.hadoop.io.NullWritable; |
012 |
import org.apache.hadoop.io.Text; |
013 |
import org.apache.hadoop.mapreduce.Job; |
014 |
import org.apache.hadoop.mapreduce.Mapper; |
015 |
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; |
016 |
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; |
017 |
import org.apache.hadoop.util.GenericOptionsParser; |
018 |
import org.apache.hadoop.util.bloom.BloomFilter; |
019 |
import org.apache.hadoop.util.bloom.Key; |
021 |
public class BloomFilteringDriver { |
023 |
public static class BloomFilteringMapper extends |
024 |
Mapper<Object, Text, Text, NullWritable> { |
026 |
private BloomFilter filter = new BloomFilter(); |
029 |
protected void setup(Context context) throws IOException, |
030 |
InterruptedException { |
032 |
BufferedReader in = null ; |
036 |
Path[] paths = DistributedCache.getLocalCacheFiles(context |
037 |
.getConfiguration()); |
038 |
for (Path path : paths) { |
039 |
if (path.toString().contains( "bloom.bin" )) { |
040 |
DataInputStream strm = new DataInputStream( |
041 |
new FileInputStream(path.toString())); |
043 |
filter.readFields(strm); |
047 |
} catch (IOException e) { |
054 |
} catch (IOException e) { |
061 |
public void map(Object key, Text value, Context context) |
062 |
throws IOException, InterruptedException { |
065 |
String comment = value.toString(); |
068 |
if (comment == null || comment.isEmpty()) { |
072 |
StringTokenizer tokenizer = new StringTokenizer(comment); |
074 |
while (tokenizer.hasMoreTokens()) { |
077 |
String cleanWord = tokenizer.nextToken().replaceAll( "'" , "" ) |
078 |
.replaceAll( "[^a-zA-Z]" , " " ); |
081 |
if (cleanWord.length() > 0 |
082 |
&& filter.membershipTest( new Key(cleanWord.getBytes()))) { |
083 |
context.write( new Text(cleanWord), NullWritable.get()); |
090 |
public static void main(String[] args) throws Exception { |
092 |
Configuration conf = new Configuration(); |
093 |
String[] otherArgs = new GenericOptionsParser(conf, args) |
095 |
System.out.println( "================ " + otherArgs[ 0 ]); |
096 |
if (otherArgs.length != 3 ) { |
097 |
System.err.println( "Usage: BloomFiltering <in> <out>" ); |
101 |
FileSystem.get(conf).delete( new Path(otherArgs[ 2 ]), true ); |
103 |
Job job = new Job(conf, "TestBloomFiltering" ); |
104 |
job.setJarByClass(BloomFilteringDriver. class ); |
105 |
job.setMapperClass(BloomFilteringMapper. class ); |
106 |
job.setNumReduceTasks( 0 ); |
107 |
job.setOutputKeyClass(Text. class ); |
108 |
job.setOutputValueClass(NullWritable. class ); |
109 |
FileInputFormat.addInputPath(job, new Path(otherArgs[ 1 ])); |
110 |
FileOutputFormat.setOutputPath(job, new Path(otherArgs[ 2 ])); |
112 |
DistributedCache.addCacheFile( new Path( "/tmp/decli/bloom.bin" ).toUri(), |