在分佈式系統中,爲了解決單點問題,一般會把數據集複製多個副本部署到其餘機器,以知足故障恢復和負載均衡等需求。redis 爲咱們提供的複製功能,實現了相同數據的多個副本,這也是其實現 HA 的基礎。 redis
參與 redis 複製功能的節點被分紅兩個角色,主節點(master)和從節點(slave),複製的數據流是單向的,即 master → slave。
默認狀況下,每一個 redis 實例都是 master,mater 與 slave 的關係爲 1:n(也能夠沒有 slave),但一個 slave 只能有一個 master。數據庫
redis 的複製功能涉及同步(sync)和命令傳播(command propagate)兩個階段。
同步階段用於將 slave 的數據庫狀態更新至 master 當前所處的數據庫狀態,即追數據階段;
命令傳播階段則用於當 master 數據庫狀態改變,致使主從節點數據庫狀態不一致時,使之從新回到一致狀態。服務器
在 2.8 版本之前,slave 對 master 的同步,是經過 slave 向 master 發送 SYNC 命令完成的。
1)slave 向 master 發送 SYNC;
2)master 收到 SYNC 後,執行 BGSAVE 命令,生成包含當前數據庫狀態的 RDB 文件,同時自身使用一個 buffer 記錄從如今開始執行的全部改變其數據庫狀態的命令,RDB 生成完畢後將其發送給 slave;
3)slave 收到 RDB 文件後,載入數據,將本身的數據庫狀態更新至 master 執行 BGSAYE 時的狀態;
4)master 將 buffer 累積的命令發給 slave;
5)slave 解析 master 發來的命令並執行,將數據追至與 master 當前所處的狀態一致。網絡
若是以上任一一步由於網絡或者其餘緣由而中斷,當 slave 再次連上 master 時,master 仍然須要從新作一個 BGSAVE,而這個命令是經過 fork
子進程來作的,頻繁執行會影響性能,且複製效率低下。負載均衡
爲解決以上問題,redis 從 2.8 版本開始,引入新的同步命令 PSYNC 以支持斷點續傳。
要支持斷點續傳,就須要記錄上次同步的位置,藉助瞭如下三個變量:分佈式
1)master/slave 的複製偏移量(replication offset);
2)master 的複製積壓緩衝區(replication backlog);
3)服務器的運行 ID(run ID)。函數
具體細節能夠參考《redis 設計與實現》這本書的第 15 章。性能
在 redisServer
結構體中有一個 dirty
變量記錄了自上一次成功執行 save 或者 bgsave 以後,數據庫狀態改變的次數。經過比較執行命令先後 的 dirty
值,就能夠知道當前命令執行後數據庫狀態是否發生了改變,只有改變了才須要作 command propagate。設計
void call(client *c, int flags) { ... dirty = server.dirty; start = ustime(); c->cmd->proc(c); duration = ustime()-start; dirty = server.dirty-dirty; if (dirty < 0) dirty = 0; ... if (dirty) propagate_flags |= (PROPAGATE_AOF|PROPAGATE_REPL); ... if (propagate_flags != PROPAGATE_NONE) propagate(c->cmd,c->db->id,c->argv,c->argc,propagate_flags); ... } void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc, int flags) { if (server.aof_state != AOF_OFF && flags & PROPAGATE_AOF) feedAppendOnlyFile(cmd,dbid,argv,argc); if (flags & PROPAGATE_REPL) replicationFeedSlaves(server.slaves,dbid,argv,argc); }
對於主從複製的命令傳播,在 replicationFeedSlaves
函數中實現。code
void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) { listNode *ln; listIter li; int j, len; char llstr[LONG_STR_SIZE]; // 若是 backlog buffer 爲空,且沒有 slave,直接返回 if (server.repl_backlog == NULL && listLength(slaves) == 0) return; serverAssert(!(listLength(slaves) != 0 && server.repl_backlog == NULL)); // 若是 dictid 與上一次 repl 選擇的不一致,須要插入一條 select 命令 if (server.slaveseldb != dictid) { robj *selectcmd; ...... } server.slaveseldb = dictid; // 將命令以 redis 協議的格式寫入 replication backlog if (server.repl_backlog) { char aux[LONG_STR_SIZE+3]; /* Add the multi bulk reply length. */ aux[0] = '*'; len = ll2string(aux+1,sizeof(aux)-1,argc); aux[len+1] = '\r'; aux[len+2] = '\n'; feedReplicationBacklog(aux,len+3); for (j = 0; j < argc; j++) { // $..CRLF long objlen = stringObjectLen(argv[j]); aux[0] = '$'; len = ll2string(aux+1,sizeof(aux)-1,objlen); aux[len+1] = '\r'; aux[len+2] = '\n'; feedReplicationBacklog(aux,len+3); feedReplicationBacklogWithObject(argv[j]); feedReplicationBacklog(aux+len+1,2); // CRLF } } /* 將命令發送給全部的 slave. */ listRewind(server.slaves,&li); while((ln = listNext(&li))) { client *slave = ln->value; /* Don't feed slaves that are still waiting for BGSAVE to start */ if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue; // 以 redis 協議的格式發送給 slave addReplyMultiBulkLen(slave,argc); for (j = 0; j < argc; j++) addReplyBulk(slave,argv[j]); } }
以上即是主從同步的兩個階段,更多相關代碼詳解請看後面的博客分析。