目錄 1redis
1. 前言 1less
2. 配置項 1socket
3. redisServer 2函數
4. feedReplicationBacklog-寫repl_backlog 3this
5. addReplyReplicationBacklog-讀repl_backlog 4spa
注意,repl_backlog只針對部分複製(Partial Replication),而非全量複製。日誌
本文內容基於redis-5.0.5(截至2019/6/6的最新版本),本文深刻介紹REdis主從複製的部分複製核心要素repl_backlog,與其相關的配置直接影響主從間的穩定性,對提高集羣的穩定性十分重要。server
注意REdis的主節點把全部從節點也看成一個Client看待,正常的數據同步並不涉及repl_backlog。當從節點斷開重連,這個時候repl_backlog的做用就體現出來了。截至到5.0.5版本,從節點重啓用不上repl_backlog,緣由是從節點沒有保存repl_backlog的信息,沒法實現部分同步,但可少許改動REdis源代碼,實現從節點重啓後的部分複製。隊列
正常狀況下,主節點會往從節點鏈接緩衝區寫一份數據,同時往repl_backlog也寫一份數據,全部從節點共享同一份repl_backlog,所以能夠考慮repl_backlog配置大一點,以容忍從節點更長時間失聯。ip
從節點向主節點發送命令PSYNC,觸發部分複製。有關REdis主從複製的細節,請參見《REdis複製研究》。
REdis的複製分全量複製和部分複製,全量複製是個很重的過程,而部分複製則是輕量的,部分複製實際是一個增量複製。
REdis的主節點建立和維護一個環形緩衝複製隊列(即repl_backlog),從節點部分複製(增量複製)的數據均來自於repl_backlog。
主節點只有一個repl_backlog,全部從節點共享,直接相關的配置項有兩個:
配置項名 |
配置項說明 |
repl-backlog-size |
環形緩衝複製隊列大小,可不帶單位,但同時支持單位:b、k、kb、m、mb、g、gb,單位不區分大小寫,其中k、m、g間的計算倍數是1000,而kb、mb和gb的計算倍數是1024。 |
repl-backlog-ttl |
環形緩衝複製隊列存活時長(全部slaves不可用時,保留repl_backlog多長時間,單位:秒) |
結構體redisServer是REdis的第一核心結構,repl_backlog是它的組成成員。
struct redisServer { /* My current replication offset */ long long master_repl_offset; /* Accept offsets up to this for replid2. */ long long second_replid_offset; /* Replication backlog for partial syncs */ char *repl_backlog; // 環形緩衝複製隊列 /* Backlog circular buffer size */ long long repl_backlog_size; // 環形緩衝複製隊列容量 /* Backlog actual data length */ long long repl_backlog_histlen; // 環形緩衝複製隊列已用大小(影響是否能部分複製) /* Backlog circular buffer current offset, that is the next byte will'll write to.*/ // 實際上談不上空閒,由於老是環繞覆蓋寫, // 理解爲最新數據的截止位置更爲合適,更新的數據老是從這裏開始寫入到repl_backlog中。 long long repl_backlog_idx; // 環形緩衝複製隊列空閒起始位置(寫從這裏開始) /* Replication "master offset" of first byte in the replication backlog buffer.*/ long long repl_backlog_off; // 數據在環形緩衝複製隊列的起始位置(讀從這裏開始) /* Time without slaves after the backlog gets released. */ time_t repl_backlog_time_limit; // 環形緩衝複製隊列生存時長 /* We have no slaves since that time. Only valid if server.slaves len is 0. */ time_t repl_no_slaves_since; // 無可用從節點的發生時間 /* Min number of slaves to write. */ int repl_min_slaves_to_write; // 最小需寫的從節點數 /* Max lag of <count> slaves to write. */ int repl_min_slaves_max_lag; /* Number of slaves with lag <= max_lag. */ int repl_good_slaves_count; /* Send RDB to slaves sockets directly. */ int repl_diskless_sync; // 不落磁盤(無盤)往從節點發送RDB(全量複製) /* Delay to start a diskless repl BGSAVE. */ // 無盤複製時,延遲指定的時長,以等待更多的從節點 int repl_diskless_sync_delay; }; |
/* Add data to the replication backlog. * This function also increments the global replication offset stored at * server.master_repl_offset, because there is no case where we want to feed * the backlog without incrementing the offset. */ // 主要被replicationFeedSlaves調用 // 寫len長的數據ptr到repl_backlog中 // repl_backlog是一個環形buffer,不存在溢出的問題,策略是最新數據覆蓋最老數據。 // 若是參數len大於repl_backlog_size, // 則repl_backlog沒有實際意義, // 由於沒法存儲一份完整數據。 void feedReplicationBacklog(void *ptr, size_t len) { unsigned char *p = ptr; server.master_repl_offset += len;
/* This is a circular buffer, so write as much data we can at every * iteration and rewind the "idx" index if we reach the limit. */ // 由於repl_backlog是環形buffer, // 剩餘的空間可能容納不了len長的數據, // 當不夠時,就須要環繞從頭開始寫, // 所以這裏需while循環。 while(len) { // repl_backlog_size爲repl_backlog的容量大小, // 由配置項決定repl_backlog_size值決定, // repl_backlog_idx是repl_backlog空閒區域的起始位置, // 這兩個值相減獲得repl_backlog可用大小。 size_t thislen = server.repl_backlog_size - server.repl_backlog_idx; // 若是thislen大於len,則表示足夠容納 if (thislen > len) thislen = len; memcpy(server.repl_backlog+server.repl_backlog_idx,p,thislen); // 空閒位置日後挪動 server.repl_backlog_idx += thislen; // 若是空閒位置達到容量大小,則環繞回去從0開始 if (server.repl_backlog_idx == server.repl_backlog_size) server.repl_backlog_idx = 0; len -= thislen; p += thislen; // repl_backlog_histlen記錄了repl_backlog中的數據大小 server.repl_backlog_histlen += thislen; } // 修正存儲在repl_backlog中的數據大小, // 它不可能超過repl_backlog_size值。 if (server.repl_backlog_histlen > server.repl_backlog_size) server.repl_backlog_histlen = server.repl_backlog_size; /* Set the offset of the first byte we have in the backlog. */ server.repl_backlog_off = server.master_repl_offset - server.repl_backlog_histlen + 1; } |
當主節點判斷可部分複製時,會記錄以下日誌:
Partial resynchronization request from %s accepted. Sending %lld bytes of backlog starting from offset %lld. |
給從節點的響應頭爲「+CONTINUE replid\r\n」或「+CONTINUE\r\n」。函數addReplyReplicationBacklog的實現:
/* Feed the slave 'c' with the replication backlog starting from the * specified 'offset' up to the end of the backlog. */ // 被masterTryPartialResynchronization調用 // 而masterTryPartialResynchronization被syncCommand調用(對應命令PSYNC)。 // 從repl_backlog取數據發給slave, // 數據的開始位置由offset指定。 long long addReplyReplicationBacklog(client *c, long long offset) { long long j, skip, len;
serverLog(LL_DEBUG, "[PSYNC] Replica request offset: %lld", offset); // repl_backlog_histlen爲0, // 表示repl_backlog中無數據。 if (server.repl_backlog_histlen == 0) { serverLog(LL_DEBUG, "[PSYNC] Backlog history len is zero"); return 0; }
serverLog(LL_DEBUG, "[PSYNC] Backlog size: %lld", server.repl_backlog_size); serverLog(LL_DEBUG, "[PSYNC] First byte: %lld", server.repl_backlog_off); serverLog(LL_DEBUG, "[PSYNC] History len: %lld", server.repl_backlog_histlen); serverLog(LL_DEBUG, "[PSYNC] Current index: %lld", server.repl_backlog_idx);
/* Compute the amount of bytes we need to discard. */ skip = offset - server.repl_backlog_off; serverLog(LL_DEBUG, "[PSYNC] Skipping: %lld", skip);
/* Point j to the oldest byte, that is actually our * server.repl_backlog_off byte. */ j = (server.repl_backlog_idx + (server.repl_backlog_size-server.repl_backlog_histlen)) % server.repl_backlog_size; serverLog(LL_DEBUG, "[PSYNC] Index of first byte: %lld", j);
/* Discard the amount of data to seek to the specified 'offset'. */ j = (j + skip) % server.repl_backlog_size;
/* Feed slave with data. Since it is a circular buffer we have to * split the reply in two parts if we are cross-boundary. */ len = server.repl_backlog_histlen - skip; serverLog(LL_DEBUG, "[PSYNC] Reply total length: %lld", len); while(len) { long long thislen = ((server.repl_backlog_size - j) < len) ? (server.repl_backlog_size - j) : len;
serverLog(LL_DEBUG, "[PSYNC] addReply() length: %lld", thislen); addReplySds(c,sdsnewlen(server.repl_backlog + j, thislen)); len -= thislen; j = 0; } return server.repl_backlog_histlen - skip; } |