redis 代碼中主從複製流程的核心部分在於狀態機的流轉。node
單機模式下以 SLAVEOF 命令觸發;
cluster 模式下以 REPLICATE 命令觸發,且 cluster 模式下不支持 SLAVEOF 命令。redis
在該過程當中,master 與 slave 各有不一樣的流轉邏輯,交互頻繁,本文如下內容試圖介紹 slave 的處理邏輯,如下流程圖能夠輔助理解。
數據庫
代碼中在 redisServer
結構體裏定義的不少 repl 前綴的變量都用於此過程,如repl_transfer_fd
。
各變量的做用在源碼註釋裏已經寫得很是詳細了,不作贅述。緩存
redis 實例以單機模式啓動,即在 <span id="inline-blue"> redis.conf </span> 中配置 cluster-enabled no。安全
有如下三種方式可觸發主從複製流程。
① redis.conf 中配置 slaveof <masterip> <masterport>
;
② redis-server 命令啓動服務時指定參數 --slaveof [masterip] [masterport]
;
③ 對一個實例執行 slaveof [masterip] [masterport]
命令。網絡
①② 邏輯類似,直接標記 server.repl_state
爲 REPL_STATE_CONNECT 狀態,以 ① 爲例簡要說明,加載配置文件時有以下邏輯,less
void loadServerConfigFromString(char *config) { .... else if (!strcasecmp(argv[0],"slaveof") && argc == 3) { slaveof_linenum = linenum; server.masterhost = sdsnew(argv[1]); server.masterport = atoi(argv[2]); server.repl_state = REPL_STATE_CONNECT; } .... }
而 ③ 在標記 REPL_STATE_CONNECT
狀態前須要作一些檢查。
首先,檢查實例是否開啓了 cluster 模式,若是開啓了,那麼直接返回,不支持這個命令。
接着,經過檢查 slaveof
命令後面的參數來判斷使用的是哪一個命令,代碼以下,socket
void slaveofCommand(client *c) { /* cluster 模式開啓後,禁用 slaveof 命令 */ if (server.cluster_enabled) { addReplyError(c,"SLAVEOF not allowed in cluster mode."); return; } // SLAVEOF NO ONE if (!strcasecmp(c->argv[1]->ptr,"no") && !strcasecmp(c->argv[2]->ptr,"one")) { if (server.masterhost) { // 若是以前有 master replicationUnsetMaster(); sds client = catClientInfoString(sdsempty(),c); serverLog(LL_NOTICE,"MASTER MODE enabled (user request from '%s')", client); sdsfree(client); } } else { long port; // 從參數中得到 port if ((getLongFromObjectOrReply(c, c->argv[2], &port, NULL) != C_OK)) return; // 若是如今的 master 已是要設置的,那麼就沒必要再作操做了,直接返回吧 if (server.masterhost && !strcasecmp(server.masterhost,c->argv[1]->ptr) && server.masterport == port) { serverLog(LL_NOTICE,"SLAVE OF would result into synchronization with the master we are already connected with. No operation performed."); addReplySds(c,sdsnew("+OK Already connected to specified master\r\n")); return; } replicationSetMaster(c->argv[1]->ptr, port); sds client = catClientInfoString(sdsempty(),c); serverLog(LL_NOTICE,"SLAVE OF %s:%d enabled (user request from '%s')", server.masterhost, server.masterport, client); sdsfree(client); } addReply(c,shared.ok); }
該命令會取消現有的主從關係,使 slave 變爲 master,主要函數以下,函數
void replicationUnsetMaster(void) { // 已是 master 了,無需繼續操做 if (server.masterhost == NULL) return; sdsfree(server.masterhost); server.masterhost = NULL; if (server.master) { if (listLength(server.slaves) == 0) { // 繼承 master 的 repl offset server.master_repl_offset = server.master->reploff; freeReplicationBacklog(); } freeClient(server.master); } replicationDiscardCachedMaster(); cancelReplicationHandshake(); server.repl_state = REPL_STATE_NONE; }
這裏主要涉及到一些與 master 相關的變量的內存釋放。
若是該實例有 master,且不是其餘實例的 master,即 listLength(server.slaves) == 0
,也就是說未造成鏈式結構,那麼記錄下原 master 的 replication offset。在某些特定條件下,副本的數據新鮮度能夠經過 replication offset 來比較,有時因爲網絡等緣由暫時斷開了,隔了一段時間又從新連上原 master,有了這個偏移量能夠減小作徹底重同步的可能性(我是這麼理解的)。
freeClient 函數會釋放掉原來的 master,作一些內存釋放,一些標誌位重置等。oop
接下來的 replicationDiscardCachedMaster
函數中會釋放掉 server.cached_master
,由於這裏緩存之前的 mater 已經沒用了,不知道下次要連的是哪一個 master,或者本身之後成爲一個 master,避免沒必要要的內存浪費。
cancelReplicationHandshake
函數則會取消一個正在進行嘗試 handshake 的主從複製過程。
最後重置狀態機爲 REPL_STATE_NONE。
經過執行該命令,能夠將當前實例變成某個實例的 slave。
若是指定的主從關係已經存在,那本次命令不必繼續執行了,直接返回;不然,經過 replicationSetMaster
函數設置新的主從關係,代碼以下,
void replicationSetMaster(char *ip, int port) { sdsfree(server.masterhost); server.masterhost = sdsnew(ip); server.masterport = port; // 若是原來有 master了,須要釋放掉 if (server.master) freeClient(server.master); disconnectAllBlockedClients(); // 釋放掉全部的 slave,讓它們從新連 disconnectSlaves(); replicationDiscardCachedMaster(); freeReplicationBacklog(); cancelReplicationHandshake(); server.repl_state = REPL_STATE_CONNECT; server.master_repl_offset = 0; server.repl_down_since = 0; }
在以上函數中,
先保存下要鏈接的 ip 和 port,方便後面進行創建網絡鏈接。
若是,該節點以前有 master 了,那麼須要釋放掉原來的 master,跟上面一節的邏輯相似,前面詳細說過了。
disconnectAllBlockedClients 函數會 unlock 已經 lock 在這個實例上的 client,並返回 -UNBLOCKED 開頭的錯誤。這是由於該實例已經改變了角色,block 已經沒什麼意義。好比當一個實例從 master 變爲 slave,那麼因爲 list 選項而阻塞在該實例上的 client 就不安全了,由於數據隨着重新的 slave 同步數據,該實例的數據集可能會發生變化。
disconnectSlaves 函數釋放掉全部的 slave,從新同步新的數據。
釋放掉 server.cached_master
,一樣由於數據集變化了,cache 的數據並不能用了。
釋放掉 server.repl_backlog
,理由同上。cancelReplicationHandshake
函數在上面講過了。
將 server.repl_state
置爲 REPL_STATE_CONNECT 狀態,複製偏離量歸零等。
最後返回 OK,也就是這個命令的返回值+OK\r\n
。
redis 中有不少 cron 任務,其中就有一個負責 replication 的,即每秒執行一次的 replicationCron 函數。
run_with_period(1000) replicationCron();
在上一步中,狀態機已經流轉到 REPL_STATE_CONNECT 狀態,這裏直接就進入到主從建連的邏輯。
void replicationCron(void) { ...... // 開始一段新的主從關係 if (server.repl_state == REPL_STATE_CONNECT) { serverLog(LL_NOTICE,"Connecting to MASTER %s:%d", server.masterhost, server.masterport); if (connectWithMaster() == C_OK) { serverLog(LL_NOTICE,"MASTER <-> SLAVE sync started"); } } ...... }
使用server.masterhost
和 server.masterport
向 master 發起 connect 請求, fd 設置爲非阻塞。成功後,爲 fd 的讀寫事件註冊 syncWithMaster
回調函數,用於處理 master 與 slave 之間的 handshake 過程。這部分邏輯在 connectWithMaster 函數中實現,代碼以下,
int connectWithMaster(void) { int fd; // 鏈接 master,得到 fd fd = anetTcpNonBlockBestEffortBindConnect(NULL, server.masterhost,server.masterport,NET_FIRST_BIND_ADDR); if (fd == -1) { serverLog(LL_WARNING,"Unable to connect to MASTER: %s", strerror(errno)); return C_ERR; } // 爲 fd 設置讀寫事件回調 syncWithMaster if (aeCreateFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE,syncWithMaster,NULL) == AE_ERR) { close(fd); serverLog(LL_WARNING,"Can't create readable event for SYNC"); return C_ERR; } server.repl_transfer_lastio = server.unixtime; server.repl_transfer_s = fd; // 狀態機更新 server.repl_state = REPL_STATE_CONNECTING; return C_OK; }
server.repl_transfer_lastio
用於記錄上一次 fd 讀事件的時刻,server.repl_transfer_s
記錄主從複製使用到的 socket fd。
更新狀態機爲 REPL_STATE_CONNECTING。
主從建連成功後,經過 fd 的讀寫事件觸發 syncWithMaster
回調函數。
若是該事件在用戶把本實例用 SLAVEOF NO ONE 變成 master 後出觸發,那麼沒有執行下去的必要,判斷邏輯以下,
if (server.repl_state == REPL_STATE_NONE) { close(fd); return; }
下面是同步發送 PING 的代碼邏輯,更新狀態機爲 REPL_STATE_RECEIVE_PONG。
if (server.repl_state == REPL_STATE_CONNECTING) { serverLog(LL_NOTICE,"Non blocking connect for SYNC fired the event."); /* 爲了等待 pong 的返回,刪除 fd 上的可寫事件,但保留可讀事件 */ aeDeleteFileEvent(server.el,fd,AE_WRITABLE); server.repl_state = REPL_STATE_RECEIVE_PONG; /* 同步發送 ping,這裏不檢查是否 err,由於已經有超時限制作保證 */ err = sendSynchronousCommand(SYNC_CMD_WRITE,fd,"PING",NULL); if (err) goto write_error; return; }
sendSynchronousCommand
函數經過 flag 標識讀寫命令,此處寫命令標識爲 SYNC_CMD_WRITE。
使用 sendSynchronousCommand 函數同步讀取 master 對 PING 的回覆。
if (server.repl_state == REPL_STATE_RECEIVE_PONG) { /* 讀取上面發送的 ping 命令的 response */ err = sendSynchronousCommand(SYNC_CMD_READ,fd,NULL); if (err[0] != '+' && strncmp(err,"-NOAUTH",7) != 0 && strncmp(err,"-ERR operation not permitted",28) != 0) { serverLog(LL_WARNING,"Error reply to PING from master: '%s'",err); sdsfree(err); goto error; } else { serverLog(LL_NOTICE, "Master replied to PING, replication can continue..."); } sdsfree(err); server.repl_state = REPL_STATE_SEND_AUTH; }
回覆只可能有 3 種狀況:+PONG,-NOAUTH 和 -ERR operation not permitted(老版本的 redis 主節點)。若是不是,直接進入錯誤處理代碼流程。
注意:這裏的讀操做會更新變量 server.repl_transfer_lastio
。
調整狀態機爲 REDIS_REPL_SEND_AUTH。這裏沒有 return
,直接往下執行,進入鑑權的邏輯,
if (server.repl_state == REPL_STATE_SEND_AUTH) { if (server.masterauth) { err = sendSynchronousCommand(SYNC_CMD_WRITE,fd,"AUTH",server.masterauth,NULL); if (err) goto write_error; server.repl_state = REPL_STATE_RECEIVE_AUTH; return; } else { server.repl_state = REPL_STATE_SEND_PORT; } }
若是配置文件中沒有設置 masterauth 選項,那麼狀態機置爲 REPL_STATE_SEND_PORT。
不然,須要發送 AUTH
命令鑑權。狀態機置爲 REPL_STATE_RECEIVE_AUTH。
/* Receive AUTH reply. */ if (server.repl_state == REPL_STATE_RECEIVE_AUTH) { err = sendSynchronousCommand(SYNC_CMD_READ,fd,NULL); if (err[0] == '-') { serverLog(LL_WARNING,"Unable to AUTH to MASTER: %s",err); sdsfree(err); goto error; } sdsfree(err); server.repl_state = REPL_STATE_SEND_PORT; }
驗證 auth 經過後,狀態機置爲 REPL_STATE_SEND_PORT,不然,直接跳到的 err 處理流程。
slave 將發送一連串的 REPLCONF
命令,以告知 master 本身的一些信息。slave-announce-ip
和 slave-announce-port
主要是針對轉發或者 NAT 場景下,master 沒法經過 socket 鏈接得到對端信息時使用。
首先發送本身的 port 信息,REPLCONF listening-port <port>
,狀態機置爲 REPL_STATE_RECEIVE_PORT,返回,等下一次事件觸發。
接着,同步讀取 master 的回覆,即便返回錯誤也沒有關係,狀態機置爲 REPL_STATE_SEND_IP。
代碼以下,
/* Set the slave port, so that Master's INFO command can list the * slave listening port correctly. */ if (server.repl_state == REPL_STATE_SEND_PORT) { sds port = sdsfromlonglong(server.slave_announce_port ? server.slave_announce_port : server.port); err = sendSynchronousCommand(SYNC_CMD_WRITE,fd,"REPLCONF", "listening-port",port, NULL); sdsfree(port); if (err) goto write_error; sdsfree(err); server.repl_state = REPL_STATE_RECEIVE_PORT; return; } if (server.repl_state == REPL_STATE_RECEIVE_PORT) { err = sendSynchronousCommand(SYNC_CMD_READ,fd,NULL); if (err[0] == '-') { serverLog(LL_NOTICE,"(Non critical) Master does not understand " "REPLCONF listening-port: %s", err); } sdsfree(err); server.repl_state = REPL_STATE_SEND_IP;
若是沒有配置 slave-announce-ip
時,直接將狀態機調跳轉到 REPL_STATE_SEND_CAPA,跳過發送 REPLCONF ip-address 的步驟。
if (server.repl_state == REPL_STATE_SEND_IP && server.slave_announce_ip == NULL) { server.repl_state = REPL_STATE_SEND_CAPA; }
發送 REPLCONF ip-address,接收回復,將狀態機置爲 REPL_STATE_SEND_CAPA。
/* REPLCONF ip-address <ip> */ if (server.repl_state == REPL_STATE_SEND_IP) { err = sendSynchronousCommand(SYNC_CMD_WRITE,fd,"REPLCONF", "ip-address",server.slave_announce_ip, NULL); if (err) goto write_error; sdsfree(err); server.repl_state = REPL_STATE_RECEIVE_IP; return; } /* Receive REPLCONF ip-address reply. */ if (server.repl_state == REPL_STATE_RECEIVE_IP) { err = sendSynchronousCommand(SYNC_CMD_READ,fd,NULL); if (err[0] == '-') { serverLog(LL_NOTICE,"(Non critical) Master does not understand " "REPLCONF ip-address: %s", err); } sdsfree(err); server.repl_state = REPL_STATE_SEND_CAPA; }
狀態機置爲 REPL_STATE_SEND_CAPA,告知 master 本身的能力,如今只有 eof,表示支持無磁盤化主從複製,之後可能會有更多,格式爲 REPLCONF capa X capa Y capa Z ...
。
if (server.repl_state == REPL_STATE_SEND_CAPA) { err = sendSynchronousCommand(SYNC_CMD_WRITE,fd,"REPLCONF", "capa","eof",NULL); if (err) goto write_error; sdsfree(err); server.repl_state = REPL_STATE_RECEIVE_CAPA; return; } if (server.repl_state == REPL_STATE_RECEIVE_CAPA) { err = sendSynchronousCommand(SYNC_CMD_READ,fd,NULL); /* Ignore the error if any, not all the Redis versions support * REPLCONF capa. */ if (err[0] == '-') { serverLog(LL_NOTICE,"(Non critical) Master does not understand " "REPLCONF capa: %s", err); } sdsfree(err); server.repl_state = REPL_STATE_SEND_PSYNC; }
狀態機置爲 REPL_STATE_SEND_PSYNC。
爲解決舊版本 redis 在處理斷線狀況下徹底複製的低效問題, 從 2.8 版本開始,使用 PSYNC 命令代替 SYNC 命令來執行復制時的同步操做,這個點在前面的博客講過了。
爲高效起見,首先嚐試作部分重同步,試探邏輯在函數 slaveTryPartialResynchronization 中。
if (server.repl_state == REPL_STATE_SEND_PSYNC) { if (slaveTryPartialResynchronization(fd,0) == PSYNC_WRITE_ERROR) { err = sdsnew("Write error sending the PSYNC command."); goto write_error; } server.repl_state = REPL_STATE_RECEIVE_PSYNC; return; }
slaveTryPartialResynchronization 裏包含了讀寫兩部分,其中寫的部分在上半部,當第二個參數爲 0 時,發送 PSYNC 命令,命令格式爲 PSYNC <runid> <offset>
。
發送 PSYNC 時,分兩種狀況,首次鏈接或非首次鏈接。首次鏈接時,runid 未知,用 ?
代替,offset 置爲初始值 -1。
代碼邏輯以下,
if (!read_reply) { server.repl_master_initial_offset = -1; if (server.cached_master) { // 重連 psync_runid = server.cached_master->replrunid; snprintf(psync_offset,sizeof(psync_offset),"%lld", server.cached_master->reploff+1); serverLog(LL_NOTICE,"Trying a partial resynchronization (request %s:%s).", psync_runid, psync_offset); } else { // 首次鏈接 master serverLog(LL_NOTICE,"Partial resynchronization not possible (no cached master)"); psync_runid = "?"; memcpy(psync_offset,"-1",3); /* psync ? -1 */ } reply = sendSynchronousCommand(SYNC_CMD_WRITE,fd,"PSYNC",psync_runid,psync_offset,NULL); if (reply != NULL) { serverLog(LL_WARNING,"Unable to send PSYNC to master: %s",reply); sdsfree(reply); /* 發送出錯了,須要刪掉 fd 上的可讀事件 */ aeDeleteFileEvent(server.el,fd,AE_READABLE); return PSYNC_WRITE_ERROR; } return PSYNC_WAIT_REPLY; }
發送出錯後,須要刪掉 fd 上的可讀事件。
回到 syncWithMaster 函數中,狀態機置爲 REPL_STATE_RECEIVE_PSYNC。
代碼邏輯走到這一步,若是狀態機的狀態不是 REPL_STATE_RECEIVE_PSYNC,必定是哪裏出錯了,進入錯誤處理流程,即,
/* If reached this point, we should be in REPL_STATE_RECEIVE_PSYNC. */ if (server.repl_state != REPL_STATE_RECEIVE_PSYNC) { serverLog(LL_WARNING,"syncWithMaster(): state machine error, " "state should be RECEIVE_PSYNC but is %d", server.repl_state); goto error; }
接着去讀取 master 的給的回覆信息,
psync_result = slaveTryPartialResynchronization(fd,1);
可能會讀到三種,+FULLRESYNC
、+CONTINUE
以及-ERR
。
reply = sendSynchronousCommand(SYNC_CMD_READ,fd,NULL); if (sdslen(reply) == 0) { // 爲了保活,master 可能在收到 PSYNC 後且回覆前發送空行 sdsfree(reply); return PSYNC_WAIT_REPLY; } /* 刪除 fd 可讀事件,方便後面爲 f'd從新註冊新的回調 */ aeDeleteFileEvent(server.el,fd,AE_READABLE);
【1】+FULLRESYNC
回覆表示不能進行部分重同步,slave 告訴給 master 的 offset 不在 master 的複製積壓緩衝區範圍內,只能進行徹底重同步,返回給上層函數 PSYNC_FULLRESYNC,代碼以下,
/* 徹底重同步獲得 response 爲 +FULLRESYNC <master_runid> <offset> */ if (!strncmp(reply,"+FULLRESYNC",11)) { char *runid = NULL, *offset = NULL; runid = strchr(reply,' '); if (runid) { runid++; offset = strchr(runid,' '); if (offset) offset++; } /* 有多是 master 發送格式有問題,先把 repl_master_runid 置空 */ if (!runid || !offset || (offset-runid-1) != CONFIG_RUN_ID_SIZE) { serverLog(LL_WARNING, "Master replied with wrong +FULLRESYNC syntax."); memset(server.repl_master_runid,0,CONFIG_RUN_ID_SIZE+1); } else { memcpy(server.repl_master_runid, runid, offset-runid-1); server.repl_master_runid[CONFIG_RUN_ID_SIZE] = '\0'; server.repl_master_initial_offset = strtoll(offset,NULL,10); serverLog(LL_NOTICE,"Full resync from master: %s:%lld", server.repl_master_runid, server.repl_master_initial_offset); } /* We are going to full resync, discard the cached master structure. */ replicationDiscardCachedMaster(); sdsfree(reply); return PSYNC_FULLRESYNC; }
以上代碼解析出 master 的 runid,以及 offset,分別賦值給 repl_master_runid
和 repl_master_initial_offset
。由於要進行全同步,cached_master
保存的信息就失效了,須要重置,即函數 replicationDiscardCachedMaster
的調用。
【2】+CONTINUE
表示能夠進行部分重同步,返回給上層函數 PSYNC_CONTINUE。
if (!strncmp(reply,"+CONTINUE",9)) { serverLog(LL_NOTICE, "Successful partial resynchronization with master."); sdsfree(reply); replicationResurrectCachedMaster(fd); return PSYNC_CONTINUE; }
void replicationResurrectCachedMaster(int newfd) { server.master = server.cached_master; server.cached_master = NULL; server.master->fd = newfd; server.master->flags &= ~(CLIENT_CLOSE_AFTER_REPLY|CLIENT_CLOSE_ASAP); server.master->authenticated = 1; server.master->lastinteraction = server.unixtime; server.repl_state = REPL_STATE_CONNECTED; /* Re-add to the list of clients. */ listAddNodeTail(server.clients,server.master); if (aeCreateFileEvent(server.el, newfd, AE_READABLE, readQueryFromClient, server.master)) { serverLog(LL_WARNING,"Error resurrecting the cached master, impossible to add the readable handler: %s", strerror(errno)); freeClientAsync(server.master); /* Close ASAP. */ } if (clientHasPendingReplies(server.master)) { if (aeCreateFileEvent(server.el, newfd, AE_WRITABLE, sendReplyToClient, server.master)) { serverLog(LL_WARNING,"Error resurrecting the cached master, impossible to add the writable handler: %s", strerror(errno)); freeClientAsync(server.master); /* Close ASAP. */ } } }
能夠看出,從 cached_master
恢復 master,將狀態機置爲 REPL_STATE_CONNECTED。
爲 fd 的讀事件註冊新的回調函數 readQueryFromClient。
若是在 server.master
上仍然有 reply,或者是在寫 buffer 裏有數據,那麼須要爲寫事件註冊回調函數 sendReplyToClient
。
【3】-ERR
的狀況。須要清理現有的 cached_master
,返回給上層函數 PSYNC_NOT_SUPPORTED 。
回到 syncWithMaster 函數裏,處理 PSYNC 命令的返回值。
當返回的是 PSYNC_CONTINUE 時,表示進行的是部分重同步,該函數結束。
if (psync_result == PSYNC_CONTINUE) { /* 部分重同步 ,不會走到下面接收 rdb 的流程 */ serverLog(LL_NOTICE, "MASTER <-> SLAVE sync: Master accepted a Partial Resynchronization."); return; }
不然,有兩種可能,進行徹底重同步,或者 master 不支持 PSYNC 命令(老版本的 master),可是不管如何都須要斷開現有的全部 slave,由於新 master 可能會傳過來一份不一樣的數據。
同時清空複製積壓緩衝區,即 repl_backlog,不容許個人 slave 作 psync 了(畢竟數據不一樣了嘛)。
若是新 master 不支持 PSYNC 命令,那麼同步發送 SYNC 命令。
if (psync_result == PSYNC_NOT_SUPPORTED) { serverLog(LL_NOTICE,"Retrying with SYNC..."); if (syncWrite(fd,"SYNC\r\n",6,server.repl_syncio_timeout*1000) == -1) { serverLog(LL_WARNING,"I/O error writing to MASTER: %s", strerror(errno)); goto error; } }
若是沒有出錯,接下來準備徹底重同步階段 master 發過來的 rdb 數據。建立一個名字以 tmp 爲前綴的臨時 rdb 接收文件,打開,並記錄 fd,最多 5 次,要是還不能成功建立一個臨時文件,那麼就走錯誤處理的流程了。代碼以下,
while(maxtries--) { snprintf(tmpfile,256, "temp-%d.%ld.rdb",(int)server.unixtime,(long int)getpid()); dfd = open(tmpfile,O_CREAT|O_WRONLY|O_EXCL,0644); if (dfd != -1) break; sleep(1); } if (dfd == -1) { serverLog(LL_WARNING,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno)); goto error; }
接着爲 fd 的讀事件註冊回調函數 readSyncBulkPayload
,用來處理從 master 讀到的數據 rdb 文件。
/* 爲 fd 的可讀事件註冊新的函數 readSyncBulkPayload */ if (aeCreateFileEvent(server.el,fd, AE_READABLE,readSyncBulkPayload,NULL) == AE_ERR) { serverLog(LL_WARNING, "Can't create readable event for SYNC: %s (fd=%d)", strerror(errno),fd); goto error; }
最後是一些 server 變量的賦值。
server.repl_state = REPL_STATE_TRANSFER; /* 初始化 RDB 文件大小 */ server.repl_transfer_size = -1; /* 已讀數據大小 */ server.repl_transfer_read = 0; /* 最近一次執行的 fsync 偏移量 */ server.repl_transfer_last_fsync_off = 0; /* 本地臨時 rdb 文件的 fd */ server.repl_transfer_fd = dfd; /* 最近一次讀數據的時間 */ server.repl_transfer_lastio = server.unixtime; /* 本地臨時 rdb 文件的名字 */ server.repl_transfer_tmpfile = zstrdup(tmpfile);
狀態機置爲 REPL_STATE_TRANSFER,repl_transfer_fd
記錄 rdb 臨時文件的 fd。
在 replicationCron 函數中的開始部分,有一些超時保證。
與 master 創建鏈接後,一直沒能發 PING,說明鏈接可能有問題。
在鑑權和確認capa 的流程中,花了過多的時間。
if (server.masterhost && (server.repl_state == REPL_STATE_CONNECTING || slaveIsInHandshakeState()) && (time(NULL)-server.repl_transfer_lastio) > server.repl_timeout) // 默認 60s 超時 { serverLog(LL_WARNING,"Timeout connecting to the MASTER..."); cancelReplicationHandshake(); } int slaveIsInHandshakeState(void) { return server.repl_state >= REPL_STATE_RECEIVE_PONG && server.repl_state <= REPL_STATE_RECEIVE_PSYNC; }
接收 rdb 文件的時長作限制。
/* Bulk transfer I/O timeout? */ if (server.masterhost && server.repl_state == REPL_STATE_TRANSFER && (time(NULL)-server.repl_transfer_lastio) > server.repl_timeout) // 默認 60s 超時 { serverLog(LL_WARNING,"Timeout receiving bulk data from MASTER... If the problem persists try to set the 'repl-timeout' parameter in redis.conf to a larger value."); cancelReplicationHandshake(); }
成爲 slave 之後,沒有數據發過來。
/* Timed out master when we are an already connected slave? */ if (server.masterhost && server.repl_state == REPL_STATE_CONNECTED && (time(NULL)-server.master->lastinteraction) > server.repl_timeout) { serverLog(LL_WARNING,"MASTER timeout: no data nor PING received..."); freeClient(server.master); }
接收 rdb 數據有兩種方式,一種是磁盤化的,一種是無磁盤化的。
從 V2.8.18 開始,redis 引入了「無硬盤複製」選項,開啓該選項時,redis 在與 slave 進行復制初始化時將不會將快照內容存儲到硬盤上,而是直接經過網絡發送給slave,避免了硬盤的性能瓶頸,能夠在配置文件中使用 repl-diskless-sync 選項來配置開啓該功能。
兩種方式發送的數據格式是不同的。
磁盤化複製時,master 先生成 rdb 文件,而後將文件內容加上 $<len>/r/n
的頭部後,發送給 slave。
而無磁盤化複製時,master 直接把 rdb 數據發送給你 slave 時,以 $EOF:<XXX>\r\n
開頭,並以 <XXX>
結尾,開頭和結尾的 <XXX>
內容相同,都是 40 個字節,是由 0123456789abcdef 中的字符組成的隨機字符串,爲了校驗數據的是否發送完成。
該流程主要是回調函數 readSyncBulkPayload
中的邏輯。
首先讀取 master 傳過來的輔助信息。
if (server.repl_transfer_size == -1) { /* 第一行內容 $<len>/r/n */ if (syncReadLine(fd,buf,1024,server.repl_syncio_timeout*1000) == -1) { serverLog(LL_WARNING, "I/O error reading bulk count from MASTER: %s", strerror(errno)); goto error; } if (buf[0] == '-') { serverLog(LL_WARNING, "MASTER aborted replication with an error: %s", buf+1); goto error; } else if (buf[0] == '\0') { server.repl_transfer_lastio = server.unixtime; return; } else if (buf[0] != '$') { serverLog(LL_WARNING,"Bad protocol from MASTER, the first byte is not '$' (we received '%s'), are you sure the host and port are right?", buf); goto error; } /* 有兩種可能的 bulk payload 格式,正常的是 $<count> * 還有一種可能就是無磁盤化主從同步時,由於這個時候不知道後面要傳輸數據的長度,所以會發送一個分隔符, * 格式爲 $EOF:<40 bytes delimiter> * 在發送完 rdb 數據後,分隔符會再次被髮送,以便讓接收端知道數據發送完成了。 * 分隔符足夠的長和隨機,所以真實文件內容碰撞的可能性能夠被忽略。*/ if (strncmp(buf+1,"EOF:",4) == 0 && strlen(buf+5) >= CONFIG_RUN_ID_SIZE) { usemark = 1; memcpy(eofmark,buf+5,CONFIG_RUN_ID_SIZE); memset(lastbytes,0,CONFIG_RUN_ID_SIZE); server.repl_transfer_size = 0; serverLog(LL_NOTICE, "MASTER <-> SLAVE sync: receiving streamed RDB from master"); } else { usemark = 0; server.repl_transfer_size = strtol(buf+1,NULL,10); serverLog(LL_NOTICE, "MASTER <-> SLAVE sync: receiving %lld bytes from master", (long long) server.repl_transfer_size); } return; }
同步讀取第一行內容,當開啓了無磁盤化同步時,有一點須要注意,保存完 eofmark 後,要把 repl_transfer_size
變量置爲一個非 -1 的值,防止下次事件觸發後又進到這個邏輯裏來了。而正常同步時,能夠讀到 repl_transfer_size
的大小。
經過 usemark
來標記同步類型,值爲 1 表示無磁盤化的同步,值爲 0 表示磁盤化同步。
/* Read bulk data */ if (usemark) { readlen = sizeof(buf); } else { left = server.repl_transfer_size - server.repl_transfer_read; readlen = (left < (signed)sizeof(buf)) ? left : (signed)sizeof(buf); }
以上邏輯來調整每次從 socket 中讀取數據的長度,由於 usemark 時,不知道要讀取的數據總長度。
nread = read(fd,buf,readlen); if (nread <= 0) { serverLog(LL_WARNING,"I/O error trying to sync with MASTER: %s", (nread == -1) ? strerror(errno) : "connection lost"); cancelReplicationHandshake(); return; }
計算出 readlen 後,讀取數據,若是讀錯了,要斷開鏈接,清理 fd ,重置同步狀態等,cancelReplicationHandshake
的邏輯在上面已經說過。
若是是 usemark,那麼須要校驗 eofmark,以便知道數據是否已經讀完。
int eof_reached = 0; if (usemark) { /* Update the last bytes array, and check if it matches our delimiter.*/ if (nread >= CONFIG_RUN_ID_SIZE) { memcpy(lastbytes,buf+nread-CONFIG_RUN_ID_SIZE,CONFIG_RUN_ID_SIZE); } else { int rem = CONFIG_RUN_ID_SIZE-nread; memmove(lastbytes,lastbytes+nread,rem); memcpy(lastbytes+rem,buf,nread); } /* 讀到 EOF 了 */ if (memcmp(lastbytes,eofmark,CONFIG_RUN_ID_SIZE) == 0) eof_reached = 1; }
若是讀到的數據長度 >= 40,那麼截取 buf 最後 40 個字符。不然使用 memmove
和 memcpy
將最後的 40 個字節填滿,這部分操做有點繞,畫了個圖幫助理解,
而後根據前面記錄 eofmark 去判斷是否是數據接收結束了,若是是,eof_reached
置爲 1。
讀完一次數據須要將其寫入本地的臨時 rdb 文件裏,
if (write(server.repl_transfer_fd,buf,nread) != nread) { serverLog(LL_WARNING,"Write error or short write writing to the DB dump file needed for MASTER <-> SLAVE synchronization: %s", strerror(errno)); goto error; } server.repl_transfer_read += nread; // 更新讀了多少數據量
若是是已經讀到末尾了,那麼須要從文件中刪掉 eofmark,由於它不是 rdb 數據嘛,只是個輔助標識。
if (usemark && eof_reached) { if (ftruncate(server.repl_transfer_fd, server.repl_transfer_read - CONFIG_RUN_ID_SIZE) == -1) { serverLog(LL_WARNING,"Error truncating the RDB file received from the master for SYNC: %s", strerror(errno)); goto error; } }
光是 write
了還不夠,這只是寫到了系統的 cache,還須要作 fsync
將數據落盤。
if (server.repl_transfer_read >= server.repl_transfer_last_fsync_off + REPL_MAX_WRITTEN_BEFORE_FSYNC) { off_t sync_size = server.repl_transfer_read - server.repl_transfer_last_fsync_off; rdb_fsync_range(server.repl_transfer_fd, server.repl_transfer_last_fsync_off, sync_size); server.repl_transfer_last_fsync_off += sync_size; }
刷盤策略是每 8M 一次。
若是不是無磁盤化的主從同步,就要依賴於接收到的數據 size 與第一次傳過來的值做比較。
if (!usemark) { if (server.repl_transfer_read == server.repl_transfer_size) eof_reached = 1; }
若是徹底接收完數據了了,那麼須要作一些善後工做,以下代碼,
if (eof_reached) {....}
首先,把本地 rdb 文件的名字改爲配置文件裏配置的名字server.rdb_filename
。
if (rename(server.repl_transfer_tmpfile,server.rdb_filename) == -1) { serverLog(LL_WARNING,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno)); cancelReplicationHandshake(); return; }
而後須要爲加載新的 rdb 文件作一些準備。
signalFlushedDb(-1); // 使得本實例的全部客戶端感知到接下來要清空數據庫 emptyDb(replicationEmptyDbCallback); // 清空全部數據,給 master 發一個 \n
long long emptyDb(void(callback)(void*)) { int j; long long removed = 0; for (j = 0; j < server.dbnum; j++) { removed += dictSize(server.db[j].dict); dictEmpty(server.db[j].dict,callback); dictEmpty(server.db[j].expires,callback); } if (server.cluster_enabled) slotToKeyFlush(); return removed; } /* Callback used by emptyDb() while flushing away old data to load * the new dataset received by the master. */ void replicationEmptyDbCallback(void *privdata) { UNUSED(privdata); replicationSendNewlineToMaster(); } /* 給 master 發 \n 代表本身還活着,在加載數據 */ void replicationSendNewlineToMaster(void) { static time_t newline_sent; if (time(NULL) != newline_sent) { newline_sent = time(NULL); if (write(server.repl_transfer_s,"\n",1) == -1) { /* Pinging back in this stage is best-effort. */ } } }
清空老數據完老數據,下面開始加載新數據。
aeDeleteFileEvent(server.el,server.repl_transfer_s,AE_READABLE); serverLog(LL_NOTICE, "MASTER <-> SLAVE sync: Loading DB in memory"); if (rdbLoad(server.rdb_filename) != C_OK) { serverLog(LL_WARNING,"Failed trying to load the MASTER synchronization DB from disk"); cancelReplicationHandshake(); return; }
在加載新數據以前,須要先刪除socket fd 的可讀事件,這是由於在調用 rdbLoad
加載 rdb 數據時,每次調用rioRead
都會由於要計算 checksum 而調用 processEventsWhileBlocked
處理當前已觸發的事件,若是不刪除該可讀事件的話,就會遞歸進入的本函數中(所以,slave 在加載 rdb 數據時,是不能處理主節點發來的其餘數據的)。
而後作一些清理工做。
zfree(server.repl_transfer_tmpfile); close(server.repl_transfer_fd);
根據 socket fd 建立一個 master 的 client。
/* 建立 master 相關的變量 */ replicationCreateMasterClient(server.repl_transfer_s);
而後能夠看下這個 replicationCreateMasterClient
這個函數都幹了些什麼事情。
void replicationCreateMasterClient(int fd) { server.master = createClient(fd); server.master->flags |= CLIENT_MASTER; server.master->authenticated = 1; server.repl_state = REPL_STATE_CONNECTED; server.master->reploff = server.repl_master_initial_offset; memcpy(server.master->replrunid, server.repl_master_runid, sizeof(server.repl_master_runid)); if (server.master->reploff == -1) server.master->flags |= CLIENT_PRE_PSYNC; }
須要注意一點,若是 master 不支持 PSYNC 的話,那麼 salve 不會獲得 +FULLRESYNC
的回覆,也就不會更新 server.repl_master_initial_offset
變量,它就一直是 -1,在這裏建立 master client 時,會給它一個標記 CLIENT_PRE_PSYNC。
這裏會把狀態機更新爲 REPL_STATE_CONNECTED。
最後,若是 aof 功能沒有關閉的話,須要從新生成 aof 文件,由於數據已經改變了。
if (server.aof_state != AOF_OFF) { int retry = 10; stopAppendOnly(); while (retry-- && startAppendOnly() == C_ERR) { serverLog(LL_WARNING,"Failed enabling the AOF after successful master synchronization! Trying it again in one second."); sleep(1); } if (!retry) { serverLog(LL_WARNING,"FATAL: this slave instance finished the synchronization with its master, but the AOF can't be turned on. Exiting now."); exit(1); } }
到這裏,readSyncBulkPayload
函數讀取並加載新 rdb 文件的流程就走完了。
當複製狀態變爲 REPL_STATE_CONNECTED 後,表示進入了命令傳播階段。後續 slave 將 master 當成一個客戶端,並接收其發來的命令請求,像處理普通客戶端同樣處理便可。命令傳播在前面的博客已經詳細講過。
在 master-slave 鏈接創建之後,他們就經過心跳進行相互探活,這些機制都在 replicationCron
函數裏。
master 會按期給它全部的 slave 發送 PING。
if ((replication_cron_loops % server.repl_ping_slave_period) == 0) { ping_argv[0] = createStringObject("PING",4); replicationFeedSlaves(server.slaves, server.slaveseldb, ping_argv, 1); decrRefCount(ping_argv[0]); }
給 slave 發送命令是經過 replicationFeedSlaves
函數實現的。
void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {....}
下面看一下該函數的詳細實現。
if (server.repl_backlog == NULL && listLength(slaves) == 0) return;
若是 repl_backlog
爲空,或者是沒有 slave,那麼這個過程是沒必要要的,直接返回。必要的時候生成 SELECT 命令,告知 slave 切換數據庫。slaveseldb
中保存的是上一次 replication 輸出時選擇的數據庫。
if (server.repl_backlog) { char aux[LONG_STR_SIZE+3]; /* Add the multi bulk reply length. */ aux[0] = '*'; len = ll2string(aux+1,sizeof(aux)-1,argc); aux[len+1] = '\r'; aux[len+2] = '\n'; feedReplicationBacklog(aux,len+3); for (j = 0; j < argc; j++) { long objlen = stringObjectLen(argv[j]); aux[0] = '$'; len = ll2string(aux+1,sizeof(aux)-1,objlen); aux[len+1] = '\r'; aux[len+2] = '\n'; feedReplicationBacklog(aux,len+3); feedReplicationBacklogWithObject(argv[j]); feedReplicationBacklog(aux+len+1,2); } }
若是 repl_backlog
不爲空,那麼組裝 redis 協議的命令,這裏是 *1\r\n$4\r\nPING
,放到 repl_backlog
變量裏。
void replicationCron(void) { ... if (server.masterhost && server.master && !(server.master->flags & CLIENT_PRE_PSYNC)) { replicationSendAck(); } ... } void replicationSendAck(void) { client *c = server.master; if (c != NULL) { c->flags |= CLIENT_MASTER_FORCE_REPLY; addReplyMultiBulkLen(c,3); addReplyBulkCString(c,"REPLCONF"); addReplyBulkCString(c,"ACK"); addReplyBulkLongLong(c,c->reploff); c->flags &= ~CLIENT_MASTER_FORCE_REPLY; } }
對於非老版本的 master,slave 向它按期發送 REPLCONF ACK <offset>
命令,以便告訴它複製偏移量。
cluster 模式下,使用 CLUSTER REPLICATE <NODE ID>
命令來進行新的主從關係的構建。
void clusterCommand(client *c) { ... else if (!strcasecmp(c->argv[1]->ptr,"replicate") && c->argc == 3) { /* CLUSTER REPLICATE <NODE ID> */ clusterNode *n = clusterLookupNode(c->argv[2]->ptr); /* Lookup the specified node in our table. */ if (!n) { addReplyErrorFormat(c,"Unknown node %s", (char*)c->argv[2]->ptr); return; } /* I can't replicate myself. */ if (n == myself) { addReplyError(c,"Can't replicate myself"); return; } /* Can't replicate a slave. */ if (nodeIsSlave(n)) { addReplyError(c,"I can only replicate a master, not a slave."); return; } // 我要作別人的 slave, 那麼不是不可以有 slots 和數據庫數據的 if (nodeIsMaster(myself) && (myself->numslots != 0 || dictSize(server.db[0].dict) != 0)) { addReplyError(c, "To set a master the node must be empty and " "without assigned slots."); return; } /* Set the master. */ clusterSetMaster(n); clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG); addReply(c,shared.ok); } ..... }
很重要的一個檢查是,有 slot 或者有數據的 master 節點,不能作此操做,防止丟數據。
跳過一些合理性檢查,重點函數就是 clusterSetMaster
了,那麼它作了什麼呢?
void clusterSetMaster(clusterNode *n) { serverAssert(n != myself); serverAssert(myself->numslots == 0); if (nodeIsMaster(myself)) { myself->flags &= ~(CLUSTER_NODE_MASTER|CLUSTER_NODE_MIGRATE_TO); myself->flags |= CLUSTER_NODE_SLAVE; clusterCloseAllSlots(); } else { if (myself->slaveof) clusterNodeRemoveSlave(myself->slaveof,myself); // 解除原有的主從關係 } myself->slaveof = n; clusterNodeAddSlave(n,myself); replicationSetMaster(n->ip, n->port); resetManualFailover(); }
首先,若是自己是個 master,那麼取消掉 master 和 migrating 的 flag,由於該 master 沒有數據,能夠大膽地取消遷移的叫標記,而後加上 slave 的標記 CLUSTER_NODE_SLAVE。
若是本來就是個 slave 節點,那麼調整本身的主從歸屬信息,置空以手動主從切換有關的變量值,關於 cluster mf 的邏輯之後會專門去討論。
而後就是前面說過的 replicationSetMaster
函數,觸發上也是在 cron 裏,就不囉嗦了。
以上,主從複製中,slave 的邏輯就介紹完了。