【Redis5源碼學習】淺析redis命令之migrate篇

baiyannode

命令使用

命令含義:將 key 原子性地從當前實例傳送到目標實例的指定數據庫上,一旦傳送成功, key 保證會出如今目標實例上,而當前實例上的 key 會被刪除
命令格式:redis

MIGRATE host port key|"" destination-db timeout [COPY] [REPLACE] [KEYS key [key ...]]

命令實戰:將鍵key一、key二、key3批量遷移到本機6380端口的redis實例上,並存儲到目標實例的第0號數據庫,超時時間爲1000毫秒。可選項COPY若是表示不移除源實例上的 key ,REPLACE選項表示替換目標實例上已存在的 key 。KEYS選項表示能夠同時批量傳送多個keys(但前面的key參數的位置必須設置爲空)數據庫

127.0.0.1:6379> migrate 127.0.0.1 6380 "" 0 5000 KEYS key1 key2 key3
OK

返回值:遷移成功時返回 OK ,不然返回錯誤數組

源碼分析

migrate命令的執行過程可分爲參數校驗、鏈接創建、組裝數據、發送數據、處理返回五個階段。一樣的,migrate命令的處理函數爲migrateCommand():緩存

參數校驗

void migrateCommand(client *c) {
    migrateCachedSocket *cs; // 鏈接另外一個實例的socket
    int copy = 0, replace = 0, j; // 是否開啓copy及replace選項標記
    char *password = NULL; // 密碼
    long timeout; // 超時時間
    long dbid; // 數據庫id
    robj **ov = NULL; /* 要遷移的對象 */
    robj **kv = NULL; /* 鍵名 */
    robj **newargv = NULL; 
    rio cmd, payload; // 重要,存儲目標實例執行的命令及DUMP的payload
    int may_retry = 1;
    int write_error = 0;
    int argv_rewritten = 0;

    /* 支持同時傳輸多個key. */
    int first_key = 3; /* 第一個鍵參數的位置. */
    int num_keys = 1;  /* 默認只傳送一個key. */

    /* 校驗其餘選項,從COPY選項開始校驗 */
    for (j = 6; j < c->argc; j++) {
        int moreargs = j < c->argc-1;
        if (!strcasecmp(c->argv[j]->ptr,"copy")) { // 若是命令參數等於copy,開啓copy選項
            copy = 1;
        } else if (!strcasecmp(c->argv[j]->ptr,"replace")) { // 若是命令參數等於replace,開啓replace選項
            replace = 1;
        } else if (!strcasecmp(c->argv[j]->ptr,"auth")) { // 若是命令參數等於auth,開啓auth選項
            if (!moreargs) { // 參數數量超出規定數量,報錯
                addReply(c,shared.syntaxerr);
                return;
            }
            j++;
            password = c->argv[j]->ptr;
        } else if (!strcasecmp(c->argv[j]->ptr,"keys")) { // 若是設置了keys參數,代表要同時傳輸多個keys值過去
            if (sdslen(c->argv[3]->ptr) != 0) { // 若是開啓了keys選項,前面key參數的位置必須設置爲空
                addReplyError(c,
                    "When using MIGRATE KEYS option, the key argument"
                    " must be set to the empty string");
                return;
            }
            first_key = j+1;
            num_keys = c->argc - j - 1;
            break; /*如今first_key值指向keys的第一個值.,並將num_keys設置爲keys的數量 */
        } else {
            addReply(c,shared.syntaxerr);
            return;
        }
    }

    /* 選擇的db和超時時間數據校驗,看是不是合法的數字格式 */
    if (getLongFromObjectOrReply(c,c->argv[5],&timeout,NULL) != C_OK ||
        getLongFromObjectOrReply(c,c->argv[4],&dbid,NULL) != C_OK)
    {
        return;
    }
    if (timeout <= 0) timeout = 1000;

    /* 接下來會檢查是否有能夠遷移的鍵 */
    ov = zrealloc(ov,sizeof(robj*)*num_keys);
    kv = zrealloc(kv,sizeof(robj*)*num_keys);
    int oi = 0;

   /* 檢查全部的鍵,判斷輸入的鍵中,是否存在合法的鍵來進行遷移 */
    for (j = 0; j < num_keys; j++) {
        if ((ov[oi] = lookupKeyRead(c->db,c->argv[first_key+j])) != NULL) { // 去鍵空間字典中查找該鍵,若是該鍵沒有超時
            kv[oi] = c->argv[first_key+j]; // 將未超時的鍵存到kv數組中,說明當前key是能夠migrate的;不然若是超時就沒法進行migrate
            oi++;
        }
    }
    num_keys = oi; // 更新當前可migrate的key總量
    if (num_keys == 0) { // 若是沒有能夠遷移的key,那麼給客戶端返回「NOKEY"字符串
        zfree(ov); zfree(kv);
        addReplySds(c,sdsnew("+NOKEY\r\n"));
        return;
    }

剛開始執行migrate命令的時候,因爲migrate參數不少,須要對其逐個作校驗。尤爲是在啓用keys參數同時遷移多個keys的時候,須要進行參數的動態判斷。同時須要判斷是否有合法的鍵來進行遷移。只有沒有過時的鍵纔可以遷移,不然不進行遷移,最大化節省系統資源。dom

鏈接創建

假如咱們要從當前6379端口上的redis實例遷移到6380端口上的redis實例,咱們必然要創建一個socket鏈接:socket

try_again:
    write_error = 0;

    /* 鏈接創建 */
    cs = migrateGetSocket(c,c->argv[1],c->argv[2],timeout);
    if (cs == NULL) {
        zfree(ov); zfree(kv);
        return; 
    }

咱們看到,在主流程中調用了migrateGetSocket()函數建立了一個socket,這裏是一個帶緩存的socket。咱們暫時不跟進這個函數,後面我會以擴展的形式來跟進。函數

組裝數據

基於這個socket,咱們能夠將數據以TCP協議中規定的字節流形式傳輸到目標實例上。這就須要一個序列化的過程了。6379實例須要將keys序列化,6380須要將數據反序列化。這就須要藉助咱們以前講過的DUMP命令和RESTORE命令,分別來進行序列化和反序列化了。
redis並無當即進行DUMP將key序列化,而是首先組裝要在目標redis實例上所要執行的命令,好比AUTH/SELECT/RESTORE等命令。要想在目標實例上執行命令,那麼必須一樣基於以前創建的socket鏈接,以當前的redis實例做爲客戶端,往與目標redis實例創建的TCP鏈接中,寫入按照redis協議封裝的命令集合(如*2 \r\n SELECT \r\n $1 \r\n 1 \r\n)。redis使用了本身封裝的I/O抽象層rio,它實現了一個I/O緩衝區。經過讀取其緩衝區中的數據,就能夠往咱們在創建socket的時候生成的fd中寫入數據啦。首先redis會創建一個rio緩衝區,並按照redis數據傳輸協議所要求的格式,組裝要在目標實例上執行的redis命令:源碼分析

// 初始化一個rio緩衝區
    rioInitWithBuffer(&cmd,sdsempty());

    /* 組裝AUTH命令 */
    if (password) {
        serverAssertWithInfo(c,NULL,rioWriteBulkCount(&cmd,'*',2)); // 按照redis協議寫入一條命令開始的標識\*2。表示命令一共有2個參數
        serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"AUTH",4)); // 寫入$4\r\n  AUTH \r\n
        serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,password, sdslen(password))); // 同上,按照協議格式寫入密碼
    }

    /* 在目標實例上選擇數據庫 */
    int select = cs->last_dbid != dbid; /* 判斷是否已經選擇過數據庫,若是選擇過就不用再次執行SELECT命令 */
    if (select) { // 若是沒有選擇過,須要執行SELECT命令選擇數據庫
        serverAssertWithInfo(c,NULL,rioWriteBulkCount(&cmd,'*',2)); // 同上,寫入開始表示\*2
        serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"SELECT",6)); // 同上,寫入$6\r\n SELECT \r\n
        serverAssertWithInfo(c,NULL,rioWriteBulkLongLong(&cmd,dbid)); // 寫入$1\r\n 1 \r\n
    }

那麼接下來須要進行DUMP的序列化操做了。因爲序列化操做耗時較久,因此可能出現這種狀況:在以前第一次檢測是否超時的時候沒有超時,可是因爲此次序列化操做時間較久,執行期間,這個鍵超時了,那麼redis簡單粗暴地丟棄該超時鍵,直接放棄遷移這個鍵:.net

int non_expired = 0; // 暫存新的未過時的鍵的數量

    /* 若是在DUMP的過程當中過時了,直接continue. */
    for (j = 0; j < num_keys; j++) {
        long long ttl = 0;
        long long expireat = getExpire(c->db,kv[j]);

        if (expireat != -1) {
            ttl = expireat-mstime();
            if (ttl < 0) {
                continue;
            }
            if (ttl < 1) ttl = 1;
        }

        /* 通過上面的篩選以後,都是最新的、沒有過時的鍵,這些鍵能夠最終被遷移了. */
        kv[non_expired++] = kv[j];

而後,在目標實例上最終咱們須要執行RESTORE命令,將以前通過DUMP序列化的字節流反序列化,過程和上面同理:

serverAssertWithInfo(c,NULL,
            rioWriteBulkCount(&cmd,'*',replace ? 5 : 4)); // 同上,寫入開始表示\*5或4

        if (server.cluster_enabled) // 若是集羣模式開啓
            serverAssertWithInfo(c,NULL,
                rioWriteBulkString(&cmd,"RESTORE-ASKING",14));
        else
            serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"RESTORE",7)); // 同上,寫入$7 RESTORE \r\n
        serverAssertWithInfo(c,NULL,sdsEncodedObject(kv[j]));
        serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,kv[j]->ptr,
                sdslen(kv[j]->ptr))); // 將全部須要反序列化的key寫入
        serverAssertWithInfo(c,NULL,rioWriteBulkLongLong(&cmd,ttl)); // 寫入過時時間

接下來,咱們就須要最終執行DUMP命令,將咱們須要傳輸的全部鍵等數據序列化了,這裏redis調用了createDumpPayload()來建立一個DUMP載荷,這就是最終序列化好的數據:

createDumpPayload(&payload,ov[j],kv[j]); // 序列化數據
        serverAssertWithInfo(c,NULL,
            rioWriteBulkString(&cmd,payload.io.buffer.ptr,
                               sdslen(payload.io.buffer.ptr))); // 將序列化數據存到rio cmd中等待發送
        sdsfree(payload.io.buffer.ptr);

        if (replace)
            serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"REPLACE",7)); // replace選項開啓

發送數據

目前,咱們須要發送的、按照redis協議組裝好的全部序列化好的命令及數據都存放在了cmd這個rio結構體變量緩存中。咱們當前的6379redis實例彷彿就是一個客戶端,而要傳輸的目標實例6380就是一個服務端。接下來就須要讀取緩存而且往直前創建好的socket中寫入數據,將數據最終傳輸至目標實例:

errno = 0;
    {
        sds buf = cmd.io.buffer.ptr;
        size_t pos = 0, towrite;
        int nwritten = 0;

        while ((towrite = sdslen(buf)-pos) > 0) {
            towrite = (towrite > (64*1024) ? (64*1024) : towrite); //按照64K的塊大小來發送
            nwritten = syncWrite(cs->fd,buf+pos,towrite,timeout); // 往socket fd中寫入數據(數據來源於rio的緩存)
            if (nwritten != (signed)towrite) {
                write_error = 1;
                goto socket_err;
            }
            pos += nwritten;
        }
    }

處理返回

在目標redis上分別執行AUTH、SELECT、RESTORE命令,RESTORE命令會反序列化並將key寫入目標實例。那麼這幾個命令執行完畢以後,咱們如何知道它們是否執行成功呢?一樣的,目標redis 6380實例在執行完命令以後,也會有相應的返回值,咱們須要根據返回值來判斷命令是否執行成功、是否將key成功遷移完成:

char buf0[1024]; /* 存儲AUTH命令返回值. */
    char buf1[1024]; /* 存儲SELECT命令返回值 */
    char buf2[1024]; /* 存儲RESTORE命令返回值. */

    /* 從socket fd中讀取AUTH命令返回值. */
    if (password && syncReadLine(cs->fd, buf0, sizeof(buf0), timeout) <= 0)
        goto socket_err;

    /*  從socket fd中讀取SELECT命令返回值. */
    if (select && syncReadLine(cs->fd, buf1, sizeof(buf1), timeout) <= 0)
        goto socket_err;

    int error_from_target = 0;
    int socket_error = 0;
    int del_idx = 1; 

    /* 遷移完成以後須要將原有實例上的key刪除 */
    if (!copy) newargv = zmalloc(sizeof(robj*)*(num_keys+1));

    for (j = 0; j < num_keys; j++) {
        /*  從socket fd中讀取RESTORE命令返回值 */
        if (syncReadLine(cs->fd, buf2, sizeof(buf2), timeout) <= 0) {
            socket_error = 1;
            break;
        }
        if ((password && buf0[0] == '-') ||
            (select && buf1[0] == '-') ||
            buf2[0] == '-')
        {
            if (!error_from_target) {
               ...
        } else {
            if (!copy) { // 沒有開啓copy選項,須要刪除原有實例的鍵
               ...
               /* 刪除原有實例上的鍵 */
                dbDelete(c->db,kv[j]);
                ...
            }
        }
    }
    ...
    /* 若是發生socket錯誤,關閉鏈接 */
    if (socket_error) migrateCloseSocket(c->argv[1],c->argv[2]);
    ...
    sdsfree(cmd.io.buffer.ptr); // 釋放cmd的rio緩衝區
    zfree(ov); zfree(kv); zfree(newargv); // 釋放存儲key的robj結構體
    return;

綜上,migrate命令就執行完成了。咱們總結一下它的執行過程:

  • 命令參數校驗
  • 按照redis協議組裝目標實例上須要執行的命令
  • 將要傳輸的key序列化
  • 建立socket鏈接
  • 經過socket鏈接將命令及數據傳輸至目標實例
  • 目標實例執行命令並存儲相應的key
  • 處理目標實例的返回值
  • 若是失敗執行重試邏輯,若是成功則執行完畢

擴展

緩存socket的實現

在migrate命令執行過程當中,調用了migrateGetSocket()建立socket。redis藉助字典結構,實現了緩存socket,避免了屢次建立socket所帶來的開銷:

migrateCachedSocket* migrateGetSocket(client *c, robj *host, robj *port, long timeout) {
    int fd;
    sds name = sdsempty();
    migrateCachedSocket *cs;

    /* 查找字典中是否有相應 ip:port 的緩存socket. */
    name = sdscatlen(name,host->ptr,sdslen(host->ptr));
    name = sdscatlen(name,":",1);
    name = sdscatlen(name,port->ptr,sdslen(port->ptr));
    // 查找字典
    cs = dictFetchValue(server.migrate_cached_sockets,name);
    if (cs) { // 若是找到了,說明以前建立過ip:port的socket
        sdsfree(name);
        cs->last_use_time = server.unixtime;
        return cs; // 直接返回緩存socket
    }
 
    /* 若是在字典中沒有找到,說明沒有緩存,須要從新建立. */
    /* 判斷是否緩存的socket過多,最大爲64個 */
    if (dictSize(server.migrate_cached_sockets) == MIGRATE_SOCKET_CACHE_ITEMS) {
        /* 若是字典中緩存的socket過多,須要隨機刪除一些 */
        dictEntry *de = dictGetRandomKey(server.migrate_cached_sockets);
        cs = dictGetVal(de);
        close(cs->fd);
        zfree(cs);
        dictDelete(server.migrate_cached_sockets,dictGetKey(de));
    }

    /* 建立socket */
    fd = anetTcpNonBlockConnect(server.neterr,c->argv[1]->ptr,
                                atoi(c->argv[2]->ptr));
    if (fd == -1) {
        sdsfree(name);
        addReplyErrorFormat(c,"Can't connect to target node: %s",
            server.neterr);
        return NULL;
    }
    anetEnableTcpNoDelay(server.neterr,fd);

    /* 檢查是否在超時時間內建立完成 */
    if ((aeWait(fd,AE_WRITABLE,timeout) & AE_WRITABLE) == 0) {
        sdsfree(name);
        addReplySds(c,
            sdsnew("-IOERR error or timeout connecting to the client\r\n"));
        close(fd);
        return NULL;
    }

    /* 將新建立的socket加入緩存並返回給調用者 */
    cs = zmalloc(sizeof(*cs));
    cs->fd = fd;
    cs->last_dbid = -1;
    cs->last_use_time = server.unixtime;
    // 將新建立的socket加入字典,緩存起來等待下次使用
    dictAdd(server.migrate_cached_sockets,name,cs);
    return cs;
}
相關文章
相關標籤/搜索