REdis主從複製之repl_backlog

目錄

目錄 1redis

1. 前言 1less

2. 配置項 1socket

3. redisServer 2函數

4. feedReplicationBacklog-repl_backlog 3this

5. addReplyReplicationBacklog-repl_backlog 4spa

 

1. 前言

注意,repl_backlog只針對部分複製(Partial Replication),而非全量複製。日誌

本文內容基於redis-5.0.5(截至2019/6/6的最新版本),本文深刻介紹REdis主從複製的部分複製核心要素repl_backlog,與其相關的配置直接影響主從間的穩定性,對提高集羣的穩定性十分重要。server

注意REdis的主節點把全部從節點也看成一個Client看待,正常的數據同步並不涉及repl_backlog。當從節點斷開重連,這個時候repl_backlog的做用就體現出來了。截至到5.0.5版本,從節點重啓用不上repl_backlog,緣由是從節點沒有保存repl_backlog的信息,沒法實現部分同步,但可少許改動REdis源代碼,實現從節點重啓後的部分複製。隊列

正常狀況下,主節點會往從節點鏈接緩衝區寫一份數據,同時往repl_backlog也寫一份數據,全部從節點共享同一份repl_backlog,所以能夠考慮repl_backlog配置大一點,以容忍從節點更長時間失聯。ip

從節點向主節點發送命令PSYNC,觸發部分複製。有關REdis主從複製的細節,請參見《REdis複製研究》。

2. 配置項

REdis的複製分全量複製和部分複製,全量複製是個很重的過程,而部分複製則是輕量的,部分複製實際是一個增量複製。

REdis的主節點建立和維護一個環形緩衝複製隊列(即repl_backlog),從節點部分複製(增量複製)的數據均來自於repl_backlog

主節點只有一個repl_backlog,全部從節點共享,直接相關的配置項有兩個:

配置項名

配置項說明

repl-backlog-size

環形緩衝複製隊列大小,可不帶單位,但同時支持單位:bkkbmmbggb,單位不區分大小寫,其中kmg間的計算倍數是1000,而kbmbgb的計算倍數是1024

repl-backlog-ttl

環形緩衝複製隊列存活時長(全部slaves不可用時,保留repl_backlog多長時間,單位:秒)

3. redisServer

結構體redisServerREdis的第一核心結構,repl_backlog是它的組成成員。

struct redisServer {

  /* My current replication offset */

  long long master_repl_offset;

  /* Accept offsets up to this for replid2. */

  long long second_replid_offset;

  /* Replication backlog for partial syncs */

  char *repl_backlog; // 環形緩衝複製隊列

  /* Backlog circular buffer size */

  long long repl_backlog_size; // 環形緩衝複製隊列容量

  /* Backlog actual data length */

  long long repl_backlog_histlen; // 環形緩衝複製隊列已用大小(影響是否能部分複製)

  /* Backlog circular buffer current offset,

     that is the next byte will'll write to.*/

  // 實際上談不上空閒,由於老是環繞覆蓋寫,

  // 理解爲最新數據的截止位置更爲合適,更新的數據老是從這裏開始寫入到repl_backlog中。

  long long repl_backlog_idx; // 環形緩衝複製隊列空閒起始位置(寫從這裏開始)

  /* Replication "master offset" of first

     byte in the replication backlog buffer.*/

  long long repl_backlog_off; // 數據在環形緩衝複製隊列的起始位置(讀從這裏開始)

  /* Time without slaves after the backlog

     gets released. */

  time_t repl_backlog_time_limit; // 環形緩衝複製隊列生存時長

  /* We have no slaves since that time.

     Only valid if server.slaves len is 0. */

  time_t repl_no_slaves_since; // 無可用從節點的發生時間

  /* Min number of slaves to write. */

  int repl_min_slaves_to_write; // 最小需寫的從節點數

  /* Max lag of <count> slaves to write. */

  int repl_min_slaves_max_lag;

  /* Number of slaves with lag <= max_lag. */

  int repl_good_slaves_count;

  /* Send RDB to slaves sockets directly. */

  int repl_diskless_sync; // 不落磁盤(無盤)往從節點發送RDB(全量複製)

  /* Delay to start a diskless repl BGSAVE. */

  // 無盤複製時,延遲指定的時長,以等待更多的從節點

  int repl_diskless_sync_delay;

};

4. feedReplicationBacklog-repl_backlog

/* Add data to the replication backlog.

 * This function also increments the global replication offset stored at

 * server.master_repl_offset, because there is no case where we want to feed

 * the backlog without incrementing the offset. */

// 主要被replicationFeedSlaves調用

// 寫len長的數據ptr到repl_backlog中

// repl_backlog是一個環形buffer,不存在溢出的問題,策略是最新數據覆蓋最老數據。

// 若是參數len大於repl_backlog_size,

// 則repl_backlog沒有實際意義,

// 由於沒法存儲一份完整數據。

void feedReplicationBacklog(void *ptr, size_t len) {

  unsigned char *p = ptr;

  server.master_repl_offset += len;

 

  /* This is a circular buffer, so write as much data we can at every

   * iteration and rewind the "idx" index if we reach the limit. */

  // 由於repl_backlog是環形buffer,

  // 剩餘的空間可能容納不了len長的數據,

  // 當不夠時,就須要環繞從頭開始寫,

  // 所以這裏需while循環。

  while(len) {

    // repl_backlog_size爲repl_backlog的容量大小,

    // 由配置項決定repl_backlog_size值決定,

    // repl_backlog_idx是repl_backlog空閒區域的起始位置,

    // 這兩個值相減獲得repl_backlog可用大小。

    size_t thislen = server.repl_backlog_size - server.repl_backlog_idx;

    // 若是thislen大於len,則表示足夠容納

    if (thislen > len) thislen = len;

    memcpy(server.repl_backlog+server.repl_backlog_idx,p,thislen);

    // 空閒位置日後挪動

    server.repl_backlog_idx += thislen;

    // 若是空閒位置達到容量大小,則環繞回去從0開始

    if (server.repl_backlog_idx == server.repl_backlog_size)

      server.repl_backlog_idx = 0;

    len -= thislen;

    p += thislen;

    // repl_backlog_histlen記錄了repl_backlog中的數據大小    

    server.repl_backlog_histlen += thislen;

  }

  // 修正存儲在repl_backlog中的數據大小,

  // 它不可能超過repl_backlog_size值。

  if (server.repl_backlog_histlen > server.repl_backlog_size)

    server.repl_backlog_histlen = server.repl_backlog_size;

  /* Set the offset of the first byte we have in the backlog. */  

  server.repl_backlog_off = server.master_repl_offset -

                            server.repl_backlog_histlen + 1;

}

5. addReplyReplicationBacklog-repl_backlog

當主節點判斷可部分複製時,會記錄以下日誌:

Partial resynchronization request from %s accepted. Sending %lld bytes of backlog starting from offset %lld.

 

給從節點的響應頭爲「+CONTINUE replid\r\n」或「+CONTINUE\r\n」。函數addReplyReplicationBacklog的實現:

/* Feed the slave 'c' with the replication backlog starting from the

 * specified 'offset' up to the end of the backlog. */

// 被masterTryPartialResynchronization調用

// 而masterTryPartialResynchronization被syncCommand調用(對應命令PSYNC)。

// 從repl_backlog取數據發給slave,

// 數據的開始位置由offset指定。

long long addReplyReplicationBacklog(client *c, long long offset) {

  long long j, skip, len;

 

  serverLog(LL_DEBUG, "[PSYNC] Replica request offset: %lld", offset);

  // repl_backlog_histlen爲0,

  // 表示repl_backlog中無數據。

  if (server.repl_backlog_histlen == 0) {

    serverLog(LL_DEBUG, "[PSYNC] Backlog history len is zero");

    return 0;

  }

 

  serverLog(LL_DEBUG, "[PSYNC] Backlog size: %lld",

           server.repl_backlog_size);

  serverLog(LL_DEBUG, "[PSYNC] First byte: %lld",

           server.repl_backlog_off);

  serverLog(LL_DEBUG, "[PSYNC] History len: %lld",

           server.repl_backlog_histlen);

  serverLog(LL_DEBUG, "[PSYNC] Current index: %lld",

           server.repl_backlog_idx);

 

  /* Compute the amount of bytes we need to discard. */

  skip = offset - server.repl_backlog_off;

  serverLog(LL_DEBUG, "[PSYNC] Skipping: %lld", skip);

 

  /* Point j to the oldest byte, that is actually our

   * server.repl_backlog_off byte. */

  j = (server.repl_backlog_idx +

      (server.repl_backlog_size-server.repl_backlog_histlen)) %

    server.repl_backlog_size;

  serverLog(LL_DEBUG, "[PSYNC] Index of first byte: %lld", j);

 

  /* Discard the amount of data to seek to the specified 'offset'. */

  j = (j + skip) % server.repl_backlog_size;

 

  /* Feed slave with data. Since it is a circular buffer we have to

   * split the reply in two parts if we are cross-boundary. */

  len = server.repl_backlog_histlen - skip;

  serverLog(LL_DEBUG, "[PSYNC] Reply total length: %lld", len);

  while(len) {

    long long thislen =

        ((server.repl_backlog_size - j) < len) ?

        (server.repl_backlog_size - j) : len;

 

    serverLog(LL_DEBUG, "[PSYNC] addReply() length: %lld", thislen);

    addReplySds(c,sdsnewlen(server.repl_backlog + j, thislen));

    len -= thislen;

    j = 0;

  }

  return server.repl_backlog_histlen - skip;

}

相關文章
相關標籤/搜索