順風車運營研發團隊 熊浩含
1、命令簡介
BITCOUNT key [start] [end]html
redis計算給定字符串中,被設置爲 1 的比特位的數量。redis
redis> BITCOUNT bits (integer) 0 redis> SETBIT bits 0 1 # 0001 (integer) 0 redis> BITCOUNT bits (integer) 1 redis> SETBIT bits 3 1 # 1001 (integer) 0 redis> BITCOUNT bits (integer) 2
2、算法思路
redis執行這一命令的過程,核心是求二進制數中「1」的個數。但不一樣於處理通常數據,redis中支持計算最多512M數據中被設置爲 1 的比特位的數。因此問題不妨轉化爲:算法
如何計算0.5個G數據中,被設置爲 1 的比特位的數量?數組
相關的算法有不少,redis在處理過程當中,綜合了二種不一樣的方法,先單獨介紹:函數
查表法
此處入參的大小是4字節(unsigned int)ui
int BitCount(unsigned int n) { unsigned int table[256] = { 0, 1, 1, 2, 1, 2, 2, 3, 1, 2, 2, 3, 2, 3, 3, 4, 1, 2, 2, 3, 2, 3, 3, 4, 2, 3, 3, 4, 3, 4, 4, 5, 1, 2, 2, 3, 2, 3, 3, 4, 2, 3, 3, 4, 3, 4, 4, 5, 2, 3, 3, 4, 3, 4, 4, 5, 3, 4, 4, 5, 4, 5, 5, 6, 1, 2, 2, 3, 2, 3, 3, 4, 2, 3, 3, 4, 3, 4, 4, 5, 2, 3, 3, 4, 3, 4, 4, 5, 3, 4, 4, 5, 4, 5, 5, 6, 2, 3, 3, 4, 3, 4, 4, 5, 3, 4, 4, 5, 4, 5, 5, 6, 3, 4, 4, 5, 4, 5, 5, 6, 4, 5, 5, 6, 5, 6, 6, 7, 1, 2, 2, 3, 2, 3, 3, 4, 2, 3, 3, 4, 3, 4, 4, 5, 2, 3, 3, 4, 3, 4, 4, 5, 3, 4, 4, 5, 4, 5, 5, 6, 2, 3, 3, 4, 3, 4, 4, 5, 3, 4, 4, 5, 4, 5, 5, 6, 3, 4, 4, 5, 4, 5, 5, 6, 4, 5, 5, 6, 5, 6, 6, 7, 2, 3, 3, 4, 3, 4, 4, 5, 3, 4, 4, 5, 4, 5, 5, 6, 3, 4, 4, 5, 4, 5, 5, 6, 4, 5, 5, 6, 5, 6, 6, 7, 3, 4, 4, 5, 4, 5, 5, 6, 4, 5, 5, 6, 5, 6, 6, 7, 4, 5, 5, 6, 5, 6, 6, 7, 5, 6, 6, 7, 6, 7, 7, 8, }; return table[n &0xff] + table[(n >>8) &0xff] + table[(n >>16) &0xff] + table[(n >>24) &0xff] ; }
思路:this
一、建立一大小爲256的數組,相應位置上存放對應2進制數的「1」的個數;spa
二、將入參按8bit分開,查4次表,並將4次結果結果相加。.net
以2882400018(二進制:10101011110011011110111100010010)爲例,四次查表過程以下:紅色表示當前8bit,綠色表示右移後高位補零。code
相加可得2+7+5+5=19。
variable-precision SWAR算法
統計一個位數組中非0位的數量,數學上稱做:」Hanmming Weight「(漢明重量)。目前效率最高的是variable-precision SWAR算法,能夠在常數時間內計算出多個字節的非0數目。
先觀察如下幾個數,以後這幾個數將做爲掩碼參與計算。
int swar(uint32_t i) { //計算每兩位二進制數中1的個數 i = ( i & 0x55555555) + ((i >> 1) & 0x55555555); //計算每四位二進制數中1的個數 i = (i & 0x33333333) + ((i >> 2) & 0x33333333); //計算每八位二進制數中1的個數 i = (i & 0x0F0F0F0F) + ((i >> 4) & 0x0F0F0F0F); //將每八位二進制數中1的個數和相加,並移至最低位八位 i = (i * 0x01010101) >> 24); return i; }
下面以(0010 1011 0100 1010 0001 1111 1000 0111)爲例逐步說明:
1)首先計算每兩位二進制數中1的個數,( i & 0x55555555)篩出了每兩位二進制數中奇數位的「1」,並把「1」置於低位;((i >> 1) & 0x55555555)篩出了每兩位二進制數中偶數位,一樣把「1」置於低位;相加後的值,只多是0,1,2,表明了這兩位上「1」的個數;
2)對上一步的結果做「歸併」處理,計算每四位上「1」的個數,此時i的一個4bit,存放着兩個2bit的「1」的個數和。(i & 0x33333333)篩出了奇數序列上的4bit,((i >> 2) & 0x33333333)篩出了偶數序列上的2bit;相加後的值,表明了這4bit上「1」的個數;
3)繼續對上一步結果做「歸併處理」,計算每八位上「1」的個數,此時i的一個8bit,存放着兩個4bit的「1」的個數和。(i &0x0F0F0F0F)篩出了奇數序列上的4bit,((i >> 2) & 0x0F0F0F0F)篩出了偶數序列上的4bit;相加後的值,表明了這8bit上「1」的個數;
4)此時對於32bit的二進制數據,咱們已經按8bit*4分好了組,每8bit存放着的是該組「1」的個數,如今把這四組數加起來便可,即實現
00000100+00000101+00000011+00000100。
體如今乘法上,便是(i * 0x01010101)>>24,等於0000....000000010000=16。
3、redis實現
void bitcountCommand(client *c) { robj *o; long start, end, strlen; unsigned char *p; char llbuf[LONG_STR_SIZE]; /* Lookup, check for type, and return 0 for non existing keys. */ /*檢查key是否存在,若是不存在,則返回0*/ if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.czero)) == NULL || checkType(c,o,OBJ_STRING)) return; p = getObjectReadOnlyString(o,&strlen,llbuf); /* 檢查參數是否有誤 */ if (c->argc == 4) { if (getLongFromObjectOrReply(c,c->argv[2],&start,NULL) != C_OK) return; if (getLongFromObjectOrReply(c,c->argv[3],&end,NULL) != C_OK) return; /* Convert negative indexes */ if (start < 0 && end < 0 && start > end) { addReply(c,shared.czero); return; } if (start < 0) start = strlen+start; if (end < 0) end = strlen+end; if (start < 0) start = 0; if (end < 0) end = 0; if (end >= strlen) end = strlen-1; } else if (c->argc == 2) { /* The whole string. */ start = 0; end = strlen-1; } else { /* Syntax error. */ addReply(c,shared.syntaxerr); return; } /* Precondition: end >= 0 && end < strlen, so the only condition where * zero can be returned is: start > end. */ if (start > end) { addReply(c,shared.czero); } else { long bytes = end-start+1; addReplyLongLong(c,redisPopcount(p+start,bytes)); } }
* Count number of bits set in the binary array pointed by 's' and long * 'count' bytes. The implementation of this function is required to * work with a input string length up to 512 MB. */ size_t redisPopcount(void *s, long count) { size_t bits = 0; unsigned char *p = s; uint32_t *p4; /*爲查表法預先準備好的表*/ static const unsigned char bitsinbyte[256] = {0,1,1,2,1,2,2,3,1,2,2,3,2,3,3,4,1,2,2,3,2,3,3,4,2,3,3,4,3,4,4,5,1,2,2,3,2,3,3,4,2,3,3,4,3,4,4,5,2,3,3,4,3,4,4,5,3,4,4,5,4,5,5,6,1,2,2,3,2,3,3,4,2,3,3,4,3,4,4,5,2,3,3,4,3,4,4,5,3,4,4,5,4,5,5,6,2,3,3,4,3,4,4,5,3,4,4,5,4,5,5,6,3,4,4,5,4,5,5,6,4,5,5,6,5,6,6,7,1,2,2,3,2,3,3,4,2,3,3,4,3,4,4,5,2,3,3,4,3,4,4,5,3,4,4,5,4,5,5,6,2,3,3,4,3,4,4,5,3,4,4,5,4,5,5,6,3,4,4,5,4,5,5,6,4,5,5,6,5,6,6,7,2,3,3,4,3,4,4,5,3,4,4,5,4,5,5,6,3,4,4,5,4,5,5,6,4,5,5,6,5,6,6,7,3,4,4,5,4,5,5,6,4,5,5,6,5,6,6,7,4,5,5,6,5,6,6,7,5,6,6,7,6,7,7,8}; /* Count initial bytes not aligned to 32 bit. */ /*四字節對齊,不是32整數倍的用查表處理。方便接下來按每次28字節處理*/ while((unsigned long)p & 3 && count) { bits += bitsinbyte[*p++];//一次仍是處理一字節 count--; } /* Count bits 28 bytes at a time */ p4 = (uint32_t*)p;//32bit 4字節 /*開始用variable-precision SWAR算法計算「1」的個數,每次算28字節*/ while(count>=28) { uint32_t aux1, aux2, aux3, aux4, aux5, aux6, aux7; aux1 = *p4++; aux2 = *p4++; aux3 = *p4++; aux4 = *p4++; aux5 = *p4++; aux6 = *p4++; aux7 = *p4++; count -= 28; aux1 = aux1 - ((aux1 >> 1) & 0x55555555);//步驟一 aux1 = (aux1 & 0x33333333) + ((aux1 >> 2) & 0x33333333);/步驟二 aux2 = aux2 - ((aux2 >> 1) & 0x55555555); aux2 = (aux2 & 0x33333333) + ((aux2 >> 2) & 0x33333333); aux3 = aux3 - ((aux3 >> 1) & 0x55555555); aux3 = (aux3 & 0x33333333) + ((aux3 >> 2) & 0x33333333); aux4 = aux4 - ((aux4 >> 1) & 0x55555555); aux4 = (aux4 & 0x33333333) + ((aux4 >> 2) & 0x33333333); aux5 = aux5 - ((aux5 >> 1) & 0x55555555); aux5 = (aux5 & 0x33333333) + ((aux5 >> 2) & 0x33333333); aux6 = aux6 - ((aux6 >> 1) & 0x55555555); aux6 = (aux6 & 0x33333333) + ((aux6 >> 2) & 0x33333333); aux7 = aux7 - ((aux7 >> 1) & 0x55555555); aux7 = (aux7 & 0x33333333) + ((aux7 >> 2) & 0x33333333); bits += ((((aux1 + (aux1 >> 4)) & 0x0F0F0F0F) + ((aux2 + (aux2 >> 4)) & 0x0F0F0F0F) + ((aux3 + (aux3 >> 4)) & 0x0F0F0F0F) + ((aux4 + (aux4 >> 4)) & 0x0F0F0F0F) + ((aux5 + (aux5 >> 4)) & 0x0F0F0F0F) + ((aux6 + (aux6 >> 4)) & 0x0F0F0F0F) + ((aux7 + (aux7 >> 4)) & 0x0F0F0F0F))* 0x01010101) >> 24;//步驟三及步驟四 } /* Count the remaining bytes. */ /*用查表法收尾剩餘幾個字節中「1」的個數*/ p = (unsigned char*)p4; while(count--) bits += bitsinbyte[*p++]; return bits; }
自問自答
Q1:爲何要4字節對齊?
A1:由於接下來處理時,p4是按4字節處理的,一次處理4*7=28字節的內容。若是這裏不是4字節,而是8字節,則前面也須要改爲8字節對齊,保持一致。
Q2:爲何一次批量處理28字節,處理16字節行不行,處理48字節行不行?
A2:其實能夠,在redis3.0中,一次就只處理了16字節,只須要保證每次處理的大小是32bit(一字節)的倍數就能夠。
Q3:函數限制了二進制串的大小是512M,是在哪限制的?
A3:這跟bitcount無關,是在setbit時限制的。
/* This helper function used by GETBIT / SETBIT parses the bit offset argument * making sure an error is returned if it is negative or if it overflows * Redis 512 MB limit for the string value. * * If the 'hash' argument is true, and 'bits is positive, then the command * will also parse bit offsets prefixed by "#". In such a case the offset * is multiplied by 'bits'. This is useful for the BITFIELD command. */ int getBitOffsetFromArgument(client *c, robj *o, size_t *offset, int hash, int bits) { long long loffset; char *err = "bit offset is not an integer or out of range"; char *p = o->ptr; size_t plen = sdslen(p); int usehash = 0; /* Handle #<offset> form. */ if (p[0] == '#' && hash && bits > 0) usehash = 1; if (string2ll(p+usehash,plen-usehash,&loffset) == 0) { addReplyError(c,err); return C_ERR; } /* Adjust the offset by 'bits' for #<offset> form. */ if (usehash) loffset *= bits; /* Limit offset to 512MB in bytes */ if ((loffset < 0) || ((unsigned long long)loffset >> 3) >= (512*1024*1024)) { addReplyError(c,err); return C_ERR; } *offset = (size_t)loffset; return C_OK; }
4、參考資料
1.https://blog.csdn.net/u010320...