[輪子系列]Google Guava之BloomFilter源碼分析及基於Redis的重構

本文源地址: http://www.fullstackyang.com/...,轉發請註明該地址或segmentfault地址,謝謝!

1、背景知識

在網上已經有不少關於布隆過濾器的介紹了,這裏就再也不贅述,下面簡單地提煉幾個要點:java

  1. 布隆過濾器是用來判斷一個元素是否出如今給定集合中的重要工具,具備快速,比哈希表更節省空間等優勢,而缺點在於有必定的誤識別率(false-positive,假陽性),亦即,它可能會把不是集合內的元素斷定爲存在於集合內,不過這樣的機率至關小,在大部分的生產環境中是能夠接受的;
  2. 其原理比較簡單,以下圖所示,S集合中有n個元素,利用k個哈希函數,將S中的每一個元素映射到一個長度爲m的位(bit)數組B中不一樣的位置上,這些位置上的二進制數均置爲1,若是待檢測的元素通過這k個哈希函數的映射後,發現其k個位置上的二進制數不全是1,那麼這個元素必定不在集合S中,反之,該元素多是S中的某一個元素(參考1);bloom-filter.jpg
  3. 綜上描述,那麼到底須要多少個哈希函數,以及建立長度爲多少的bit數組比較合適,爲了估算出k和m的值,在構造一個布隆過濾器時,須要傳入兩個參數,便可以接受的誤判率fpp和元素總個數n(不必定徹底精確)。至於參數估計的方法,有興趣的同窗能夠參考維基英文頁面,下面直接給出公式:圖片描述
  4. 哈希函數的要求儘可能知足平均分佈,這樣既下降誤判發生的機率,又能夠充分利用bit數組的空間;
  5. 根據論文《Less Hashing, Same Performance: Building a Better Bloom Filter》提出的一個技巧,能夠用2個哈希函數來模擬k個哈希函數,即gi(x) = h1(x) + ih2(x) ,其中0<=i<=k-1;
  6. 在吳軍博士的《數學之美》一書中展現了不一樣狀況下的誤判率,例如,假定一個元素用16位比特,8個哈希函數,那麼假陽性的機率是萬分之五,這已經至關小了。

目前已經有相應實現的開源類庫,如Google的Guava類庫,Twitter的Algebird類庫,和ScalaNLP breeze等等,其中Guava 11.0版本中增長了BloomFilter類,它使用了Funnel和Sink的設計,加強了泛化的能力,使其能夠支持任何數據類型,其利用murmur3 hash來作哈希映射函數,不過它底層並無使用傳統的java.util.BitSet來作bit數組,而是用long型數組進行了從新封裝,大部分操做均基於位的運算,所以能達到一個很是好的性能;下面咱們就Guava類庫中實現布隆過濾器的源碼做詳細分析,最後出於靈活性和解耦等因素的考慮,咱們想要把布隆過濾器從JVM中拿出來,因而利用了Redis自帶的Bitmaps做爲底層的bit數組進行重構,另外隨着插入的元素愈來愈多,當實際數量遠遠大於建立時設置的預計數量時,布隆過濾器的誤判率會愈來愈高,所以在重構的過程當中增長了自動擴容的特性,最後經過測試驗證其正確性。python

2、布隆過濾器在Guava中的實現

Guava中,布隆過濾器的實現主要涉及到2個類,BloomFilter和BloomFilterStrategies,首先來看一下BloomFilter:git

/** The bit set of the BloomFilter (not necessarily power of 2!) */
  private final BitArray bits;

  /** Number of hashes per element */
  private final int numHashFunctions;

  /** The funnel to translate Ts to bytes */
  private final Funnel<? super T> funnel;

  /**
   * The strategy we employ to map an element T to {@code numHashFunctions} bit indexes.
   */
  private final Strategy strategy;

這是它的4個成員變量:github

  • BitArrays是定義在BloomFilterStrategies中的內部類,封裝了布隆過濾器底層bit數組的操做,後文詳述;
  • numHashFunctions表示哈希函數的個數,即上文提到的k;
  • Funnel,這是Guava中定義的一個接口,它和PrimitiveSink配套使用,主要是把任意類型的數據轉化成Java基本數據類型(primitive value,如char,byte,int……),默認用java.nio.ByteBuffer實現,最終均轉化爲byte數組;
  • Strategy是定義在BloomFilter類內部的接口,代碼以下,有3個方法,put(插入元素),mightContain(斷定元素是否存在)和ordinal方法(能夠理解爲枚舉類中那個默認方法)
interface Strategy extends java.io.Serializable {

    /**
     * Sets {@code numHashFunctions} bits of the given bit array, by hashing a user element.
     *
     * <p>Returns whether any bits changed as a result of this operation.
     */
    <T> boolean put(T object, Funnel<? super T> funnel, int numHashFunctions, BitArray bits);

    /**
     * Queries {@code numHashFunctions} bits of the given bit array, by hashing a user element;
     * returns {@code true} if and only if all selected bits are set.
     */
    <T> boolean mightContain(
        T object, Funnel<? super T> funnel, int numHashFunctions, BitArray bits);

    /**
     * Identifier used to encode this strategy, when marshalled as part of a BloomFilter. Only
     * values in the [-128, 127] range are valid for the compact serial form. Non-negative values
     * are reserved for enums defined in BloomFilterStrategies; negative values are reserved for any
     * custom, stateful strategy we may define (e.g. any kind of strategy that would depend on user
     * input).
     */
    int ordinal();
  }

對於建立布隆過濾器,BloomFilter並無公有的構造函數,只有一個私有構造函數,而對外它提供了5個重載的create方法,在缺省狀況下誤判率設定爲3%,採用BloomFilterStrategies.MURMUR128_MITZ_64的實現。其中4個create方法最終都調用了同一個create方法,由它來負責調用私有構造函數,其源碼以下:redis

static <T> BloomFilter<T> create(
      Funnel<? super T> funnel, long expectedInsertions, double fpp, Strategy strategy) {
    checkNotNull(funnel);
    checkArgument(
        expectedInsertions >= 0, "Expected insertions (%s) must be >= 0", expectedInsertions);
    checkArgument(fpp > 0.0, "False positive probability (%s) must be > 0.0", fpp);
    checkArgument(fpp < 1.0, "False positive probability (%s) must be < 1.0", fpp);
    checkNotNull(strategy);

    if (expectedInsertions == 0) {
      expectedInsertions = 1;
    }
    /*
     * TODO(user): Put a warning in the javadoc about tiny fpp values, since the resulting size
     * is proportional to -log(p), but there is not much of a point after all, e.g.
     * optimalM(1000, 0.0000000000000001) = 76680 which is less than 10kb. Who cares!
     */
    long numBits = optimalNumOfBits(expectedInsertions, fpp);
    int numHashFunctions = optimalNumOfHashFunctions(expectedInsertions, numBits);
    try {
      return new BloomFilter<T>(new BitArray(numBits), numHashFunctions, funnel, strategy);
    } catch (IllegalArgumentException e) {
      throw new IllegalArgumentException("Could not create BloomFilter of " + numBits + " bits", e);
    }
  }

在create中接受了4個參數,funnel(輸入的數據),expectedInsertions(預計插入的元素總數),fpp(指望誤判率),strategy(實現Strategy的實例),而後它計算了bit數組的長度以及哈希函數的個數(公式參考前文),最後用numBits建立了BitArray,並調用了構造函數完成賦值操做。算法

static long optimalNumOfBits(long n, double p) {
    if (p == 0) {
      p = Double.MIN_VALUE;
    }
    return (long) (-n * Math.log(p) / (Math.log(2) * Math.log(2)));
  }

static int optimalNumOfHashFunctions(long n, long m) {
    // (m / n) * log(2), but avoid truncation due to division!
    return Math.max(1, (int) Math.round((double) m / n * Math.log(2)));
  }

接着再來看一下BloomFilterStrategies類,首先它是實現了BloomFilter.Strategy 接口的一個枚舉類,其次它有兩個2枚舉值,MURMUR128_MITZ_32和MURMUR128_MITZ_64,分別對應了32位哈希映射函數,和64位哈希映射函數,後者使用了murmur3 hash生成的全部128位,具備更大的空間,不過原理是相通的,咱們選擇默認的MURMUR128_MITZ_64來分析:數據庫

MURMUR128_MITZ_64() {
    @Override
    public <T> boolean put(
        T object, Funnel<? super T> funnel, int numHashFunctions, BitArray bits) {
      long bitSize = bits.bitSize();
      byte[] bytes = Hashing.murmur3_128().hashObject(object, funnel).getBytesInternal();
      long hash1 = lowerEight(bytes);
      long hash2 = upperEight(bytes);

      boolean bitsChanged = false;
      long combinedHash = hash1;
      for (int i = 0; i < numHashFunctions; i++) {
        // Make the combined hash positive and indexable
        bitsChanged |= bits.set((combinedHash & Long.MAX_VALUE) % bitSize);
        combinedHash += hash2;
      }
      return bitsChanged;
    }

    @Override
    public <T> boolean mightContain(
        T object, Funnel<? super T> funnel, int numHashFunctions, BitArray bits) {
      long bitSize = bits.bitSize();
      byte[] bytes = Hashing.murmur3_128().hashObject(object, funnel).getBytesInternal();
      long hash1 = lowerEight(bytes);
      long hash2 = upperEight(bytes);

      long combinedHash = hash1;
      for (int i = 0; i < numHashFunctions; i++) {
        // Make the combined hash positive and indexable
        if (!bits.get((combinedHash & Long.MAX_VALUE) % bitSize)) {
          return false;
        }
        combinedHash += hash2;
      }
      return true;
    }

抽象來看,put是寫,mightContain是讀,兩個方法的代碼有一點類似,都是先利用murmur3 hash對輸入的funnel計算獲得128位的字節數組,而後高低分別取8個字節(64位)建立2個long型整數hash1,hash2做爲哈希值。循環體內採用了2個函數模擬其餘函數的思想,即上文提到的gi(x) = h1(x) + ih2(x) ,這至關於每次累加hash2,而後經過基於bitSize取模的方式在bit數組中索引。segmentfault

這裏之因此要和Long.MAX_VALUE進行按位與的操做,是由於在除數和被除數符號不一致的狀況下計算所得的結果是有差異的,在程序語言裏,「%」準確來講是取餘運算(C,C++和Java均如此,python是取模),如-5%3=-2,而取模的數學定義是x
mod y=x-y[x/y](向下取整),因此-5 mod 3=
-5-3*(-2)=1,所以當哈希值爲負數的時候,其取餘的結果爲負(bitSize始終爲正數),這樣就不方便在bit數組中取值,所以經過Long.MAX_VALUE(二進制爲0111…1111),直接將開頭的符號位去掉,從而轉變爲正數。固然也能夠取絕對值,在另外一個MURMUR128_MITZ_32的實現中就是這麼作的。

在put方法中,先是將索引位置上的二進制置爲1,而後用bitsChanged記錄插入結果,若是返回true代表沒有重複插入成功,而mightContain方法則是將索引位置上的數值取出,並判斷是否爲0,只要其中出現一個0,那麼當即判斷爲不存在。數組

最後再說一下底層bit數組的實現,主要代碼以下:數據結構

static final class BitArray {
    final long[] data;
    long bitCount;

    BitArray(long bits) {
      this(new long[Ints.checkedCast(LongMath.divide(bits, 64, RoundingMode.CEILING))]);
    }

    // Used by serialization
    BitArray(long[] data) {
      checkArgument(data.length > 0, "data length is zero!");
      this.data = data;
      long bitCount = 0;
      for (long value : data) {
        bitCount += Long.bitCount(value);
      }
      this.bitCount = bitCount;
    }

    /** Returns true if the bit changed value. */
    boolean set(long index) {
      if (!get(index)) {
        data[(int) (index >>> 6)] |= (1L << index);
        bitCount++;
        return true;
      }
      return false;
    }

    boolean get(long index) {
      return (data[(int) (index >>> 6)] & (1L << index)) != 0;
    }

    /** Number of bits */
    long bitSize() {
      return (long) data.length * Long.SIZE;
    }
...
}

以前也提到了Guava沒有使用java.util.BitSet,而是封裝了一個long型的數組,另外還有一個long型整數,用來統計數組中已經佔用(置爲1)的數量,在第一個構造函數中,它把傳入的long型整數按長度64分段(例如129分爲3段),段數做爲數組的長度,你能夠想象成由若干個64位數組拼接成一個超長的數組,它的長度就是64乘以段數,即bitSize,在第二個構造函數中利用Long.bitCount方法來統計對應二進制編碼中的1個數,這個方法在JDK1.5中就有了,其算法設計得很是精妙,有精力的同窗能夠自行研究。

另外兩個重要的方法是set和get,在get方法中,參考put和mightContain方法,傳入的參數index是通過bitSize取模的,所以必定能落在這個超長數組的範圍以內,爲了獲取index對應索引位置上的值,首先將其無符號右移6位,而且強制轉換成int型,這至關於除以64向下取整的操做,也就是換算成段數,獲得該段上的數值以後,又將1左移index位,最後進行按位與的操做,若是結果等於0,那麼返回false,從而在mightContain中判斷爲不存在。在set方法中,首先調用了get方法判斷是否已經存在,若是不存在,則用一樣的邏輯取出data數組中對應索引位置的數值,而後按位或並賦值回去。

到這裏,對Guava中布隆過濾器的實現就基本討論完了,簡單總結一下:

  1. BloomFilter類的做用在於接收輸入,利用公式完成對參數的估算,最後初始化Strategy接口的實例;
  2. BloomFilterStrategies是一個枚舉類,具備兩個實現了Strategy接口的成員,分別爲MURMUR128_MITZ_32和MURMUR128_MITZ_64,另外封裝了long型的數組做爲布隆過濾器底層的bit數組,其中在get和set方法中完成核心的位運算。

3、利用Redis Bitmaps進行重構

經過上面的分析,主要算法和邏輯的部分大致都是同樣的,真正須要重構的部分是底層位數組的實現,在Guava中是封裝了一個long型的數組,而對於redis來講,自己自帶了Bitmaps的「數據結構」(本質上仍是一個字符串),已經提供了位操做的接口,所以重構自己並不複雜,相對比較複雜的是,以前提到的實現自動擴容特性。

這裏實現自動擴容的思想是,在redis中記錄一個自增的遊標cursor,若是當前key對應的Bitmaps已經達到飽和狀態,則cursor自增,同時用其生成一個新的key,並建立規模同等的Bitmaps。而後在get的時候,須要判斷該元素是否存在於任意一個Bitmaps中。因而整個的邏輯就變成,一個元素在每一個Bitmaps中都不存在時,才能插入當前cursor對應key的Bitmaps中。

QQ20171224-003813.png

下面是代碼的實現部分。

首先,爲了簡化redis的操做,定義了2個函數式接口,分別執行單條命令和pipeline,另外還實現了一個簡單的工具類

@FunctionalInterface
public interface JedisExecutor<T> {
    T execute(Jedis jedis);
}

@FunctionalInterface
public interface PipelineExecutor {
    void load(Pipeline pipeline);
}

public class JedisUtils {

    private static final GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig();

    private JedisPool jedisPool;

    public JedisUtils() {
        jedisPool = new JedisPool(poolConfig, "localhost", 6379);
    }

    public <T> T execute(JedisExecutor<T> jedisExecutor) {
        try (Jedis jedis = jedisPool.getResource()) {
            return jedisExecutor.execute(jedis);
        }
    }

    public List<Object> pipeline(List<PipelineExecutor> pipelineExecutors) {
        try (Jedis jedis = jedisPool.getResource()) {
            Pipeline pipeline = jedis.pipelined();
            for (PipelineExecutor executor : pipelineExecutors)
                executor.load(pipeline);
            return pipeline.syncAndReturnAll();
        }
    }
}

其次在Strategy中,對put和mightContain做了一點修改,其中被註釋的部分是Guava中的實現。爲了簡化,這裏咱們只接受String對象。

這裏先把全部的隨機函數對應的索引位置收集到一個數組中,而後交由底層的RedisBitmaps處理get或set,具體過程後面會詳細說明。

bits.ensureCapacityInternal()方法,即表示自動擴容,這個函數名是從ArrayList中搬過來的。

@Override
    public boolean put(String string, int numHashFunctions, RedisBitmaps bits) {
        long bitSize = bits.bitSize();
        byte[] bytes = Hashing.murmur3_128().hashString(string, Charsets.UTF_8).asBytes();
        long hash1 = lowerEight(bytes);
        long hash2 = upperEight(bytes);

        boolean bitsChanged = false;
        long combinedHash = hash1;
//        for (int i = 0; i < numHashFunctions; i++) {
//            bitsChanged |= bits.set((combinedHash & Long.MAX_VALUE) % bitSize);
//            combinedHash += hash2;
//        }
        long[] offsets = new long[numHashFunctions];
        for (int i = 0; i < numHashFunctions; i++) {
            offsets[i] = (combinedHash & Long.MAX_VALUE) % bitSize;
            combinedHash += hash2;
        }
        bitsChanged = bits.set(offsets);
        bits.ensureCapacityInternal();//自動擴容
        return bitsChanged;
    }

    @Override
    public boolean mightContain(String object, int numHashFunctions, RedisBitmaps bits) {
        long bitSize = bits.bitSize();
        byte[] bytes = Hashing.murmur3_128().hashString(object, Charsets.UTF_8).asBytes();
        long hash1 = lowerEight(bytes);
        long hash2 = upperEight(bytes);
        long combinedHash = hash1;
//        for (int i = 0; i < numHashFunctions; i++) {
//            if (!bits.get((combinedHash & Long.MAX_VALUE) % bitSize)) {
//                return false;
//            }
//            combinedHash += hash2;
//        }
//        return true;
        long[] offsets = new long[numHashFunctions];
        for (int i = 0; i < numHashFunctions; i++) {
            offsets[i] = (combinedHash & Long.MAX_VALUE) % bitSize;
            combinedHash += hash2;
        }
        return bits.get(offsets);
    }

最後,也是最重要的RedisBitmaps,其中bitSize用了Guava布隆過濾器中計算Long型數組長度的方法,獲得bitSize以後使用setbit命令初始化一個所有爲0的位數組。get(long offset)和set(long offset),這兩個與Guava布隆過濾器中的邏輯相似,這裏就再也不贅述了,而get(long[] offsets)方法中,全部的offset要與每個cursor對應的Bitmaps進行判斷,若所有命中,那麼這個元素就可能存在於該Bitmaps,反之若不能徹底命中,則表示該元素不存在於任何一個Bitmaps,因此當知足這個條件,在set(long[] offsets)方法中,就能夠插入到當前key的Bitmaps中了。

在ensureCapacityInternal方法,須要擴容的判斷條件是bitCount*2>bitSize,bitCount表示一個Bitmaps中「1」出現的個數,也就是當「1」出現的個數超過總數的一半時,進行擴容操做——首先使用incr命令對cursor自增,而後使用新的key建立一個新的Bitmaps。

RedisBitmapsJava

class RedisBitmaps {

    private static final String BASE_KEY = "bloomfilter";
    private static final String CURSOR = "cursor";

    private JedisUtils jedisUtils;
    private long bitSize;

    RedisBitmaps(long bits) {
        this.jedisUtils = new JedisUtils();
        this.bitSize = LongMath.divide(bits, 64, RoundingMode.CEILING) * Long.SIZE;//位數組的長度,至關於n個long的長度
        if (bitCount() == 0) {
            jedisUtils.execute((jedis -> jedis.setbit(currentKey(), bitSize - 1, false)));
        }
    }

   boolean get(long[] offsets) {
        for (long i = 0; i < cursor() + 1; i++) {
            final long cursor = i;
            //只要有一個cursor對應的bitmap中,offsets所有命中,則表示可能存在
            boolean match = Arrays.stream(offsets).boxed()
                    .map(offset -> jedisUtils.execute(jedis -> jedis.getbit(genkey(cursor), offset)))
                    .allMatch(b -> (Boolean) b);
            if (match)
                return true;
        }
        return false;
    }

    boolean get(final long offset) {
        return jedisUtils.execute(jedis -> jedis.getbit(currentKey(), offset));
    }

    boolean set(long[] offsets) {
        if (cursor() > 0 && get(offsets)) {
            return false;
        }
        boolean bitsChanged = false;
        for (long offset : offsets)
            bitsChanged |= set(offset);
        return bitsChanged;
    }

    boolean set(long offset) {
        if (!get(offset)) {
            jedisUtils.execute(jedis -> jedis.setbit(currentKey(), offset, true));
            return true;
        }
        return false;
    }

    long bitCount() {
        return jedisUtils.execute(jedis -> jedis.bitcount(currentKey()));
    }

    long bitSize() {
        return this.bitSize;
    }

    private String currentKey() {
        return genkey(cursor());
    }

    private String genkey(long cursor) {
        return BASE_KEY + "-" + cursor;
    }

    private Long cursor() {
        String cursor = jedisUtils.execute(jedis -> jedis.get(CURSOR));
        return cursor == null ? 0 : Longs.tryParse(cursor);
    }

    void ensureCapacityInternal() {
        if (bitCount() * 2 > bitSize())
            grow();
    }

    void grow() {
        Long cursor = jedisUtils.execute(jedis -> jedis.incr(CURSOR));
        jedisUtils.execute((jedis -> jedis.setbit(genkey(cursor), bitSize - 1, false)));
    }

    void reset() {
        String[] keys = LongStream.range(0, cursor() + 1).boxed().map(this::genkey).toArray(String[]::new);
        jedisUtils.execute(jedis -> jedis.del(keys));
        jedisUtils.execute(jedis -> jedis.set(CURSOR, "0"));
        jedisUtils.execute(jedis -> jedis.setbit(currentKey(), bitSize - 1, false));
    }

    private PipelineExecutor apply(PipelineExecutor executor) {
        return executor;
    }
}

下面咱們作一個單元測試來驗證其正確性。

若是咱們插入的數量等於原預計總數,RedisBloomFilter擴容了1次,而兩個布隆過濾器的結果一致,都爲false,true,false。

若是插入的數量爲原預計總數的3倍,RedisBloomFilter擴容了3次,而且仍判斷正確,而Guava布隆過濾器則在判斷str3時出現誤判。

public class TestRedisBloomFilter {

    private static final int TOTAL = 10000;
    private static final double FPP = 0.0005;

    @Test
    public void test() {
        RedisBloomFilter redisBloomFilter = RedisBloomFilter.create(TOTAL, FPP);
        redisBloomFilter.resetBitmap();
        BloomFilter<String> bloomFilter = BloomFilter.create(Funnels.stringFunnel(Charsets.UTF_8), TOTAL, FPP);

        IntStream.range(0, /* 3* */TOTAL).boxed()
                .map(i -> Hashing.md5().hashInt(i).toString())
                .collect(toList()).forEach(s -> {
            redisBloomFilter.put(s);
            bloomFilter.put(s);
        });

        String str1 = Hashing.md5().hashInt(99999).toString();
        String str2 = Hashing.md5().hashInt(9999).toString();
        String str3 = "abcdefghijklmnopqrstuvwxyz123456";
        System.out.println(redisBloomFilter.mightContain(str1) + ":" + bloomFilter.mightContain(str1));
        System.out.println(redisBloomFilter.mightContain(str2) + ":" + bloomFilter.mightContain(str2));
        System.out.println(redisBloomFilter.mightContain(str3) + ":" + bloomFilter.mightContain(str3));
    }
}
>>
grow bloomfilter-1
false:false
true:true
false:false
>>
grow bloomfilter-1
grow bloomfilter-2
grow bloomfilter-3
false:false
true:true
false:true

綜上,本文利用了Guava布隆過濾器的思想,並結合Redis中的Bitmaps等特性實現了支持動態擴容的布隆過濾器,它將布隆過濾器底層的位數據裝載到了Redis數據庫中,這樣的好處在於能夠部署在更復雜的多應用或分佈式系統中,還能夠利用Redis完成持久化,定時過時等功能。

4、參考文獻

  1. 吳軍. 數學之美[M]. 人民郵電出版社, 2012.
  2. Kirsch A, Mitzenmacher M. Less hashing, same performance: building a better bloom filter[C]//ESA. 2006, 6: 456-467.
  3. Bloom Filters for the Perplexed, https://sagi.io/2017/07/bloom...
  4. Google Guava, https://github.com/google/guava
相關文章
相關標籤/搜索