順風車運營研發團隊 譚淼
1、複製redis
Redis的主從同步複製包括兩種方式:一種是徹底複製,另外一種是部分複製。緩存
徹底複製:主Redis生產RDB,傳輸到從服務器進行同步。服務器
部分複製:主Redis從複製積壓緩衝區中獲取數據,發送給從服務器進行同步。less
假設兩臺Redis服務器A和B啓動後,對B執行slaveof命令,使得B變爲A的從服務器,總體流程以下:async
(1)B接收到slaveof命令時候,會執行slaveofCommand()函數,slaveofCommand()函數會調用replicationSetMaster()函數中,將複製狀態爲REPL_STATE_CONNECTide
/* Set replication to the specified master address and port. */ void replicationSetMaster(char *ip, int port) { ...... server.repl_state = REPL_STATE_CONNECT; server.repl_down_since = 0; }
(2)週期調度的時間事件中,會按期執行replicationCron()函數函數
/* This is our timer interrupt, called server.hz times per second. * Here is where we do a number of things that need to be done asynchronously. * For instance: * * - Active expired keys collection (it is also performed in a lazy way on * lookup). * - Software watchdog. * - Update some statistic. * - Incremental rehashing of the DBs hash tables. * - Triggering BGSAVE / AOF rewrite, and handling of terminated children. * - Clients timeout of different kinds. * - Replication reconnection. * - Many more... * * Everything directly called here will be called server.hz times per second, * so in order to throttle execution of things we want to do less frequently * a macro is used: run_with_period(milliseconds) { .... } */ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { ...... /* Replication cron function -- used to reconnect to master, * detect transfer failures, start background RDB transfers and so forth. */ run_with_period(1000) replicationCron(); ...... }
(3)在replicationCron()函數中,若是檢測到REPL_STATE_CONNECT狀態,調用connectWithMaster()。oop
/* --------------------------- REPLICATION CRON ---------------------------- */ /* Replication cron function, called 1 time per second. */ void replicationCron(void) { ...... /* Check if we should connect to a MASTER */ if (server.repl_state == REPL_STATE_CONNECT) { serverLog(LL_NOTICE,"Connecting to MASTER %s:%d", server.masterhost, server.masterport); if (connectWithMaster() == C_OK) { serverLog(LL_NOTICE,"MASTER <-> SLAVE sync started"); } } ...... }
(4)在connectWithMaster()中,會設置文件事件,事件處理函數爲syncWithMaster()函數spa
int connectWithMaster(void) { int fd; fd = anetTcpNonBlockBestEffortBindConnect(NULL, server.masterhost,server.masterport,NET_FIRST_BIND_ADDR); if (fd == -1) { serverLog(LL_WARNING,"Unable to connect to MASTER: %s", strerror(errno)); return C_ERR; } if (aeCreateFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE,syncWithMaster,NULL) == AE_ERR) { close(fd); serverLog(LL_WARNING,"Can't create readable event for SYNC"); return C_ERR; } server.repl_transfer_lastio = server.unixtime; server.repl_transfer_s = fd; server.repl_state = REPL_STATE_CONNECTING; return C_OK; }
(5)在syncWithMaster中,從Redis首先會與主Redis進行通訊,交換關鍵信息unix
(6)master根接收到PSYN消息後,根據複製ID若是複製ID與本身的複製ID相同且複製偏移量仍然存在於複製緩存區中(server.repl_backlog),那麼執行部分重同步,回覆CONTINUE消息,並從複製緩存區中複製相關的數據到slave。不然執行全量重同步,回覆FULLRESYNC消息,生成RDB傳輸到slave。
2、命令傳播
當在主Redis寫一條命令時,會調用server.c中的call()函數,call()函數會調用propagate()來向從Redis更新數據
/* Call() is the core of Redis execution of a command. * * The following flags can be passed: * CMD_CALL_NONE No flags. * CMD_CALL_SLOWLOG Check command speed and log in the slow log if needed. * CMD_CALL_STATS Populate command stats. * CMD_CALL_PROPAGATE_AOF Append command to AOF if it modified the dataset * or if the client flags are forcing propagation. * CMD_CALL_PROPAGATE_REPL Send command to salves if it modified the dataset * or if the client flags are forcing propagation. * CMD_CALL_PROPAGATE Alias for PROPAGATE_AOF|PROPAGATE_REPL. * CMD_CALL_FULL Alias for SLOWLOG|STATS|PROPAGATE. * * The exact propagation behavior depends on the client flags. * Specifically: * * 1. If the client flags CLIENT_FORCE_AOF or CLIENT_FORCE_REPL are set * and assuming the corresponding CMD_CALL_PROPAGATE_AOF/REPL is set * in the call flags, then the command is propagated even if the * dataset was not affected by the command. * 2. If the client flags CLIENT_PREVENT_REPL_PROP or CLIENT_PREVENT_AOF_PROP * are set, the propagation into AOF or to slaves is not performed even * if the command modified the dataset. * * Note that regardless of the client flags, if CMD_CALL_PROPAGATE_AOF * or CMD_CALL_PROPAGATE_REPL are not set, then respectively AOF or * slaves propagation will never occur. * * Client flags are modified by the implementation of a given command * using the following API: * * forceCommandPropagation(client *c, int flags); * preventCommandPropagation(client *c); * preventCommandAOF(client *c); * preventCommandReplication(client *c); * */ void call(client *c, int flags) { ...... /* Propagate the command into the AOF and replication link */ if (flags & CMD_CALL_PROPAGATE && (c->flags & CLIENT_PREVENT_PROP) != CLIENT_PREVENT_PROP){ ...... if (propagate_flags != PROPAGATE_NONE && !(c->cmd->flags & CMD_MODULE)) propagate(c->cmd,c->db->id,c->argv,c->argc,propagate_flags); } ...... }
propagate()會調用replicationFeedSlaves(),向複製積壓緩衝區中寫入數據
/* Propagate the specified command (in the context of the specified database id) * to AOF and Slaves. * * flags are an xor between: * + PROPAGATE_NONE (no propagation of command at all) * + PROPAGATE_AOF (propagate into the AOF file if is enabled) * + PROPAGATE_REPL (propagate into the replication link) * * This should not be used inside commands implementation. Use instead * alsoPropagate(), preventCommandPropagation(), forceCommandPropagation(). */ void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc, int flags) { if (server.aof_state != AOF_OFF && flags & PROPAGATE_AOF) feedAppendOnlyFile(cmd,dbid,argv,argc); if (flags & PROPAGATE_REPL) replicationFeedSlaves(server.slaves,dbid,argv,argc); }
replicationFeedSlaves()負責向複製積壓緩衝區中寫入數據
/* Propagate write commands to slaves, and populate the replication backlog * as well. This function is used if the instance is a master: we use * the commands received by our clients in order to create the replication * stream. Instead if the instance is a slave and has sub-slaves attached, * we use replicationFeedSlavesFromMaster() */ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) { ...... /* Write the command to the replication backlog if any. */ if (server.repl_backlog) { char aux[LONG_STR_SIZE+3]; /* Add the multi bulk reply length. */ aux[0] = '*'; len = ll2string(aux+1,sizeof(aux)-1,argc); aux[len+1] = '\r'; aux[len+2] = '\n'; feedReplicationBacklog(aux,len+3); for (j = 0; j < argc; j++) { long objlen = stringObjectLen(argv[j]); /* We need to feed the buffer with the object as a bulk reply * not just as a plain string, so create the $..CRLF payload len * and add the final CRLF */ aux[0] = '$'; len = ll2string(aux+1,sizeof(aux)-1,objlen); aux[len+1] = '\r'; aux[len+2] = '\n'; feedReplicationBacklog(aux,len+3); feedReplicationBacklogWithObject(argv[j]); feedReplicationBacklog(aux+len+1,2); } } /* Write the command to every slave. */ listRewind(slaves,&li); while((ln = listNext(&li))) { client *slave = ln->value; /* Don't feed slaves that are still waiting for BGSAVE to start */ if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue; /* Feed slaves that are waiting for the initial SYNC (so these commands * are queued in the output buffer until the initial SYNC completes), * or are already in sync with the master. */ /* Add the multi bulk length. */ addReplyMultiBulkLen(slave,argc); /* Finally any additional argument that was not stored inside the * static buffer if any (from j to argc). */ for (j = 0; j < argc; j++) addReplyBulk(slave,argv[j]); } }