Redis 源碼分析之主從複製(1)

在分佈式系統中,爲了解決單點問題,一般會把數據集複製多個副本部署到其餘機器,以知足故障恢復和負載均衡等需求。redis 爲咱們提供的複製功能,實現了相同數據的多個副本,這也是其實現 HA 的基礎。 redis

參與 redis 複製功能的節點被分紅兩個角色,主節點(master)和從節點(slave),複製的數據流是單向的,即 master → slave
默認狀況下,每一個 redis 實例都是 master,mater 與 slave 的關係爲 1:n(也能夠沒有 slave),但一個 slave 只能有一個 master。數據庫

redis 的複製功能涉及同步(sync)和命令傳播(command propagate)兩個階段。
同步階段用於將 slave 的數據庫狀態更新至 master 當前所處的數據庫狀態,即追數據階段;
命令傳播階段則用於當 master 數據庫狀態改變,致使主從節點數據庫狀態不一致時,使之從新回到一致狀態。服務器

同步階段

2.8 版本之前,slave 對 master 的同步,是經過 slave 向 master 發送 SYNC 命令完成的。
1)slave 向 master 發送 SYNC
2)master 收到 SYNC 後,執行 BGSAVE 命令,生成包含當前數據庫狀態的 RDB 文件,同時自身使用一個 buffer 記錄從如今開始執行的全部改變其數據庫狀態的命令,RDB 生成完畢後將其發送給 slave;
3)slave 收到 RDB 文件後,載入數據,將本身的數據庫狀態更新至 master 執行 BGSAYE 時的狀態;
4)master 將 buffer 累積的命令發給 slave;
5)slave 解析 master 發來的命令並執行,將數據追至與 master 當前所處的狀態一致。網絡

若是以上任一一步由於網絡或者其餘緣由而中斷,當 slave 再次連上 master 時,master 仍然須要從新作一個 BGSAVE,而這個命令是經過 fork 子進程來作的,頻繁執行會影響性能,且複製效率低下。負載均衡

爲解決以上問題,redis 從 2.8 版本開始,引入新的同步命令 PSYNC 以支持斷點續傳。
要支持斷點續傳,就須要記錄上次同步的位置,藉助瞭如下三個變量:分佈式

1)master/slave 的複製偏移量(replication offset);
2)master 的複製積壓緩衝區(replication backlog);
3)服務器的運行 ID(run ID)。函數

具體細節能夠參考《redis 設計與實現》這本書的第 15 章。性能

命令傳播階段

redisServer 結構體中有一個 dirty 變量記錄了自上一次成功執行 save 或者 bgsave 以後,數據庫狀態改變的次數。經過比較執行命令先後 的 dirty 值,就能夠知道當前命令執行後數據庫狀態是否發生了改變,只有改變了才須要作 command propagate設計

void call(client *c, int flags) {
        ...
    dirty = server.dirty;
    start = ustime();
    c->cmd->proc(c);
    duration = ustime()-start;
    dirty = server.dirty-dirty;
    if (dirty < 0) dirty = 0;
    ...
    if (dirty) propagate_flags |= (PROPAGATE_AOF|PROPAGATE_REPL);
    ...
    if (propagate_flags != PROPAGATE_NONE)
          propagate(c->cmd,c->db->id,c->argv,c->argc,propagate_flags);
    ...
}

void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc,
               int flags)
{
    if (server.aof_state != AOF_OFF && flags & PROPAGATE_AOF)
        feedAppendOnlyFile(cmd,dbid,argv,argc);
    if (flags & PROPAGATE_REPL)
        replicationFeedSlaves(server.slaves,dbid,argv,argc);
}

對於主從複製的命令傳播,在 replicationFeedSlaves 函數中實現。code

void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
    listNode *ln;
    listIter li;
    int j, len;
    char llstr[LONG_STR_SIZE];

    // 若是 backlog buffer 爲空,且沒有 slave,直接返回
    if (server.repl_backlog == NULL && listLength(slaves) == 0) return;
    serverAssert(!(listLength(slaves) != 0 && server.repl_backlog == NULL));

    // 若是 dictid 與上一次 repl 選擇的不一致,須要插入一條 select 命令
    if (server.slaveseldb != dictid) {
        robj *selectcmd;
        ......
    }
    server.slaveseldb = dictid;

    // 將命令以 redis 協議的格式寫入 replication backlog
    if (server.repl_backlog) {
        char aux[LONG_STR_SIZE+3];

        /* Add the multi bulk reply length. */
        aux[0] = '*';
        len = ll2string(aux+1,sizeof(aux)-1,argc);
        aux[len+1] = '\r';
        aux[len+2] = '\n';
        feedReplicationBacklog(aux,len+3);
   
        for (j = 0; j < argc; j++) {
            // $..CRLF
            long objlen = stringObjectLen(argv[j]);
            aux[0] = '$';
            len = ll2string(aux+1,sizeof(aux)-1,objlen);
            aux[len+1] = '\r';
            aux[len+2] = '\n';
            feedReplicationBacklog(aux,len+3);
            feedReplicationBacklogWithObject(argv[j]);
            feedReplicationBacklog(aux+len+1,2); // CRLF
        }
    }

    /* 將命令發送給全部的 slave. */
    listRewind(server.slaves,&li);
    while((ln = listNext(&li))) {
        client *slave = ln->value;
      
        /* Don't feed slaves that are still waiting for BGSAVE to start */
        if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue;
        // 以 redis 協議的格式發送給 slave
        addReplyMultiBulkLen(slave,argc);
        for (j = 0; j < argc; j++)
            addReplyBulk(slave,argv[j]);
    }
}

以上即是主從同步的兩個階段,更多相關代碼詳解請看後面的博客分析。

相關文章
相關標籤/搜索