【Redis學習筆記】bitcount分析

順風車運營研發團隊 熊浩含
1、命令簡介
BITCOUNT key [start] [end]html

redis計算給定字符串中,被設置爲 1 的比特位的數量。redis

redis> BITCOUNT bits
(integer) 0
redis> SETBIT bits 0 1          # 0001
(integer) 0
redis> BITCOUNT bits
(integer) 1
redis> SETBIT bits 3 1          # 1001
(integer) 0
redis> BITCOUNT bits
(integer) 2

2、算法思路
redis執行這一命令的過程,核心是求二進制數中「1」的個數。但不一樣於處理通常數據,redis中支持計算最多512M數據中被設置爲 1 的比特位的數。因此問題不妨轉化爲:算法

如何計算0.5個G數據中,被設置爲 1 的比特位的數量?數組

相關的算法有不少,redis在處理過程當中,綜合了二種不一樣的方法,先單獨介紹:函數

查表法
此處入參的大小是4字節(unsigned int)ui

int BitCount(unsigned int n)
{
    unsigned int table[256] =
    {
        0, 1, 1, 2, 1, 2, 2, 3, 1, 2, 2, 3, 2, 3, 3, 4,
        1, 2, 2, 3, 2, 3, 3, 4, 2, 3, 3, 4, 3, 4, 4, 5,
        1, 2, 2, 3, 2, 3, 3, 4, 2, 3, 3, 4, 3, 4, 4, 5,
        2, 3, 3, 4, 3, 4, 4, 5, 3, 4, 4, 5, 4, 5, 5, 6,
        1, 2, 2, 3, 2, 3, 3, 4, 2, 3, 3, 4, 3, 4, 4, 5,
        2, 3, 3, 4, 3, 4, 4, 5, 3, 4, 4, 5, 4, 5, 5, 6,
        2, 3, 3, 4, 3, 4, 4, 5, 3, 4, 4, 5, 4, 5, 5, 6,
        3, 4, 4, 5, 4, 5, 5, 6, 4, 5, 5, 6, 5, 6, 6, 7,
        1, 2, 2, 3, 2, 3, 3, 4, 2, 3, 3, 4, 3, 4, 4, 5,
        2, 3, 3, 4, 3, 4, 4, 5, 3, 4, 4, 5, 4, 5, 5, 6,
        2, 3, 3, 4, 3, 4, 4, 5, 3, 4, 4, 5, 4, 5, 5, 6,
        3, 4, 4, 5, 4, 5, 5, 6, 4, 5, 5, 6, 5, 6, 6, 7,
        2, 3, 3, 4, 3, 4, 4, 5, 3, 4, 4, 5, 4, 5, 5, 6,
        3, 4, 4, 5, 4, 5, 5, 6, 4, 5, 5, 6, 5, 6, 6, 7,
        3, 4, 4, 5, 4, 5, 5, 6, 4, 5, 5, 6, 5, 6, 6, 7,
        4, 5, 5, 6, 5, 6, 6, 7, 5, 6, 6, 7, 6, 7, 7, 8,
    };
 
    return table[n &0xff] +
        table[(n >>8) &0xff] +
        table[(n >>16) &0xff] +
        table[(n >>24) &0xff] ;
}

思路:this

一、建立一大小爲256的數組,相應位置上存放對應2進制數的「1」的個數;spa

二、將入參按8bit分開,查4次表,並將4次結果結果相加。.net

以2882400018(二進制:10101011110011011110111100010010)爲例,四次查表過程以下:紅色表示當前8bit,綠色表示右移後高位補零。code

clipboard.png

相加可得2+7+5+5=19。

clipboard.png

variable-precision SWAR算法
統計一個位數組中非0位的數量,數學上稱做:」Hanmming Weight「(漢明重量)。目前效率最高的是variable-precision SWAR算法,能夠在常數時間內計算出多個字節的非0數目。

先觀察如下幾個數,以後這幾個數將做爲掩碼參與計算。

clipboard.png

int swar(uint32_t i)
{
    //計算每兩位二進制數中1的個數
    i = ( i & 0x55555555) + ((i >> 1) & 0x55555555);
    //計算每四位二進制數中1的個數
    i = (i & 0x33333333) + ((i >> 2) & 0x33333333);
    //計算每八位二進制數中1的個數
    i = (i & 0x0F0F0F0F) + ((i >> 4) & 0x0F0F0F0F);
    //將每八位二進制數中1的個數和相加,並移至最低位八位
    i = (i * 0x01010101) >> 24);
    return i;
}

下面以(0010 1011 0100 1010 0001 1111 1000 0111)爲例逐步說明:

1)首先計算每兩位二進制數中1的個數,( i & 0x55555555)篩出了每兩位二進制數中奇數位的「1」,並把「1」置於低位;((i >> 1) & 0x55555555)篩出了每兩位二進制數中偶數位,一樣把「1」置於低位;相加後的值,只多是0,1,2,表明了這兩位上「1」的個數;

clipboard.png

2)對上一步的結果做「歸併」處理,計算每四位上「1」的個數,此時i的一個4bit,存放着兩個2bit的「1」的個數和。(i & 0x33333333)篩出了奇數序列上的4bit,((i >> 2) & 0x33333333)篩出了偶數序列上的2bit;相加後的值,表明了這4bit上「1」的個數;

clipboard.png

3)繼續對上一步結果做「歸併處理」,計算每八位上「1」的個數,此時i的一個8bit,存放着兩個4bit的「1」的個數和。(i &0x0F0F0F0F)篩出了奇數序列上的4bit,((i >> 2) & 0x0F0F0F0F)篩出了偶數序列上的4bit;相加後的值,表明了這8bit上「1」的個數;

clipboard.png

4)此時對於32bit的二進制數據,咱們已經按8bit*4分好了組,每8bit存放着的是該組「1」的個數,如今把這四組數加起來便可,即實現

00000100+00000101+00000011+00000100。

體如今乘法上,便是(i * 0x01010101)>>24,等於0000....000000010000=16。

3、redis實現

void bitcountCommand(client *c) {
    robj *o;
    long start, end, strlen;
    unsigned char *p;
    char llbuf[LONG_STR_SIZE];
 
    /* Lookup, check for type, and return 0 for non existing keys. */
    /*檢查key是否存在,若是不存在,則返回0*/
    if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.czero)) == NULL ||
        checkType(c,o,OBJ_STRING)) return;
    p = getObjectReadOnlyString(o,&strlen,llbuf);
 
    /* 檢查參數是否有誤 */
    if (c->argc == 4) {
        if (getLongFromObjectOrReply(c,c->argv[2],&start,NULL) != C_OK)
            return;
        if (getLongFromObjectOrReply(c,c->argv[3],&end,NULL) != C_OK)
            return;
        /* Convert negative indexes */
        if (start < 0 && end < 0 && start > end) {
            addReply(c,shared.czero);
            return;
        }
        if (start < 0) start = strlen+start;
        if (end < 0) end = strlen+end;
        if (start < 0) start = 0;
        if (end < 0) end = 0;
        if (end >= strlen) end = strlen-1;
    } else if (c->argc == 2) {
        /* The whole string. */
        start = 0;
        end = strlen-1;
    } else {
        /* Syntax error. */
        addReply(c,shared.syntaxerr);
        return;
    }
 
    /* Precondition: end >= 0 && end < strlen, so the only condition where
     * zero can be returned is: start > end. */
    if (start > end) {
        addReply(c,shared.czero);
    } else {
        long bytes = end-start+1;
 
        addReplyLongLong(c,redisPopcount(p+start,bytes));
    }
}
* Count number of bits set in the binary array pointed by 's' and long
 * 'count' bytes. The implementation of this function is required to
 * work with a input string length up to 512 MB. */
size_t redisPopcount(void *s, long count) {
    size_t bits = 0;
    unsigned char *p = s;
    uint32_t *p4;
    /*爲查表法預先準備好的表*/
    static const unsigned char bitsinbyte[256] = {0,1,1,2,1,2,2,3,1,2,2,3,2,3,3,4,1,2,2,3,2,3,3,4,2,3,3,4,3,4,4,5,1,2,2,3,2,3,3,4,2,3,3,4,3,4,4,5,2,3,3,4,3,4,4,5,3,4,4,5,4,5,5,6,1,2,2,3,2,3,3,4,2,3,3,4,3,4,4,5,2,3,3,4,3,4,4,5,3,4,4,5,4,5,5,6,2,3,3,4,3,4,4,5,3,4,4,5,4,5,5,6,3,4,4,5,4,5,5,6,4,5,5,6,5,6,6,7,1,2,2,3,2,3,3,4,2,3,3,4,3,4,4,5,2,3,3,4,3,4,4,5,3,4,4,5,4,5,5,6,2,3,3,4,3,4,4,5,3,4,4,5,4,5,5,6,3,4,4,5,4,5,5,6,4,5,5,6,5,6,6,7,2,3,3,4,3,4,4,5,3,4,4,5,4,5,5,6,3,4,4,5,4,5,5,6,4,5,5,6,5,6,6,7,3,4,4,5,4,5,5,6,4,5,5,6,5,6,6,7,4,5,5,6,5,6,6,7,5,6,6,7,6,7,7,8};
 
    /* Count initial bytes not aligned to 32 bit. */
    /*四字節對齊,不是32整數倍的用查表處理。方便接下來按每次28字節處理*/
    while((unsigned long)p & 3 && count) {
        bits += bitsinbyte[*p++];//一次仍是處理一字節
        count--;
    }
 
    /* Count bits 28 bytes at a time */
    p4 = (uint32_t*)p;//32bit 4字節
    /*開始用variable-precision SWAR算法計算「1」的個數,每次算28字節*/
    while(count>=28) {
        uint32_t aux1, aux2, aux3, aux4, aux5, aux6, aux7;
 
        aux1 = *p4++;
        aux2 = *p4++;
        aux3 = *p4++;
        aux4 = *p4++;
        aux5 = *p4++;
        aux6 = *p4++;
        aux7 = *p4++;
        count -= 28;
 
        aux1 = aux1 - ((aux1 >> 1) & 0x55555555);//步驟一
        aux1 = (aux1 & 0x33333333) + ((aux1 >> 2) & 0x33333333);/步驟二
        aux2 = aux2 - ((aux2 >> 1) & 0x55555555);
        aux2 = (aux2 & 0x33333333) + ((aux2 >> 2) & 0x33333333);
        aux3 = aux3 - ((aux3 >> 1) & 0x55555555);
        aux3 = (aux3 & 0x33333333) + ((aux3 >> 2) & 0x33333333);
        aux4 = aux4 - ((aux4 >> 1) & 0x55555555);
        aux4 = (aux4 & 0x33333333) + ((aux4 >> 2) & 0x33333333);
        aux5 = aux5 - ((aux5 >> 1) & 0x55555555);
        aux5 = (aux5 & 0x33333333) + ((aux5 >> 2) & 0x33333333);
        aux6 = aux6 - ((aux6 >> 1) & 0x55555555);
        aux6 = (aux6 & 0x33333333) + ((aux6 >> 2) & 0x33333333);
        aux7 = aux7 - ((aux7 >> 1) & 0x55555555);
        aux7 = (aux7 & 0x33333333) + ((aux7 >> 2) & 0x33333333);
        bits += ((((aux1 + (aux1 >> 4)) & 0x0F0F0F0F) +
                    ((aux2 + (aux2 >> 4)) & 0x0F0F0F0F) +
                    ((aux3 + (aux3 >> 4)) & 0x0F0F0F0F) +
                    ((aux4 + (aux4 >> 4)) & 0x0F0F0F0F) +
                    ((aux5 + (aux5 >> 4)) & 0x0F0F0F0F) +
                    ((aux6 + (aux6 >> 4)) & 0x0F0F0F0F) +
                    ((aux7 + (aux7 >> 4)) & 0x0F0F0F0F))* 0x01010101) >> 24;//步驟三及步驟四
    }
    /* Count the remaining bytes. */
    /*用查表法收尾剩餘幾個字節中「1」的個數*/
    p = (unsigned char*)p4;
    while(count--) bits += bitsinbyte[*p++];
    return bits;
}

自問自答
Q1:爲何要4字節對齊?

A1:由於接下來處理時,p4是按4字節處理的,一次處理4*7=28字節的內容。若是這裏不是4字節,而是8字節,則前面也須要改爲8字節對齊,保持一致。

Q2:爲何一次批量處理28字節,處理16字節行不行,處理48字節行不行?

A2:其實能夠,在redis3.0中,一次就只處理了16字節,只須要保證每次處理的大小是32bit(一字節)的倍數就能夠。

Q3:函數限制了二進制串的大小是512M,是在哪限制的?

A3:這跟bitcount無關,是在setbit時限制的。

/* This helper function used by GETBIT / SETBIT parses the bit offset argument
 * making sure an error is returned if it is negative or if it overflows
 * Redis 512 MB limit for the string value.
 *
 * If the 'hash' argument is true, and 'bits is positive, then the command
 * will also parse bit offsets prefixed by "#". In such a case the offset
 * is multiplied by 'bits'. This is useful for the BITFIELD command. */
int getBitOffsetFromArgument(client *c, robj *o, size_t *offset, int hash, int bits) {
    long long loffset;
    char *err = "bit offset is not an integer or out of range";
    char *p = o->ptr;
    size_t plen = sdslen(p);
    int usehash = 0;
 
    /* Handle #<offset> form. */
    if (p[0] == '#' && hash && bits > 0) usehash = 1;
 
    if (string2ll(p+usehash,plen-usehash,&loffset) == 0) {
        addReplyError(c,err);
        return C_ERR;
    }
 
    /* Adjust the offset by 'bits' for #<offset> form. */
    if (usehash) loffset *= bits;
 
    /* Limit offset to 512MB in bytes */
    if ((loffset < 0) || ((unsigned long long)loffset >> 3) >= (512*1024*1024))
    {
        addReplyError(c,err);
        return C_ERR;
    }
 
    *offset = (size_t)loffset;
    return C_OK;
}

4、參考資料
1.https://blog.csdn.net/u010320...

2.http://www.cnblogs.com/graphi...

3.https://blog.csdn.net/qq_3478...

相關文章
相關標籤/搜索