redis 支持 master-slave(主從)模式,redis server 能夠設置爲另外一個 redis server 的主機(從機),從機按期從主機拿數據。特殊的,一個 從機一樣能夠設置爲一個 redis server 的主機,這樣一來 master-slave 的分佈看起來就是一個有向無環圖 DAG,如此造成 redis server 集羣,不管是主機仍是從機都是 redis server,均可以提供服務)。node
在配置後,主機可負責讀寫服務,從機只負責讀。redis 提升這種配置方式,爲的是讓其支持數據的弱一致性,即最終一致性。在業務中,選擇強一致性仍是若已執行,應該取決於具體的業務需求,像微博,徹底可使用弱一致性模型;像淘寶,能夠選用強一致性模型。redis
redis 主從複製的實現主要在 replication.c 中。緩存
這篇文章涉及較多的代碼,但我已經儘可能刪繁就簡,達到能說明問題本質。爲了保留代碼的原生性並讓讀者可以閱讀原生代碼的註釋,剖析 redis 的幾篇文章都沒有刪除代碼中的英文註釋,並已加註釋。安全
在《深刻剖析 redis AOF 持久化策略》中,介紹了更新緩存的概念,舉一個例子:客戶端發來命令:set name Jhon,這一數據更新被記錄爲:*3\r\n$3\r\nSET\r\n$4\r\nname\r\n$3\r\nJhon\r\n,並存儲在更新緩存中。網絡
一樣,在主從鏈接中,也有更新緩存的概念。只是二者的用途不同,前者被寫入本地,後者被寫入從機,這裏咱們把它成爲積壓空間。數據結構
更新緩存存儲在 server.repl_backlog,redis 將其做爲一個環形空間來處理,這樣作節省了空間,避免內存再分配的狀況。app
struct redisServer { /* Replication (master) */ // 最近一次使用(訪問)的數據集 int slaveseldb; /* Last SELECTed DB in replication output */ // 全局的數據同步偏移量 long long master_repl_offset; /* Global replication offset */ // 主從鏈接心跳頻率 int repl_ping_slave_period; /* Master pings the slave every N seconds */ // 積壓空間指針 char *repl_backlog; /* Replication backlog for partial syncs */ // 積壓空間大小 long long repl_backlog_size; /* Backlog circular buffer size */ // 積壓空間中寫入的新數據的大小 long long repl_backlog_histlen; /* Backlog actual data length */ // 下一次向積壓空間寫入數據的起始位置 long long repl_backlog_idx; /* Backlog circular buffer current offset */ // 積壓數據的起始位置,是一個宏觀值 long long repl_backlog_off; /* Replication offset of first byte in the backlog buffer. */ // 積壓空間有效時間 time_t repl_backlog_time_limit; /* Time without slaves after the backlog gets released. */ }
積壓空間中的數據變動記錄是何時被寫入的?在執行一個 redis 命令的時候,若是存在數據的修改(寫),那麼就會把變動記錄傳播。redis 源碼中是這麼實現的:call()->propagate()->replicationFeedSlaves()socket
註釋:命令真正執行的地方在 call() 中,call() 若是發現數據被修改(dirty),則傳播 propagrate(),replicationFeedSlaves() 將修改記錄寫入積壓空間和全部已鏈接的從機。tcp
這裏可能會有疑問:爲何把數據添加入積壓空間,又把數據分發給全部的從機?爲何不只僅將數據分發給全部從機呢?ide
由於有一些從機會因特殊狀況(???)與主機斷開鏈接,注意從機斷開前有暫存主機的狀態信息,所以這些斷開的從機就沒有及時收到更新的數據。redis 爲了讓斷開的從機在下次鏈接後可以獲取更新數據,將更新數據加入了積壓空間。從 replicationFeedSlaves() 實現來看,在線的 slave 能立刻收到數據更新記錄;因某些緣由暫時斷開鏈接的 slave,須要從積壓空間中找回斷開期間的數據更新記錄。若是斷開的時間足夠長,master 會拒絕 slave 的部分同步請求,從而 slave 只能進行全同步。
下面是源碼註釋:
// call() 函數是執行命令的核心函數,真正執行命令的地方 /* Call() is the core of Redis execution of a command */ void call(redisClient *c, int flags) { ...... /* Call the command. */ c->flags &= ~(REDIS_FORCE_AOF|REDIS_FORCE_REPL); redisOpArrayInit(&server.also_propagate); // 髒數據標記,數據是否被修改 dirty = server.dirty; // 執行命令對應的函數 c->cmd->proc(c); dirty = server.dirty-dirty; duration = ustime()-start; ...... // 將客戶端請求的數據修改記錄傳播給 AOF 和從機 /* Propagate the command into the AOF and replication link */ if (flags & REDIS_CALL_PROPAGATE) { int flags = REDIS_PROPAGATE_NONE; // 強制主從複製 if (c->flags & REDIS_FORCE_REPL) flags |= REDIS_PROPAGATE_REPL; // 強制 AOF 持久化 if (c->flags & REDIS_FORCE_AOF) flags |= REDIS_PROPAGATE_AOF; // 數據被修改 if (dirty) flags |= (REDIS_PROPAGATE_REPL | REDIS_PROPAGATE_AOF); // 傳播數據修改記錄 if (flags != REDIS_PROPAGATE_NONE) propagate(c->cmd,c->db->id,c->argv,c->argc,flags); } ...... } // 向 AOF 和從機發布數據更新 /* Propagate the specified command (in the context of the specified database id) * to AOF and Slaves. * * flags are an xor between: * + REDIS_PROPAGATE_NONE (no propagation of command at all) * + REDIS_PROPAGATE_AOF (propagate into the AOF file if is enabled) * + REDIS_PROPAGATE_REPL (propagate into the replication link) */ void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc, int flags) { // AOF 策略須要打開,且設置 AOF 傳播標記,將更新發布給本地文件 if (server.aof_state != REDIS_AOF_OFF && flags & REDIS_PROPAGATE_AOF) feedAppendOnlyFile(cmd,dbid,argv,argc); // 設置了從機傳播標記,將更新發布給從機 if (flags & REDIS_PROPAGATE_REPL) replicationFeedSlaves(server.slaves,dbid,argv,argc); } // 向積壓空間和從機發送數據 void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) { listNode *ln; listIter li; int j, len; char llstr[REDIS_LONGSTR_SIZE]; // 沒有積壓數據且沒有從機,直接退出 /* If there aren't slaves, and there is no backlog buffer to populate, * we can return ASAP. */ if (server.repl_backlog == NULL && listLength(slaves) == 0) return; /* We can't have slaves attached and no backlog. */ redisAssert(!(listLength(slaves) != 0 && server.repl_backlog == NULL)); /* Send SELECT command to every slave if needed. */ if (server.slaveseldb != dictid) { robj *selectcmd; // 小於等於 10 的能夠用共享對象 /* For a few DBs we have pre-computed SELECT command. */ if (dictid >= 0 && dictid < REDIS_SHARED_SELECT_CMDS) { selectcmd = shared.select[dictid]; } else { // 不能使用共享對象,生成 SELECT 命令對應的 redis 對象 int dictid_len; dictid_len = ll2string(llstr,sizeof(llstr),dictid); selectcmd = createObject(REDIS_STRING, sdscatprintf(sdsempty(), "*2\r\n$6\r\nSELECT\r\n$%d\r\n%s\r\n", dictid_len, llstr)); } // 這裏可能會有疑問:爲何把數據添加入積壓空間,又把數據分發給全部的從機? // 爲何不只僅將數據分發給全部從機呢? // 由於有一些從機會因特殊狀況(???)與主機斷開鏈接,注意從機斷開前有暫存 // 主機的狀態信息,所以這些斷開的從機就沒有及時收到更新的數據。redis 爲了讓 // 斷開的從機在下次鏈接後可以獲取更新數據,將更新數據加入了積壓空間。 // 將 SELECT 命令對應的 redis 對象數據添加到積壓空間 /* Add the SELECT command into the backlog. */ if (server.repl_backlog) feedReplicationBacklogWithObject(selectcmd); // 將數據分發全部的從機 /* Send it to slaves. */ listRewind(slaves,&li); while((ln = listNext(&li))) { redisClient *slave = ln->value; addReply(slave,selectcmd); } // 銷燬對象 if (dictid < 0 || dictid >= REDIS_SHARED_SELECT_CMDS) decrRefCount(selectcmd); } // 更新最近一次使用(訪問)的數據集 server.slaveseldb = dictid; // 將命令寫入積壓空間 /* Write the command to the replication backlog if any. */ if (server.repl_backlog) { char aux[REDIS_LONGSTR_SIZE+3]; // 命令個數 /* Add the multi bulk reply length. */ aux[0] = '*'; len = ll2string(aux+1,sizeof(aux)-1,argc); aux[len+1] = '\r'; aux[len+2] = '\n'; feedReplicationBacklog(aux,len+3); // 逐個命令寫入 for (j = 0; j < argc; j++) { long objlen = stringObjectLen(argv[j]); /* We need to feed the buffer with the object as a bulk reply * not just as a plain string, so create the $..CRLF payload len * ad add the final CRLF */ aux[0] = '$'; len = ll2string(aux+1,sizeof(aux)-1,objlen); aux[len+1] = '\r'; aux[len+2] = '\n'; /* 每一個命令格式以下: $3 *3 SET *4 NAME *4 Jhon*/ // 命令長度 feedReplicationBacklog(aux,len+3); // 命令 feedReplicationBacklogWithObject(argv[j]); // 換行 feedReplicationBacklog(aux+len+1,2); } } // 當即給每個從機發送命令 /* Write the command to every slave. */ listRewind(slaves,&li); while((ln = listNext(&li))) { redisClient *slave = ln->value; // 若是從機要求全同步,則不對此從機發送數據 /* Don't feed slaves that are still waiting for BGSAVE to start */ if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) continue; /* Feed slaves that are waiting for the initial SYNC (so these commands * are queued in the output buffer until the initial SYNC completes), * or are already in sync with the master. */ // 向從機命令的長度 /* Add the multi bulk length. */ addReplyMultiBulkLen(slave,argc); // 向從機發送命令 /* Finally any additional argument that was not stored inside the * static buffer if any (from j to argc). */ for (j = 0; j < argc; j++) addReplyBulk(slave,argv[j]); } }
redis 主從同步有兩種方式(或者所兩個階段):全同步和部分同步。
主從剛剛鏈接的時候,進行全同步;全同步結束後,進行部分同步。固然,若是有須要,slave 在任什麼時候候均可以發起全同步。redis 策略是,不管如何,首先會嘗試進行部分同步,如不成功,要求從機進行全同步,並啓動 BGSAVE……BGSAVE 結束後,傳輸 RDB 文件;若是成功,容許從機進行部分同步,並傳輸積壓空間中的數據。
下面這幅圖,總結了主從同步的機制:
如需設置 slave,master 須要向 slave 發送 SLAVEOF hostname port,從機接收到後會自動鏈接主機,註冊相應讀寫事件(syncWithMaster())。
// 修改主機 void slaveofCommand(redisClient *c) { if (!strcasecmp(c->argv[1]->ptr,"no") && !strcasecmp(c->argv[2]->ptr,"one")) { // slaveof no one 斷開主機鏈接 if (server.masterhost) { replicationUnsetMaster(); redisLog(REDIS_NOTICE,"MASTER MODE enabled (user request)"); } } else { long port; if ((getLongFromObjectOrReply(c, c->argv[2], &port, NULL) != REDIS_OK)) return; // 可能已經鏈接須要鏈接的主機 /* Check if we are already attached to the specified slave */ if (server.masterhost && !strcasecmp(server.masterhost,c->argv[1]->ptr) && server.masterport == port) { redisLog(REDIS_NOTICE,"SLAVE OF would result into synchronization with the master we are already connected with. No operation performed."); addReplySds(c,sdsnew("+OK Already connected to specified master\r\n")); return; } // 斷開以前鏈接主機的鏈接,鏈接新的。 replicationSetMaster() 並不會真正鏈接主機,只是修改 struct server 中關於主機的設置。真正的主機鏈接在 replicationCron() 中完成 /* There was no previous master or the user specified a different one, * we can continue. */ replicationSetMaster(c->argv[1]->ptr, port); redisLog(REDIS_NOTICE,"SLAVE OF %s:%d enabled (user request)", server.masterhost, server.masterport); } addReply(c,shared.ok); } // 設置新主機 /* Set replication to the specified master address and port. */ void replicationSetMaster(char *ip, int port) { sdsfree(server.masterhost); server.masterhost = sdsdup(ip); server.masterport = port; // 斷開以前主機的鏈接 if (server.master) freeClient(server.master); disconnectSlaves(); /* Force our slaves to resync with us as well. */ // 取消緩存主機 replicationDiscardCachedMaster(); /* Don't try a PSYNC. */ // 釋放積壓空間 freeReplicationBacklog(); /* Don't allow our chained slaves to PSYNC. */ // cancelReplicationHandshake() 嘗試斷開數據傳輸和主機鏈接 cancelReplicationHandshake(); server.repl_state = REDIS_REPL_CONNECT; server.master_repl_offset = 0; } // 管理主從鏈接的定時程序定時程序,每秒執行一次 // 在 serverCorn() 中調用 /* --------------------------- REPLICATION CRON ----------------------------- */ /* Replication cron funciton, called 1 time per second. */ void replicationCron(void) { ...... // 若是須要( REDIS_REPL_CONNECT),嘗試鏈接主機,真正鏈接主機的操做在這裏 /* Check if we should connect to a MASTER */ if (server.repl_state == REDIS_REPL_CONNECT) { redisLog(REDIS_NOTICE,"Connecting to MASTER %s:%d", server.masterhost, server.masterport); if (connectWithMaster() == REDIS_OK) { redisLog(REDIS_NOTICE,"MASTER <-> SLAVE sync started"); } } ...... }
接着自動發起 PSYNC 請求 master 進行全同步。不管如何,redis 首先會嘗試部分同步,若是失敗才嘗試全同步。而剛剛創建鏈接的 master-slave 須要全同步。
從機鏈接主機後,會主動發起 PSYNC 命令,從機會提供 master_runid 和 offset,主機驗證 master_runid 和 offset 是否有效?master_runid 至關於主機身份驗證碼,用來驗證從機上一次鏈接的主機,offset 是全局積壓空間數據的偏移量。
驗證未經過則,則進行全同步:主機返回 +FULLRESYNC master_runid offset(從機接收並記錄 master_runid 和 offset,並準備接收 RDB 文件)接着啓動 BGSAVE 生成 RDB 文件,BGSAVE 結束後,向從機傳輸,從而完成全同步。
// 鏈接主機 connectWithMaster() 的時候,會被註冊爲回調函數 void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) { char tmpfile[256], *err; int dfd, maxtries = 5; int sockerr = 0, psync_result; socklen_t errlen = sizeof(sockerr); ...... // 這裏嘗試向主機請求部分同步,主機會回覆以拒絕或接受請求。若是拒絕部分同步,會返回 +FULLRESYNC master_runid offset // 從機接收後準備進行全同步 psync_result = slaveTryPartialResynchronization(fd); if (psync_result == PSYNC_CONTINUE) { redisLog(REDIS_NOTICE, "MASTER <-> SLAVE sync: Master accepted a Partial Resynchronization."); return; } // 執行全同步 /* Fall back to SYNC if needed. Otherwise psync_result == PSYNC_FULLRESYNC * and the server.repl_master_runid and repl_master_initial_offset are * already populated. */ // 未知結果,進行出錯處理 if (psync_result == PSYNC_NOT_SUPPORTED) { redisLog(REDIS_NOTICE,"Retrying with SYNC..."); if (syncWrite(fd,"SYNC\r\n",6,server.repl_syncio_timeout*1000) == -1) { redisLog(REDIS_WARNING,"I/O error writing to MASTER: %s", strerror(errno)); goto error; } } // 爲何要嘗試 5次??? /* Prepare a suitable temp file for bulk transfer */ while(maxtries--) { snprintf(tmpfile,256, "temp-%d.%ld.rdb",(int)server.unixtime,(long int)getpid()); dfd = open(tmpfile,O_CREAT|O_WRONLY|O_EXCL,0644); if (dfd != -1) break; sleep(1); } if (dfd == -1) { redisLog(REDIS_WARNING,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno)); goto error; } // 註冊讀事件,回調函數 readSyncBulkPayload(), 準備讀 RDB 文件 /* Setup the non blocking download of the bulk file. */ if (aeCreateFileEvent(server.el,fd, AE_READABLE,readSyncBulkPayload,NULL) == AE_ERR) { redisLog(REDIS_WARNING, "Can't create readable event for SYNC: %s (fd=%d)", strerror(errno),fd); goto error; } // 設置傳輸 RDB 文件數據的選項 // 狀態 server.repl_state = REDIS_REPL_TRANSFER; // RDB 文件大小 server.repl_transfer_size = -1; // 已經傳輸的大小 server.repl_transfer_read = 0; // 上一次同步的偏移,爲的是定時寫入磁盤 server.repl_transfer_last_fsync_off = 0; // 本地 RDB 文件套接字 server.repl_transfer_fd = dfd; // 上一次同步 IO 時間 server.repl_transfer_lastio = server.unixtime; // 臨時文件名 server.repl_transfer_tmpfile = zstrdup(tmpfile); return; error: close(fd); server.repl_transfer_s = -1; server.repl_state = REDIS_REPL_CONNECT; return; }
全同步請求的數據是 RDB 數據文件和積壓空間中的數據。關於 RDB 數據文件,請參看《深刻剖析 redis RDB 持久化策略》。若是沒有後臺持久化 BGSAVE 進程,那麼 BGSVAE 會被觸發,不然全部請求全同步的 slave 都會被標記爲等待 BGSAVE 結束。BGSAVE 結束後,master 會立刻向全部的從機發送 RDB 文件。
// 主機 SYNC 和 PSYNC 命令處理函數,會嘗試進行部分同步和全同步 /* SYNC ad PSYNC command implemenation. */ void syncCommand(redisClient *c) { ...... // 主機嘗試部分同步,失敗的話向從機發送 +FULLRESYNC master_runid offset,接着啓動 BGSAVE // 執行全同步: /* Full resynchronization. */ server.stat_sync_full++; /* Here we need to check if there is a background saving operation * in progress, or if it is required to start one */ if (server.rdb_child_pid != -1) { /* 存在 BGSAVE 後臺進程。 1.若是 master 現有所鏈接的全部從機 slaves 當中有存在 REDIS_REPL_WAIT_BGSAVE_END 的從機,那麼將從機 c 設置爲 REDIS_REPL_WAIT_BGSAVE_END; 2.不然,設置爲 REDIS_REPL_WAIT_BGSAVE_START*/ /* Ok a background save is in progress. Let's check if it is a good * one for replication, i.e. if there is another slave that is * registering differences since the server forked to save */ redisClient *slave; listNode *ln; listIter li; // 檢測是否已經有從機申請全同步 listRewind(server.slaves,&li); while((ln = listNext(&li))) { slave = ln->value; if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) break; } if (ln) { // 存在狀態爲 REDIS_REPL_WAIT_BGSAVE_END 的從機 slave, // 就將此從機 c 狀態設置爲 REDIS_REPL_WAIT_BGSAVE_END, // 從而在 BGSAVE 進程結束後,能夠發送 RDB 文件, // 同時將從機 slave 中的更新複製到此從機 c。 /* Perfect, the server is already registering differences for * another slave. Set the right state, and copy the buffer. */ // 將其餘從機上的待回覆的緩存複製到從機 c copyClientOutputBuffer(c,slave); // 修改從機 c 狀態爲「等待 BGSAVE 進程結束」 c->replstate = REDIS_REPL_WAIT_BGSAVE_END; redisLog(REDIS_NOTICE,"Waiting for end of BGSAVE for SYNC"); } else { // 不存在狀態爲 REDIS_REPL_WAIT_BGSAVE_END 的從機,就將此從機 c 狀態設置爲 REDIS_REPL_WAIT_BGSAVE_START,即等待新的 BGSAVE 進程的開啓。 // 修改狀態爲「等待 BGSAVE 進程開始」 /* No way, we need to wait for the next BGSAVE in order to * register differences */ c->replstate = REDIS_REPL_WAIT_BGSAVE_START; redisLog(REDIS_NOTICE,"Waiting for next BGSAVE for SYNC"); } } else { // 不存在 BGSAVE 後臺進程,啓動一個新的 BGSAVE 進程 /* Ok we don't have a BGSAVE in progress, let's start one */ redisLog(REDIS_NOTICE,"Starting BGSAVE for SYNC"); if (rdbSaveBackground(server.rdb_filename) != REDIS_OK) { redisLog(REDIS_NOTICE,"Replication failed, can't BGSAVE"); addReplyError(c,"Unable to perform background save"); return; } // 將此從機 c 狀態設置爲 REDIS_REPL_WAIT_BGSAVE_END,從而在 BGSAVE 進程結束後,能夠發送 RDB 文件,同時將從機 slave 中的更新複製到此從機 c。 c->replstate = REDIS_REPL_WAIT_BGSAVE_END; // 清理腳本緩存??? /* Flush the script cache for the new slave. */ replicationScriptCacheFlush(); } if (server.repl_disable_tcp_nodelay) anetDisableTcpNoDelay(NULL, c->fd); /* Non critical if it fails. */ c->repldbfd = -1; c->flags |= REDIS_SLAVE; server.slaveseldb = -1; /* Force to re-emit the SELECT command. */ listAddNodeTail(server.slaves,c); if (listLength(server.slaves) == 1 && server.repl_backlog == NULL) createReplicationBacklog(); return; } // BGSAVE 結束後,會調用 /* A background saving child (BGSAVE) terminated its work. Handle this. */ void backgroundSaveDoneHandler(int exitcode, int bysignal) { // 其餘操做 ...... // 可能從機正在等待 BGSAVE 進程的終止 /* Possibly there are slaves waiting for a BGSAVE in order to be served * (the first stage of SYNC is a bulk transfer of dump.rdb) */ updateSlavesWaitingBgsave(exitcode == 0 ? REDIS_OK : REDIS_ERR); } // 當 RDB 持久化(backgroundSaveDoneHandler())結束後,會調用此函數 // RDB 文件就緒,給全部的從機發送 RDB 文件 /* This function is called at the end of every background saving. * The argument bgsaveerr is REDIS_OK if the background saving succeeded * otherwise REDIS_ERR is passed to the function. * * The goal of this function is to handle slaves waiting for a successful * background saving in order to perform non-blocking synchronization. */ void updateSlavesWaitingBgsave(int bgsaveerr) { listNode *ln; int startbgsave = 0; listIter li; listRewind(server.slaves,&li); while((ln = listNext(&li))) { redisClient *slave = ln->value; // 等待 BGSAVE 開始。調整狀態爲等待下一次 BGSAVE 進程的結束 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) { startbgsave = 1; slave->replstate = REDIS_REPL_WAIT_BGSAVE_END; // 等待 BGSAVE 結束。準備向 slave 發送 RDB 文件 } else if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) { struct redis_stat buf; // 若是 RDB 持久化失敗, bgsaveerr 會被設置爲 REDIS_ERR if (bgsaveerr != REDIS_OK) { freeClient(slave); redisLog(REDIS_WARNING,"SYNC failed. BGSAVE child returned an error"); continue; } // 打開 RDB 文件 if ((slave->repldbfd = open(server.rdb_filename,O_RDONLY)) == -1 || redis_fstat(slave->repldbfd,&buf) == -1) { freeClient(slave); redisLog(REDIS_WARNING,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno)); continue; } slave->repldboff = 0; slave->repldbsize = buf.st_size; slave->replstate = REDIS_REPL_SEND_BULK; // 若是以前有註冊寫事件,取消 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE); // 註冊新的寫事件,sendBulkToSlave() 傳輸 RDB 文件 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave) == AE_ERR) { freeClient(slave); continue; } } } // startbgsave == REDIS_ERR 表示 BGSAVE 失敗,再一次進行 BGSAVE 嘗試 if (startbgsave) { /* Since we are starting a new background save for one or more slaves, * we flush the Replication Script Cache to use EVAL to propagate every * new EVALSHA for the first time, since all the new slaves don't know * about previous scripts. */ replicationScriptCacheFlush(); if (rdbSaveBackground(server.rdb_filename) != REDIS_OK) { /*BGSAVE 可能 fork 失敗,全部等待 BGSAVE 的從機都將結束鏈接。這是 redis 自我保護的措施,fork 失敗極可能是內存緊張*/ listIter li; listRewind(server.slaves,&li); redisLog(REDIS_WARNING,"SYNC failed. BGSAVE failed"); while((ln = listNext(&li))) { redisClient *slave = ln->value; if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) freeClient(slave); } } } }
如上所說,不管如何,redis 首先會嘗試部分同步。部分同步即把積壓空間緩存的數據,即更新記錄發送給從機。
從機鏈接主機後,會主動發起 PSYNC 命令,從機會提供 master_runid 和 offset,主機驗證 master_runid 和 offset 是否有效?
驗證經過則,進行部分同步:主機返回 +CONTINUE(從機接收後會註冊積壓數據接收事件),接着發送積壓空間數據。
// 鏈接主機 connectWithMaster() 的時候,會被註冊爲回調函數 void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) { char tmpfile[256], *err; int dfd, maxtries = 5; int sockerr = 0, psync_result; socklen_t errlen = sizeof(sockerr); ...... // 嘗試部分同步,主機容許進行部分同步會返回 +CONTINUE,從機接收後註冊相應的事件 /* Try a partial resynchonization. If we don't have a cached master * slaveTryPartialResynchronization() will at least try to use PSYNC * to start a full resynchronization so that we get the master run id * and the global offset, to try a partial resync at the next * reconnection attempt. */ // 函數返回三種狀態: // PSYNC_CONTINUE:表示會進行部分同步,在 slaveTryPartialResynchronization() // 中已經設置回調函數 readQueryFromClient() // PSYNC_FULLRESYNC:全同步,會下載 RDB 文件 // PSYNC_NOT_SUPPORTED:未知 psync_result = slaveTryPartialResynchronization(fd); if (psync_result == PSYNC_CONTINUE) { redisLog(REDIS_NOTICE, "MASTER <-> SLAVE sync: Master accepted a Partial Resynchronization."); return; } // 執行全同步 ...... } // 函數返回三種狀態: // PSYNC_CONTINUE:表示會進行部分同步,已經設置回調函數 // PSYNC_FULLRESYNC:全同步,會下載 RDB 文件 // PSYNC_NOT_SUPPORTED:未知 #define PSYNC_CONTINUE 0 #define PSYNC_FULLRESYNC 1 #define PSYNC_NOT_SUPPORTED 2 int slaveTryPartialResynchronization(int fd) { char *psync_runid; char psync_offset[32]; sds reply; /* Initially set repl_master_initial_offset to -1 to mark the current * master run_id and offset as not valid. Later if we'll be able to do * a FULL resync using the PSYNC command we'll set the offset at the * right value, so that this information will be propagated to the * client structure representing the master into server.master. */ server.repl_master_initial_offset = -1; if (server.cached_master) { // 緩存了上一次與主機鏈接的信息,能夠嘗試進行部分同步,減小數據傳輸 psync_runid = server.cached_master->replrunid; snprintf(psync_offset,sizeof(psync_offset),"%lld", server.cached_master->reploff+1); redisLog(REDIS_NOTICE,"Trying a partial resynchronization (request %s:%s).", psync_runid, psync_offset); } else { // 未緩存上一次與主機鏈接的信息,進行全同步 // psync ? -1 能夠獲取主機的 master_runid redisLog(REDIS_NOTICE,"Partial resynchronization not possible (no cached master)"); psync_runid = "?"; memcpy(psync_offset,"-1",3); } // 向主機發送命令,並接收回復 /* Issue the PSYNC command */ reply = sendSynchronousCommand(fd,"PSYNC",psync_runid,psync_offset,NULL); // 全同步 if (!strncmp(reply,"+FULLRESYNC",11)) { char *runid = NULL, *offset = NULL; /* FULL RESYNC, parse the reply in order to extract the run id * and the replication offset. */ runid = strchr(reply,' '); if (runid) { runid++; offset = strchr(runid,' '); if (offset) offset++; } if (!runid || !offset || (offset-runid-1) != REDIS_RUN_ID_SIZE) { redisLog(REDIS_WARNING, "Master replied with wrong +FULLRESYNC syntax."); /* This is an unexpected condition, actually the +FULLRESYNC * reply means that the master supports PSYNC, but the reply * format seems wrong. To stay safe we blank the master * runid to make sure next PSYNCs will fail. */ memset(server.repl_master_runid,0,REDIS_RUN_ID_SIZE+1); } else { // 拷貝 runid memcpy(server.repl_master_runid, runid, offset-runid-1); server.repl_master_runid[REDIS_RUN_ID_SIZE] = '\0'; server.repl_master_initial_offset = strtoll(offset,NULL,10); redisLog(REDIS_NOTICE,"Full resync from master: %s:%lld", server.repl_master_runid, server.repl_master_initial_offset); } /* We are going to full resync, discard the cached master structure. */ replicationDiscardCachedMaster(); sdsfree(reply); return PSYNC_FULLRESYNC; } // 部分同步 if (!strncmp(reply,"+CONTINUE",9)) { /* Partial resync was accepted, set the replication state accordingly */ redisLog(REDIS_NOTICE, "Successful partial resynchronization with master."); sdsfree(reply); // 緩存主機替代現有主機,且爲 PSYNC(部分同步) 作好準備c replicationResurrectCachedMaster(fd); return PSYNC_CONTINUE; } /* If we reach this point we receied either an error since the master does * not understand PSYNC, or an unexpected reply from the master. * Reply with PSYNC_NOT_SUPPORTED in both cases. */ // 接收到主機發出的錯誤信息 if (strncmp(reply,"-ERR",4)) { /* If it's not an error, log the unexpected event. */ redisLog(REDIS_WARNING, "Unexpected reply to PSYNC from master: %s", reply); } else { redisLog(REDIS_NOTICE, "Master does not support PSYNC or is in " "error state (reply: %s)", reply); } sdsfree(reply); replicationDiscardCachedMaster(); return PSYNC_NOT_SUPPORTED; } // 主機 SYNC 和 PSYNC 命令處理函數,會嘗試進行部分同步和全同步 /* SYNC ad PSYNC command implemenation. */ void syncCommand(redisClient *c) { ...... // 主機嘗試部分同步,容許則進行部分同步,會返回 +CONTINUE,接着發送積壓空間 /* Try a partial resynchronization if this is a PSYNC command. * If it fails, we continue with usual full resynchronization, however * when this happens masterTryPartialResynchronization() already * replied with: * * +FULLRESYNC <runid> <offset> * * So the slave knows the new runid and offset to try a PSYNC later * if the connection with the master is lost. */ if (!strcasecmp(c->argv[0]->ptr,"psync")) { // 部分同步 if (masterTryPartialResynchronization(c) == REDIS_OK) { server.stat_sync_partial_ok++; return; /* No full resync needed, return. */ } else { // 部分同步失敗,會進行全同步,這時會收到來自客戶端的 runid char *master_runid = c->argv[1]->ptr; /* Increment stats for failed PSYNCs, but only if the * runid is not "?", as this is used by slaves to force a full * resync on purpose when they are not albe to partially * resync. */ if (master_runid[0] != '?') server.stat_sync_partial_err++; } } else { /* If a slave uses SYNC, we are dealing with an old implementation * of the replication protocol (like redis-cli --slave). Flag the client * so that we don't expect to receive REPLCONF ACK feedbacks. */ c->flags |= REDIS_PRE_PSYNC_SLAVE; } // 執行全同步: ...... } // 主機嘗試是否能進行部分同步 /* This function handles the PSYNC command from the point of view of a * master receiving a request for partial resynchronization. * * On success return REDIS_OK, otherwise REDIS_ERR is returned and we proceed * with the usual full resync. */ int masterTryPartialResynchronization(redisClient *c) { long long psync_offset, psync_len; char *master_runid = c->argv[1]->ptr; char buf[128]; int buflen; /* Is the runid of this master the same advertised by the wannabe slave * via PSYNC? If runid changed this master is a different instance and * there is no way to continue. */ if (strcasecmp(master_runid, server.runid)) { // 當由於異常須要與主機斷開鏈接的時候,從機會暫存主機的狀態信息,以便 // 下一次的部分同步。 // 1)master_runid 是從機提供一個因緩存主機的 runid, // 2)server.runid 是本機(主機)的 runid。 // 匹配失敗,說明是本機(主機)不是從機緩存的主機,這時候不能進行部分同步, // 只能進行全同步 // "?" 表示從機要求全同步 // 何時從機會要求全同步??? /* Run id "?" is used by slaves that want to force a full resync. */ if (master_runid[0] != '?') { redisLog(REDIS_NOTICE,"Partial resynchronization not accepted: " "Runid mismatch (Client asked for '%s', I'm '%s')", master_runid, server.runid); } else { redisLog(REDIS_NOTICE,"Full resync requested by slave."); } goto need_full_resync; } // 從參數中解析整數,整數是從機指定的偏移量 /* We still have the data our slave is asking for? */ if (getLongLongFromObjectOrReply(c,c->argv[2],&psync_offset,NULL) != REDIS_OK) goto need_full_resync; // 部分同步失敗的狀況 if (!server.repl_backlog || /*不存在積壓空間*/ psync_offset < server.repl_backlog_off || /*psync_offset 太太小, 即從機錯過太多更新記錄, 安全起見,實行全同步*/ /*psync_offset 越界*/ psync_offset > (server.repl_backlog_off + server.repl_backlog_histlen)) // 經檢測,不知足部分同步的條件,轉而進行全同步 { redisLog(REDIS_NOTICE, "Unable to partial resync with the slave for lack of backlog (Slave request was: %lld).", psync_offset); if (psync_offset > server.master_repl_offset) { redisLog(REDIS_WARNING, "Warning: slave tried to PSYNC with an offset that is greater than the master replication offset."); } goto need_full_resync; } // 執行部分同步: // 1)標記客戶端爲從機 // 2)通知從機準備接收數據。從機收到 +CONTINUE 會作好準備 // 3)開發發送數據 /* If we reached this point, we are able to perform a partial resync: * 1) Set client state to make it a slave. * 2) Inform the client we can continue with +CONTINUE * 3) Send the backlog data (from the offset to the end) to the slave. */ // 將鏈接的客戶端標記爲從機 c->flags |= REDIS_SLAVE; // 表示進行部分同步 // #define REDIS_REPL_ONLINE 9 /* RDB file transmitted, sending just // updates. */ c->replstate = REDIS_REPL_ONLINE; // 更新 ack 的時間 c->repl_ack_time = server.unixtime; // 添加入從機鏈表 listAddNodeTail(server.slaves,c); // 告訴從機能夠進行部分同步,從機收到後會作相關的準備(註冊回調函數) /* We can't use the connection buffers since they are used to accumulate * new commands at this stage. But we are sure the socket send buffer is * emtpy so this write will never fail actually. */ buflen = snprintf(buf,sizeof(buf),"+CONTINUE\r\n"); if (write(c->fd,buf,buflen) != buflen) { freeClientAsync(c); return REDIS_OK; } // 向從機寫積壓空間中的數據,積壓空間存儲有「更新緩存」 psync_len = addReplyReplicationBacklog(c,psync_offset); redisLog(REDIS_NOTICE, "Partial resynchronization request accepted. Sending %lld bytes of backlog starting from offset %lld.", psync_len, psync_offset); /* Note that we don't need to set the selected DB at server.slaveseldb * to -1 to force the master to emit SELECT, since the slave already * has this state from the previous connection with the master. */ refreshGoodSlavesCount(); return REDIS_OK; /* The caller can return, no full resync needed. */ need_full_resync: ...... // 向從機發送 +FULLRESYNC runid repl_offset }
從機由於某些緣由,譬如網絡延遲(PING 超時,ACK 超時等),可能會斷開與主機的鏈接。這時候,從機會嘗試保存與主機鏈接的信息,譬如全局積壓空間數據偏移量等,以便下一次的部分同步,而且從機會再一次嘗試鏈接主機。注意一點,若是斷開的時間足夠長, 部分同步確定會失敗的。
void freeClient(redisClient *c) { listNode *ln; /* If this is marked as current client unset it */ if (server.current_client == c) server.current_client = NULL; // 若是此機爲從機,已經鏈接主機,可能須要保存主機狀態信息,以便進行 PSYNC /* If it is our master that's beging disconnected we should make sure * to cache the state to try a partial resynchronization later. * * Note that before doing this we make sure that the client is not in * some unexpected state, by checking its flags. */ if (server.master && c->flags & REDIS_MASTER) { redisLog(REDIS_WARNING,"Connection with master lost."); if (!(c->flags & (REDIS_CLOSE_AFTER_REPLY| REDIS_CLOSE_ASAP| REDIS_BLOCKED| REDIS_UNBLOCKED))) { replicationCacheMaster(c); return; } } ...... } // 爲了實現部分同步,從機會保存主機的狀態信息後纔會斷開主機的鏈接,主機狀態信息 // 保存在 server.cached_master // 會在 freeClient() 中調用,保存與主機鏈接的狀態信息,以便進行 PSYNC void replicationCacheMaster(redisClient *c) { listNode *ln; redisAssert(server.master != NULL && server.cached_master == NULL); redisLog(REDIS_NOTICE,"Caching the disconnected master state."); // 從客戶端列表刪除主機的信息 /* Remove from the list of clients, we don't want this client to be * listed by CLIENT LIST or processed in any way by batch operations. */ ln = listSearchKey(server.clients,c); redisAssert(ln != NULL); listDelNode(server.clients,ln); // 保存主機的狀態信息 /* Save the master. Server.master will be set to null later by * replicationHandleMasterDisconnection(). */ server.cached_master = server.master; // 註銷事件,關閉鏈接 /* Remove the event handlers and close the socket. We'll later reuse * the socket of the new connection with the master during PSYNC. */ aeDeleteFileEvent(server.el,c->fd,AE_READABLE); aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE); close(c->fd); /* Set fd to -1 so that we can safely call freeClient(c) later. */ c->fd = -1; // 修改鏈接的狀態,設置 server.master = NULL /* Caching the master happens instead of the actual freeClient() call, * so make sure to adjust the replication state. This function will * also set server.master to NULL. */ replicationHandleMasterDisconnection(); }
簡單來講,主從同步就是 RDB 文件的上傳下載;主機有小部分的數據修改,就把修改記錄傳播給每一個從機。這篇文章詳述了 redis 主從複製的內部協議和機制。接下來的幾篇關於 redis 的文章,主要是其內部數據結構。
搗亂 2014-4-22