redis我的源碼分析1----hyperloglog(golang實現)

1 基數統計

HLL算法用來進行基數統計。golang

什麼是基數統計:例如給你一個數組[1,2,2,3,3,5,5] ,這個數組的基數是4(一共有4個不重複的元素)。 好了如今知道什麼是基數統計了。
對於這個問題,最容易想到的辦法固然是使用bitmap來實現,每一個bit位表示一個數字是否出現過,好比要表示上面這串數字使用下面的bitmap來表示:redis

011101算法

優勢:相對省空間,且合併操做簡單,好比上面的應用場景1, 若是想統計某2天有多少個ip地址訪問,只須要把兩天的bitmap結構拿出來作「或」操做便可。
缺點: 空間複雜度仍是太大,1個byte只有8個bit,也就是1個byte只能惟一表示8個IP地址(8個不一樣的客戶)那麼:
1k才能表示 1024 * 8 = 8192
1M才能表示 1024 * 1024 * 8 = 8388608 (約800多萬)
若是商品連接不少,還須要統計天天的數據等等,每一個商品天天的連接須要1M以上的內存,太大,內存扛不住。數組

相反: 使用HLL ,對於精度要求不是特別高的時候,只須要12k的內存,很神奇!!!緩存

2 HLL算法的原理

舉一個咱們最熟悉的拋硬幣例子,出現正反面的機率都是1/2,一直拋硬幣直到出現正面,記錄下投擲次數k,將這種拋硬幣屢次直到出現正面的過程記爲一次伯努利過程,對於n次伯努利過程,咱們會獲得n個出現正面的投擲次數值 $ k_1, k_2 ... k_n $, 其中這裏的最大值是k_max
看到這裏可能有點懵逼,咱們把問題縷一縷,聯繫咱們的問題,在作基數統計的時候,實際上是這麼個問題:bash

如今的目標是來了一組投擲次數的數據($ k_1, k_2 ... k_n $),想要預測出,一共作了多少次伯努利過程(假設爲n),才能獲得這樣的數據的啊?app

根據一頓數學推導---直接給結論: 用 $2^{k_ max}$ 來做爲n的估計值。ide

image

例如:作了一組伯努利過程,發現連續出現5次反面後,出現了1次正面,請問出現這種狀況,大概須要作多少次伯努利過程呢? 答案是: $ 2^6 = 64 $ 次實驗。函數

這個其實就是咱們進行HLL基數統計的基礎。oop

回到基數統計的問題,咱們須要統計一組數據中不重複元素的個數,集合中每一個元素的通過hash函數後能夠表示成0和1構成的二進制數串,一個二進制串能夠類比爲一次拋硬幣實驗,1是拋到正面,0是反面。二進制串中從低位開始第一個1出現的位置能夠理解爲拋硬幣試驗中第一次出現正面的拋擲次數kk,那麼基於上面的結論,咱們能夠經過屢次拋硬幣實驗的最大拋到正面的次數來預估總共進行了多少次實驗,一樣能夠能夠經過第一個1出現位置的最大值$ k_{max}k ​max ​​ 來預估總共有多少個不一樣的數字(總體基數)。

因此HLL的基本思想是利用集合中數字的比特串第一個1出現位置的最大值來預估總體基數,可是這種預估方法存在較大偏差,爲了改善偏差狀況,HLL中引入分桶平均的概念。

例如 一樣舉拋硬幣的例子,若是隻有一組拋硬幣實驗,運氣較好,第一次實驗過程就拋了10次才第一次拋到正面,顯然根據公式推導獲得的實驗次數的估計偏差較大;若是100個組同時進行拋硬幣實驗,同時運氣這麼好的機率就很低了,每組分別進行屢次拋硬幣實驗,並上報各自實驗過程當中拋到正面的拋擲次數的最大值,就能根據100組的平均值預估總體的實驗次數了。

redis裏面就是使用了分桶的原理,具體的實現原理以下: 首先來了一個redis object(字符串), 通過hash後,生成了一個8字節的hash值。

graph LR
A[redis object]-->|hash function| B(64 bit)
複製代碼

而後將 64個bit位的前14位做爲桶的下標,這樣桶大小就是$ 2^{14} = 16348 $
後面50個bit位,至關因而隨機的那個伯努利過程,咱們找到1第一次出現的位置count,若是當前count比桶裏面的oldcount大, 則更新oldcount=count。

1.2 基數統計的應用場景

    1. 統計一個網站一天有多少個ip地址訪問;
    1. 統計某個商品連接天天被多少個不一樣客戶訪問;
  • ...

還有不少應用,大體就是統計類的需求其實都很明確, 不須要很準確的值,只須要一個相似的估計值便可,同時不用set來存儲,由於set其實很消耗內存,但願這個統計的結構越節約內存越好。 其實均可以用這HLL算法,節約內存。

1.4 HyperLogLog的算法流程

1.4.1 建立一個HLL對象

HLL的頭結構體定義:

struct hllhdr {
    char magic[4];      /* "HYLL" 魔數,前面4個字節表示這是一個hll對象*/
    uint8_t encoding;   /* 存儲方式,後面會講到,分爲HLL_DENSE or HLL_SPARSE兩種存儲方式 */
    uint8_t notused[3]; /*保留字段,由於redis是天然字節對齊的,因此空着也是空着,不如定義一下 Reserved for future use, must be zero. */
    uint8_t card[8];    /*緩存的當前hll對象的基數值 Cached cardinality, little endian. */
    uint8_t registers[]; /* Data bytes. 對於dense存儲方式,這裏就是一個12k的連續數組,對於sparse存儲方式,這裏長度是不定的,後面會講到*/
};


複製代碼

建立一個hll對象:

/* Create an HLL object. We always create the HLL using sparse encoding.
 * This will be upgraded to the dense representation as needed.
 這裏英文註釋其實已經寫的很清楚了,默認hll對象使用sparse的編碼方式,這樣比較節約內存,可是sparse方式存儲其實比較難以理解,代碼實現也比較複雜,可是對於理解來講,其實就是對於裏面hll桶的存儲方式的不一樣,HLL算法自己邏輯上沒有區別
 */
robj *createHLLObject(void) {
    robj *o;
    struct hllhdr *hdr;
    sds s;
    uint8_t *p;
    int sparselen = HLL_HDR_SIZE +
                    (((HLL_REGISTERS+(HLL_SPARSE_XZERO_MAX_LEN-1)) /
                     HLL_SPARSE_XZERO_MAX_LEN)*2);  
    //頭長度+(16384 + (16384-1) / 16384 * 2),也就是2個字節,默認由於基數統計裏面全部的桶都是0,用spase方式存儲,只須要2個字節
    int aux;

    /* Populate the sparse representation with as many XZERO opcodes as
     * needed to represent all the registers. */
    aux = HLL_REGISTERS;
    s = sdsnewlen(NULL,sparselen);
    p = (uint8_t*)s + HLL_HDR_SIZE;
    while(aux) {
        int xzero = HLL_SPARSE_XZERO_MAX_LEN;
        if (xzero > aux) xzero = aux;
        HLL_SPARSE_XZERO_SET(p,xzero);
        p += 2;
        aux -= xzero;
    }
    serverAssert((p-(uint8_t*)s) == sparselen);

    /* Create the actual object. */
    o = createObject(OBJ_STRING,s);
    hdr = o->ptr;
    memcpy(hdr->magic,"HYLL",4);
    hdr->encoding = HLL_SPARSE;
    return o;
}



複製代碼

3 pfadd 流程

image

3.1 使用dense方式存儲

來一個byte流,傳入 是一個void * 指針和一個長度len, 經過MurmurHash64A 函數 計算一個64位的hash值。64位的前14位(這個值是能夠修改的)做爲index,後面做爲50位做爲bit流。 2 ^ 14 == 16384 也就是一共有16384個桶。每一個桶使用6個bit存儲。

後面的50位bit流,以下樣子:
00001000....11000 其中第一次出現1的位置咱們記爲count, 因此count最大值是50, 用6個bit位就夠表示了。 2 ^ 6 = 64

故一個HLL對象實際用來存儲的空間是16384(個桶) * ( 每一個桶6個bit) / 8 = 12288 byte。 也就是使用了約12k的內存。這個其實redis比較牛逼的地方,其實用一個字節來存的話,其實也就是16k的內存,可是爲了能省4k的內存,搞出一堆。這個只是dense方式存儲,相對是浪費空間的,下面講的sparse方式存儲更加節約空間。

計算出index(桶的下標), count(後面50個bit中第一次出現1的位置)後,下一步就是更新桶的操做。 根據index找到桶,而後看當前的count 是否大於oldcount,大於則更新下oldcount = count。此時爲了性能考慮,是不會去統計當前的基數的,而是將HLL的頭裏面的一個標誌位置爲1,表示下次進行pfcount操做的時候,當前的緩存值已經失效了,須要從新統計緩存值。在後面pfcount流程的時候,發現這個標記爲失效,就會去從新統計新的基數,放入基數緩存。

/* Call hllDenseAdd() or hllSparseAdd() according to the HLL encoding. */
int hllAdd(robj *o, unsigned char *ele, size_t elesize) {
    struct hllhdr *hdr = o->ptr;
    switch(hdr->encoding) {
    case HLL_DENSE: return hllDenseAdd(hdr->registers,ele,elesize);
    case HLL_SPARSE: return hllSparseAdd(o,ele,elesize);//sparse
    default: return -1; /* Invalid representation. */
    }
}



/* "Add" the element in the dense hyperloglog data structure.
 * Actually nothing is added, but the max 0 pattern counter of the subset
 * the element belongs to is incremented if needed.
 *
 * This is just a wrapper to hllDenseSet(), performing the hashing of the
 * element in order to retrieve the index and zero-run count. */
int hllDenseAdd(uint8_t *registers, unsigned char *ele, size_t elesize) {
    long index;
    uint8_t count = hllPatLen(ele,elesize,&index);//index就是桶的下標, count則是後面50個bit位中1第一次出現的位置
    /* Update the register if this element produced a longer run of zeroes. */
    return hllDenseSet(registers,index,count);
}


/* ================== Dense representation implementation  ================== */

/* Low level function to set the dense HLL register at 'index' to the
 * specified value if the current value is smaller than 'count'.
 *
 * 'registers' is expected to have room for HLL_REGISTERS plus an
 * additional byte on the right. This requirement is met by sds strings
 * automatically since they are implicitly null terminated.
 *
 * The function always succeed, however if as a result of the operation
 * the approximated cardinality changed, 1 is returned. Otherwise 0
 * is returned. */
int hllDenseSet(uint8_t *registers, long index, uint8_t count) {
    uint8_t oldcount;
    //找到對應的index獲取其中的值
    HLL_DENSE_GET_REGISTER(oldcount,registers,index);
    if (count > oldcount) { //若是新的值比老的大,就更新來的
        HLL_DENSE_SET_REGISTER(registers,index,count);
        return 1;
    } else {
        return 0;
    }
}



/* Given a string element to add to the HyperLogLog, returns the length
 * of the pattern 000..1 of the element hash. As a side effect 'regp' is
 * set to the register index this element hashes to. */
int hllPatLen(unsigned char *ele, size_t elesize, long *regp) {
    uint64_t hash, bit, index;
    int count;

    /* Count the number of zeroes starting from bit HLL_REGISTERS
     * (that is a power of two corresponding to the first bit we don't use * as index). The max run can be 64-P+1 = Q+1 bits. * * Note that the final "1" ending the sequence of zeroes must be * included in the count, so if we find "001" the count is 3, and * the smallest count possible is no zeroes at all, just a 1 bit * at the first position, that is a count of 1. * * This may sound like inefficient, but actually in the average case * there are high probabilities to find a 1 after a few iterations. */ hash = MurmurHash64A(ele,elesize,0xadc83b19ULL); index = hash & HLL_P_MASK; /* Register index. */ hash >>= HLL_P; /* Remove bits used to address the register. */ hash |= ((uint64_t)1<<HLL_Q); /* Make sure the loop terminates and count will be <= Q+1. */ bit = 1; count = 1; /* Initialized to 1 since we count the "00000...1" pattern. */ while((hash & bit) == 0) { count++; bit <<= 1; } *regp = (int) index; serverLog(LL_NOTICE,"pf hash idx=%d, count=%d", index, count); return count; } 複製代碼

3.2 pfcount流程

統計基數流程,就若是cache標誌位是有效的,直接返回緩存值,不然從新計算HLL的全部16384個桶,而後進行統計修正,具體的修正的原理,涉及不少的數學知識和論文,這裏就不說起了。

/* Return the approximated cardinality of the set based on the harmonic
 * mean of the registers values. 'hdr' points to the start of the SDS
 * representing the String object holding the HLL representation.
 *
 * If the sparse representation of the HLL object is not valid, the integer
 * pointed by 'invalid' is set to non-zero, otherwise it is left untouched.
 *
 * hllCount() supports a special internal-only encoding of HLL_RAW, that
 * is, hdr->registers will point to an uint8_t array of HLL_REGISTERS element.
 * This is useful in order to speedup PFCOUNT when called against multiple
 * keys (no need to work with 6-bit integers encoding). */
uint64_t hllCount(struct hllhdr *hdr, int *invalid) {
    double m = HLL_REGISTERS;
    double E;
    int j;
    int reghisto[HLL_Q+2] = {0};

    /* Compute register histogram */
    if (hdr->encoding == HLL_DENSE) {
        hllDenseRegHisto(hdr->registers,reghisto);
    } else if (hdr->encoding == HLL_SPARSE) {
        hllSparseRegHisto(hdr->registers,
                         sdslen((sds)hdr)-HLL_HDR_SIZE,invalid,reghisto);
    } else if (hdr->encoding == HLL_RAW) {
        hllRawRegHisto(hdr->registers,reghisto);
    } else {
        serverPanic("Unknown HyperLogLog encoding in hllCount()");
    }

    /* Estimate cardinality form register histogram. See:
     * "New cardinality estimation algorithms for HyperLogLog sketches"
     * Otmar Ertl, arXiv:1702.01284 */
    //這裏具體的修正流程,要去看論文,就照着抄過來實現就能夠了。 
    double z = m * hllTau((m-reghisto[HLL_Q+1])/(double)m);
    for (j = HLL_Q; j >= 1; --j) {
        z += reghisto[j];
        z *= 0.5;
    }
    z += m * hllSigma(reghisto[0]/(double)m);
    E = llroundl(HLL_ALPHA_INF*m*m/z);

    return (uint64_t) E;
}

複製代碼

4. 後記

其實原理是很簡單的,並且裏面涉及到不少的數學知識,也是不能所有看懂,不得不感慨,redis對內存的節約是真的很變態的。對於sparse模式,節約的內存更加恐怖,由於這個其實對於hll算法的原理理解其實影響不大,本文就不作詳細介紹了。

最後貼上我用golang模仿寫的一個hyperloglog代碼:

package goRedis

import (
	"bytes"
	"encoding/binary"
	"fmt"
	"math"
)

const seed = 0xadc83b19

const hll_dense = 1
const hll_sparse = 2

const hll_p = 14
const hll_q = 64 - hll_p
const hll_registers = 1 << hll_p
const hll_p_mask = hll_registers - 1
const hll_bits = 6
const hll_sparse_val_max_value = 32
const hll_alpha_inf = 0.721347520444481703680

type hllhdr struct {
	magic      string
	encoding   uint8
	notused    [3]uint8
	card       [8]uint64
	registers  []byte //實際存儲的,由於後面若是encoding方式採用sparse的話,長度會變化,因此使用slice比較好
	vaildCache bool
}

func initHLL(encoding uint8) *hllhdr {
	hdr := new(hllhdr)
	hdr.magic = "HYLL"
	hdr.encoding = encoding

	if encoding == hll_dense {
		hdr.registers = make([]byte, hll_registers*1) // 先簡單實現下 用一個字節存6個bit
	} else {
		panic("HLL SPARSE encoding format doesn't support.")
	}
	return hdr
}

func hllDenseSet(hllObj *hllhdr, index uint64, count int) bool {
	if count > int(hllObj.registers[index]) {
		hllObj.registers[index] = byte(count)
		return true
	}
	return false
}

func PfAddCommand(hllObj *hllhdr, val []byte) {
	index, count := hllPartLen(val)
	if hllObj.encoding == hll_dense {
		hllDenseSet(hllObj, index, count)
		hllObj.vaildCache = false
	} else {
		panic("HLL SPARSE encoding format doesn't support.")
	}
}

func hllTau(x float64) float64 {
	if x == 0. || x == 1. {
		return 0.
	}
	var zPrime float64
	y := 1.0
	z := 1 - x
	for {
		x = math.Sqrt(x)
		zPrime = z
		y *= 0.5
		z -= math.Pow(1-x, 2) * y
		if zPrime == z {
			break
		}
	}
	return z / 3

}

func hllDenseRegHisto(hllObj *hllhdr, reghisto *[hll_q + 2]int) {
	for i := 0; i < hll_registers; i++ {
		reg := hllObj.registers[i]
		reghisto[reg]++
	}
}

func hllSigma(x float64) float64 {
	if x == 1. {
		return math.MaxInt64
	}
	var zPrime float64
	y := float64(1)
	z := x
	for {
		x *= x
		zPrime = z
		z += x * y
		y += y
		if zPrime == z {
			break
		}
	}
	return z
}

func hllCount(hllObj *hllhdr) int {
	m := float64(hll_registers)
	var reghisto [hll_q + 2]int
	if hllObj.encoding == hll_dense {
		hllDenseRegHisto(hllObj, &reghisto)
	} else {
		panic("impliment me..")
	}

	z := m * hllTau((m - (float64(reghisto[hll_q+1]))/m))
	for j := hll_q; j >= 1; j-- {
		z += float64(reghisto[j])
		z *= 0.5
	}
	z += m * hllSigma(float64(reghisto[0])/m)
	E := math.Round(hll_alpha_inf * m * m / z)

	return int(E)

}

func PfCountCommand(hllObj *hllhdr) int {
	var ret int
	if hllObj.vaildCache {
		return 0
	} else {
		ret = hllCount(hllObj)
	}

	return ret
}

func CreateHLLObject() *hllhdr {
	hdr := initHLL(hll_dense)
	return hdr
}

func Murmurhash(buff []byte, seed uint32) uint64 {
	buffLen := uint64(len(buff))
	m := uint64(0xc6a4a7935bd1e995)
	r := uint32(47)
	h := uint64(seed) ^ (buffLen * m)

	for i := uint64(0); i < buffLen-(buffLen&7); {
		var k uint64
		bBuffer := bytes.NewBuffer(buff[i : i+8])
		binary.Read(bBuffer, binary.LittleEndian, &k)

		k *= m
		k ^= k >> r
		k *= m
		h ^= k
		h *= m

		binary.Write(bBuffer, binary.LittleEndian, &k)
		i += 8
	}
	switch buffLen & 7 {
	case 7:
		h ^= uint64(buff[6]) << 48
		fallthrough
	case 6:
		h ^= uint64(buff[5]) << 40
		fallthrough
	case 5:
		h ^= uint64(buff[4]) << 32
		fallthrough
	case 4:
		h ^= uint64(buff[3]) << 24
		fallthrough
	case 3:
		h ^= uint64(buff[2]) << 16
		fallthrough
	case 2:
		h ^= uint64(buff[1]) << 8
		fallthrough
	case 1:
		h ^= uint64(buff[0])
		fallthrough
	default:
		h *= m
	}

	h ^= h >> r
	h *= m
	h ^= h >> r
	return h
}

func hllPartLen(buff []byte) (index uint64, count int) {
	hash := Murmurhash(buff, seed)
	index = hash & uint64(hll_p_mask) //這裏就是取出後14個bit,做爲index
	hash >>= hll_p                    //右移把後面14個bit清理掉,注意這裏的bit流實際上是倒序的
	hash |= uint64(1) << hll_q        //當前的最高位設置1,實際上是一個哨兵,避免count爲0
	bit := uint64(1)
	count = 1
	for (hash & bit) == 0 {
		count++
		bit <<= 1
	}
	fmt.Printf("pf hash idx=%d, count=%d\n", index, count)

	return index, count
}

//func hllSparseSet(o, index int64, count int64) {
//	if count > hll_sparse_val_max_value {
//		goto promote
//	}
//
//promote:
//}

複製代碼

測試代碼:

func TestAll(t *testing.T) {
	hllObj := CreateHLLObject()
	test1 := []string{"apple", "apple", "orange", "ttt", "aaa"}

	for _, str := range test1 {
		PfAddCommand(hllObj, []byte(str))
	}

	println(PfCountCommand(hllObj))
}
複製代碼
相關文章
相關標籤/搜索