baiyannode
命令含義:將 key 原子性地從當前實例傳送到目標實例的指定數據庫上,一旦傳送成功, key 保證會出如今目標實例上,而當前實例上的 key 會被刪除
命令格式:redis
MIGRATE host port key|"" destination-db timeout [COPY] [REPLACE] [KEYS key [key ...]]
命令實戰:將鍵key一、key二、key3批量遷移到本機6380端口的redis實例上,並存儲到目標實例的第0號數據庫,超時時間爲1000毫秒。可選項COPY若是表示不移除源實例上的 key ,REPLACE選項表示替換目標實例上已存在的 key 。KEYS選項表示能夠同時批量傳送多個keys(但前面的key參數的位置必須設置爲空)數據庫
127.0.0.1:6379> migrate 127.0.0.1 6380 "" 0 5000 KEYS key1 key2 key3 OK
返回值:遷移成功時返回 OK ,不然返回錯誤數組
migrate命令的執行過程可分爲參數校驗、鏈接創建、組裝數據、發送數據、處理返回五個階段。一樣的,migrate命令的處理函數爲migrateCommand():緩存
void migrateCommand(client *c) { migrateCachedSocket *cs; // 鏈接另外一個實例的socket int copy = 0, replace = 0, j; // 是否開啓copy及replace選項標記 char *password = NULL; // 密碼 long timeout; // 超時時間 long dbid; // 數據庫id robj **ov = NULL; /* 要遷移的對象 */ robj **kv = NULL; /* 鍵名 */ robj **newargv = NULL; rio cmd, payload; // 重要,存儲目標實例執行的命令及DUMP的payload int may_retry = 1; int write_error = 0; int argv_rewritten = 0; /* 支持同時傳輸多個key. */ int first_key = 3; /* 第一個鍵參數的位置. */ int num_keys = 1; /* 默認只傳送一個key. */ /* 校驗其餘選項,從COPY選項開始校驗 */ for (j = 6; j < c->argc; j++) { int moreargs = j < c->argc-1; if (!strcasecmp(c->argv[j]->ptr,"copy")) { // 若是命令參數等於copy,開啓copy選項 copy = 1; } else if (!strcasecmp(c->argv[j]->ptr,"replace")) { // 若是命令參數等於replace,開啓replace選項 replace = 1; } else if (!strcasecmp(c->argv[j]->ptr,"auth")) { // 若是命令參數等於auth,開啓auth選項 if (!moreargs) { // 參數數量超出規定數量,報錯 addReply(c,shared.syntaxerr); return; } j++; password = c->argv[j]->ptr; } else if (!strcasecmp(c->argv[j]->ptr,"keys")) { // 若是設置了keys參數,代表要同時傳輸多個keys值過去 if (sdslen(c->argv[3]->ptr) != 0) { // 若是開啓了keys選項,前面key參數的位置必須設置爲空 addReplyError(c, "When using MIGRATE KEYS option, the key argument" " must be set to the empty string"); return; } first_key = j+1; num_keys = c->argc - j - 1; break; /*如今first_key值指向keys的第一個值.,並將num_keys設置爲keys的數量 */ } else { addReply(c,shared.syntaxerr); return; } } /* 選擇的db和超時時間數據校驗,看是不是合法的數字格式 */ if (getLongFromObjectOrReply(c,c->argv[5],&timeout,NULL) != C_OK || getLongFromObjectOrReply(c,c->argv[4],&dbid,NULL) != C_OK) { return; } if (timeout <= 0) timeout = 1000; /* 接下來會檢查是否有能夠遷移的鍵 */ ov = zrealloc(ov,sizeof(robj*)*num_keys); kv = zrealloc(kv,sizeof(robj*)*num_keys); int oi = 0; /* 檢查全部的鍵,判斷輸入的鍵中,是否存在合法的鍵來進行遷移 */ for (j = 0; j < num_keys; j++) { if ((ov[oi] = lookupKeyRead(c->db,c->argv[first_key+j])) != NULL) { // 去鍵空間字典中查找該鍵,若是該鍵沒有超時 kv[oi] = c->argv[first_key+j]; // 將未超時的鍵存到kv數組中,說明當前key是能夠migrate的;不然若是超時就沒法進行migrate oi++; } } num_keys = oi; // 更新當前可migrate的key總量 if (num_keys == 0) { // 若是沒有能夠遷移的key,那麼給客戶端返回「NOKEY"字符串 zfree(ov); zfree(kv); addReplySds(c,sdsnew("+NOKEY\r\n")); return; }
剛開始執行migrate命令的時候,因爲migrate參數不少,須要對其逐個作校驗。尤爲是在啓用keys參數同時遷移多個keys的時候,須要進行參數的動態判斷。同時須要判斷是否有合法的鍵來進行遷移。只有沒有過時的鍵纔可以遷移,不然不進行遷移,最大化節省系統資源。dom
假如咱們要從當前6379端口上的redis實例遷移到6380端口上的redis實例,咱們必然要創建一個socket鏈接:socket
try_again: write_error = 0; /* 鏈接創建 */ cs = migrateGetSocket(c,c->argv[1],c->argv[2],timeout); if (cs == NULL) { zfree(ov); zfree(kv); return; }
咱們看到,在主流程中調用了migrateGetSocket()函數建立了一個socket,這裏是一個帶緩存的socket。咱們暫時不跟進這個函數,後面我會以擴展的形式來跟進。函數
基於這個socket,咱們能夠將數據以TCP協議中規定的字節流形式傳輸到目標實例上。這就須要一個序列化的過程了。6379實例須要將keys序列化,6380須要將數據反序列化。這就須要藉助咱們以前講過的DUMP命令和RESTORE命令,分別來進行序列化和反序列化了。
redis並無當即進行DUMP將key序列化,而是首先組裝要在目標redis實例上所要執行的命令,好比AUTH/SELECT/RESTORE等命令。要想在目標實例上執行命令,那麼必須一樣基於以前創建的socket鏈接,以當前的redis實例做爲客戶端,往與目標redis實例創建的TCP鏈接中,寫入按照redis協議封裝的命令集合(如*2 \r\n SELECT \r\n $1 \r\n 1 \r\n)。redis使用了本身封裝的I/O抽象層rio,它實現了一個I/O緩衝區。經過讀取其緩衝區中的數據,就能夠往咱們在創建socket的時候生成的fd中寫入數據啦。首先redis會創建一個rio緩衝區,並按照redis數據傳輸協議所要求的格式,組裝要在目標實例上執行的redis命令:源碼分析
// 初始化一個rio緩衝區 rioInitWithBuffer(&cmd,sdsempty()); /* 組裝AUTH命令 */ if (password) { serverAssertWithInfo(c,NULL,rioWriteBulkCount(&cmd,'*',2)); // 按照redis協議寫入一條命令開始的標識\*2。表示命令一共有2個參數 serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"AUTH",4)); // 寫入$4\r\n AUTH \r\n serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,password, sdslen(password))); // 同上,按照協議格式寫入密碼 } /* 在目標實例上選擇數據庫 */ int select = cs->last_dbid != dbid; /* 判斷是否已經選擇過數據庫,若是選擇過就不用再次執行SELECT命令 */ if (select) { // 若是沒有選擇過,須要執行SELECT命令選擇數據庫 serverAssertWithInfo(c,NULL,rioWriteBulkCount(&cmd,'*',2)); // 同上,寫入開始表示\*2 serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"SELECT",6)); // 同上,寫入$6\r\n SELECT \r\n serverAssertWithInfo(c,NULL,rioWriteBulkLongLong(&cmd,dbid)); // 寫入$1\r\n 1 \r\n }
那麼接下來須要進行DUMP的序列化操做了。因爲序列化操做耗時較久,因此可能出現這種狀況:在以前第一次檢測是否超時的時候沒有超時,可是因爲此次序列化操做時間較久,執行期間,這個鍵超時了,那麼redis簡單粗暴地丟棄該超時鍵,直接放棄遷移這個鍵:.net
int non_expired = 0; // 暫存新的未過時的鍵的數量 /* 若是在DUMP的過程當中過時了,直接continue. */ for (j = 0; j < num_keys; j++) { long long ttl = 0; long long expireat = getExpire(c->db,kv[j]); if (expireat != -1) { ttl = expireat-mstime(); if (ttl < 0) { continue; } if (ttl < 1) ttl = 1; } /* 通過上面的篩選以後,都是最新的、沒有過時的鍵,這些鍵能夠最終被遷移了. */ kv[non_expired++] = kv[j];
而後,在目標實例上最終咱們須要執行RESTORE命令,將以前通過DUMP序列化的字節流反序列化,過程和上面同理:
serverAssertWithInfo(c,NULL, rioWriteBulkCount(&cmd,'*',replace ? 5 : 4)); // 同上,寫入開始表示\*5或4 if (server.cluster_enabled) // 若是集羣模式開啓 serverAssertWithInfo(c,NULL, rioWriteBulkString(&cmd,"RESTORE-ASKING",14)); else serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"RESTORE",7)); // 同上,寫入$7 RESTORE \r\n serverAssertWithInfo(c,NULL,sdsEncodedObject(kv[j])); serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,kv[j]->ptr, sdslen(kv[j]->ptr))); // 將全部須要反序列化的key寫入 serverAssertWithInfo(c,NULL,rioWriteBulkLongLong(&cmd,ttl)); // 寫入過時時間
接下來,咱們就須要最終執行DUMP命令,將咱們須要傳輸的全部鍵等數據序列化了,這裏redis調用了createDumpPayload()來建立一個DUMP載荷,這就是最終序列化好的數據:
createDumpPayload(&payload,ov[j],kv[j]); // 序列化數據 serverAssertWithInfo(c,NULL, rioWriteBulkString(&cmd,payload.io.buffer.ptr, sdslen(payload.io.buffer.ptr))); // 將序列化數據存到rio cmd中等待發送 sdsfree(payload.io.buffer.ptr); if (replace) serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"REPLACE",7)); // replace選項開啓
目前,咱們須要發送的、按照redis協議組裝好的全部序列化好的命令及數據都存放在了cmd這個rio結構體變量緩存中。咱們當前的6379redis實例彷彿就是一個客戶端,而要傳輸的目標實例6380就是一個服務端。接下來就須要讀取緩存而且往直前創建好的socket中寫入數據,將數據最終傳輸至目標實例:
errno = 0; { sds buf = cmd.io.buffer.ptr; size_t pos = 0, towrite; int nwritten = 0; while ((towrite = sdslen(buf)-pos) > 0) { towrite = (towrite > (64*1024) ? (64*1024) : towrite); //按照64K的塊大小來發送 nwritten = syncWrite(cs->fd,buf+pos,towrite,timeout); // 往socket fd中寫入數據(數據來源於rio的緩存) if (nwritten != (signed)towrite) { write_error = 1; goto socket_err; } pos += nwritten; } }
在目標redis上分別執行AUTH、SELECT、RESTORE命令,RESTORE命令會反序列化並將key寫入目標實例。那麼這幾個命令執行完畢以後,咱們如何知道它們是否執行成功呢?一樣的,目標redis 6380實例在執行完命令以後,也會有相應的返回值,咱們須要根據返回值來判斷命令是否執行成功、是否將key成功遷移完成:
char buf0[1024]; /* 存儲AUTH命令返回值. */ char buf1[1024]; /* 存儲SELECT命令返回值 */ char buf2[1024]; /* 存儲RESTORE命令返回值. */ /* 從socket fd中讀取AUTH命令返回值. */ if (password && syncReadLine(cs->fd, buf0, sizeof(buf0), timeout) <= 0) goto socket_err; /* 從socket fd中讀取SELECT命令返回值. */ if (select && syncReadLine(cs->fd, buf1, sizeof(buf1), timeout) <= 0) goto socket_err; int error_from_target = 0; int socket_error = 0; int del_idx = 1; /* 遷移完成以後須要將原有實例上的key刪除 */ if (!copy) newargv = zmalloc(sizeof(robj*)*(num_keys+1)); for (j = 0; j < num_keys; j++) { /* 從socket fd中讀取RESTORE命令返回值 */ if (syncReadLine(cs->fd, buf2, sizeof(buf2), timeout) <= 0) { socket_error = 1; break; } if ((password && buf0[0] == '-') || (select && buf1[0] == '-') || buf2[0] == '-') { if (!error_from_target) { ... } else { if (!copy) { // 沒有開啓copy選項,須要刪除原有實例的鍵 ... /* 刪除原有實例上的鍵 */ dbDelete(c->db,kv[j]); ... } } } ... /* 若是發生socket錯誤,關閉鏈接 */ if (socket_error) migrateCloseSocket(c->argv[1],c->argv[2]); ... sdsfree(cmd.io.buffer.ptr); // 釋放cmd的rio緩衝區 zfree(ov); zfree(kv); zfree(newargv); // 釋放存儲key的robj結構體 return;
綜上,migrate命令就執行完成了。咱們總結一下它的執行過程:
- 命令參數校驗
- 按照redis協議組裝目標實例上須要執行的命令
- 將要傳輸的key序列化
- 建立socket鏈接
- 經過socket鏈接將命令及數據傳輸至目標實例
- 目標實例執行命令並存儲相應的key
- 處理目標實例的返回值
- 若是失敗執行重試邏輯,若是成功則執行完畢
在migrate命令執行過程當中,調用了migrateGetSocket()建立socket。redis藉助字典結構,實現了緩存socket,避免了屢次建立socket所帶來的開銷:
migrateCachedSocket* migrateGetSocket(client *c, robj *host, robj *port, long timeout) { int fd; sds name = sdsempty(); migrateCachedSocket *cs; /* 查找字典中是否有相應 ip:port 的緩存socket. */ name = sdscatlen(name,host->ptr,sdslen(host->ptr)); name = sdscatlen(name,":",1); name = sdscatlen(name,port->ptr,sdslen(port->ptr)); // 查找字典 cs = dictFetchValue(server.migrate_cached_sockets,name); if (cs) { // 若是找到了,說明以前建立過ip:port的socket sdsfree(name); cs->last_use_time = server.unixtime; return cs; // 直接返回緩存socket } /* 若是在字典中沒有找到,說明沒有緩存,須要從新建立. */ /* 判斷是否緩存的socket過多,最大爲64個 */ if (dictSize(server.migrate_cached_sockets) == MIGRATE_SOCKET_CACHE_ITEMS) { /* 若是字典中緩存的socket過多,須要隨機刪除一些 */ dictEntry *de = dictGetRandomKey(server.migrate_cached_sockets); cs = dictGetVal(de); close(cs->fd); zfree(cs); dictDelete(server.migrate_cached_sockets,dictGetKey(de)); } /* 建立socket */ fd = anetTcpNonBlockConnect(server.neterr,c->argv[1]->ptr, atoi(c->argv[2]->ptr)); if (fd == -1) { sdsfree(name); addReplyErrorFormat(c,"Can't connect to target node: %s", server.neterr); return NULL; } anetEnableTcpNoDelay(server.neterr,fd); /* 檢查是否在超時時間內建立完成 */ if ((aeWait(fd,AE_WRITABLE,timeout) & AE_WRITABLE) == 0) { sdsfree(name); addReplySds(c, sdsnew("-IOERR error or timeout connecting to the client\r\n")); close(fd); return NULL; } /* 將新建立的socket加入緩存並返回給調用者 */ cs = zmalloc(sizeof(*cs)); cs->fd = fd; cs->last_dbid = -1; cs->last_use_time = server.unixtime; // 將新建立的socket加入字典,緩存起來等待下次使用 dictAdd(server.migrate_cached_sockets,name,cs); return cs; }