當須要快速查找某個keyword時,只要將其經過一樣的32個hash函數運算,而後映射到Bit Array中的對應位,若是Bit Array中的對應位所有是1,那麼說明該keyword匹配成功(會有誤判的概率)。
三、幾個前提
1. hash函數的計算不能性能太差, 不然得不償失
2. 任意兩個hash函數之間必須是獨立的.
即任意兩個hash函數不存在單一相關性, 不然hash到其中一個索引上的元素也一定會hash到另外一個相關的索引上, 這樣多個hash沒有意義 html
四、錯誤率
工做原理的第3步, 的出來的結論, 一個是絕對靠譜的, 一個是不能100%靠譜的。在判斷一個元素是否屬於某個集合時,有可能會把不屬於這個集合的元素誤認爲屬於這個集合(false positive)。所以,Bloom Filter不適合那些「零錯誤」的應用場合。而在能容忍低錯誤率的應用場合下,Bloom Filter經過極少的錯誤換取了存儲空間的極大節省。關於具體的錯誤率,這和最優的哈希函數個數以及位數組的大小有關,而這是能夠估算求得一個最優解的:
哈希函數個數k、位數組大小m及字符串數量n之間存在相互關係。相關文獻證實了對於給定的m、n,當 k = ln(2)* m/n 時出錯的機率是最小的。 具體的請看:http://blog.csdn.net/jiaomeng/article/details/1495500
五、基本特徵
從以上對基本原理和數學基礎的分析,咱們能夠獲得Bloom filter的以下基本特徵,用於指導實際應用。
(1)存在必定錯誤率,發生在正向判斷上(存在性),反向判斷不會發生錯誤(不存在性);
(2)錯誤率是可控制的,經過改變位數組大小、hash函數個數或更低碰撞率的hash函數來調節;
(3)保持較低的錯誤率,位數組空位至少保持在一半以上;
(4)給定m和n,能夠肯定最優hash個數,即k = ln2 * (m/n),此時錯誤率最小;
(5)給定容許的錯誤率E,能夠肯定合適的位數組大小,即m >= log2(e) * (n * log2(1/E)),繼而肯定hash函數個數k;
(6)正向錯誤率沒法徹底消除,即便不對位數組大小和hash函數個數進行限制,即沒法實現零錯誤率;
(7)空間效率高,僅保存「存在狀態」,但沒法存儲完整信息,須要其餘數據結構輔助存儲;
(8)不支持元素刪除操做,由於不能保證刪除的安全性。 java
六、應用場景舉例:
(1)拼寫檢查、數據庫系統、文件系統
(2)假設要你寫一個網絡蜘蛛(web crawler)。因爲網絡間的連接錯綜複雜,蜘蛛在網絡間爬行極可能會造成「環」。爲了不造成「環」,就須要知道蜘蛛已經訪問過那些URL。給一個URL,怎樣知道蜘蛛是否已經訪問過呢?
(3)網絡應用
P2P網絡中查找資源操做,能夠對每條網絡通路保存Bloom Filter,當命中時,則選擇該通路訪問。
廣播消息時,能夠檢測某個IP是否已發包。
檢測廣播消息包的環路,將Bloom Filter保存在包裏,每一個節點將本身添加入Bloom Filter。
信息隊列管理,使用Counter Bloom Filter管理信息流量。
(4)垃圾郵件地址過濾
像網易,QQ這樣的公衆電子郵件(email)提供商,老是須要過濾來自發送垃圾郵件的人(spamer)的垃圾郵件。一個辦法就是記錄下那些發垃圾郵件的email 地址。因爲那些發送者不停地在註冊新的地址,全世界少說也有幾十億個發垃圾郵件的地址,將他們都存起來則須要大量的網絡服務器。若是用哈希表,每存儲一億個 email 地址,就須要1.6GB 的內存(用哈希表實現的具體辦法是將每個email 地址對應成一個八字節的信息指紋,而後將這些信息指紋存入哈希表,因爲哈希表的存儲效率通常只有50%,所以一個email 地址須要佔用十六個字節。一億個地址大約要1.6GB, 即十六億字節的內存)。所以存貯幾十億個郵件地址可能須要上百GB 的內存。而Bloom Filter只須要哈希表1/8 到1/4 的大小就能解決一樣的問題。Bloom Filter決不會漏掉任何一個在黑名單中的可疑地址。而至於誤判問題,常見的補救辦法是在創建一個小的白名單,存儲那些可能別誤判的郵件地址。
(5)Bloomfilter在HBase中的做用
HBase利用Bloomfilter來提升隨機讀(Get)的性能,對於順序讀(Scan)而言,設置Bloomfilter是沒有做用的(0.92之後,若是設置了bloomfilter爲ROWCOL,對於指定了qualifier的Scan有必定的優化,但不是那種直接過濾文件,排除在查找範圍的形式)
Bloomfilter在HBase中的開銷?
Bloomfilter是一個列族(cf)級別的配置屬性,若是你在表中設置了Bloomfilter,那麼HBase會在生成StoreFile時包含一份bloomfilter結構的數據,稱其爲MetaBlock;MetaBlock與DataBlock(真實的KeyValue數據)一塊兒由LRUBlockCache維護。因此,開啓bloomfilter會有必定的存儲及內存cache開銷。
Bloomfilter如何提升隨機讀(Get)的性能?
對於某個region的隨機讀,HBase會遍歷讀memstore及storefile(按照必定的順序),將結果合併返回給客戶端。若是你設置了bloomfilter,那麼在遍歷讀storefile時,就能夠利用bloomfilter,忽略某些storefile。
注意:hbase的bloom filter是惰性加載的,在寫壓力比較大的狀況下,會有不停的compact併產生storefile,那麼新的storefile是不會立刻將bloom filter加載到內存的,等到讀請求來的時候才加載。
這樣問題就來了,第一,若是storefile設置的比較大,max size爲2G,這會致使bloom filter也比較大;第二,系統的讀寫壓力都比較大。這樣或許會常常出現單個 GET請求花費3-5秒的超時現象。
七、reduce side join + BloomFilter 在hadoop中的應用舉例:
在某些狀況下,SemiJoin抽取出來的小表的key集合在內存中仍然存放不下,這時候可使用BloomFiler以節省空間。將小表中的key保存到BloomFilter中,在map階段過濾大表,可能有一些不在小表中的記錄沒有過濾掉(可是在小表中的記錄必定不會過濾掉),這不要緊,只不過增長了少許的網絡IO而已。最後再在reduce階段作表間join便可。
這個過程其實須要先對小表的數據作BloomFilter訓練,構造一個BloomFilter樣本文件(二進制的),放到分佈式緩存,而後在map階段被讀入用來過濾大表。而hadoop早已經支持 BloomFilter 了,咱們只需調相應的API便可,ok 下面上代碼了。 mysql
import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.util.zip.GZIPInputStream; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.util.bloom.BloomFilter; import org.apache.hadoop.util.bloom.Key; import org.apache.hadoop.util.hash.Hash; public class TrainingBloomfilter { public static int getOptimalBloomFilterSize(int numRecords, float falsePosRate) { int size = (int) (-numRecords * (float) Math.log(falsePosRate) / Math .pow(Math.log(2), 2)); return size; } public static int getOptimalK(float numMembers, float vectorSize) { return (int) Math.round(vectorSize / numMembers * Math.log(2)); } public static void main(String[] args) throws IOException { Path inputFile = new Path("/tmp/decli/user1.txt"); int numMembers = Integer.parseInt("10"); float falsePosRate = Float.parseFloat("0.01"); Path bfFile = new Path("/tmp/decli/bloom.bin"); // Calculate our vector size and optimal K value based on approximations int vectorSize = getOptimalBloomFilterSize(numMembers, falsePosRate); int nbHash = getOptimalK(numMembers, vectorSize); // create new Bloom filter BloomFilter filter = new BloomFilter(vectorSize, nbHash, Hash.MURMUR_HASH); // Open file for read System.out.println("Training Bloom filter of size " + vectorSize + " with " + nbHash + " hash functions, " + numMembers + " approximate number of records, and " + falsePosRate + " false positive rate"); String line = null; int numRecords = 0; FileSystem fs = FileSystem.get(new Configuration()); for (FileStatus status : fs.listStatus(inputFile)) { BufferedReader rdr; // if file is gzipped, wrap it in a GZIPInputStream if (status.getPath().getName().endsWith(".gz")) { rdr = new BufferedReader(new InputStreamReader( new GZIPInputStream(fs.open(status.getPath())))); } else { rdr = new BufferedReader(new InputStreamReader(fs.open(status .getPath()))); } System.out.println("Reading " + status.getPath()); while ((line = rdr.readLine()) != null) { filter.add(new Key(line.getBytes())); ++numRecords; } rdr.close(); } System.out.println("Trained Bloom filter with " + numRecords + " entries."); System.out.println("Serializing Bloom filter to HDFS at " + bfFile); FSDataOutputStream strm = fs.create(bfFile); filter.write(strm); strm.flush(); strm.close(); System.out.println("Done training Bloom filter."); } }
import java.io.BufferedReader; import java.io.DataInputStream; import java.io.FileInputStream; import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.filecache.DistributedCache; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; import org.apache.hadoop.util.bloom.BloomFilter; import org.apache.hadoop.util.bloom.Key; public class BloomFilteringDriver { public static class BloomFilteringMapper extends Mapper<Object, Text, Text, NullWritable> { private BloomFilter filter = new BloomFilter(); @Override protected void setup(Context context) throws IOException, InterruptedException { BufferedReader in = null; try { // 從當前做業中獲取要緩存的文件 Path[] paths = DistributedCache.getLocalCacheFiles(context .getConfiguration()); for (Path path : paths) { if (path.toString().contains("bloom.bin")) { DataInputStream strm = new DataInputStream( new FileInputStream(path.toString())); // Read into our Bloom filter. filter.readFields(strm); strm.close(); } } } catch (IOException e) { e.printStackTrace(); } finally { try { if (in != null) { in.close(); } } catch (IOException e) { e.printStackTrace(); } } } @Override public void map(Object key, Text value, Context context) throws IOException, InterruptedException { // Get the value for the comment String comment = value.toString(); // If it is null, skip this record if (comment == null || comment.isEmpty()) { return; } StringTokenizer tokenizer = new StringTokenizer(comment); // For each word in the comment while (tokenizer.hasMoreTokens()) { // Clean up the words String cleanWord = tokenizer.nextToken().replaceAll("'", "") .replaceAll("[^a-zA-Z]", " "); // If the word is in the filter, output it and break if (cleanWord.length() > 0 && filter.membershipTest(new Key(cleanWord.getBytes()))) { context.write(new Text(cleanWord), NullWritable.get()); // break; } } } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args) .getRemainingArgs(); System.out.println("================ " + otherArgs[0]); if (otherArgs.length != 3) { System.err.println("Usage: BloomFiltering <in> <out>"); System.exit(1); } FileSystem.get(conf).delete(new Path(otherArgs[2]), true); Job job = new Job(conf, "TestBloomFiltering"); job.setJarByClass(BloomFilteringDriver.class); job.setMapperClass(BloomFilteringMapper.class); job.setNumReduceTasks(0); job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); FileInputFormat.addInputPath(job, new Path(otherArgs[1])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[2])); DistributedCache.addCacheFile(new Path("/tmp/decli/bloom.bin").toUri(), job.getConfiguration()); System.exit(job.waitForCompletion(true) ? 0 : 1); } }測試文件:
user1.txt git
test
xiaowang
xiao
wang
test
user2.txt github
test xiaowang
xiao wang test
test1 2xiaowang
1xiao wa2ng atest
運行命令: web
hadoop jar trainbloom.jar TrainingBloomfilter
hadoop jar bloom.jar BloomFilteringDriver /tmp/decli/user2.txt /tmp/decli/result sql
結果: 數據庫
root@master 192.168.120.236 ~/lijun06 >
hadoop fs -cat /tmp/decli/result/p*
test
xiaowang
xiao
wang
test
root@master 192.168.120.236 ~/lijun06 > apache
八、關於 hadoop mapreduce join 的幾種方式,請參考: 數組
http://my.oschina.net/leejun2005/blog/95186
http://my.oschina.net/leejun2005/blog/111963
九、本文參考 or 推薦閱讀:
http://www.jiacheo.org/blog/304
http://blog.csdn.net/jiaomeng/article/details/1495500
http://www.iteye.com/blogs/tag/BloomFilter
http://www.cnblogs.com/dong008259/archive/2012/01/04/2311332.html
http://blog.csdn.net/liuben/article/details/6602683
http://ourmysql.com/archives/510?f=wb
https://zh.wikipedia.org/wiki/%E5%B8%83%E9%9A%86%E8%BF%87%E6%BB%A4%E5%99%A8
http://www.oratea.net/?p=1248
http://zjushch.iteye.com/blog/1530143
https://github.com/adamjshook/mapreducepatterns/blob/master/MRDP/src/main/java/mrdp/appendixA/BloomFilterDriver.java
https://github.com/adamjshook/mapreducepatterns/tree/master/MRDP/src/main/java/mrdp/ch3
https://github.com/alexholmes/hadoop-book/tree/master/src/main/java/com/manning/hip/ch7/bloom
bloom filter能夠看作是對bit-map的擴展,只是 bitmap 通常只用了一個hash作映射,
具體能夠參考:
http://www.cnblogs.com/pangxiaodong/archive/2011/08/14/2137748.html
http://kb.cnblogs.com/page/77440/