BloomFilter 簡介及在 Hadoop reduce side join 中的應用

一、BloomFilter能解決什麼問題? 
     以少許的內存空間判斷一個元素是否屬於這個集合, 代價是有必定的錯誤率 

二、工做原理 
     1. 初始化一個數組, 全部位標爲0,  A={x1, x2, x3,…,xm}  (x1, x2, x3,…,xm 初始爲0) 
     2. 將已知集合S中的每個數組, 按如下方式映射到A中 
          2.0  選取n個互相獨立的hash函數 h1, h2, … hk 
          2.1  將元素經過以上hash函數獲得一組索引值 h1(xi), h2(xi),…,hk(xi) 
          2.2  將集合A中的上述索引值標記爲1(若是不一樣元素有重複, 則重複覆蓋爲1, 這是一個覓等操做) 
     3.  對於一個元素x, 將其根據2.0中選取的hash函數, 進行hash, 獲得一組索引值 h1(x), h2(x), …,hk(x) 
          若是集合A中的這些索引位置上的值都是1, 表示這個元素屬於集合S, 不然則不屬於S 

舉例說明: 

創建一個容量爲500萬的Bit Array結構(Bit Array的大小和keyword的數量決定了誤判的概率),將集合中的每一個keyword經過32個hash函數分別計算出32個數字,而後對這32個數字分別用500萬取模,而後將Bit Array中對應的位置爲1,咱們將其稱爲特徵值。簡單的說就是將每一個keyword對應到Bit Array中的32個位置上,見下圖:

bloom

當須要快速查找某個keyword時,只要將其經過一樣的32個hash函數運算,而後映射到Bit Array中的對應位,若是Bit Array中的對應位所有是1,那麼說明該keyword匹配成功(會有誤判的概率)。


三、幾個前提
     1. hash函數的計算不能性能太差, 不然得不償失
     2. 任意兩個hash函數之間必須是獨立的.
          即任意兩個hash函數不存在單一相關性, 不然hash到其中一個索引上的元素也一定會hash到另外一個相關的索引上, 這樣多個hash沒有意義html


四、錯誤率
     工做原理的第3步, 的出來的結論, 一個是絕對靠譜的, 一個是不能100%靠譜的。在判斷一個元素是否屬於某個集合時,有可能會把不屬於這個集合的元素誤認爲屬於這個集合(false positive)。所以,Bloom Filter不適合那些「零錯誤」的應用場合。而在能容忍低錯誤率的應用場合下,Bloom Filter經過極少的錯誤換取了存儲空間的極大節省。關於具體的錯誤率,這和最優的哈希函數個數以及位數組的大小有關,而這是能夠估算求得一個最優解的:
哈希函數個數k、位數組大小m及字符串數量n之間存在相互關係。相關文獻證實了對於給定的m、n,當 k = ln(2)* m/n 時出錯的機率是最小的。  具體的請看:http://blog.csdn.net/jiaomeng/article/details/1495500


五、基本特徵
從以上對基本原理和數學基礎的分析,咱們能夠獲得Bloom filter的以下基本特徵,用於指導實際應用。
(1)存在必定錯誤率,發生在正向判斷上(存在性),反向判斷不會發生錯誤(不存在性);
(2)錯誤率是可控制的,經過改變位數組大小、hash函數個數或更低碰撞率的hash函數來調節;
(3)保持較低的錯誤率,位數組空位至少保持在一半以上;
(4)給定m和n,能夠肯定最優hash個數,即k = ln2 * (m/n),此時錯誤率最小;
(5)給定容許的錯誤率E,能夠肯定合適的位數組大小,即m >= log2(e) * (n * log2(1/E)),繼而肯定hash函數個數k;
(6)正向錯誤率沒法徹底消除,即便不對位數組大小和hash函數個數進行限制,即沒法實現零錯誤率;
(7)空間效率高,僅保存「存在狀態」,但沒法存儲完整信息,須要其餘數據結構輔助存儲;
(8)不支持元素刪除操做,由於不能保證刪除的安全性。java


六、應用場景舉例:
(1)拼寫檢查、數據庫系統、文件系統
(2)假設要你寫一個網絡蜘蛛(web crawler)。因爲網絡間的連接錯綜複雜,蜘蛛在網絡間爬行極可能會造成「環」。爲了不造成「環」,就須要知道蜘蛛已經訪問過那些URL。給一個URL,怎樣知道蜘蛛是否已經訪問過呢?
(3)網絡應用
  P2P網絡中查找資源操做,能夠對每條網絡通路保存Bloom Filter,當命中時,則選擇該通路訪問。
  廣播消息時,能夠檢測某個IP是否已發包。
  檢測廣播消息包的環路,將Bloom Filter保存在包裏,每一個節點將本身添加入Bloom Filter。
  信息隊列管理,使用Counter Bloom Filter管理信息流量。
(4)垃圾郵件地址過濾
  像網易,QQ這樣的公衆電子郵件(email)提供商,老是須要過濾來自發送垃圾郵件的人(spamer)的垃圾郵件。一個辦法就是記錄下那些發垃圾郵件的email 地址。因爲那些發送者不停地在註冊新的地址,全世界少說也有幾十億個發垃圾郵件的地址,將他們都存起來則須要大量的網絡服務器。若是用哈希表,每存儲一億個 email 地址,就須要1.6GB 的內存(用哈希表實現的具體辦法是將每個email 地址對應成一個八字節的信息指紋,而後將這些信息指紋存入哈希表,因爲哈希表的存儲效率通常只有50%,所以一個email 地址須要佔用十六個字節。一億個地址大約要1.6GB, 即十六億字節的內存)。所以存貯幾十億個郵件地址可能須要上百GB 的內存。而Bloom Filter只須要哈希表1/8 到1/4 的大小就能解決一樣的問題。Bloom Filter決不會漏掉任何一個在黑名單中的可疑地址。而至於誤判問題,常見的補救辦法是在創建一個小的白名單,存儲那些可能別誤判的郵件地址。
(5)Bloomfilter在HBase中的做用
      HBase利用Bloomfilter來提升隨機讀(Get)的性能,對於順序讀(Scan)而言,設置Bloomfilter是沒有做用的(0.92之後,若是設置了bloomfilter爲ROWCOL,對於指定了qualifier的Scan有必定的優化,但不是那種直接過濾文件,排除在查找範圍的形式) 
      Bloomfilter在HBase中的開銷? 
Bloomfilter是一個列族(cf)級別的配置屬性,若是你在表中設置了Bloomfilter,那麼HBase會在生成StoreFile時包含一份bloomfilter結構的數據,稱其爲MetaBlock;MetaBlock與DataBlock(真實的KeyValue數據)一塊兒由LRUBlockCache維護。因此,開啓bloomfilter會有必定的存儲及內存cache開銷。 
     Bloomfilter如何提升隨機讀(Get)的性能? 
對於某個region的隨機讀,HBase會遍歷讀memstore及storefile(按照必定的順序),將結果合併返回給客戶端。若是你設置了bloomfilter,那麼在遍歷讀storefile時,就能夠利用bloomfilter,忽略某些storefile。 
     注意:hbase的bloom filter是惰性加載的,在寫壓力比較大的狀況下,會有不停的compact併產生storefile,那麼新的storefile是不會立刻將bloom filter加載到內存的,等到讀請求來的時候才加載。 
這樣問題就來了,第一,若是storefile設置的比較大,max size爲2G,這會致使bloom filter也比較大;第二,系統的讀寫壓力都比較大。這樣或許會常常出現單個 GET請求花費3-5秒的超時現象。

七、reduce side join + BloomFilter 在hadoop中的應用舉例:
在某些狀況下,SemiJoin抽取出來的小表的key集合在內存中仍然存放不下,這時候可使用BloomFiler以節省空間。將小表中的key保存到BloomFilter中,在map階段過濾大表,可能有一些不在小表中的記錄沒有過濾掉(可是在小表中的記錄必定不會過濾掉),這不要緊,只不過增長了少許的網絡IO而已。最後再在reduce階段作表間join便可。
這個過程其實須要先對小表的數據作BloomFilter訓練,構造一個BloomFilter樣本文件(二進制的),放到分佈式緩存,而後在map階段被讀入用來過濾大表。而hadoop早已經支持 BloomFilter 了,咱們只需調相應的API便可,ok 下面上代碼了。mysql

01 import java.io.BufferedReader;
02 import java.io.IOException;
03 import java.io.InputStreamReader;
04 import java.util.zip.GZIPInputStream;
05  
06 import org.apache.hadoop.conf.Configuration;
07 import org.apache.hadoop.fs.FSDataOutputStream;
08 import org.apache.hadoop.fs.FileStatus;
09 import org.apache.hadoop.fs.FileSystem;
10 import org.apache.hadoop.fs.Path;
11 import org.apache.hadoop.util.bloom.BloomFilter;
12 import org.apache.hadoop.util.bloom.Key;
13 import org.apache.hadoop.util.hash.Hash;
14  
15 public class TrainingBloomfilter {
16  
17     public static int getOptimalBloomFilterSize(int numRecords,
18             float falsePosRate) {
19         int size = (int) (-numRecords * (float) Math.log(falsePosRate) / Math
20                 .pow(Math.log(2), 2));
21         return size;
22     }
23  
24     public static int getOptimalK(float numMembers, float vectorSize) {
25         return (int) Math.round(vectorSize / numMembers * Math.log(2));
26     }
27  
28     public static void main(String[] args) throws IOException {
29  
30         Path inputFile = new Path("/tmp/decli/user1.txt");
31         int numMembers = Integer.parseInt("10");
32         float falsePosRate = Float.parseFloat("0.01");
33         Path bfFile = new Path("/tmp/decli/bloom.bin");
34  
35         // Calculate our vector size and optimal K value based on approximations
36         int vectorSize = getOptimalBloomFilterSize(numMembers, falsePosRate);
37         int nbHash = getOptimalK(numMembers, vectorSize);
38  
39         // create new Bloom filter
40         BloomFilter filter = new BloomFilter(vectorSize, nbHash,
41                 Hash.MURMUR_HASH);
42  
43         // Open file for read
44  
45         System.out.println("Training Bloom filter of size " + vectorSize
46                 " with " + nbHash + " hash functions, " + numMembers
47                 " approximate number of records, and " + falsePosRate
48                 " false positive rate");
49  
50         String line = null;
51         int numRecords = 0;
52         FileSystem fs = FileSystem.get(new Configuration());
53         for (FileStatus status : fs.listStatus(inputFile)) {
54             BufferedReader rdr;
55             // if file is gzipped, wrap it in a GZIPInputStream
56             if (status.getPath().getName().endsWith(".gz")) {
57                 rdr = new BufferedReader(new InputStreamReader(
58                         new GZIPInputStream(fs.open(status.getPath()))));
59             else {
60                 rdr = new BufferedReader(new InputStreamReader(fs.open(status
61                         .getPath())));
62             }
63  
64             System.out.println("Reading " + status.getPath());
65             while ((line = rdr.readLine()) != null) {
66                 filter.add(new Key(line.getBytes()));
67                 ++numRecords;
68             }
69  
70             rdr.close();
71         }
72  
73         System.out.println("Trained Bloom filter with " + numRecords
74                 " entries.");
75  
76         System.out.println("Serializing Bloom filter to HDFS at " + bfFile);
77         FSDataOutputStream strm = fs.create(bfFile);
78         filter.write(strm);
79  
80         strm.flush();
81         strm.close();
82  
83         System.out.println("Done training Bloom filter.");
84  
85     }
86  
87 }

001 import java.io.BufferedReader;
002 import java.io.DataInputStream;
003 import java.io.FileInputStream;
004 import java.io.IOException;
005 import java.util.StringTokenizer;
006  
007 import org.apache.hadoop.conf.Configuration;
008 import org.apache.hadoop.filecache.DistributedCache;
009 import org.apache.hadoop.fs.FileSystem;
010 import org.apache.hadoop.fs.Path;
011 import org.apache.hadoop.io.NullWritable;
012 import org.apache.hadoop.io.Text;
013 import org.apache.hadoop.mapreduce.Job;
014 import org.apache.hadoop.mapreduce.Mapper;
015 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
016 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
017 import org.apache.hadoop.util.GenericOptionsParser;
018 import org.apache.hadoop.util.bloom.BloomFilter;
019 import org.apache.hadoop.util.bloom.Key;
020  
021 public class BloomFilteringDriver {
022  
023     public static class BloomFilteringMapper extends
024             Mapper<Object, Text, Text, NullWritable> {
025  
026         private BloomFilter filter = new BloomFilter();
027  
028         @Override
029         protected void setup(Context context) throws IOException,
030                 InterruptedException {
031  
032             BufferedReader in = null;
033  
034             try {
035                 // 從當前做業中獲取要緩存的文件
036                 Path[] paths = DistributedCache.getLocalCacheFiles(context
037                         .getConfiguration());
038                 for (Path path : paths) {
039                     if (path.toString().contains("bloom.bin")) {
040                         DataInputStream strm = new DataInputStream(
041                                 new FileInputStream(path.toString()));
042                         // Read into our Bloom filter.
043                         filter.readFields(strm);
044                         strm.close();
045                     }
046                 }
047             catch (IOException e) {
048                 e.printStackTrace();
049             finally {
050                 try {
051                     if (in != null) {
052                         in.close();
053                     }
054                 catch (IOException e) {
055                     e.printStackTrace();
056                 }
057             }
058         }
059  
060         @Override
061         public void map(Object key, Text value, Context context)
062                 throws IOException, InterruptedException {
063  
064             // Get the value for the comment
065             String comment = value.toString();
066  
067             // If it is null, skip this record
068             if (comment == null || comment.isEmpty()) {
069                 return;
070             }
071  
072             StringTokenizer tokenizer = new StringTokenizer(comment);
073             // For each word in the comment
074             while (tokenizer.hasMoreTokens()) {
075  
076                 // Clean up the words
077                 String cleanWord = tokenizer.nextToken().replaceAll("'""")
078                         .replaceAll("[^a-zA-Z]"" ");
079  
080                 // If the word is in the filter, output it and break
081                 if (cleanWord.length() > 0
082                         && filter.membershipTest(new Key(cleanWord.getBytes()))) {
083                     context.write(new Text(cleanWord), NullWritable.get());
084                     // break;
085                 }
086             }
087         }
088     }
089  
090     public static void main(String[] args) throws Exception {
091  
092         Configuration conf = new Configuration();
093         String[] otherArgs = new GenericOptionsParser(conf, args)
094                 .getRemainingArgs();
095         System.out.println("================ " + otherArgs[0]);
096         if (otherArgs.length != 3) {
097             System.err.println("Usage: BloomFiltering <in> <out>");
098             System.exit(1);
099         }
100  
101         FileSystem.get(conf).delete(new Path(otherArgs[2]), true);
102  
103         Job job = new Job(conf, "TestBloomFiltering");
104         job.setJarByClass(BloomFilteringDriver.class);
105         job.setMapperClass(BloomFilteringMapper.class);
106         job.setNumReduceTasks(0);
107         job.setOutputKeyClass(Text.class);
108         job.setOutputValueClass(NullWritable.class);
109         FileInputFormat.addInputPath(job, new Path(otherArgs[1]));
110         FileOutputFormat.setOutputPath(job, new Path(otherArgs[2]));
111  
112         DistributedCache.addCacheFile(new Path("/tmp/decli/bloom.bin").toUri(),
113
相關文章
相關標籤/搜索