BloomFilter 簡介及在 Hadoop reduce side join 中的應用

一、BloomFilter能解決什麼問題?
     以少許的內存空間判斷一個元素是否屬於這個集合, 代價是有必定的錯誤率

二、工做原理
     1. 初始化一個數組, 全部位標爲0,  A={x1, x2, x3,…,xm}  (x1, x2, x3,…,xm 初始爲0)
     2. 將已知集合S中的每個數組, 按如下方式映射到A中
          2.0  選取n個互相獨立的hash函數 h1, h2, … hk
          2.1  將元素經過以上hash函數獲得一組索引值 h1(xi), h2(xi),…,hk(xi)
          2.2  將集合A中的上述索引值標記爲1(若是不一樣元素有重複, 則重複覆蓋爲1, 這是一個覓等操做)
     3.  對於一個元素x, 將其根據2.0中選取的hash函數, 進行hash, 獲得一組索引值 h1(x), h2(x), …,hk(x)
          若是集合A中的這些索引位置上的值都是1, 表示這個元素屬於集合S, 不然則不屬於S

舉例說明:

創建一個容量爲500萬的Bit Array結構(Bit Array的大小和keyword的數量決定了誤判的概率),將集合中的每一個keyword經過32個hash函數分別計算出32個數字,而後對這32個數字分別用500萬取模,而後將Bit Array中對應的位置爲1,咱們將其稱爲特徵值。簡單的說就是將每一個keyword對應到Bit Array中的32個位置上,見下圖:

bloom

當須要快速查找某個keyword時,只要將其經過一樣的32個hash函數運算,而後映射到Bit Array中的對應位,若是Bit Array中的對應位所有是1,那麼說明該keyword匹配成功(會有誤判的概率)。


三、幾個前提
     1. hash函數的計算不能性能太差, 不然得不償失
     2. 任意兩個hash函數之間必須是獨立的.
          即任意兩個hash函數不存在單一相關性, 不然hash到其中一個索引上的元素也一定會hash到另外一個相關的索引上, 這樣多個hash沒有意義 html


四、錯誤率
     工做原理的第3步, 的出來的結論, 一個是絕對靠譜的, 一個是不能100%靠譜的。在判斷一個元素是否屬於某個集合時,有可能會把不屬於這個集合的元素誤認爲屬於這個集合(false positive)。所以,Bloom Filter不適合那些「零錯誤」的應用場合。而在能容忍低錯誤率的應用場合下,Bloom Filter經過極少的錯誤換取了存儲空間的極大節省。關於具體的錯誤率,這和最優的哈希函數個數以及位數組的大小有關,而這是能夠估算求得一個最優解的:
哈希函數個數k、位數組大小m及字符串數量n之間存在相互關係。相關文獻證實了對於給定的m、n,當 k = ln(2)* m/n 時出錯的機率是最小的。  具體的請看:http://blog.csdn.net/jiaomeng/article/details/1495500


五、基本特徵
從以上對基本原理和數學基礎的分析,咱們能夠獲得Bloom filter的以下基本特徵,用於指導實際應用。
(1)存在必定錯誤率,發生在正向判斷上(存在性),反向判斷不會發生錯誤(不存在性);
(2)錯誤率是可控制的,經過改變位數組大小、hash函數個數或更低碰撞率的hash函數來調節;
(3)保持較低的錯誤率,位數組空位至少保持在一半以上;
(4)給定m和n,能夠肯定最優hash個數,即k = ln2 * (m/n),此時錯誤率最小;
(5)給定容許的錯誤率E,能夠肯定合適的位數組大小,即m >= log2(e) * (n * log2(1/E)),繼而肯定hash函數個數k;
(6)正向錯誤率沒法徹底消除,即便不對位數組大小和hash函數個數進行限制,即沒法實現零錯誤率;
(7)空間效率高,僅保存「存在狀態」,但沒法存儲完整信息,須要其餘數據結構輔助存儲;
(8)不支持元素刪除操做,由於不能保證刪除的安全性。 java


六、應用場景舉例:
(1)拼寫檢查、數據庫系統、文件系統
(2)假設要你寫一個網絡蜘蛛(web crawler)。因爲網絡間的連接錯綜複雜,蜘蛛在網絡間爬行極可能會造成「環」。爲了不造成「環」,就須要知道蜘蛛已經訪問過那些URL。給一個URL,怎樣知道蜘蛛是否已經訪問過呢?
(3)網絡應用
  P2P網絡中查找資源操做,能夠對每條網絡通路保存Bloom Filter,當命中時,則選擇該通路訪問。
  廣播消息時,能夠檢測某個IP是否已發包。
  檢測廣播消息包的環路,將Bloom Filter保存在包裏,每一個節點將本身添加入Bloom Filter。
  信息隊列管理,使用Counter Bloom Filter管理信息流量。
(4)垃圾郵件地址過濾
  像網易,QQ這樣的公衆電子郵件(email)提供商,老是須要過濾來自發送垃圾郵件的人(spamer)的垃圾郵件。一個辦法就是記錄下那些發垃圾郵件的email 地址。因爲那些發送者不停地在註冊新的地址,全世界少說也有幾十億個發垃圾郵件的地址,將他們都存起來則須要大量的網絡服務器。若是用哈希表,每存儲一億個 email 地址,就須要1.6GB 的內存(用哈希表實現的具體辦法是將每個email 地址對應成一個八字節的信息指紋,而後將這些信息指紋存入哈希表,因爲哈希表的存儲效率通常只有50%,所以一個email 地址須要佔用十六個字節。一億個地址大約要1.6GB, 即十六億字節的內存)。所以存貯幾十億個郵件地址可能須要上百GB 的內存。而Bloom Filter只須要哈希表1/8 到1/4 的大小就能解決一樣的問題。Bloom Filter決不會漏掉任何一個在黑名單中的可疑地址。而至於誤判問題,常見的補救辦法是在創建一個小的白名單,存儲那些可能別誤判的郵件地址。
(5)Bloomfilter在HBase中的做用
      HBase利用Bloomfilter來提升隨機讀(Get)的性能,對於順序讀(Scan)而言,設置Bloomfilter是沒有做用的(0.92之後,若是設置了bloomfilter爲ROWCOL,對於指定了qualifier的Scan有必定的優化,但不是那種直接過濾文件,排除在查找範圍的形式) 
      Bloomfilter在HBase中的開銷? 
Bloomfilter是一個列族(cf)級別的配置屬性,若是你在表中設置了Bloomfilter,那麼HBase會在生成StoreFile時包含一份bloomfilter結構的數據,稱其爲MetaBlock;MetaBlock與DataBlock(真實的KeyValue數據)一塊兒由LRUBlockCache維護。因此,開啓bloomfilter會有必定的存儲及內存cache開銷。 
     Bloomfilter如何提升隨機讀(Get)的性能? 
對於某個region的隨機讀,HBase會遍歷讀memstore及storefile(按照必定的順序),將結果合併返回給客戶端。若是你設置了bloomfilter,那麼在遍歷讀storefile時,就能夠利用bloomfilter,忽略某些storefile。 
     注意:hbase的bloom filter是惰性加載的,在寫壓力比較大的狀況下,會有不停的compact併產生storefile,那麼新的storefile是不會立刻將bloom filter加載到內存的,等到讀請求來的時候才加載。 
這樣問題就來了,第一,若是storefile設置的比較大,max size爲2G,這會致使bloom filter也比較大;第二,系統的讀寫壓力都比較大。這樣或許會常常出現單個 GET請求花費3-5秒的超時現象。

七、reduce side join + BloomFilter 在hadoop中的應用舉例:
在某些狀況下,SemiJoin抽取出來的小表的key集合在內存中仍然存放不下,這時候可使用BloomFiler以節省空間。將小表中的key保存到BloomFilter中,在map階段過濾大表,可能有一些不在小表中的記錄沒有過濾掉(可是在小表中的記錄必定不會過濾掉),這不要緊,只不過增長了少許的網絡IO而已。最後再在reduce階段作表間join便可。
這個過程其實須要先對小表的數據作BloomFilter訓練,構造一個BloomFilter樣本文件(二進制的),放到分佈式緩存,而後在map階段被讀入用來過濾大表。而hadoop早已經支持 BloomFilter 了,咱們只需調相應的API便可,ok 下面上代碼了。 mysql

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.zip.GZIPInputStream;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.bloom.BloomFilter;
import org.apache.hadoop.util.bloom.Key;
import org.apache.hadoop.util.hash.Hash;

public class TrainingBloomfilter {

	public static int getOptimalBloomFilterSize(int numRecords,
			float falsePosRate) {
		int size = (int) (-numRecords * (float) Math.log(falsePosRate) / Math
				.pow(Math.log(2), 2));
		return size;
	}

	public static int getOptimalK(float numMembers, float vectorSize) {
		return (int) Math.round(vectorSize / numMembers * Math.log(2));
	}

	public static void main(String[] args) throws IOException {

		Path inputFile = new Path("/tmp/decli/user1.txt");
		int numMembers = Integer.parseInt("10");
		float falsePosRate = Float.parseFloat("0.01");
		Path bfFile = new Path("/tmp/decli/bloom.bin");

		// Calculate our vector size and optimal K value based on approximations
		int vectorSize = getOptimalBloomFilterSize(numMembers, falsePosRate);
		int nbHash = getOptimalK(numMembers, vectorSize);

		// create new Bloom filter
		BloomFilter filter = new BloomFilter(vectorSize, nbHash,
				Hash.MURMUR_HASH);

		// Open file for read

		System.out.println("Training Bloom filter of size " + vectorSize
				+ " with " + nbHash + " hash functions, " + numMembers
				+ " approximate number of records, and " + falsePosRate
				+ " false positive rate");

		String line = null;
		int numRecords = 0;
		FileSystem fs = FileSystem.get(new Configuration());
		for (FileStatus status : fs.listStatus(inputFile)) {
			BufferedReader rdr;
			// if file is gzipped, wrap it in a GZIPInputStream
			if (status.getPath().getName().endsWith(".gz")) {
				rdr = new BufferedReader(new InputStreamReader(
						new GZIPInputStream(fs.open(status.getPath()))));
			} else {
				rdr = new BufferedReader(new InputStreamReader(fs.open(status
						.getPath())));
			}

			System.out.println("Reading " + status.getPath());
			while ((line = rdr.readLine()) != null) {
				filter.add(new Key(line.getBytes()));
				++numRecords;
			}

			rdr.close();
		}

		System.out.println("Trained Bloom filter with " + numRecords
				+ " entries.");

		System.out.println("Serializing Bloom filter to HDFS at " + bfFile);
		FSDataOutputStream strm = fs.create(bfFile);
		filter.write(strm);

		strm.flush();
		strm.close();

		System.out.println("Done training Bloom filter.");

	}

}

import java.io.BufferedReader;
import java.io.DataInputStream;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.bloom.BloomFilter;
import org.apache.hadoop.util.bloom.Key;

public class BloomFilteringDriver {

	public static class BloomFilteringMapper extends
			Mapper<Object, Text, Text, NullWritable> {

		private BloomFilter filter = new BloomFilter();

		@Override
		protected void setup(Context context) throws IOException,
				InterruptedException {

			BufferedReader in = null;

			try {
				// 從當前做業中獲取要緩存的文件
				Path[] paths = DistributedCache.getLocalCacheFiles(context
						.getConfiguration());
				for (Path path : paths) {
					if (path.toString().contains("bloom.bin")) {
						DataInputStream strm = new DataInputStream(
								new FileInputStream(path.toString()));
						// Read into our Bloom filter.
						filter.readFields(strm);
						strm.close();
					}
				}
			} catch (IOException e) {
				e.printStackTrace();
			} finally {
				try {
					if (in != null) {
						in.close();
					}
				} catch (IOException e) {
					e.printStackTrace();
				}
			}
		}

		@Override
		public void map(Object key, Text value, Context context)
				throws IOException, InterruptedException {

			// Get the value for the comment
			String comment = value.toString();

			// If it is null, skip this record
			if (comment == null || comment.isEmpty()) {
				return;
			}

			StringTokenizer tokenizer = new StringTokenizer(comment);
			// For each word in the comment
			while (tokenizer.hasMoreTokens()) {

				// Clean up the words
				String cleanWord = tokenizer.nextToken().replaceAll("'", "")
						.replaceAll("[^a-zA-Z]", " ");

				// If the word is in the filter, output it and break
				if (cleanWord.length() > 0
						&& filter.membershipTest(new Key(cleanWord.getBytes()))) {
					context.write(new Text(cleanWord), NullWritable.get());
					// break;
				}
			}
		}
	}

	public static void main(String[] args) throws Exception {

		Configuration conf = new Configuration();
		String[] otherArgs = new GenericOptionsParser(conf, args)
				.getRemainingArgs();
		System.out.println("================ " + otherArgs[0]);
		if (otherArgs.length != 3) {
			System.err.println("Usage: BloomFiltering <in> <out>");
			System.exit(1);
		}

		FileSystem.get(conf).delete(new Path(otherArgs[2]), true);

		Job job = new Job(conf, "TestBloomFiltering");
		job.setJarByClass(BloomFilteringDriver.class);
		job.setMapperClass(BloomFilteringMapper.class);
		job.setNumReduceTasks(0);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(NullWritable.class);
		FileInputFormat.addInputPath(job, new Path(otherArgs[1]));
		FileOutputFormat.setOutputPath(job, new Path(otherArgs[2]));

		DistributedCache.addCacheFile(new Path("/tmp/decli/bloom.bin").toUri(),
				job.getConfiguration());

		System.exit(job.waitForCompletion(true) ? 0 : 1);
	}

}
測試文件:

user1.txt git

test
xiaowang
xiao
wang
test

user2.txt github

test xiaowang
xiao wang test
test1 2xiaowang
1xiao wa2ng atest


運行命令: web

hadoop jar trainbloom.jar TrainingBloomfilter 
hadoop jar bloom.jar BloomFilteringDriver /tmp/decli/user2.txt /tmp/decli/result sql

結果: 數據庫

root@master 192.168.120.236 ~/lijun06 >
hadoop fs -cat /tmp/decli/result/p*
test
xiaowang
xiao
wang
test
root@master 192.168.120.236 ~/lijun06 > apache

八、關於 hadoop mapreduce join 的幾種方式,請參考: 數組

http://my.oschina.net/leejun2005/blog/95186

http://my.oschina.net/leejun2005/blog/111963


九、本文參考 or 推薦閱讀:

http://www.jiacheo.org/blog/304
http://blog.csdn.net/jiaomeng/article/details/1495500
http://www.iteye.com/blogs/tag/BloomFilter


http://www.cnblogs.com/dong008259/archive/2012/01/04/2311332.html
http://blog.csdn.net/liuben/article/details/6602683
http://ourmysql.com/archives/510?f=wb
https://zh.wikipedia.org/wiki/%E5%B8%83%E9%9A%86%E8%BF%87%E6%BB%A4%E5%99%A8
http://www.oratea.net/?p=1248
http://zjushch.iteye.com/blog/1530143


https://github.com/adamjshook/mapreducepatterns/blob/master/MRDP/src/main/java/mrdp/appendixA/BloomFilterDriver.java
https://github.com/adamjshook/mapreducepatterns/tree/master/MRDP/src/main/java/mrdp/ch3


https://github.com/alexholmes/hadoop-book/tree/master/src/main/java/com/manning/hip/ch7/bloom


bloom filter能夠看作是對bit-map的擴展,只是 bitmap 通常只用了一個hash作映射,

具體能夠參考:

http://www.cnblogs.com/pangxiaodong/archive/2011/08/14/2137748.html

http://kb.cnblogs.com/page/77440/

http://hongweiyi.com/2012/03/data-structure-bitmap/

http://blog.csdn.net/hit_kongquan/article/details/6255673

相關文章
相關標籤/搜索