Redis 源碼分析之主從複製(4)

在上一篇文章,主要介紹了主從複製流程中 slave 的狀態機流轉,本篇文章中,將作相應的 master 邏輯的相關分析。node

主從建鏈與握手階段

slave 在向 master 發起 TCP 建鏈,以及複製握手過程當中,master 一直把 slave 當成一個普通的 client 來處理。也就是說,不爲 slave 保存狀態,只是收到 slave 發來的命令進而處理並回復而已。

PING 命令處理

握手過程當中,首先 slave 會發過來一個 PING 命令,master 使用 pingCommand 函數來進行處理。回覆字符串 +PONG,仍是權限錯誤,視狀況而定。redis

AUTH 命令處理

可能會有一個鑑權過程,master 收到 slave 發來 AUTH 命令,使用 authCommand 函數進行處理,代碼大概以下,數據庫

void authCommand(client *c) {
    if (!server.requirepass) { // 未設置 auth passwd
        addReplyError(c,"Client sent AUTH, but no password is set");
    } else if (!time_independent_strcmp(c->argv[1]->ptr, server.requirepass)) {
      c->authenticated = 1;
      addReply(c,shared.ok);
    } else {
      c->authenticated = 0;
      addReplyError(c,"invalid password");
    }
}

client 的 authenticated 屬性代表 server 是否設置了鑑權。緩存

REPLCONF 命令處理

接下來就是 REPLCONF 命令,相應處理函數爲 replconfCommand,用於保存 slave 告知的端口號、地址和能力等。該函數代碼邏輯基本以下,網絡

首先進行必要的參數校驗,命令格式爲 REPLCONF <option> <value> <option> <value> ...,能夠看出,後面的參數值是成對出現的,加上 REPLCONF 自己,參數個數確定是奇數個,那麼偶數個就確定是有問題的。併發

if ((c->argc % 2) == 0) {
    /* Number of arguments must be odd to make sure that every
     * option has a corresponding value. */
    addReply(c,shared.syntaxerr);
    return;
}

接着,匹配到各選項分別處理,目前支持的選項有 listening-port、ip-address、capa、ack 和 getack,不支持的選項在報錯後會返回,代碼處理以下,app

for (j = 1; j < c->argc; j+=2) {
    if (!strcasecmp(c->argv[j]->ptr,"listening-port")) {
        long port;

        if ((getLongFromObjectOrReply(c,c->argv[j+1],
                &port,NULL) != C_OK))
            return;
        c->slave_listening_port = port;
    } else if (!strcasecmp(c->argv[j]->ptr,"ip-address")) {
        sds ip = c->argv[j+1]->ptr;
        if (sdslen(ip) < sizeof(c->slave_ip)) {
            memcpy(c->slave_ip,ip,sdslen(ip)+1);
        } else {
            addReplyErrorFormat(c,"REPLCONF ip-address provided by "
                "slave instance is too long: %zd bytes", sdslen(ip));
            return;
        }
    } else if (!strcasecmp(c->argv[j]->ptr,"capa")) {
        /* Ignore capabilities not understood by this master. */
        if (!strcasecmp(c->argv[j+1]->ptr,"eof"))
            c->slave_capa |= SLAVE_CAPA_EOF;
    } else if {
        .....
    }
    } else {
        addReplyErrorFormat(c,"Unrecognized REPLCONF option: %s",
            (char*)c->argv[j]->ptr);
        return;
    }
}

主從複製階段

接下來,slave 會向 master 發送 SYNC/PSYNC 命令,請求進行徹底重同步或者部分重同步。master 爲 slave 保存的狀態記錄在 client 的 replstate 屬性中。less

從 master 的角度看,slave 須要經歷的以下狀態:
SLAVE_STATE_WAIT_BGSAVE_STARTSLAVE_REPL_WAIT_BGSAVE_ENDSLAVE_REPL_SEND_BULKSLAVE_REPL_ONLINE
狀態轉換圖在前一篇文章開頭畫過,能夠找下作參考。socket

SYNC/PSYNC 命令處理

SYNC/PSYNC 命令的處理函數爲 syncCommandtcp

前置 check

首先,須要作一些必要的 check。

/* ignore SYNC if already slave or in monitor mode */
if (c->flags & CLIENT_SLAVE) return;

// 本節點是其餘節點的 slave,可是尚未同步好數據,
// 此時不能爲本節點的 slave 進行數據同步(由於數據不全)
if (server.masterhost && server.repl_state != REPL_STATE_CONNECTED) {
    addReplyError(c,"Can't SYNC while not connected with my master");
    return;
}

/* 由於 master 接下來須要爲該 slave 進行後臺 RDB 數據轉儲了,
 * 同時須要將前臺接收到的其餘 client 命令請求緩存到該 slave client 的輸出緩存中,
 * 這就須要一個徹底清空的輸出緩存,才能爲該 slave 保存從執行 BGSAVE 開始的命令流。
 *
 * 在 master 收到 slave 發來的 SYNC(PSYNC)命令以前,二者之間的交互信息都是比較短的,
 * 所以,在網絡正常的狀況下,slave client 中的輸出緩存應該是很容易就發送給該 slave,並清空的。
 * 因此,若是不爲空,說明可能有問題 */
if (clientHasPendingReplies(c)) {
    addReplyError(c,"SYNC and PSYNC are invalid with pending output");
    return;
}

徹底重同步 or 部分重同步

下面就開始進入正題,SYNC/PSYNC 命令進行了區別對待。

// slave 發來 psync 命令
if (!strcasecmp(c->argv[0]->ptr,"psync")) {
    if (masterTryPartialResynchronization(c) == C_OK) {
        server.stat_sync_partial_ok++;
        return; /* No full resync needed, return. */
    } else {
        char *master_runid = c->argv[1]->ptr;

        /* Increment stats for failed PSYNCs, but only if the
         * runid is not "?", as this is used by slaves to force a full
         * resync on purpose when they are not albe to partially
         * resync. */
        if (master_runid[0] != '?') server.stat_sync_partial_err++;
    }
// slave 發來 sync 命令
} else {
    /* If a slave uses SYNC, we are dealing with an old implementation
     * of the replication protocol (like redis-cli --slave). Flag the client
     * so that we don't expect to receive REPLCONF ACK feedbacks. */
    c->flags |= CLIENT_PRE_PSYNC; // 老版本實例
}

從上面代碼能夠看出,當須要進行部分重同步時,函數會直接返回,不然,開始着手處理徹底重同步的狀況,此時 master 要執行一次 rdb 。

處理 PSYNC 命令的函數是 masterTryPartialResynchronization,該函數經過返回值來進行區分是否進行部分重同步,C_OK 表示部分重同步,C_ERR 表示徹底重同步,下面進行具體分析。

首先,把本身的 runid 與 slave 發來的 master_runid 相匹配,若是不匹配,說明是一個新的 slave,此時須要進行徹底重同步,代碼以下。

char *master_runid = c->argv[1]->ptr;
... ...

if (strcasecmp(master_runid, server.runid)) {

    // slave 經過發送 runid 爲 `?` 來觸發一次徹底重同步。
    if (master_runid[0] != '?') {
        serverLog(LL_NOTICE,"Partial resynchronization not accepted: "
            "Runid mismatch (Client asked for runid '%s', my runid is '%s')",
            master_runid, server.runid);
    } else {
        serverLog(LL_NOTICE,"Full resync requested by slave %s",
            replicationGetSlaveName(c));
    }
    goto need_full_resync;
}

而後,取出 slave 的複製偏移量 psync_offset,master 據此來判斷是否能夠進行徹底重同步,關於複製偏移量的問題,前面的文章已經提過。

if (getLongLongFromObjectOrReply(c,c->argv[2],&psync_offset,NULL) !=
    C_OK) goto need_full_resync;
if (!server.repl_backlog ||
    psync_offset < server.repl_backlog_off ||
    psync_offset > (server.repl_backlog_off + server.repl_backlog_histlen))
{
    serverLog(LL_NOTICE,
        "Unable to partial resync with slave %s for lack of backlog (Slave request was: %lld).", replicationGetSlaveName(c), psync_offset);
    if (psync_offset > server.master_repl_offset) {
        serverLog(LL_WARNING,
            "Warning: slave %s tried to PSYNC with an offset that is greater than the master replication offset.", replicationGetSlaveName(c));
    }
    goto need_full_resync;
}

以上出現的兩種須要進行徹底重同步的狀況,都會進入 need_full_resync 的邏輯,最後返回 C_ERR

need_full_resync:
    /* We need a full resync for some reason... Note that we can't
     * reply to PSYNC right now if a full SYNC is needed. The reply
     * must include the master offset at the time the RDB file we transfer
     * is generated, so we need to delay the reply to that moment. */
    return C_ERR;

不然,表示須要進行部分重同步,進行相應變量的初始化,返回C_OK

c->flags |= CLIENT_SLAVE;
c->replstate = SLAVE_STATE_ONLINE;
c->repl_ack_time = server.unixtime;
c->repl_put_online_on_ack = 0;

listAddNodeTail(server.slaves,c);

// 這裏不能用輸出緩存,由於輸出緩存只能用於累積命令流。
// 以前 master 向 slave 發送的信息不多,所以內核的輸出緩存中應該會有空間,
// 因此,這裏直接的 write 操做通常不會出錯。

// 回覆 slave +CONTINUE
buflen = snprintf(buf,sizeof(buf),"+CONTINUE\r\n");
if (write(c->fd,buf,buflen) != buflen) {
    freeClientAsync(c);
    return C_OK;
}
// 將積壓隊列中 psync_offset 以後的數據複製到客戶端輸出緩存中
psync_len = addReplyReplicationBacklog(c,psync_offset);

/* Note that we don't need to set the selected DB at server.slaveseldb
 * to -1 to force the master to emit SELECT, since the slave already
 * has this state from the previous connection with the master. */

// 更新當前狀態正常的 slave 數量
refreshGoodSlavesCount();
return C_OK; /* The caller can return, no full resync needed. */

addReplyReplicationBacklog 函數的邏輯也已經在前面講過。

徹底重同步過程

首先,一些變量的更新,將 replstate 更新爲 SLAVE_STATE_WAIT_BGSAVE_START 狀態。

server.stat_sync_full++;

/* Setup the slave as one waiting for BGSAVE to start. The following code
    * paths will change the state if we handle the slave differently. */
c->replstate = SLAVE_STATE_WAIT_BGSAVE_START;
if (server.repl_disable_tcp_nodelay)
    anetDisableTcpNoDelay(NULL, c->fd); /* Non critical if it fails. */
c->repldbfd = -1;
c->flags |= CLIENT_SLAVE;
listAddNodeTail(server.slaves,c);

徹底重同步時,master 須要作一次 rdb。後臺 rdb 數據生成時須要作 fork,這對性能是有所犧牲的,因此要先看下是否有現成的 rdb 數據能夠複用。分如下 3 種清理,

【1】若是後臺有 rdb 任務在執行,而且使用的是有硬盤複製的方式(將 rdb 數據保存在本地臨時文件),而後發送給 slave。

/* CASE 1: BGSAVE is in progress, with disk target. */
if (server.rdb_child_pid != -1 &&
    server.rdb_child_type == RDB_CHILD_TYPE_DISK)
{
    /* Ok a background save is in progress. Let's check if it is a good
     * one for replication, i.e. if there is another slave that is
     * registering differences since the server forked to save. */
    client *slave;
    listNode *ln;
    listIter li;

    listRewind(server.slaves,&li);
    while((ln = listNext(&li))) {
        slave = ln->value;
        if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END) break;
    }

    /* To attach this slave, we check that it has at least all the
     * capabilities of the slave that triggered the current BGSAVE. */
    if (ln && ((c->slave_capa & slave->slave_capa) == slave->slave_capa)) {

        /* Perfect, the server is already registering differences for
         * another slave. Set the right state, and copy the buffer. */
        copyClientOutputBuffer(c,slave);
        replicationSetupSlaveForFullResync(c,slave->psync_initial_offset);
        serverLog(LL_NOTICE,"Waiting for end of BGSAVE for SYNC");
    } else {

        /* No way, we need to wait for the next BGSAVE in order to
         * register differences. */
        serverLog(LL_NOTICE,"Can't attach the slave to the current BGSAVE. Waiting for next BGSAVE for SYNC");
    }
}

代碼中,在 master 全部 slave 中找到一個處於 SLAVE_STATE_WAIT_BGSAVE_END 狀態的 slaveX。
將 slaveX 輸出緩存內容 copy 一份給當前的 client,而後調用函數 replicationSetupSlaveForFullResync,將 client 狀態設置爲 SLAVE_STATE_WAIT_BGSAVE_END,併發送 +FULLRESYNC 回覆,代碼以下,

int replicationSetupSlaveForFullResync(client *slave, long long offset) {
    char buf[128];
    int buflen;

    slave->psync_initial_offset = offset;
    slave->replstate = SLAVE_STATE_WAIT_BGSAVE_END;

    /* We are going to accumulate the incremental changes for this
     * slave as well. Set slaveseldb to -1 in order to force to re-emit
     * a SELECT statement in the replication stream. */
    server.slaveseldb = -1;

    /* Don't send this reply to slaves that approached us with
     * the old SYNC command. */
    if (!(slave->flags & CLIENT_PRE_PSYNC)) {
        buflen = snprintf(buf,sizeof(buf),"+FULLRESYNC %s %lld\r\n",
                          server.runid,offset);
        if (write(slave->fd,buf,buflen) != buflen) {
            return C_ERR;
        }
    }
    return C_OK;
}

這個函數主要作了如下 4 件事:

  • 設置 slave 的 psync_initial_offset 屬性,方便後面再進來的 slave,能夠最大限度的複用。
  • 設置 slave 的當前狀態爲 WAIT_BGSAVE_END,代表 slave 能夠從這個點來累積前臺發過來的命令流,並等待 rdb 轉儲完成。
  • 設置 slave 的 slaveseldb 屬性爲 -1,這樣能夠在開始累積命令流時,強制增長一條 SELECT 命令到客戶端輸出緩存中,以避免第一條命令沒有選擇數據庫。
  • 給 slave 一個 +FULLRESYNC 的回覆。

該函數應當在如下 2 個時刻當即被調用:

  • 由複製而發起的一次成功的 bgsave 以後;
  • 找到了一個能夠複用的 slave 以後。

若是找不到一個能夠複用的 slave,那麼 master 須要在當前的 bgsave 操做完成以後,再執行一次。

【2】若是後臺有 rdb 任務在執行,而且使用的是無硬盤複製的方式。

此時,當前 slave 沒法重用 rdb 數據,必須在當前的 bgsave 操做完成以後,再執行一次。代碼以下,

/* CASE 2: BGSAVE is in progress, with socket target. */
else if (server.rdb_child_pid != -1 &&
            server.rdb_child_type == RDB_CHILD_TYPE_SOCKET)
{
    /* There is an RDB child process but it is writing directly to
     * children sockets. We need to wait for the next BGSAVE
     * in order to synchronize. */
    serverLog(LL_NOTICE,"Current BGSAVE has socket target. Waiting for next BGSAVE for SYNC");
}

【3】若是後臺沒有 rdb 任務在執行。

若當前 slave 使用的是無磁盤化複製,那麼暫時先不進行 bgsave,把它推遲到 replicationCron 函數,這是爲了等待更多的 slave,以減小執行 bgsave 的次數,由於使用 diskless 的方式進行主從複製,後來的 slave 不能 attach 到已有 slave 上,只能從新作 bgsave。

若當前 slave 使用的是有磁盤化複製,調用 startBgsaveForReplication 函數開始一次新的 bgsave,須要注意的是這裏要避開後臺的 aofrewite。代碼以下,

/* CASE 3: There is no BGSAVE is progress. */
else {
    if (server.repl_diskless_sync && (c->slave_capa & SLAVE_CAPA_EOF)) {

        /* Diskless replication RDB child is created inside
         * replicationCron() since we want to delay its start a
         * few seconds to wait for more slaves to arrive. */
        if (server.repl_diskless_sync_delay)
            serverLog(LL_NOTICE,"Delay next BGSAVE for diskless SYNC");
    } else {

        /* Target is disk (or the slave is not capable of supporting
         * diskless replication) and we don't have a BGSAVE in progress,
         * let's start one. */
        if (server.aof_child_pid == -1) {
            startBgsaveForReplication(c->slave_capa); // 直接進行 bgsave
        } else {
            serverLog(LL_NOTICE,
                "No BGSAVE in progress, but an AOF rewrite is active. "
                "BGSAVE for replication delayed");
        }
    }
}

最後,若是有必要的話,建立 backlog。

if (listLength(server.slaves) == 1 && server.repl_backlog == NULL)
    createReplicationBacklog();
void createReplicationBacklog(void) {
    serverAssert(server.repl_backlog == NULL);
    server.repl_backlog = zmalloc(server.repl_backlog_size);
    server.repl_backlog_histlen = 0;
    server.repl_backlog_idx = 0;

    // 避免以前使用過 backlog 的 slave 引起錯誤的 PSYNC 操做
    server.master_repl_offset++;

    // 儘管沒有數據,但事實上,第一個字節的邏輯位置是 master_repl_offset 的下一個字節
    server.repl_backlog_off = server.master_repl_offset+1;
}

執行 bgsave 操做

接上一小節,bgsave 操做的處理函數爲 startBgsaveForReplication
首先根據傳入的參數,針對有無磁盤化複製調用不一樣的處理函數,即,

int retval;
int socket_target = server.repl_diskless_sync && (mincapa & SLAVE_CAPA_EOF);
listIter li;
listNode *ln;

serverLog(LL_NOTICE,"Starting BGSAVE for SYNC with target: %s",
    socket_target ? "slaves sockets" : "disk");

if (socket_target)
    retval = rdbSaveToSlavesSockets();
else
    retval = rdbSaveBackground(server.rdb_filename);

參數 mincapa,表示 slave 的"能力",便是否能接受無硬盤複製的 rdb 數據。
若是選項 server.repl_diskless_sync 爲真,且 mincapa 中包含 SLAVE_CAPA_EOF,說明能夠爲該 slave 直接發送無硬盤複製的 rdb 數據,調用 rdbSaveToSlavesSockets 函數,在後臺將 rdb 數據經過 socket 發送給全部狀態爲 SLAVE_STATE_WAIT_BGSAVE_START 的 slave。
不然,調用rdbSaveBackground 函數,在後臺將 rdb 數據轉儲到本地文件。

若是以上的 rdb 處理函數調用失敗,從 slave 列表中刪除處於 SLAVE_STATE_WAIT_BGSAVE_START 狀態的 slave,並在 slave 中加入 CLIENT_CLOSE_AFTER_REPLY 標識,以便在回覆錯誤消息後關閉鏈接。代碼邏輯以下,

if (retval == C_ERR) {
    serverLog(LL_WARNING,"BGSAVE for replication failed");
    listRewind(server.slaves,&li);
    while((ln = listNext(&li))) {
        client *slave = ln->value;

        if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) {
            slave->flags &= ~CLIENT_SLAVE;
            listDelNode(server.slaves,ln);
            addReplyError(slave,
                "BGSAVE failed, replication can't continue");
            slave->flags |= CLIENT_CLOSE_AFTER_REPLY;
        }
    }
    return retval;
}

若是使用的是有磁盤複製,那麼從 slave 列表中找處處於 SLAVE_STATE_WAIT_BGSAVE_START 狀態的 slave,調用 replicationSetupSlaveForFullResync 函數,把 slave 狀態置爲 SLAVE_STATE_WAIT_BGSAVE_END,並回復 +FULLRESYNC,這個前面說過。代碼以下,

/* If the target is socket, rdbSaveToSlavesSockets() already setup
 * the salves for a full resync. Otherwise for disk target do it now.*/
if (!socket_target) {
    listRewind(server.slaves,&li);
    while((ln = listNext(&li))) {
        client *slave = ln->value;

        if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) {
                replicationSetupSlaveForFullResync(slave,
                        getPsyncInitialOffset());
        }
    }
}

最後調用函數 replicationScriptCacheFlush 清空 lua 腳本緩存。

累積命令流過程

當 master 收到 client 發來的命令後,會調用 call 函數執行相應的命令處理函數。在代碼中 PROPAGATE_REPL 標識表示須要將命令同步給 slave,有以下邏輯,

void call(client *c, int flags) {
    ......
   /* Propagate the command into the AOF and replication link */
    if (flags & CMD_CALL_PROPAGATE &&
        (c->flags & CLIENT_PREVENT_PROP) != CLIENT_PREVENT_PROP)
    {
        ......
        /* Check if the command operated changes in the data set. If so
         * set for replication / AOF propagation. */
        if (dirty) propagate_flags |= (PROPAGATE_AOF|PROPAGATE_REPL);

        ......

        /* If the client forced AOF / replication of the command, set
         * the flags regardless of the command effects on the data set. */
        if (c->flags & CLIENT_FORCE_REPL) propagate_flags |= PROPAGATE_REPL;

        ......

        /* Call propagate() only if at least one of AOF / replication
         * propagation is needed. */
        if (propagate_flags != PROPAGATE_NONE)
            propagate(c->cmd,c->db->id,c->argv,c->argc,propagate_flags);
    }
}


void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc,
               int flags)
{
    if (server.aof_state != AOF_OFF && flags & PROPAGATE_AOF)
        feedAppendOnlyFile(cmd,dbid,argv,argc);
    if (flags & PROPAGATE_REPL)
        replicationFeedSlaves(server.slaves,dbid,argv,argc);
}

如今來看重點處理函數 replicationFeedSlaves,如今分析以下。

首先,必要的 check。

// 若是 backlog 爲空,且本節點沒有 slave,那麼下面的邏輯就不必走了
if (server.repl_backlog == NULL && listLength(slaves) == 0) return;

若是有必要的話,將 SELECT 命令添加到 backlog 和全部狀態不是 SLAVE_STATE_WAIT_BGSAVE_START 的 slave 輸出緩存中,其餘命令也是如此,代碼大概以下,

/* Write the command to the replication backlog if any. */
if (server.repl_backlog) {
    char aux[LONG_STR_SIZE+3];

    /* Add the multi bulk reply length. */
    // *..CRLF
    aux[0] = '*';
    len = ll2string(aux+1,sizeof(aux)-1,argc);
    aux[len+1] = '\r';
    aux[len+2] = '\n';
    feedReplicationBacklog(aux,len+3);// argc 轉換成字符串的長度 + 3,即 * 以及 CRLF

    for (j = 0; j < argc; j++) {
        long objlen = stringObjectLen(argv[j]);

        /* We need to feed the buffer with the object as a bulk reply
         * not just as a plain string, so create the $..CRLF payload len
         * and add the final CRLF */
        aux[0] = '$';
        len = ll2string(aux+1,sizeof(aux)-1,objlen);
        aux[len+1] = '\r';
        aux[len+2] = '\n';
        feedReplicationBacklog(aux,len+3);
        feedReplicationBacklogWithObject(argv[j]);
        feedReplicationBacklog(aux+len+1,2); // CRLF
    }
}

/* Write the command to every slave. */
listRewind(server.slaves,&li);
while((ln = listNext(&li))) {
    client *slave = ln->value;

    /* Don't feed slaves that are still waiting for BGSAVE to start */
    if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue;

    /* Feed slaves that are waiting for the initial SYNC (so these commands
        * are queued in the output buffer until the initial SYNC completes),
        * or are already in sync with the master. */

    /* Add the multi bulk length. */
    addReplyMultiBulkLen(slave,argc);

    /* Finally any additional argument that was not stored inside the
        * static buffer if any (from j to argc). */
    for (j = 0; j < argc; j++)
        addReplyBulk(slave,argv[j]);
}

向 slave 輸出緩存追加命令流時,調用的是 addReply 類的函數。

bgsave 收尾階段

當完成 bgsave 後,不管是有無磁盤複製,都要調用 updateSlavesWaitingBgsave 函數進行最後的處理,主要是爲了前面說過的被推遲的 bgsave

void updateSlavesWaitingBgsave(int bgsaveerr, int type) {.....}

遍歷 slave 列表,若是 slave 的複製狀態處於 SLAVE_STATE_WAIT_BGSAVE_START,那麼調用 startBgsaveForReplication 函數,開始一次新的 bgsave。

if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) {
    startbgsave = 1;
    mincapa = (mincapa == -1) ? slave->slave_capa :
                                (mincapa & slave->slave_capa);

若是 slave 的複製狀態處於 SLAVE_STATE_WAIT_BGSAVE_END,說明該 slave 正在等待 rdb 數據處理完成,此時須要根據有無磁盤化複製,區別對待處理。

無磁盤複製

if (type == RDB_CHILD_TYPE_SOCKET) {
    serverLog(LL_NOTICE,
        "Streamed RDB transfer with slave %s succeeded (socket). Waiting for REPLCONF ACK from slave to enable streaming",
            replicationGetSlaveName(slave));

    /* Note: we wait for a REPLCONF ACK message from slave in
     * order to really put it online (install the write handler
     * so that the accumulated data can be transfered). However
     * we change the replication state ASAP, since our slave
     * is technically online now. */
    slave->replstate = SLAVE_STATE_ONLINE;
    slave->repl_put_online_on_ack = 1;
    slave->repl_ack_time = server.unixtime; /* Timeout otherwise. */
}

將 slave 的複製狀態置爲 SLAVE_STATE_ONLINE,屬性 repl_put_online_on_ack 置爲 1。
注意,在收到該 slave 第一個 replconf ack <offset> 命令以後,master 才真正調用 putSlaveOnline 函數將該 slave置爲 REDIS_REPL_ONLINE 狀態,而且開始發送緩存的命令流。

void replconfCommand(client *c) {
    .....
    else if (!strcasecmp(c->argv[j]->ptr,"ack")) {

        /* REPLCONF ACK is used by slave to inform the master the amount
         * of replication stream that it processed so far. It is an
         * internal only command that normal clients should never use. */
        long long offset;

        if (!(c->flags & CLIENT_SLAVE)) return;
        if ((getLongLongFromObject(c->argv[j+1], &offset) != C_OK))
            return;
        if (offset > c->repl_ack_off)
            c->repl_ack_off = offset;
        c->repl_ack_time = server.unixtime;

        /* If this was a diskless replication, we need to really put
         * the slave online when the first ACK is received (which
         * confirms slave is online and ready to get more data). */
        if (c->repl_put_online_on_ack && c->replstate == SLAVE_STATE_ONLINE)
            putSlaveOnline(c);

        /* Note: this command does not reply anything! */
        return;
    }
    ......
}

之因此這樣設計,與這兩種複製方式有關。

當使用有磁盤複製方式時,master 會先把 rdb 數據的長度以 $<len>/r/n 的格式發送給 slave,slave 在解析到 len 後,從 socket 中讀取到特定長度的 rdb 數據。
當使用無磁盤複製方式時,master 預先沒法獲知 rdb 數據的長度,那 slave 如何判斷 rdb 數據是否讀完了呢?在發送 rdb 數據以前,master 會先以 $EOF:<40 bytes delimiter> 的格式發送一個 40 字節的魔數,當 rdb 數據發送完後,再次發送這個魔數,這樣 slave 就能夠檢測到 rdb 數據發送結束了。

若是 master 發送完 rdb 數據後,直接將 slave 狀態置爲 SLAVE_STATE_ONLINE ,接着發送緩存的命令流。
當採用無磁盤複製方式時,slave 最後讀到的數據頗有可能包含了命令流數據。所以,須要等到 slave 發送的第一個 replconf ack <offset> 命令以後,master 再把 slave 狀態置爲 SLAVE_STATE_ONLINE

有磁盤複製

if (bgsaveerr != C_OK) {
    freeClient(slave);
        serverLog(LL_WARNING,"SYNC failed. BGSAVE child returned an error");
        continue;
}

if ((slave->repldbfd = open(server.rdb_filename,O_RDONLY)) == -1 ||
    redis_fstat(slave->repldbfd,&buf) == -1) {
    freeClient(slave);
    serverLog(LL_WARNING,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno));
    continue;
}

slave->repldboff = 0;
slave->repldbsize = buf.st_size;
slave->replstate = SLAVE_STATE_SEND_BULK;
slave->replpreamble = sdscatprintf(sdsempty(),"$%lld\r\n",
    (unsigned long long) slave->repldbsize);

aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave) == AE_ERR) {
    freeClient(slave);
    continue;
}

若是前面作 bgsave 出錯了,那麼這裏會釋放掉 client。
不然,打開生成的 rdb 文件,將 fd 保存到 repldbfd 屬性中,狀態置爲 SLAVE_STATE_SEND_BULK,這表示要把 rdb 數據發送給 slave 了,將 rdb 大小寫入 replpreamble 屬性。
從新註冊 slave 上的寫事件,回調函數爲 sendBulkToSlave,該函數作如下分析,

/* Before sending the RDB file, we send the preamble as configured by the
 * replication process. Currently the preamble is just the bulk count of
 * the file in the form "$<length>\r\n". */
if (slave->replpreamble) {
    nwritten = write(fd,slave->replpreamble,sdslen(slave->replpreamble));
    if (nwritten == -1) {
        serverLog(LL_VERBOSE,"Write error sending RDB preamble to slave: %s",
            strerror(errno));
        freeClient(slave);
        return;
    }
    server.stat_net_output_bytes += nwritten;
    sdsrange(slave->replpreamble,nwritten,-1);
    if (sdslen(slave->replpreamble) == 0) {
        sdsfree(slave->replpreamble);
        slave->replpreamble = NULL;
        /* fall through sending data. */
    } else {
        return;
    }
}

若是 replpreamble 屬性不爲空,說明是第一次觸發該回調,那麼先把這個 rdb 數據的長度信息發送給 slave。
不然,進入發送實際 rdb 數據階段。從 rdb 文件中讀取數據,而後發送給 slave,代碼中使用 repldboff 屬性記錄累積發送過多少數據。
默認一次發送的數據量爲 PROTO_IOBUF_LEN,大小爲 16K。

/* If the preamble was already transfered, send the RDB bulk data. */
lseek(slave->repldbfd,slave->repldboff,SEEK_SET);
buflen = read(slave->repldbfd,buf,PROTO_IOBUF_LEN); // 讀 16k 數據
if (buflen <= 0) {
    serverLog(LL_WARNING,"Read error sending DB to slave: %s",
        (buflen == 0) ? "premature EOF" : strerror(errno));
    freeClient(slave);
    return;
}
if ((nwritten = write(fd,buf,buflen)) == -1) {
    if (errno != EAGAIN) {
        serverLog(LL_WARNING,"Write error sending DB to slave: %s",
            strerror(errno));
        freeClient(slave);
    }
    return;
}
slave->repldboff += nwritten;
server.stat_net_output_bytes += nwritten;

當 rdb 數據徹底發送完之後,關閉 rdb 文件 fd,刪除 fd 的寫事件,重置 repldbfd。

if (slave->repldboff == slave->repldbsize) { // 發送完 rdb 文件,刪除可讀事件
    close(slave->repldbfd);
    slave->repldbfd = -1;
    aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
    putSlaveOnline(slave);
}

最後調用 putSlaveOnline 函數,將 slave 的複製狀態置爲 SLAVE_STATE_ONLINE,從新註冊 fd 的寫事件,回調函數爲 sendReplyToClient,向 slave 發送累積的命令流。

void putSlaveOnline(client *slave) {
    slave->replstate = SLAVE_STATE_ONLINE;
    slave->repl_put_online_on_ack = 0;
    slave->repl_ack_time = server.unixtime; /* Prevent false timeout. */
    if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE,
        sendReplyToClient, slave) == AE_ERR) {
        serverLog(LL_WARNING,"Unable to register writable event for slave bulk transfer: %s", strerror(errno));
        freeClient(slave);
        return;
    }
    refreshGoodSlavesCount();
    serverLog(LL_NOTICE,"Synchronization with slave %s succeeded",
        replicationGetSlaveName(slave));
}

設置 slave 屬性 repl_put_online_on_ack 爲 0,表示該 slave 已完成初始同步,接下來進入命令傳播階段
最後,調用 refreshGoodSlavesCount 函數,更新當前狀態正常的 slave 數量。


到此,主從複製過程當中 master 的邏輯就已經講完了。

相關文章
相關標籤/搜索