大數據下的Distinct Count(二):Bitmap篇

前一篇中介紹了使用API作Distinct Count,可是精確計算的API都較慢,那有沒有能更快的優化解決方案呢?html

1. Bitmap介紹

《編程珠璣》上是這樣介紹bitmap的:java

Bitmap是一個十分有用的數據結構。所謂的Bitmap就是用一個bit位來標記某個元素對應的Value,而Key便是該元素。因爲採用了Bit爲單位來存儲數據,所以在內存佔用方面,能夠大大節省。git

簡而言之——用一個bit(0或1)表示某元素是否出現過,其在bitmap的位置對應於其index。《編程珠璣》給出了一個用bitmap作排序的例子:github

/* Copyright (C) 1999 Lucent Technologies */
/* From 'Programming Pearls' by Jon Bentley */
/* bitsort.c -- bitmap sort from Column 1
* Sort distinct integers in the range [0..N-1]
*/
#include <stdio.h>

#define BITSPERWORD 32
#define SHIFT 5
#define MASK 0x1F
#define N 10000000
int a[1 + N / BITSPERWORD];

void set(int i) { a[i >> SHIFT] |= (1 << (i & MASK)); }

void clr(int i) { a[i >> SHIFT] &= ~(1 << (i & MASK)); }

int test(int i) { return a[i >> SHIFT] & (1 << (i & MASK)); }

int main() {
    int i;
    for (i = 0; i < N; i++)
        clr(i);
    /* Replace above 2 lines with below 3 for word-parallel init
    int top = 1 + N/BITSPERWORD;
    for (i = 0; i < top; i++)
    a[i] = 0;
    */
    while (scanf("%d", &i) != EOF)
        set(i);
    for (i = 0; i < N; i++)
        if (test(i))
            printf("%d\n", i);
    return 0;
}

上面代碼中,用int的數組存儲bitmap,對於每個待排序的int數,其對應的index爲其int值。編程

2. Distinct Count優化

index生成

爲了使用bitmap作Distinct Count,首先需獲得每一個用戶(uid)對應(在bitmap中)的index。有兩種辦法能夠獲得從1開始編號index表(與uid一一對應):數組

  • hash,可是要找到無碰撞且hash值均勻分佈[1, +∞)區間的hash函數是很是困難的;
  • 維護一張uid與index之間的映射表,並增量更新

比較兩種方法,第二種方法更爲簡單可行。數據結構

UV計算

在index生成完成後,RDD[(uid, V)]RDD[(uid, index)]join獲得index化的RDD。bitmap的開源實現有EWAH,採用RLE(Run Length Encoding)壓縮,很好地解決了存儲空間的浪費。Distinct Count計算轉變成了求bitmap中1的個數:函數

// distinct count for rdd(not pair) and the rdd must be sorted in each partition
def distinctCount(rdd: RDD[Int]): Int = {
    val bitmap = rdd.aggregate[EWAHCompressedBitmap](new EWAHCompressedBitmap())(
      (u: EWAHCompressedBitmap, v: Int) => {
        u.set(v)
        u
      },
      (u1: EWAHCompressedBitmap, u2: EWAHCompressedBitmap) => u1.or(u2)
    )
    bitmap.cardinality()
}

// the tuple_2 is the index
def groupCount[K: ClassTag](rdd: RDD[(K, Int)]): RDD[(K, Int)] = {
    val grouped: RDD[(K, EWAHCompressedBitmap)] = rdd.combineByKey[EWAHCompressedBitmap](
      (v: Int) => EWAHCompressedBitmap.bitmapOf(v),
      (c: EWAHCompressedBitmap, v: Int) => {
        c.set(v)
        c
      },
      (c1: EWAHCompressedBitmap, c2: EWAHCompressedBitmap) => c1.or(c2))
    grouped.map(t => (t._1, t._2.cardinality()))
}

可是,在上述計算中,因爲EWAHCompressedBitmap的set方法要求int值是升序的,也就是說RDD的每個partition的index應是升序排列:優化

// sort pair RDD by value
def sortPairRDD[K](rdd: RDD[(K, Int)]): RDD[(K, Int)] = {
    rdd.mapPartitions(iter => {
      iter.toArray.sortWith((x, y) => x._2.compare(y._2) < 0).iterator
    })
}

爲了不排序,能夠爲每個uid生成一個bitmap,而後在Distinct Count時將bitmap進行or運算亦可:ui

rdd.reduceByKey(_ or _)
    .mapValues(_._2.cardinality())

3. 參考資料

[1] 周海鵬, Bitmap的祕密.

相關文章
相關標籤/搜索