如何解決Redis 主從數據不一致問題

線上問題

近期咱們在對Redis作大規模遷移升級的時候,採用模擬複製協議的方式進行數據傳輸同步。redis

在此期間,咱們遇到以下兩個問題:測試

  1. 遷移先後Redis過時時間不一致。
  2. 遷移先後Redis key 數量不一致。

遷移先後Redis過時時間不一致

針對第一個問題,Redis 過時時間不一致問題,經過測試而且查閱Redis源碼中得出以下結論:
Redis社區版本在正常的主從複製也會出現過時時間不一致問題,主要是因爲在主從進行全同步期間,若是主庫此時有expire 命令,那麼到從庫中,該命令將會被延遲執行。由於全同步須要耗費時間,數據量越大,那麼過時時間差距就越大。
Redis expire 命令主要實現以下:ui

expireGenericCommand(c,mstime(),UNIT_SECONDS);

void expireGenericCommand(redisClient *c, long long basetime, int unit) {
    robj *key = c->argv[1], *param = c->argv[2];
    long long when; /* unix time in milliseconds when the key will expire. */
    if (getLongLongFromObjectOrReply(c, param, &when, NULL) != REDIS_OK)
        return;
    if (unit == UNIT_SECONDS) when *= 1000;
    when += basetime;

expire 600 到redis中過時時間實際上是(當前timestamp+600)*1000,最終Redis會存儲計算後這個值在Redis中。因此上面提到的狀況,等到命令到從庫的時候,當前的timestamp跟以前的timestamp不同了,特別是發生在全同步後的expire命令,延遲時間基本上等於全同步的數據,最終形成過時時間不一致。this

這個問題其實已是官方的已知問題,解決方案有兩個:unix

1. 業務採用expireat timestamp 方式,這樣命令傳送到從庫就沒有影響。
2. 在Redis代碼中將expire命令轉換爲expireat命令。

官方沒有作第二個選擇,反而是提供expireat命令來給用戶選擇。其實從另一個角度來看,從庫的過時時間大於主庫的過時時間,其實影響不大。由於主庫會主動觸發過時刪除,若是該key刪除以後,主庫也會向從庫發送刪除的命令。可是若是主庫的key已經到了過時時間,redis沒有及時進行淘汰,這個時候訪問從庫該key,那麼這個key是不會被觸發淘汰的,這樣若是對於過時時間要求很是苛刻的業務仍是會有影響的。
並且目前針對於咱們大規模遷移的時間,在進行過時時間校驗的時候,發現大量key的過時時間都不一致,這樣也不利於咱們進行校驗。code

因此針對第一個問題,咱們將expire/pexpire/setex/psetex 命令在複製到從庫的時候轉換成時間戳的方式,好比expire 轉成expireat命令,setex轉換成set和expireat命令,具體實現以下:orm

void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc,
               int flags)
{
    if (server.aof_state != REDIS_AOF_OFF && flags & REDIS_PROPAGATE_AOF)
        feedAppendOnlyFile(cmd,dbid,argv,argc);
    if (flags & REDIS_PROPAGATE_REPL) {
        if (!strcasecmp(argv[0]->ptr,"expire") ||
            !strcasecmp(argv[0]->ptr,"setex") ||
            !strcasecmp(argv[0]->ptr,"pexpire") ||
            !strcasecmp(argv[0]->ptr,"psetex") ) {
            long long when;
            robj *tmpargv[3];
            robj *tmpexpire[3];
            argv[2] = getDecodedObject(argv[2]);
            when = strtoll(argv[2]->ptr,NULL,10);
            if (!strcasecmp(argv[0]->ptr,"expire") ||
                !strcasecmp(argv[0]->ptr,"setex")) {
                    when *= 1000;
            }    
            when += mstime();
            /* Translate EXPIRE/PEXPIRE/EXPIREAT into PEXPIREAT */
            if (!strcasecmp(argv[0]->ptr,"expire") ||
                !strcasecmp(argv[0]->ptr,"pexpire")) {
                tmpargv[0] = createStringObject("PEXPIREAT",9);
                tmpargv[1] = getDecodedObject(argv[1]);
                tmpargv[2] = createStringObjectFromLongLong(when);
                replicationFeedSlaves(server.slaves,dbid,tmpargv,argc);
                decrRefCount(tmpargv[0]);
                decrRefCount(tmpargv[1]);
                decrRefCount(tmpargv[2]);
            }    
            /* Translate SETEX/PSETEX to SET and PEXPIREAT */
            if (!strcasecmp(argv[0]->ptr,"setex") ||
                !strcasecmp(argv[0]->ptr,"psetex")) {
                argc = 3;
                tmpargv[0] = createStringObject("SET",3);
                tmpargv[1] = getDecodedObject(argv[1]);
                tmpargv[2] = getDecodedObject(argv[3]);
                replicationFeedSlaves(server.slaves,dbid,tmpargv,argc);
                tmpexpire[0] = createStringObject("PEXPIREAT",9);
                tmpexpire[1] = getDecodedObject(argv[1]);
                tmpexpire[2] = createStringObjectFromLongLong(when);
                replicationFeedSlaves(server.slaves,dbid,tmpexpire,argc);
                decrRefCount(tmpargv[0]);
                decrRefCount(tmpargv[1]);
                decrRefCount(tmpargv[2]);
                decrRefCount(tmpexpire[0]);
                decrRefCount(tmpexpire[1]);
                decrRefCount(tmpexpire[2]);
            }
        } else {
                replicationFeedSlaves(server.slaves,dbid,argv,argc);
        }
}
}

目前上述修改已經應用到線上遷移環境中,上線之後Redis過時時間不一致問題解決,目前遷移先後的過時時間是嚴格保持一致的。server

遷移先後Redis key 數量不一致

針對於第二個問題,Redis key 遷移先後數量不一致問題,其實在Redis社區版本的主從複製中,也會常常出現key數量不一致。其中一個很是關鍵的問題是,redis在作主從複製的時候,會對當前的存量數據作一個RDB快照(bgsave命令),而後將RDB快照傳給從庫,從庫會解析RDB文件而且load到內存中。然兒在上述的兩個步驟中Redis會忽略過時的key:ip

1. 主庫在作RDB快照文件的時候,發現key已通過期了,則此時不會將過時的key寫到RDB文件中。
2. 從庫在load RDB文件到內存中的時候,發現key已通過期了,則此時不會將過時的key load進去。

因此針對上述兩個問題會形成Redis主從key不一致問題,這個對於咱們作數據校驗的時候會有些影響,因始終以爲key不一致,可是不影響業務邏輯。
針對上述問題,目前咱們將以上兩個步驟都改成不忽略過時key,過時key的刪除統一由主庫觸發刪除,而後將刪除命令傳送到從庫中。這樣key的數量就徹底一致了。
最終在打上以上兩個patch以後,再進行遷移測試的時候,驗證key過時時間以及數量都是徹底一致的。
最後貼上以上修改的代碼(針對於社區版本Redis 3.0.7):內存

如下代碼修改均在我標記註釋的下面

一、作bgsave的時候不忽略過時的key

rdb.c

/* Save a key-value pair, with expire time, type, key, value.
 * On error -1 is returned.
 * On success if the key was actually saved 1 is returned, otherwise 0
 * is returned (the key was already expired). */
int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val,
                        long long expiretime, long long now)
{
    /* Save the expire time */
    if (expiretime != -1) {
        /* If this key is already expired skip it */
        /* 註釋下面這一行 */
        /* if (expiretime < now) return 0; */                                                                                                                                                
        if (rdbSaveType(rdb,REDIS_RDB_OPCODE_EXPIRETIME_MS) == -1) return -1;
        if (rdbSaveMillisecondTime(rdb,expiretime) == -1) return -1;
    }   

    /* Save type, key, value */
    if (rdbSaveObjectType(rdb,val) == -1) return -1;
    if (rdbSaveStringObject(rdb,key) == -1) return -1;
    if (rdbSaveObject(rdb,val) == -1) return -1;
    return 1;
}

二、作bgrewirteaof 的時候不忽略過時的key

aof.c

int rewriteAppendOnlyFile(char *filename) {
    dictIterator *di = NULL;
    dictEntry *de;
    rio aof;
    FILE *fp;
    char tmpfile[256];
    int j;
    long long now = mstime();
    char byte;
    size_t processed = 0;

    /* Note that we have to use a different temp name here compared to the
     * one used by rewriteAppendOnlyFileBackground() function. */
    snprintf(tmpfile,256,"temp-rewriteaof-%d.aof", (int) getpid());
    fp = fopen(tmpfile,"w");
    if (!fp) {
        redisLog(REDIS_WARNING, "Opening the temp file for AOF rewrite in rewriteAppendOnlyFile(): %s", strerror(errno));
        return REDIS_ERR;
    }   

    server.aof_child_diff = sdsempty();
    rioInitWithFile(&aof,fp);
    if (server.aof_rewrite_incremental_fsync)
        rioSetAutoSync(&aof,REDIS_AOF_AUTOSYNC_BYTES);
    for (j = 0; j < server.dbnum; j++) {
        char selectcmd[] = "*2\r\n$6\r\nSELECT\r\n";
        redisDb *db = server.db+j;
        dict *d = db->dict;
        if (dictSize(d) == 0) continue;
        di = dictGetSafeIterator(d);
        if (!di) {
            fclose(fp);
            return REDIS_ERR;
        }   

        /* SELECT the new DB */
        if (rioWrite(&aof,selectcmd,sizeof(selectcmd)-1) == 0) goto werr;
        if (rioWriteBulkLongLong(&aof,j) == 0) goto werr;

        /* Iterate this DB writing every entry */  
        while((de = dictNext(di)) != NULL) {
            sds keystr;
            robj key, *o;
            long long expiretime;
 
            keystr = dictGetKey(de);
            o = dictGetVal(de);
            initStaticStringObject(key,keystr);
 
            expiretime = getExpire(db,&key);
 
            /* If this key is already expired skip it */
            /* 註釋下面這一行 */
            /* if (expiretime != -1 && expiretime < now) continue; */

三、在load rdb 的時候不忽略過時key

rdb.c

int rdbLoad(char *filename) {
    uint32_t dbid;
    int type, rdbver;
    redisDb *db = server.db+0;
    char buf[1024];
    long long expiretime, now = mstime();
    FILE *fp;
    rio rdb;

    if ((fp = fopen(filename,"r")) == NULL) return REDIS_ERR;

    rioInitWithFile(&rdb,fp);
    rdb.update_cksum = rdbLoadProgressCallback;
    rdb.max_processing_chunk = server.loading_process_events_interval_bytes;
    if (rioRead(&rdb,buf,9) == 0) goto eoferr;
    buf[9] = '\0';
    if (memcmp(buf,"REDIS",5) != 0) {
        fclose(fp);
        redisLog(REDIS_WARNING,"Wrong signature trying to load DB from file");
        errno = EINVAL;
        return REDIS_ERR;
    }
    rdbver = atoi(buf+5);
    if (rdbver < 1 || rdbver > REDIS_RDB_VERSION) {
        fclose(fp);
        redisLog(REDIS_WARNING,"Can't handle RDB format version %d",rdbver);
        errno = EINVAL;
        return REDIS_ERR;
    }

    startLoading(fp);
    while(1) {
        robj *key, *val;
        expiretime = -1;

        /* Read type. */
        if ((type = rdbLoadType(&rdb)) == -1) goto eoferr;
        if (type == REDIS_RDB_OPCODE_EXPIRETIME) {
            if ((expiretime = rdbLoadTime(&rdb)) == -1) goto eoferr;
            /* We read the time so we need to read the object type again. */
            if ((type = rdbLoadType(&rdb)) == -1) goto eoferr;
            /* the EXPIRETIME opcode specifies time in seconds, so convert
             * into milliseconds. */
            expiretime *= 1000;
        } else if (type == REDIS_RDB_OPCODE_EXPIRETIME_MS) {
            /* Milliseconds precision expire times introduced with RDB
             * version 3. */
            if ((expiretime = rdbLoadMillisecondTime(&rdb)) == -1) goto eoferr;
            /* We read the time so we need to read the object type again. */
            if ((type = rdbLoadType(&rdb)) == -1) goto eoferr;
        }
 
        if (type == REDIS_RDB_OPCODE_EOF)
            break;
 
        /* Handle SELECT DB opcode as a special case */
        if (type == REDIS_RDB_OPCODE_SELECTDB) {
            if ((dbid = rdbLoadLen(&rdb,NULL)) == REDIS_RDB_LENERR)
                goto eoferr;
            if (dbid >= (unsigned)server.dbnum) {
                redisLog(REDIS_WARNING,"FATAL: Data file was created with a Redis server configured to handle more than %d databases. Exiting\n", server.dbnum);
                exit(1);
            }
            db = server.db+dbid;
            continue;
        }
        /* Read key */
        if ((key = rdbLoadStringObject(&rdb)) == NULL) goto eoferr;
        /* Read value */
        if ((val = rdbLoadObject(type,&rdb)) == NULL) goto eoferr;
        /* Check if the key already expired. This function is used when loading
         * an RDB file from disk, either at startup, or when an RDB was
         * received from the master. In the latter case, the master is
         * responsible for key expiry. If we would expire keys here, the
         * snapshot taken by the master may not be reflected on the slave. */
         /* 註釋下面5行 */
        /* if (server.masterhost == NULL && expiretime != -1 && expiretime < now) {
            decrRefCount(key);
            decrRefCount(val);
             continue;
        } */
        /* Add the new object in the hash table */
        dbAdd(db,key,val);
 
        /* Set the expire time if needed */
        if (expiretime != -1) setExpire(db,key,expiretime);
 
        decrRefCount(key);
    }

總結

注意上述修改在內存策略爲noeviction一直有效,可是其餘內存策略只能在Redis 使用內存小於最大內存的時候纔會有效,由於從庫在使用內存超過最大內存的時候也會觸發淘汰,這個時候也無法徹底保證數據一致性了。

相關文章
相關標籤/搜索