Redis 數據結構-字符串源碼分析

相關文章

Redis 初探-安裝與使用javascript

本文將從如下幾個部分進行介紹

1.前言java

2.經常使用命令數據庫

3.字符串結構數組

4.字符串實現安全

5.命令是若是操做字符串的bash

前言

平時在使用 Redis 的時候,只會使用簡單的 set,get,並不明白其中的道理,爲了探個究竟,搞個明白,就看了下其底層的實現,本人的C言語水平只停留在大學上課堂上,因此看起來仍是有點吃力,好在一些關鍵流程,數據結構仍是看得懂 ^ ^。數據結構

Redis 的字符串是 Redis 中最基本的一種數據結構,全部的 key 都用字符串表示,且它是二進制安全的;它在內部使用一種稱爲動態字符串的結構來表示,能夠動態的進行擴展,能夠在 O(1) 的時間內獲取字符串的長度等,此外,一個字符串的長度最多不能超過 512M。app

經常使用命令

字符串的一些經常使用命令以下:curl

set key value - 設置值函數

get key -獲取值

append key value - 追加值

decr key - 原子減1

incr key - 原子加1

.......

動態字符串(SDS)結構定義

在解析動態字符串以前,先來看看 Redis 中 Object 的定義,源碼在 object.c 中,在該Object的中,定義了建立對象的一些方法,如建立字符串,建立list,建立set等,以外,還指定了對象的編碼方法;接下來看下和字符串相關的方法:

指定對象的編碼方式:

# object.c

char *strEncoding(int encoding) {
    switch(encoding) {
    case OBJ_ENCODING_RAW: return "raw";
    case OBJ_ENCODING_INT: return "int";
    case OBJ_ENCODING_HT: return "hashtable";
    case OBJ_ENCODING_QUICKLIST: return "quicklist";
    case OBJ_ENCODING_ZIPLIST: return "ziplist";
    case OBJ_ENCODING_INTSET: return "intset";
    case OBJ_ENCODING_SKIPLIST: return "skiplist";
    case OBJ_ENCODING_EMBSTR: return "embstr";
    default: return "unknown";
    }
}

字符串存儲主要的編碼方式主要有兩種 : embstr raw

這兩種方式有什麼區別呢?在什麼狀況下使用 embstr,在什麼狀況下使用 raw 呢?

當存儲的字符串很短的時候,會使用 embstr 進入編碼,當存儲的字符串超過 44 個字符的時候,會使用 raw 進行編碼;可使用 debug object key 來查看編碼方式,看如下的實驗:

embstr 編碼的存儲方式爲 將 RedisObject 對象頭和 SDS 對象連續存在一塊兒,使用 malloc 方法一次分配內存,而 raw 它須要兩次 malloc 分配內存,兩個對象頭在內存地址上通常是不連續的,它們的結構以下所示:

在 object.c 源碼中看下兩種字符串的建立方式:

# object.c

robj *createObject(int type, void *ptr) {
    robj *o = zmalloc(sizeof(*o)); // 只是申請對象頭的空間,會把指針指向 SDS
    o->type = type;
    o->encoding = OBJ_ENCODING_RAW; // 編碼格式爲 raw,爲默認的編碼方式
    o->ptr = ptr; //  指針指向 SDS
    o->refcount = 1;

    // 可忽略
    if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) {
        o->lru = (LFUGetTimeInMinutes()<<8) | LFU_INIT_VAL;
    } else {
        o->lru = LRU_CLOCK();
    }
    return o;
}

robj *createEmbeddedStringObject(const char *ptr, size_t len) {
    // 把對象頭和SDS 連續存在一塊兒
    robj *o = zmalloc(sizeof(robj)+sizeof(struct sdshdr8)+len+1);
    struct sdshdr8 *sh = (void*)(o+1);

    o->type = OBJ_STRING;
    o->encoding = OBJ_ENCODING_EMBSTR; // 爲 embstr 編碼方式
    o->ptr = sh+1;  
    o->refcount = 1;
    if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) {
        o->lru = (LFUGetTimeInMinutes()<<8) | LFU_INIT_VAL;
    } else {
        o->lru = LRU_CLOCK();
    }

    sh->len = len;
    sh->alloc = len;
    sh->flags = SDS_TYPE_8;
    if (ptr == SDS_NOINIT)
        sh->buf[len] = '\0';
    else if (ptr) {
        memcpy(sh->buf,ptr,len);
        sh->buf[len] = '\0';
    } else {
        memset(sh->buf,0,len+1);
    }
    return o;
}

SDS 定義

接下來看下動態字符串(SDS)的結構定義,該定義是在 sds.h 文件中,

// 
typedef char *sds;

// 該結構體再也不使用
struct __attribute__ ((__packed__)) sdshdr5 {
    unsigned char flags; /* 3 lsb of type, and 5 msb of string length */
    char buf[];
};
struct __attribute__ ((__packed__)) sdshdr8 {
    uint8_t len; /* used */
    uint8_t alloc; /* excluding the header and null terminator */
    unsigned char flags; /* 3 lsb of type, 5 unused bits */
    char buf[];
};
struct __attribute__ ((__packed__)) sdshdr16 {
    uint16_t len; /* used */
    uint16_t alloc; /* excluding the header and null terminator */
    unsigned char flags; /* 3 lsb of type, 5 unused bits */
    char buf[];
};
struct __attribute__ ((__packed__)) sdshdr32 {
    uint32_t len; /* used */
    uint32_t alloc; /* excluding the header and null terminator */
    unsigned char flags; /* 3 lsb of type, 5 unused bits */
    char buf[];
};
struct __attribute__ ((__packed__)) sdshdr64 {
    uint64_t len; /* used */
    uint64_t alloc; /* excluding the header and null terminator */
    unsigned char flags; /* 3 lsb of type, 5 unused bits */
    char buf[];
};

SDS 結構定義了五種不一樣 header 類型,不一樣字符串的長度使用不一樣的 header 類型,從而能夠節省內存。

每種 header 類型包含如下幾個屬性:

1. len : 字符串的長度

2. alloc : 表示字符串的最大容量,不包含 header 和空的終止符

3. flags : header的類型

4. buf: 存放字符串的數組

這幾種類型使用以下的宏表示:

#define SDS_TYPE_5  0
#define SDS_TYPE_8  1
#define SDS_TYPE_16 2
#define SDS_TYPE_32 3
#define SDS_TYPE_64 4

在該定義文件中,還定義了一些方法,以下:

獲取 sds 的長度,其中, SDS_HDR 表示獲取 header 指針。

static inline size_t sdslen(const sds s) {
    unsigned char flags = s[-1];
    switch(flags&SDS_TYPE_MASK) {
        case SDS_TYPE_5:
            return SDS_TYPE_5_LEN(flags);
        case SDS_TYPE_8:
            return SDS_HDR(8,s)->len;
        case SDS_TYPE_16:
            return SDS_HDR(16,s)->len; 
        case SDS_TYPE_32:
            return SDS_HDR(32,s)->len;
        case SDS_TYPE_64:
            return SDS_HDR(64,s)->len;
    }
    return 0;
}

獲取存儲字符串的數組中還剩多少容量,用最大容量 - 字符串的長度

static inline size_t sdsavail(const sds s) {
    unsigned char flags = s[-1];
    switch(flags&SDS_TYPE_MASK) {
        case SDS_TYPE_5: {
            return 0;
        }
        case SDS_TYPE_8: {
            SDS_HDR_VAR(8,s);
            return sh->alloc - sh->len;
        }
        case SDS_TYPE_16: {
            SDS_HDR_VAR(16,s);
            return sh->alloc - sh->len;
        }
        case SDS_TYPE_32: {
            SDS_HDR_VAR(32,s);
            return sh->alloc - sh->len;
        }
        case SDS_TYPE_64: {
            SDS_HDR_VAR(64,s);
            return sh->alloc - sh->len;
        }
    }
    return 0;
}

 

接下來,看一下字符串的一些經常使用的方法,這些方法主要是在 sds.c 文件中:

1. 根據字符串的大小來獲取請求的 header 類型

static inline char sdsReqType(size_t string_size) {
    if (string_size < 1<<5) // 32
        return SDS_TYPE_5;
    if (string_size < 1<<8) // 256
        return SDS_TYPE_8;
    if (string_size < 1<<16) // 65536
        return SDS_TYPE_16;
#if (LONG_MAX == LLONG_MAX)
    if (string_size < 1ll<<32)
        return SDS_TYPE_32;
    return SDS_TYPE_64;
#else
    return SDS_TYPE_32;
#endif
}

2. 根據類型獲取結構體大小

static inline int sdsHdrSize(char type) {
    switch(type&SDS_TYPE_MASK) {
        case SDS_TYPE_5:
            return sizeof(struct sdshdr5);
        case SDS_TYPE_8:
            return sizeof(struct sdshdr8);
        case SDS_TYPE_16:
            return sizeof(struct sdshdr16);
        case SDS_TYPE_32:
            return sizeof(struct sdshdr32);
        case SDS_TYPE_64:
            return sizeof(struct sdshdr64);
    }
    return 0;
}

3. 建立SDS

sds sdsnewlen(const void *init, size_t initlen) {
    void *sh;
    sds s;
    // 根據字符串長度獲取請求類型
    char type = sdsReqType(initlen);    
    if (type == SDS_TYPE_5 && initlen == 0) type = SDS_TYPE_8;
    // 根據類型獲取結構體大小
    int hdrlen = sdsHdrSize(type);
    unsigned char *fp; /* flags pointer. */
    // 申請內存,+1 是爲了處理最後一位 \0 的狀況
    sh = s_malloc(hdrlen+initlen+1);
    if (init==SDS_NOINIT)
        init = NULL;
    else if (!init)
        memset(sh, 0, hdrlen+initlen+1); //若是建立的sds爲空,則以0填充
    if (sh == NULL) return NULL;
    s = (char*)sh+hdrlen; // 指向 buf
    fp = ((unsigned char*)s)-1;
    switch(type) { //根據type不一樣對sdshdr結構體進行賦值,len和alloc設置爲initlen
        case SDS_TYPE_5: {
            *fp = type | (initlen << SDS_TYPE_BITS);
            break;
        }
        case SDS_TYPE_8: {
            SDS_HDR_VAR(8,s);
            sh->len = initlen;
            sh->alloc = initlen;
            *fp = type;
            break;
        }
        case SDS_TYPE_16: {
            SDS_HDR_VAR(16,s);
            sh->len = initlen;
            sh->alloc = initlen;
            *fp = type;
            break;
        }
        case SDS_TYPE_32: {
            SDS_HDR_VAR(32,s);
            sh->len = initlen;
            sh->alloc = initlen;
            *fp = type;
            break;
        }
        case SDS_TYPE_64: {
            SDS_HDR_VAR(64,s);
            sh->len = initlen;
            sh->alloc = initlen;
            *fp = type;
            break;
        }
    }
    if (initlen && init)
        memcpy(s, init, initlen); // copy 數據
    s[initlen] = '\0';
    return s;
}

4. 對SDS的空間進行擴容,當進行字符串追加的時候,須要判斷剩餘容量是否足夠

sds sdsMakeRoomFor(sds s, size_t addlen) {
    void *sh, *newsh;
    // 剩餘容量
    size_t avail = sdsavail(s);
    size_t len, newlen;
    char type, oldtype = s[-1] & SDS_TYPE_MASK;
    int hdrlen;

    // 若是當前容量足夠,則不須要擴容
    if (avail >= addlen) return s;
    // 當前字符串的長度
    len = sdslen(s);

    sh = (char*)s-sdsHdrSize(oldtype);
    // 擴容後的長度
    newlen = (len+addlen);
    // 若是擴容後的容量小於 1M 則擴大2倍,
    if (newlen < SDS_MAX_PREALLOC) // #define SDS_MAX_PREALLOC (1024*1024)
        newlen *= 2;
    else
        newlen += SDS_MAX_PREALLOC; // 不然等於當前容量 + 1M

    // 獲取新容量的結構體類型
    type = sdsReqType(newlen);

    if (type == SDS_TYPE_5) type = SDS_TYPE_8;
    // 根據類型獲取結構體大小
    hdrlen = sdsHdrSize(type);
    // 若是擴容後類型相等則,直接使用s_realloc擴容,內容不變
    if (oldtype==type) {
        newsh = s_realloc(sh, hdrlen+newlen+1);
        if (newsh == NULL) return NULL;
        s = (char*)newsh+hdrlen;
    } else {
        // 若是類型已經改變,就須要使用 s_malloc 申請內存,使用 memcpy 填充數據
        newsh = s_malloc(hdrlen+newlen+1);
        if (newsh == NULL) return NULL;
        memcpy((char*)newsh+hdrlen, s, len+1);
        s_free(sh); // 釋放舊內存
        s = (char*)newsh+hdrlen;
        s[-1] = type; // 設定新的類型
        sdssetlen(s, len);
    }
    sdssetalloc(s, newlen);// 更新容量
    return s;
}

5. 字符串鏈接函數

sds sdscat(sds s, const char *t) {
    return sdscatlen(s, t, strlen(t));
}

sds sdscatlen(sds s, const void *t, size_t len) {
    當前字符串的長度
    size_t curlen = sdslen(s);
    對容量進行擴容
    s = sdsMakeRoomFor(s,len);
    if (s == NULL) return NULL;
    鏈接字符串
    memcpy(s+curlen, t, len);
    // 設置鏈接後的字符串長度
    sdssetlen(s, curlen+len);
    s[curlen+len] = '\0';
    return s;
}

 

SDS 的實現

接下來看下動態字符串(SDS)的實現類,命令的操做都是調用實現類的方法,如set, get 等,SDS 的實現類在 t.string.c 文件中:

1. 字符串的長度檢查

static int checkStringLength(client *c, long long size) {
    if (size > 512*1024*1024) { // 512M
        addReplyError(c,"string exceeds maximum allowed size (512MB)");
        return C_ERR;
    }
    return C_OK;
}

從以上方法能夠看到,若是字符串的大小超過 512M,則會拋出異常

當咱們調用 SET, SETEX, PSETEX, SETNX 命令的話,會怎麼操做的呢?

set 操做

以 SET 爲例,看下是如何操做的,分爲三步:

(1). 解析命令格式

(2). 設置 value 的編碼方式

(3). 入庫

1. 解析命令格式

/* SET key value [NX] [XX] [EX <seconds>] [PX <milliseconds>] */
// 解析命令格式
void setCommand(client *c) {
    int j;
    robj *expire = NULL;
    int unit = UNIT_SECONDS;
    int flags = OBJ_SET_NO_FLAGS;
    // 獲取各個各個參數的值
    for (j = 3; j < c->argc; j++) {
        char *a = c->argv[j]->ptr;
        robj *next = (j == c->argc-1) ? NULL : c->argv[j+1];

        if ((a[0] == 'n' || a[0] == 'N') &&
            (a[1] == 'x' || a[1] == 'X') && a[2] == '\0' &&
            !(flags & OBJ_SET_XX))
        {
            flags |= OBJ_SET_NX;
        } else if ((a[0] == 'x' || a[0] == 'X') &&
                   (a[1] == 'x' || a[1] == 'X') && a[2] == '\0' &&
                   !(flags & OBJ_SET_NX))
        {
            flags |= OBJ_SET_XX;
        } else if ((a[0] == 'e' || a[0] == 'E') &&
                   (a[1] == 'x' || a[1] == 'X') && a[2] == '\0' &&
                   !(flags & OBJ_SET_PX) && next)
        {
            flags |= OBJ_SET_EX;
            unit = UNIT_SECONDS;
            expire = next;
            j++;
        } else if ((a[0] == 'p' || a[0] == 'P') &&
                   (a[1] == 'x' || a[1] == 'X') && a[2] == '\0' &&
                   !(flags & OBJ_SET_EX) && next)
        {
            flags |= OBJ_SET_PX;
            unit = UNIT_MILLISECONDS;
            expire = next;
            j++;
        } else {
            addReply(c,shared.syntaxerr);
            return;
        }
    }

    // argv[1] 表示要存儲的鍵,即key
    // argv[2] 表示要存儲的值,即value

    // 對 vaue 設置編碼方式
    c->argv[2] = tryObjectEncoding(c->argv[2]);

    // 入庫操做
    setGenericCommand(c,flags,c->argv[1],c->argv[2],expire,unit,NULL,NULL);
}

2. 設置 value 的編碼方式

robj *tryObjectEncoding(robj *o) {
    long value;
    sds s = o->ptr;
    size_t len;

    // 斷言必須爲 String 類型,不然沒必要進行編碼
    serverAssertWithInfo(NULL,o,o->type == OBJ_STRING);

   //只對raw或embstr 編碼的對象嘗試一些專門的編碼,若是不是 raw或embstr ,直接返回
    if (!sdsEncodedObject(o)) return o;

     // 若是該對象正在被使用,即引用次數大於1,則不會進行編碼,不安全
     if (o->refcount > 1) return o;

    len = sdslen(s);
    // 判斷是否是long類型,大於20個字符表示的纔是 String 
    // 若是是long,且String轉換爲long成功,轉爲 int 類型
    if (len <= 20 && string2l(s,len,&value)) {
        if ((server.maxmemory == 0 ||
            !(server.maxmemory_policy & MAXMEMORY_FLAG_NO_SHARED_INTEGERS)) &&
            value >= 0 &&
            value < OBJ_SHARED_INTEGERS)
        {
            decrRefCount(o);
            incrRefCount(shared.integers[value]);
            return shared.integers[value];
        } else {
            if (o->encoding == OBJ_ENCODING_RAW) sdsfree(o->ptr);
            o->encoding = OBJ_ENCODING_INT; // int 類型
            o->ptr = (void*) value;
            return o;
        }
    }

    // 若是字符串的長度小於限制,即44個字符,且它是 raw 編碼的話,轉換爲 embstr 編碼
    // #define OBJ_ENCODING_EMBSTR_SIZE_LIMIT 44
    if (len <= OBJ_ENCODING_EMBSTR_SIZE_LIMIT) {
        robj *emb;

        if (o->encoding == OBJ_ENCODING_EMBSTR) return o;
        emb = createEmbeddedStringObject(s,sdslen(s)); // 使用 embstr 方式建立字符串
        decrRefCount(o);
        return emb;
    }
    // 若是字符串是 raw 編碼,且剩餘可用空間 > len/10, 則會進行內存空間回收
    if (o->encoding == OBJ_ENCODING_RAW &&
        sdsavail(s) > len/10)
    {
        o->ptr = sdsRemoveFreeSpace(o->ptr);
    }
    return o;
}

3. 入庫

void setGenericCommand(client *c, int flags, robj *key, robj *val, robj *expire, int unit, robj *ok_reply, robj *abort_reply) {
    long long milliseconds = 0; /* initialized to avoid any harmness warning */

    if (expire) {
        if (getLongLongFromObjectOrReply(c, expire, &milliseconds, NULL) != C_OK)
            return;
        if (milliseconds <= 0) {
            addReplyErrorFormat(c,"invalid expire time in %s",c->cmd->name);
            return;
        }
        if (unit == UNIT_SECONDS) milliseconds *= 1000;
    }

    if ((flags & OBJ_SET_NX && lookupKeyWrite(c->db,key) != NULL) ||
        (flags & OBJ_SET_XX && lookupKeyWrite(c->db,key) == NULL))
    {
        addReply(c, abort_reply ? abort_reply : shared.nullbulk);
        return;
    }
    // 入庫
    setKey(c->db,key,val);
    server.dirty++;
    // 處理過時時間
    if (expire) setExpire(c,c->db,key,mstime()+milliseconds);
    notifyKeyspaceEvent(NOTIFY_STRING,"set",key,c->db->id);
    if (expire) notifyKeyspaceEvent(NOTIFY_GENERIC,
        "expire",key,c->db->id);
    // 返回
    addReply(c, ok_reply ? ok_reply : shared.ok);
}

get 操做

void getCommand(client *c) {
    getGenericCommand(c);
}

int getGenericCommand(client *c) {
    robj *o;
    // 從數據庫中查找
    if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.nullbulk)) == NULL)
        return C_OK;
    // 檢查類型
    if (o->type != OBJ_STRING) {
        addReply(c,shared.wrongtypeerr);
        return C_ERR;
    } else {
        addReplyBulk(c,o);
        return C_OK;
    }
}

append 操做

void appendCommand(client *c) {
    size_t totlen;
    robj *o, *append;

    // 從數據庫中查詢對應的 key
    o = lookupKeyWrite(c->db,c->argv[1]);
    // 若是數據庫中沒有這個 key ,則會建立一個 key
    if (o == NULL) {
        // 對 value 設置編碼
        c->argv[2] = tryObjectEncoding(c->argv[2]);
        // 入庫
        dbAdd(c->db,c->argv[1],c->argv[2]);
        incrRefCount(c->argv[2]);
        // 設置總長度
        totlen = stringObjectLen(c->argv[2]);
    } else {
        // 若是在數據庫中 key 已存在
        // 檢查類型
        if (checkType(c,o,OBJ_STRING))
            return;

        // 要添加的值
        append = c->argv[2];
        totlen = stringObjectLen(o)+sdslen(append->ptr);
        // 檢查總長度是否超過 512M
        if (checkStringLength(c,totlen) != C_OK)
            return;

        // 追加值
        o = dbUnshareStringValue(c->db,c->argv[1],o);
        o->ptr = sdscatlen(o->ptr,append->ptr,sdslen(append->ptr));
        totlen = sdslen(o->ptr);
    }
    signalModifiedKey(c->db,c->argv[1]);
    notifyKeyspaceEvent(NOTIFY_STRING,"append",c->argv[1],c->db->id);
    server.dirty++;
    addReplyLongLong(c,totlen);
}

以上就是 Redis 的字符串的方式,會根據不一樣字符串的長度來選擇不一樣的編碼方式以達到節約內存的效果。

相關文章
相關標籤/搜索