在前一篇中介紹了使用API作Distinct Count,可是精確計算的API都較慢,那有沒有能更快的優化解決方案呢?html
《編程珠璣》上是這樣介紹bitmap的:java
Bitmap是一個十分有用的數據結構。所謂的Bitmap就是用一個bit位來標記某個元素對應的Value,而Key便是該元素。因爲採用了Bit爲單位來存儲數據,所以在內存佔用方面,能夠大大節省。git
簡而言之——用一個bit(0或1)表示某元素是否出現過,其在bitmap的位置對應於其index。《編程珠璣》給出了一個用bitmap作排序的例子:github
/* Copyright (C) 1999 Lucent Technologies */ /* From 'Programming Pearls' by Jon Bentley */ /* bitsort.c -- bitmap sort from Column 1 * Sort distinct integers in the range [0..N-1] */ #include <stdio.h> #define BITSPERWORD 32 #define SHIFT 5 #define MASK 0x1F #define N 10000000 int a[1 + N / BITSPERWORD]; void set(int i) { a[i >> SHIFT] |= (1 << (i & MASK)); } void clr(int i) { a[i >> SHIFT] &= ~(1 << (i & MASK)); } int test(int i) { return a[i >> SHIFT] & (1 << (i & MASK)); } int main() { int i; for (i = 0; i < N; i++) clr(i); /* Replace above 2 lines with below 3 for word-parallel init int top = 1 + N/BITSPERWORD; for (i = 0; i < top; i++) a[i] = 0; */ while (scanf("%d", &i) != EOF) set(i); for (i = 0; i < N; i++) if (test(i)) printf("%d\n", i); return 0; }
上面代碼中,用int的數組存儲bitmap,對於每個待排序的int數,其對應的index爲其int值。編程
爲了使用bitmap作Distinct Count,首先需獲得每一個用戶(uid)對應(在bitmap中)的index。有兩種辦法能夠獲得從1開始編號index表(與uid一一對應):數組
比較兩種方法,第二種方法更爲簡單可行。數據結構
在index生成完成後,RDD[(uid, V)]
與RDD[(uid, index)]
join獲得index化的RDD。bitmap的開源實現有EWAH,採用RLE(Run Length Encoding)壓縮,很好地解決了存儲空間的浪費。Distinct Count計算轉變成了求bitmap中1的個數:函數
// distinct count for rdd(not pair) and the rdd must be sorted in each partition def distinctCount(rdd: RDD[Int]): Int = { val bitmap = rdd.aggregate[EWAHCompressedBitmap](new EWAHCompressedBitmap())( (u: EWAHCompressedBitmap, v: Int) => { u.set(v) u }, (u1: EWAHCompressedBitmap, u2: EWAHCompressedBitmap) => u1.or(u2) ) bitmap.cardinality() } // the tuple_2 is the index def groupCount[K: ClassTag](rdd: RDD[(K, Int)]): RDD[(K, Int)] = { val grouped: RDD[(K, EWAHCompressedBitmap)] = rdd.combineByKey[EWAHCompressedBitmap]( (v: Int) => EWAHCompressedBitmap.bitmapOf(v), (c: EWAHCompressedBitmap, v: Int) => { c.set(v) c }, (c1: EWAHCompressedBitmap, c2: EWAHCompressedBitmap) => c1.or(c2)) grouped.map(t => (t._1, t._2.cardinality())) }
可是,在上述計算中,因爲EWAHCompressedBitmap的set方法要求int值是升序的,也就是說RDD的每個partition的index應是升序排列:優化
// sort pair RDD by value def sortPairRDD[K](rdd: RDD[(K, Int)]): RDD[(K, Int)] = { rdd.mapPartitions(iter => { iter.toArray.sortWith((x, y) => x._2.compare(y._2) < 0).iterator }) }
爲了不排序,能夠爲每個uid生成一個bitmap,而後在Distinct Count時將bitmap進行or運算亦可:ui
rdd.reduceByKey(_ or _) .mapValues(_._2.cardinality())
[1] 周海鵬, Bitmap的祕密.