在上一篇文章,主要介紹了主從複製流程中 slave 的狀態機流轉,本篇文章中,將作相應的 master 邏輯的相關分析。node
slave 在向 master 發起 TCP 建鏈,以及複製握手過程當中,master 一直把 slave 當成一個普通的 client 來處理。也就是說,不爲 slave 保存狀態,只是收到 slave 發來的命令進而處理並回復而已。
握手過程當中,首先 slave 會發過來一個 PING 命令,master 使用 pingCommand 函數來進行處理。回覆字符串 +PONG,仍是權限錯誤,視狀況而定。redis
可能會有一個鑑權過程,master 收到 slave 發來 AUTH 命令,使用 authCommand 函數進行處理,代碼大概以下,數據庫
void authCommand(client *c) { if (!server.requirepass) { // 未設置 auth passwd addReplyError(c,"Client sent AUTH, but no password is set"); } else if (!time_independent_strcmp(c->argv[1]->ptr, server.requirepass)) { c->authenticated = 1; addReply(c,shared.ok); } else { c->authenticated = 0; addReplyError(c,"invalid password"); } }
client 的 authenticated 屬性代表 server 是否設置了鑑權。緩存
接下來就是 REPLCONF 命令,相應處理函數爲 replconfCommand,用於保存 slave 告知的端口號、地址和能力等。該函數代碼邏輯基本以下,網絡
首先進行必要的參數校驗,命令格式爲 REPLCONF <option> <value> <option> <value> ...
,能夠看出,後面的參數值是成對出現的,加上 REPLCONF 自己,參數個數確定是奇數個,那麼偶數個就確定是有問題的。併發
if ((c->argc % 2) == 0) { /* Number of arguments must be odd to make sure that every * option has a corresponding value. */ addReply(c,shared.syntaxerr); return; }
接着,匹配到各選項分別處理,目前支持的選項有 listening-port、ip-address、capa、ack 和 getack,不支持的選項在報錯後會返回,代碼處理以下,app
for (j = 1; j < c->argc; j+=2) { if (!strcasecmp(c->argv[j]->ptr,"listening-port")) { long port; if ((getLongFromObjectOrReply(c,c->argv[j+1], &port,NULL) != C_OK)) return; c->slave_listening_port = port; } else if (!strcasecmp(c->argv[j]->ptr,"ip-address")) { sds ip = c->argv[j+1]->ptr; if (sdslen(ip) < sizeof(c->slave_ip)) { memcpy(c->slave_ip,ip,sdslen(ip)+1); } else { addReplyErrorFormat(c,"REPLCONF ip-address provided by " "slave instance is too long: %zd bytes", sdslen(ip)); return; } } else if (!strcasecmp(c->argv[j]->ptr,"capa")) { /* Ignore capabilities not understood by this master. */ if (!strcasecmp(c->argv[j+1]->ptr,"eof")) c->slave_capa |= SLAVE_CAPA_EOF; } else if { ..... } } else { addReplyErrorFormat(c,"Unrecognized REPLCONF option: %s", (char*)c->argv[j]->ptr); return; } }
接下來,slave 會向 master 發送 SYNC/PSYNC 命令,請求進行徹底重同步或者部分重同步。master 爲 slave 保存的狀態記錄在 client 的 replstate 屬性中。less
從 master 的角度看,slave 須要經歷的以下狀態:
SLAVE_STATE_WAIT_BGSAVE_START → SLAVE_REPL_WAIT_BGSAVE_END → SLAVE_REPL_SEND_BULK → SLAVE_REPL_ONLINE。
狀態轉換圖在前一篇文章開頭畫過,能夠找下作參考。socket
SYNC/PSYNC 命令的處理函數爲 syncCommand。tcp
首先,須要作一些必要的 check。
/* ignore SYNC if already slave or in monitor mode */ if (c->flags & CLIENT_SLAVE) return; // 本節點是其餘節點的 slave,可是尚未同步好數據, // 此時不能爲本節點的 slave 進行數據同步(由於數據不全) if (server.masterhost && server.repl_state != REPL_STATE_CONNECTED) { addReplyError(c,"Can't SYNC while not connected with my master"); return; } /* 由於 master 接下來須要爲該 slave 進行後臺 RDB 數據轉儲了, * 同時須要將前臺接收到的其餘 client 命令請求緩存到該 slave client 的輸出緩存中, * 這就須要一個徹底清空的輸出緩存,才能爲該 slave 保存從執行 BGSAVE 開始的命令流。 * * 在 master 收到 slave 發來的 SYNC(PSYNC)命令以前,二者之間的交互信息都是比較短的, * 所以,在網絡正常的狀況下,slave client 中的輸出緩存應該是很容易就發送給該 slave,並清空的。 * 因此,若是不爲空,說明可能有問題 */ if (clientHasPendingReplies(c)) { addReplyError(c,"SYNC and PSYNC are invalid with pending output"); return; }
下面就開始進入正題,SYNC/PSYNC 命令進行了區別對待。
// slave 發來 psync 命令 if (!strcasecmp(c->argv[0]->ptr,"psync")) { if (masterTryPartialResynchronization(c) == C_OK) { server.stat_sync_partial_ok++; return; /* No full resync needed, return. */ } else { char *master_runid = c->argv[1]->ptr; /* Increment stats for failed PSYNCs, but only if the * runid is not "?", as this is used by slaves to force a full * resync on purpose when they are not albe to partially * resync. */ if (master_runid[0] != '?') server.stat_sync_partial_err++; } // slave 發來 sync 命令 } else { /* If a slave uses SYNC, we are dealing with an old implementation * of the replication protocol (like redis-cli --slave). Flag the client * so that we don't expect to receive REPLCONF ACK feedbacks. */ c->flags |= CLIENT_PRE_PSYNC; // 老版本實例 }
從上面代碼能夠看出,當須要進行部分重同步時,函數會直接返回,不然,開始着手處理徹底重同步的狀況,此時 master 要執行一次 rdb 。
處理 PSYNC 命令的函數是 masterTryPartialResynchronization,該函數經過返回值來進行區分是否進行部分重同步,C_OK
表示部分重同步,C_ERR
表示徹底重同步,下面進行具體分析。
首先,把本身的 runid 與 slave 發來的 master_runid 相匹配,若是不匹配,說明是一個新的 slave,此時須要進行徹底重同步,代碼以下。
char *master_runid = c->argv[1]->ptr; ... ... if (strcasecmp(master_runid, server.runid)) { // slave 經過發送 runid 爲 `?` 來觸發一次徹底重同步。 if (master_runid[0] != '?') { serverLog(LL_NOTICE,"Partial resynchronization not accepted: " "Runid mismatch (Client asked for runid '%s', my runid is '%s')", master_runid, server.runid); } else { serverLog(LL_NOTICE,"Full resync requested by slave %s", replicationGetSlaveName(c)); } goto need_full_resync; }
而後,取出 slave 的複製偏移量 psync_offset,master 據此來判斷是否能夠進行徹底重同步,關於複製偏移量的問題,前面的文章已經提過。
if (getLongLongFromObjectOrReply(c,c->argv[2],&psync_offset,NULL) != C_OK) goto need_full_resync; if (!server.repl_backlog || psync_offset < server.repl_backlog_off || psync_offset > (server.repl_backlog_off + server.repl_backlog_histlen)) { serverLog(LL_NOTICE, "Unable to partial resync with slave %s for lack of backlog (Slave request was: %lld).", replicationGetSlaveName(c), psync_offset); if (psync_offset > server.master_repl_offset) { serverLog(LL_WARNING, "Warning: slave %s tried to PSYNC with an offset that is greater than the master replication offset.", replicationGetSlaveName(c)); } goto need_full_resync; }
以上出現的兩種須要進行徹底重同步的狀況,都會進入 need_full_resync 的邏輯,最後返回 C_ERR
。
need_full_resync: /* We need a full resync for some reason... Note that we can't * reply to PSYNC right now if a full SYNC is needed. The reply * must include the master offset at the time the RDB file we transfer * is generated, so we need to delay the reply to that moment. */ return C_ERR;
不然,表示須要進行部分重同步,進行相應變量的初始化,返回C_OK
。
c->flags |= CLIENT_SLAVE; c->replstate = SLAVE_STATE_ONLINE; c->repl_ack_time = server.unixtime; c->repl_put_online_on_ack = 0; listAddNodeTail(server.slaves,c); // 這裏不能用輸出緩存,由於輸出緩存只能用於累積命令流。 // 以前 master 向 slave 發送的信息不多,所以內核的輸出緩存中應該會有空間, // 因此,這裏直接的 write 操做通常不會出錯。 // 回覆 slave +CONTINUE buflen = snprintf(buf,sizeof(buf),"+CONTINUE\r\n"); if (write(c->fd,buf,buflen) != buflen) { freeClientAsync(c); return C_OK; } // 將積壓隊列中 psync_offset 以後的數據複製到客戶端輸出緩存中 psync_len = addReplyReplicationBacklog(c,psync_offset); /* Note that we don't need to set the selected DB at server.slaveseldb * to -1 to force the master to emit SELECT, since the slave already * has this state from the previous connection with the master. */ // 更新當前狀態正常的 slave 數量 refreshGoodSlavesCount(); return C_OK; /* The caller can return, no full resync needed. */
addReplyReplicationBacklog 函數的邏輯也已經在前面講過。
首先,一些變量的更新,將 replstate 更新爲 SLAVE_STATE_WAIT_BGSAVE_START 狀態。
server.stat_sync_full++; /* Setup the slave as one waiting for BGSAVE to start. The following code * paths will change the state if we handle the slave differently. */ c->replstate = SLAVE_STATE_WAIT_BGSAVE_START; if (server.repl_disable_tcp_nodelay) anetDisableTcpNoDelay(NULL, c->fd); /* Non critical if it fails. */ c->repldbfd = -1; c->flags |= CLIENT_SLAVE; listAddNodeTail(server.slaves,c);
徹底重同步時,master 須要作一次 rdb。後臺 rdb 數據生成時須要作 fork
,這對性能是有所犧牲的,因此要先看下是否有現成的 rdb 數據能夠複用。分如下 3 種清理,
【1】若是後臺有 rdb 任務在執行,而且使用的是有硬盤複製的方式(將 rdb 數據保存在本地臨時文件),而後發送給 slave。
/* CASE 1: BGSAVE is in progress, with disk target. */ if (server.rdb_child_pid != -1 && server.rdb_child_type == RDB_CHILD_TYPE_DISK) { /* Ok a background save is in progress. Let's check if it is a good * one for replication, i.e. if there is another slave that is * registering differences since the server forked to save. */ client *slave; listNode *ln; listIter li; listRewind(server.slaves,&li); while((ln = listNext(&li))) { slave = ln->value; if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END) break; } /* To attach this slave, we check that it has at least all the * capabilities of the slave that triggered the current BGSAVE. */ if (ln && ((c->slave_capa & slave->slave_capa) == slave->slave_capa)) { /* Perfect, the server is already registering differences for * another slave. Set the right state, and copy the buffer. */ copyClientOutputBuffer(c,slave); replicationSetupSlaveForFullResync(c,slave->psync_initial_offset); serverLog(LL_NOTICE,"Waiting for end of BGSAVE for SYNC"); } else { /* No way, we need to wait for the next BGSAVE in order to * register differences. */ serverLog(LL_NOTICE,"Can't attach the slave to the current BGSAVE. Waiting for next BGSAVE for SYNC"); } }
代碼中,在 master 全部 slave 中找到一個處於 SLAVE_STATE_WAIT_BGSAVE_END 狀態的 slaveX。
將 slaveX 輸出緩存內容 copy 一份給當前的 client,而後調用函數 replicationSetupSlaveForFullResync,將 client 狀態設置爲 SLAVE_STATE_WAIT_BGSAVE_END,併發送 +FULLRESYNC 回覆,代碼以下,
int replicationSetupSlaveForFullResync(client *slave, long long offset) { char buf[128]; int buflen; slave->psync_initial_offset = offset; slave->replstate = SLAVE_STATE_WAIT_BGSAVE_END; /* We are going to accumulate the incremental changes for this * slave as well. Set slaveseldb to -1 in order to force to re-emit * a SELECT statement in the replication stream. */ server.slaveseldb = -1; /* Don't send this reply to slaves that approached us with * the old SYNC command. */ if (!(slave->flags & CLIENT_PRE_PSYNC)) { buflen = snprintf(buf,sizeof(buf),"+FULLRESYNC %s %lld\r\n", server.runid,offset); if (write(slave->fd,buf,buflen) != buflen) { return C_ERR; } } return C_OK; }
這個函數主要作了如下 4 件事:
該函數應當在如下 2 個時刻當即被調用:
若是找不到一個能夠複用的 slave,那麼 master 須要在當前的 bgsave 操做完成以後,再執行一次。
【2】若是後臺有 rdb 任務在執行,而且使用的是無硬盤複製的方式。
此時,當前 slave 沒法重用 rdb 數據,必須在當前的 bgsave 操做完成以後,再執行一次。代碼以下,
/* CASE 2: BGSAVE is in progress, with socket target. */ else if (server.rdb_child_pid != -1 && server.rdb_child_type == RDB_CHILD_TYPE_SOCKET) { /* There is an RDB child process but it is writing directly to * children sockets. We need to wait for the next BGSAVE * in order to synchronize. */ serverLog(LL_NOTICE,"Current BGSAVE has socket target. Waiting for next BGSAVE for SYNC"); }
【3】若是後臺沒有 rdb 任務在執行。
若當前 slave 使用的是無磁盤化複製,那麼暫時先不進行 bgsave,把它推遲到 replicationCron 函數,這是爲了等待更多的 slave,以減小執行 bgsave 的次數,由於使用 diskless 的方式進行主從複製,後來的 slave 不能 attach 到已有 slave 上,只能從新作 bgsave。
若當前 slave 使用的是有磁盤化複製,調用 startBgsaveForReplication 函數開始一次新的 bgsave,須要注意的是這裏要避開後臺的 aofrewite。代碼以下,
/* CASE 3: There is no BGSAVE is progress. */ else { if (server.repl_diskless_sync && (c->slave_capa & SLAVE_CAPA_EOF)) { /* Diskless replication RDB child is created inside * replicationCron() since we want to delay its start a * few seconds to wait for more slaves to arrive. */ if (server.repl_diskless_sync_delay) serverLog(LL_NOTICE,"Delay next BGSAVE for diskless SYNC"); } else { /* Target is disk (or the slave is not capable of supporting * diskless replication) and we don't have a BGSAVE in progress, * let's start one. */ if (server.aof_child_pid == -1) { startBgsaveForReplication(c->slave_capa); // 直接進行 bgsave } else { serverLog(LL_NOTICE, "No BGSAVE in progress, but an AOF rewrite is active. " "BGSAVE for replication delayed"); } } }
最後,若是有必要的話,建立 backlog。
if (listLength(server.slaves) == 1 && server.repl_backlog == NULL) createReplicationBacklog();
void createReplicationBacklog(void) { serverAssert(server.repl_backlog == NULL); server.repl_backlog = zmalloc(server.repl_backlog_size); server.repl_backlog_histlen = 0; server.repl_backlog_idx = 0; // 避免以前使用過 backlog 的 slave 引起錯誤的 PSYNC 操做 server.master_repl_offset++; // 儘管沒有數據,但事實上,第一個字節的邏輯位置是 master_repl_offset 的下一個字節 server.repl_backlog_off = server.master_repl_offset+1; }
接上一小節,bgsave 操做的處理函數爲 startBgsaveForReplication。
首先根據傳入的參數,針對有無磁盤化複製調用不一樣的處理函數,即,
int retval; int socket_target = server.repl_diskless_sync && (mincapa & SLAVE_CAPA_EOF); listIter li; listNode *ln; serverLog(LL_NOTICE,"Starting BGSAVE for SYNC with target: %s", socket_target ? "slaves sockets" : "disk"); if (socket_target) retval = rdbSaveToSlavesSockets(); else retval = rdbSaveBackground(server.rdb_filename);
參數 mincapa,表示 slave 的"能力",便是否能接受無硬盤複製的 rdb 數據。
若是選項 server.repl_diskless_sync
爲真,且 mincapa 中包含 SLAVE_CAPA_EOF,說明能夠爲該 slave 直接發送無硬盤複製的 rdb 數據,調用 rdbSaveToSlavesSockets 函數,在後臺將 rdb 數據經過 socket 發送給全部狀態爲 SLAVE_STATE_WAIT_BGSAVE_START 的 slave。
不然,調用rdbSaveBackground 函數,在後臺將 rdb 數據轉儲到本地文件。
若是以上的 rdb 處理函數調用失敗,從 slave 列表中刪除處於 SLAVE_STATE_WAIT_BGSAVE_START 狀態的 slave,並在 slave 中加入 CLIENT_CLOSE_AFTER_REPLY 標識,以便在回覆錯誤消息後關閉鏈接。代碼邏輯以下,
if (retval == C_ERR) { serverLog(LL_WARNING,"BGSAVE for replication failed"); listRewind(server.slaves,&li); while((ln = listNext(&li))) { client *slave = ln->value; if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) { slave->flags &= ~CLIENT_SLAVE; listDelNode(server.slaves,ln); addReplyError(slave, "BGSAVE failed, replication can't continue"); slave->flags |= CLIENT_CLOSE_AFTER_REPLY; } } return retval; }
若是使用的是有磁盤複製,那麼從 slave 列表中找處處於 SLAVE_STATE_WAIT_BGSAVE_START 狀態的 slave,調用 replicationSetupSlaveForFullResync 函數,把 slave 狀態置爲 SLAVE_STATE_WAIT_BGSAVE_END,並回復 +FULLRESYNC,這個前面說過。代碼以下,
/* If the target is socket, rdbSaveToSlavesSockets() already setup * the salves for a full resync. Otherwise for disk target do it now.*/ if (!socket_target) { listRewind(server.slaves,&li); while((ln = listNext(&li))) { client *slave = ln->value; if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) { replicationSetupSlaveForFullResync(slave, getPsyncInitialOffset()); } } }
最後調用函數 replicationScriptCacheFlush 清空 lua 腳本緩存。
當 master 收到 client 發來的命令後,會調用 call 函數執行相應的命令處理函數。在代碼中 PROPAGATE_REPL 標識表示須要將命令同步給 slave,有以下邏輯,
void call(client *c, int flags) { ...... /* Propagate the command into the AOF and replication link */ if (flags & CMD_CALL_PROPAGATE && (c->flags & CLIENT_PREVENT_PROP) != CLIENT_PREVENT_PROP) { ...... /* Check if the command operated changes in the data set. If so * set for replication / AOF propagation. */ if (dirty) propagate_flags |= (PROPAGATE_AOF|PROPAGATE_REPL); ...... /* If the client forced AOF / replication of the command, set * the flags regardless of the command effects on the data set. */ if (c->flags & CLIENT_FORCE_REPL) propagate_flags |= PROPAGATE_REPL; ...... /* Call propagate() only if at least one of AOF / replication * propagation is needed. */ if (propagate_flags != PROPAGATE_NONE) propagate(c->cmd,c->db->id,c->argv,c->argc,propagate_flags); } } void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc, int flags) { if (server.aof_state != AOF_OFF && flags & PROPAGATE_AOF) feedAppendOnlyFile(cmd,dbid,argv,argc); if (flags & PROPAGATE_REPL) replicationFeedSlaves(server.slaves,dbid,argv,argc); }
如今來看重點處理函數 replicationFeedSlaves,如今分析以下。
首先,必要的 check。
// 若是 backlog 爲空,且本節點沒有 slave,那麼下面的邏輯就不必走了 if (server.repl_backlog == NULL && listLength(slaves) == 0) return;
若是有必要的話,將 SELECT 命令添加到 backlog 和全部狀態不是 SLAVE_STATE_WAIT_BGSAVE_START 的 slave 輸出緩存中,其餘命令也是如此,代碼大概以下,
/* Write the command to the replication backlog if any. */ if (server.repl_backlog) { char aux[LONG_STR_SIZE+3]; /* Add the multi bulk reply length. */ // *..CRLF aux[0] = '*'; len = ll2string(aux+1,sizeof(aux)-1,argc); aux[len+1] = '\r'; aux[len+2] = '\n'; feedReplicationBacklog(aux,len+3);// argc 轉換成字符串的長度 + 3,即 * 以及 CRLF for (j = 0; j < argc; j++) { long objlen = stringObjectLen(argv[j]); /* We need to feed the buffer with the object as a bulk reply * not just as a plain string, so create the $..CRLF payload len * and add the final CRLF */ aux[0] = '$'; len = ll2string(aux+1,sizeof(aux)-1,objlen); aux[len+1] = '\r'; aux[len+2] = '\n'; feedReplicationBacklog(aux,len+3); feedReplicationBacklogWithObject(argv[j]); feedReplicationBacklog(aux+len+1,2); // CRLF } } /* Write the command to every slave. */ listRewind(server.slaves,&li); while((ln = listNext(&li))) { client *slave = ln->value; /* Don't feed slaves that are still waiting for BGSAVE to start */ if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue; /* Feed slaves that are waiting for the initial SYNC (so these commands * are queued in the output buffer until the initial SYNC completes), * or are already in sync with the master. */ /* Add the multi bulk length. */ addReplyMultiBulkLen(slave,argc); /* Finally any additional argument that was not stored inside the * static buffer if any (from j to argc). */ for (j = 0; j < argc; j++) addReplyBulk(slave,argv[j]); }
向 slave 輸出緩存追加命令流時,調用的是 addReply 類的函數。
當完成 bgsave 後,不管是有無磁盤複製,都要調用 updateSlavesWaitingBgsave 函數進行最後的處理,主要是爲了前面說過的被推遲的 bgsave。
void updateSlavesWaitingBgsave(int bgsaveerr, int type) {.....}
遍歷 slave 列表,若是 slave 的複製狀態處於 SLAVE_STATE_WAIT_BGSAVE_START,那麼調用 startBgsaveForReplication 函數,開始一次新的 bgsave。
if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) { startbgsave = 1; mincapa = (mincapa == -1) ? slave->slave_capa : (mincapa & slave->slave_capa);
若是 slave 的複製狀態處於 SLAVE_STATE_WAIT_BGSAVE_END,說明該 slave 正在等待 rdb 數據處理完成,此時須要根據有無磁盤化複製,區別對待處理。
if (type == RDB_CHILD_TYPE_SOCKET) { serverLog(LL_NOTICE, "Streamed RDB transfer with slave %s succeeded (socket). Waiting for REPLCONF ACK from slave to enable streaming", replicationGetSlaveName(slave)); /* Note: we wait for a REPLCONF ACK message from slave in * order to really put it online (install the write handler * so that the accumulated data can be transfered). However * we change the replication state ASAP, since our slave * is technically online now. */ slave->replstate = SLAVE_STATE_ONLINE; slave->repl_put_online_on_ack = 1; slave->repl_ack_time = server.unixtime; /* Timeout otherwise. */ }
將 slave 的複製狀態置爲 SLAVE_STATE_ONLINE,屬性 repl_put_online_on_ack 置爲 1。
⚠ 注意,在收到該 slave 第一個 replconf ack <offset>
命令以後,master 才真正調用 putSlaveOnline 函數將該 slave置爲 REDIS_REPL_ONLINE 狀態,而且開始發送緩存的命令流。
void replconfCommand(client *c) { ..... else if (!strcasecmp(c->argv[j]->ptr,"ack")) { /* REPLCONF ACK is used by slave to inform the master the amount * of replication stream that it processed so far. It is an * internal only command that normal clients should never use. */ long long offset; if (!(c->flags & CLIENT_SLAVE)) return; if ((getLongLongFromObject(c->argv[j+1], &offset) != C_OK)) return; if (offset > c->repl_ack_off) c->repl_ack_off = offset; c->repl_ack_time = server.unixtime; /* If this was a diskless replication, we need to really put * the slave online when the first ACK is received (which * confirms slave is online and ready to get more data). */ if (c->repl_put_online_on_ack && c->replstate == SLAVE_STATE_ONLINE) putSlaveOnline(c); /* Note: this command does not reply anything! */ return; } ...... }
之因此這樣設計,與這兩種複製方式有關。
當使用有磁盤複製方式時,master 會先把 rdb 數據的長度以 $<len>/r/n
的格式發送給 slave,slave 在解析到 len 後,從 socket 中讀取到特定長度的 rdb 數據。
當使用無磁盤複製方式時,master 預先沒法獲知 rdb 數據的長度,那 slave 如何判斷 rdb 數據是否讀完了呢?在發送 rdb 數據以前,master 會先以 $EOF:<40 bytes delimiter>
的格式發送一個 40 字節的魔數,當 rdb 數據發送完後,再次發送這個魔數,這樣 slave 就能夠檢測到 rdb 數據發送結束了。
若是 master 發送完 rdb 數據後,直接將 slave 狀態置爲 SLAVE_STATE_ONLINE ,接着發送緩存的命令流。
當採用無磁盤複製方式時,slave 最後讀到的數據頗有可能包含了命令流數據。所以,須要等到 slave 發送的第一個 replconf ack <offset>
命令以後,master 再把 slave 狀態置爲 SLAVE_STATE_ONLINE。
if (bgsaveerr != C_OK) { freeClient(slave); serverLog(LL_WARNING,"SYNC failed. BGSAVE child returned an error"); continue; } if ((slave->repldbfd = open(server.rdb_filename,O_RDONLY)) == -1 || redis_fstat(slave->repldbfd,&buf) == -1) { freeClient(slave); serverLog(LL_WARNING,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno)); continue; } slave->repldboff = 0; slave->repldbsize = buf.st_size; slave->replstate = SLAVE_STATE_SEND_BULK; slave->replpreamble = sdscatprintf(sdsempty(),"$%lld\r\n", (unsigned long long) slave->repldbsize); aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE); if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave) == AE_ERR) { freeClient(slave); continue; }
若是前面作 bgsave 出錯了,那麼這裏會釋放掉 client。
不然,打開生成的 rdb 文件,將 fd 保存到 repldbfd 屬性中,狀態置爲 SLAVE_STATE_SEND_BULK,這表示要把 rdb 數據發送給 slave 了,將 rdb 大小寫入 replpreamble 屬性。
從新註冊 slave 上的寫事件,回調函數爲 sendBulkToSlave,該函數作如下分析,
/* Before sending the RDB file, we send the preamble as configured by the * replication process. Currently the preamble is just the bulk count of * the file in the form "$<length>\r\n". */ if (slave->replpreamble) { nwritten = write(fd,slave->replpreamble,sdslen(slave->replpreamble)); if (nwritten == -1) { serverLog(LL_VERBOSE,"Write error sending RDB preamble to slave: %s", strerror(errno)); freeClient(slave); return; } server.stat_net_output_bytes += nwritten; sdsrange(slave->replpreamble,nwritten,-1); if (sdslen(slave->replpreamble) == 0) { sdsfree(slave->replpreamble); slave->replpreamble = NULL; /* fall through sending data. */ } else { return; } }
若是 replpreamble 屬性不爲空,說明是第一次觸發該回調,那麼先把這個 rdb 數據的長度信息發送給 slave。
不然,進入發送實際 rdb 數據階段。從 rdb 文件中讀取數據,而後發送給 slave,代碼中使用 repldboff 屬性記錄累積發送過多少數據。
默認一次發送的數據量爲 PROTO_IOBUF_LEN,大小爲 16K。
/* If the preamble was already transfered, send the RDB bulk data. */ lseek(slave->repldbfd,slave->repldboff,SEEK_SET); buflen = read(slave->repldbfd,buf,PROTO_IOBUF_LEN); // 讀 16k 數據 if (buflen <= 0) { serverLog(LL_WARNING,"Read error sending DB to slave: %s", (buflen == 0) ? "premature EOF" : strerror(errno)); freeClient(slave); return; } if ((nwritten = write(fd,buf,buflen)) == -1) { if (errno != EAGAIN) { serverLog(LL_WARNING,"Write error sending DB to slave: %s", strerror(errno)); freeClient(slave); } return; } slave->repldboff += nwritten; server.stat_net_output_bytes += nwritten;
當 rdb 數據徹底發送完之後,關閉 rdb 文件 fd,刪除 fd 的寫事件,重置 repldbfd。
if (slave->repldboff == slave->repldbsize) { // 發送完 rdb 文件,刪除可讀事件 close(slave->repldbfd); slave->repldbfd = -1; aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE); putSlaveOnline(slave); }
最後調用 putSlaveOnline 函數,將 slave 的複製狀態置爲 SLAVE_STATE_ONLINE,從新註冊 fd 的寫事件,回調函數爲 sendReplyToClient,向 slave 發送累積的命令流。
void putSlaveOnline(client *slave) { slave->replstate = SLAVE_STATE_ONLINE; slave->repl_put_online_on_ack = 0; slave->repl_ack_time = server.unixtime; /* Prevent false timeout. */ if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendReplyToClient, slave) == AE_ERR) { serverLog(LL_WARNING,"Unable to register writable event for slave bulk transfer: %s", strerror(errno)); freeClient(slave); return; } refreshGoodSlavesCount(); serverLog(LL_NOTICE,"Synchronization with slave %s succeeded", replicationGetSlaveName(slave)); }
設置 slave 屬性 repl_put_online_on_ack 爲 0,表示該 slave 已完成初始同步,接下來進入命令傳播階段。
最後,調用 refreshGoodSlavesCount 函數,更新當前狀態正常的 slave 數量。
到此,主從複製過程當中 master 的邏輯就已經講完了。