一萬字詳解 Redis Cluster Gossip 協議


你們好,我是歷小冰,今天來說一下 Reids Cluster 的 Gossip 協議和集羣操做,文章的思惟導圖以下所示。
node

集羣模式和 Gossip 簡介

對於數據存儲領域,當數據量或者請求流量大到必定程度後,就必然會引入分佈式。好比 Redis,雖然其單機性能十分優秀,可是由於下列緣由時,也不得不引入集羣。web

  • 單機沒法保證高可用,須要引入多實例來提供高可用性
  • 單機可以提供高達 8W 左右的QPS,再高的QPS則須要引入多實例
  • 單機可以支持的數據量有限,處理更多的數據須要引入多實例;
  • 單機所處理的網絡流量已經超過服務器的網卡的上限值,須要引入多實例來分流。

有集羣,集羣每每須要維護必定的元數據,好比實例的ip地址,緩存分片的 slots 信息等,因此須要一套分佈式機制來維護元數據的一致性。這類機制通常有兩個模式:分散式和集中式redis

分散式機制將元數據存儲在部分或者全部節點上,不一樣節點之間進行不斷的通訊來維護元數據的變動和一致性。Redis Cluster,Consul 等都是該模式。算法

而集中式是將集羣元數據集中存儲在外部節點或者中間件上,好比 zookeeper。舊版本的 kafka 和 storm 等都是使用該模式。
兩種模式各有優劣,具體以下表所示:
模式 優勢 缺點
集中式 數據更新及時,時效好,元數據的更新和讀取,時效性很是好,一旦元數據出現了變動,當即就更新到集中式的外部節點中,其餘節點讀取的時候當即就能夠感知到; 較大數據更新壓力,更新壓力所有集中在外部節點,做爲單點影響整個系統
分散式 數據更新壓力分散,元數據的更新比較分散,不是集中某一個節點,更新請求比較分散,並且有不一樣節點處理,有必定的延時,下降了併發壓力 數據更新延遲,可能致使集羣的感知有必定的滯後

分散式的元數據模式有多種可選的算法進行元數據的同步,好比說 Paxos、Raft 和 Gossip。Paxos 和 Raft 等都須要所有節點或者大多數節點(超過一半)正常運行,整個集羣才能穩定運行,而 Gossip 則不須要半數以上的節點運行。數組

Gossip 協議,顧名思義,就像流言蜚語同樣,利用一種隨機、帶有傳染性的方式,將信息傳播到整個網絡中,並在必定時間內,使得系統內的全部節點數據一致。對你來講,掌握這個協議不只能很好地理解這種最經常使用的,實現最終一致性的算法,也能在後續工做中駕輕就熟地實現數據的最終一致性。緩存

Gossip 協議又稱 epidemic 協議(epidemic protocol),是基於流行病傳播方式的節點或者進程之間信息交換的協議,在P2P網絡和分佈式系統中應用普遍,它的方法論也特別簡單:服務器

在一個處於有界網絡的集羣裏,若是每一個節點都隨機與其餘節點交換特定信息,通過足夠長的時間後,集羣各個節點對該份信息的認知終將收斂到一致。微信

這裏的「特定信息」通常就是指集羣狀態、各節點的狀態以及其餘元數據等。Gossip協議是徹底符合 BASE 原則,能夠用在任何要求最終一致性的領域,好比分佈式存儲和註冊中心。另外,它能夠很方便地實現彈性集羣,容許節點隨時上下線,提供快捷的失敗檢測和動態負載均衡等。markdown

此外,Gossip 協議的最大的好處是,即便集羣節點的數量增長,每一個節點的負載也不會增長不少,幾乎是恆定的。這就容許 Redis Cluster 或者 Consul 集羣管理的節點規模能橫向擴展到數千個。網絡

Redis Cluster 的 Gossip 通訊機制

Redis Cluster 是在 3.0 版本引入集羣功能。爲了讓讓集羣中的每一個實例都知道其餘全部實例的狀態信息,Redis 集羣規定各個實例之間按照 Gossip 協議來通訊傳遞信息。

上圖展現了主從架構的 Redis Cluster 示意圖,其中實線表示節點間的主從複製關係,而虛線表示各個節點之間的 Gossip 通訊。

Redis Cluster 中的每一個節點都維護一份本身視角下的當前整個集羣的狀態,主要包括:

  1. 當前集羣狀態
  2. 集羣中各節點所負責的 slots信息,及其migrate狀態
  3. 集羣中各節點的master-slave狀態
  4. 集羣中各節點的存活狀態及懷疑Fail狀態

也就是說上面的信息,就是集羣中Node相互八卦傳播流言蜚語的內容主題,並且比較全面,既有本身的更有別人的,這麼一來你們都相互傳,最終信息就全面並且一致了。

Redis Cluster 的節點之間會相互發送多種消息,較爲重要的以下所示:

  • MEET:經過「cluster meet ip port」命令,已有集羣的節點會向新的節點發送邀請,加入現有集羣,而後新節點就會開始與其餘節點進行通訊;
  • PING:節點按照配置的時間間隔向集羣中其餘節點發送 ping 消息,消息中帶有本身的狀態,還有本身維護的集羣元數據,和部分其餘節點的元數據;
  • PONG:  節點用於迴應 PING 和 MEET 的消息,結構和 PING 消息相似,也包含本身的狀態和其餘信息,也能夠用於信息廣播和更新;
  • FAIL: 節點 PING 不通某節點後,會向集羣全部節點廣播該節點掛掉的消息。其餘節點收到消息後標記已下線。

Redis 的源碼中 cluster.h 文件定義了所有的消息類型,代碼爲 redis 4.0版本。

// 注意,PING 、 PONG 和 MEET 其實是同一種消息。
// PONG 是對 PING 的回覆,它的實際格式也爲 PING 消息,
// 而 MEET 則是一種特殊的 PING 消息,用於強制消息的接收者將消息的發送者添加到集羣中(若是節點還沒有在節點列表中的話)
#define CLUSTERMSG_TYPE_PING 0          /* Ping 消息 */
#define CLUSTERMSG_TYPE_PONG 1          /* Pong 用於回覆Ping */
#define CLUSTERMSG_TYPE_MEET 2          /* Meet 請求將某個節點添加到集羣中 */
#define CLUSTERMSG_TYPE_FAIL 3          /* Fail 將某個節點標記爲 FAIL */
#define CLUSTERMSG_TYPE_PUBLISH 4       /* 經過發佈與訂閱功能廣播消息 */
#define CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST 5 /* 請求進行故障轉移操做,要求消息的接收者經過投票來支持消息的發送者 */
#define CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK 6     /* 消息的接收者贊成向消息的發送者投票 */
#define CLUSTERMSG_TYPE_UPDATE 7        /* slots 已經發生變化,消息發送者要求消息接收者進行相應的更新 */
#define CLUSTERMSG_TYPE_MFSTART 8       /* 爲了進行手動故障轉移,暫停各個客戶端 */
#define CLUSTERMSG_TYPE_COUNT 9         /* 消息總數 */

經過上述這些消息,集羣中的每個實例都能得到其它全部實例的狀態信息。這樣一來,即便有新節點加入、節點故障、Slot 變動等事件發生,實例間也能夠經過 PING、PONG 消息的傳遞,完成集羣狀態在每一個實例上的同步。下面,咱們依次來看看幾種常見的場景。

定時 PING/PONG 消息

Redis Cluster 中的節點都會定時地向其餘節點發送 PING 消息,來交換各個節點狀態信息,檢查各個節點狀態,包括在線狀態、疑似下線狀態 PFAIL 和已下線狀態 FAIL。

Redis 集羣的定時 PING/PONG 的工做原理能夠歸納成兩點:

  • 一是,每一個實例之間會按照必定的頻率,從集羣中隨機挑選一些實例,把 PING 消息發送給挑選出來的實例,用來檢測這些實例是否在線,並交換彼此的狀態信息。PING 消息中封裝了發送消息的實例自身的狀態信息、部分其它實例的狀態信息,以及 Slot 映射表。
  • 二是,一個實例在接收到 PING 消息後,會給發送 PING 消息的實例,發送一個 PONG 消息。PONG 消息包含的內容和 PING 消息同樣。

下圖顯示了兩個實例間進行 PING、PONG 消息傳遞的狀況,其中實例一爲發送節點,實例二是接收節點


新節點上線

Redis Cluster 加入新節點時,客戶端須要執行 CLUSTER MEET 命令,以下圖所示。


節點一在執行 CLUSTER MEET 命令時會首先爲新節點建立一個 clusterNode 數據,並將其添加到本身維護的 clusterState 的 nodes 字典中。有關 clusterState 和 clusterNode 關係,咱們在最後一節會有詳盡的示意圖和源碼來說解。

而後節點一會根據據 CLUSTER MEET 命令中的 IP 地址和端口號,向新節點發送一條 MEET 消息。新節點接收到節點一發送的MEET消息後,新節點也會爲節點一建立一個 clusterNode 結構,並將該結構添加到本身維護的 clusterState 的 nodes 字典中。

接着,新節點向節點一返回一條PONG消息。節點一接收到節點B返回的PONG消息後,得知新節點已經成功的接收了本身發送的MEET消息。

最後,節點一還會向新節點發送一條 PING 消息。新節點接收到該條 PING 消息後,能夠知道節點A已經成功的接收到了本身返回的P ONG消息,從而完成了新節點接入的握手操做。

MEET 操做成功以後,節點一會經過稍早時講的定時 PING 機制將新節點的信息發送給集羣中的其餘節點,讓其餘節點也與新節點進行握手,最終,通過一段時間後,新節點會被集羣中的全部節點認識。

節點疑似下線和真正下線

Redis Cluster 中的節點會按期檢查已經發送 PING 消息的接收方節點是否在規定時間 ( cluster-node-timeout ) 內返回了 PONG 消息,若是沒有則會將其標記爲疑似下線狀態,也就是 PFAIL 狀態,以下圖所示。


而後,節點一會經過 PING 消息,將節點二處於疑似下線狀態的信息傳遞給其餘節點,例如節點三。節點三接收到節點一的 PING 消息得知節點二進入 PFAIL 狀態後,會在本身維護的 clusterState 的 nodes 字典中找到節點二所對應的 clusterNode 結構,並將主節點一的下線報告添加到 clusterNode 結構的 fail_reports 鏈表中。


隨着時間的推移,若是節點十 (舉個例子) 也由於 PONG 超時而認爲節點二疑似下線了,而且發現本身維護的節點二的 clusterNode 的 fail_reports 中有半數以上的主節點數量的未過期的將節點二標記爲 PFAIL 狀態報告日誌,那麼節點十將會把節點二將被標記爲已下線 FAIL 狀態,而且節點十會馬上向集羣其餘節點廣播主節點二已經下線的 FAIL 消息,全部收到 FAIL 消息的節點都會當即將節點二狀態標記爲已下線。以下圖所示。


須要注意的是,報告疑似下線記錄是由時效性的,若是超過 cluster-node-timeout *2 的時間,這個報告就會被忽略掉,讓節點二又恢復成正常狀態。

Redis Cluster 通訊源碼實現

綜上,咱們瞭解了 Redis Cluster 在定時 PING/PONG、新節點上線、節點疑似下線和真正下線等環節的原理和操做流程,下面咱們來真正看一下 Redis 在這些環節的源碼實現和具體操做。

涉及的數據結構體

首先,咱們先來說解一下其中涉及的數據結構,也就是上文提到的 ClusterNode 等結構。

每一個節點都會維護一個 clusterState 結構,表示當前集羣的總體狀態,它的定義以下所示。

typedef struct clusterState {
   clusterNode *myself;  /* 當前節點的clusterNode信息 */
   ....
   dict *nodes;          /* name到clusterNode的字典 */
   ....
   clusterNode *slots[CLUSTER_SLOTS]; /* slot 和節點的對應關係*/
   ....
} clusterState;

它有三個比較關鍵的字段,具體示意圖以下所示:

  • myself 字段,是一個 clusterNode 結構,用來記錄本身的狀態;
  • nodes 字典,記錄一個 name 到 clusterNode 結構的映射,以此來記錄其餘節點的狀態;
  • slot 數組,記錄slot 對應的節點 clusterNode結構。

clusterNode 結構保存了一個節點的當前狀態,好比節點的建立時間、節點的名字、節點 當前的配置紀元、節點的IP地址和端口號等等。除此以外,clusterNode結構的 link 屬性是一個clusterLink結構,該結構保存了鏈接節點所需的有關信息**,好比**套接字描述符,輸入緩衝區和輸出緩衝區。clusterNode 還有一個 fail_report 的列表,用來記錄疑似下線報告。具體定義以下所示。

typedef struct clusterNode {
    mstime_t ctime; /* 建立節點的時間 */
    char name[CLUSTER_NAMELEN]; /* 節點的名字 */
    int flags;      /* 節點標識,標記節點角色或者狀態,好比主節點從節點或者在線和下線 */
    uint64_t configEpoch; /* 當前節點已知的集羣統一epoch */
    unsigned char slots[CLUSTER_SLOTS/8]; /* slots handled by this node */
    int numslots;   /* Number of slots handled by this node */
    int numslaves;  /* Number of slave nodes, if this is a master */
    struct clusterNode **slaves; /* pointers to slave nodes */
    struct clusterNode *slaveof; /* pointer to the master node. Note that it
                                    may be NULL even if the node is a slave
                                    if we don't have the master node in our
                                    tables. */

    mstime_t ping_sent;      /* 當前節點最後一次向該節點發送 PING 消息的時間 */
    mstime_t pong_received;  /* 當前節點最後一次收到該節點 PONG 消息的時間 */
    mstime_t fail_time;      /* FAIL 標誌位被設置的時間 */
    mstime_t voted_time;     /* Last time we voted for a slave of this master */
    mstime_t repl_offset_time;  /* Unix time we received offset for this node */
    mstime_t orphaned_time;     /* Starting time of orphaned master condition */
    long long repl_offset;      /* 當前節點的repl便宜 */
    char ip[NET_IP_STR_LEN];  /* 節點的IP 地址 */
    int port;                   /* 端口 */
    int cport;                  /* 通訊端口,通常是端口+1000 */
    clusterLink *link;          /* 和該節點的 tcp 鏈接 */
    list *fail_reports;         /* 下線記錄列表 */
} clusterNode;

clusterNodeFailReport 是記錄節點下線報告的結構體, node 是報告節點的信息,而 time 則表明着報告時間。

typedef struct clusterNodeFailReport {
    struct clusterNode *node;  /* 報告當前節點已經下線的節點 */
    mstime_t time;             /* 報告時間 */
} clusterNodeFailReport;

消息結構體

瞭解了 Reids 節點維護的數據結構體後,咱們再來看節點進行通訊的消息結構體。通訊消息最外側的結構體爲 clusterMsg,它包括了不少消息記錄信息,包括 RCmb 標誌位,消息總長度,消息協議版本,消息類型;它還包括了發送該消息節點的記錄信息,好比節點名稱,節點負責的slot信息,節點ip和端口等;最後它包含了一個 clusterMsgData 來攜帶具體類型的消息。

typedef struct {
    char sig[4];        /* 標誌位,"RCmb" (Redis Cluster message bus). */
    uint32_t totlen;    /* 消息總長度 */
    uint16_t ver;       /* 消息協議版本 */
    uint16_t port;      /* 端口 */
    uint16_t type;      /* 消息類型 */
    uint16_t count;     /*  */
    uint64_t currentEpoch;  /* 表示本節點當前記錄的整個集羣的統一的epoch,用來決策選舉投票等,與configEpoch不一樣的是:configEpoch表示的是master節點的惟一標誌,currentEpoch是集羣的惟一標誌。 */
    uint64_t configEpoch;   /* 每一個master節點都有一個惟一的configEpoch作標誌,若是和其餘master節點衝突,會強制自增使本節點在集羣中惟一 */
    uint64_t offset;    /* 主從複製偏移相關信息,主節點和從節點含義不一樣 */
    char sender[CLUSTER_NAMELEN]; /* 發送節點的名稱 */
    unsigned char myslots[CLUSTER_SLOTS/8]; /* 本節點負責的slots信息,16384/8個char數組,一共爲16384bit */
    char slaveof[CLUSTER_NAMELEN]; /* master信息,假如本節點是slave節點的話,協議帶有master信息 */
    char myip[NET_IP_STR_LEN];    /* IP */
    char notused1[34];  /* 保留字段 */
    uint16_t cport;      /* 集羣的通訊端口 */
    uint16_t flags;      /* 本節點當前的狀態,好比 CLUSTER_NODE_HANDSHAKE、CLUSTER_NODE_MEET */
    unsigned char state; /* Cluster state from the POV of the sender */
    unsigned char mflags[3]; /* 本條消息的類型,目前只有兩類:CLUSTERMSG_FLAG0_PAUSED、CLUSTERMSG_FLAG0_FORCEACK */
    union clusterMsgData data;
} clusterMsg;

clusterMsgData 是一個 union 結構體,它能夠爲 PING,MEET,PONG 或者 FAIL 等消息體。其中當消息爲 PING、MEET 和 PONG 類型時,ping 字段是被賦值的,而是 FAIL 類型時,fail 字段是被賦值的。

// 注意這是 union 關鍵字
union clusterMsgData {
    /* PING, MEET 或者 PONG 消息時,ping 字段被賦值 */
    struct {
        /* Array of N clusterMsgDataGossip structures */
        clusterMsgDataGossip gossip[1];
    } ping;
    /*  FAIL 消息時,fail 被賦值 */
    struct {
        clusterMsgDataFail about;
    } fail;
    // .... 省略 publish 和 update 消息的字段
};

clusterMsgDataGossip 是 PING、PONG 和 MEET 消息的結構體,它會包括髮送消息節點維護的其餘節點信息,也就是上文中 clusterState 中 nodes 字段包含的信息,具體代碼以下所示,你也會發現兩者的字段是相似的。

typedef struct {
 /* 節點的名字,默認是隨機的,MEET消息發送並獲得回覆後,集羣會爲該節點設置正式的名稱*/
    char nodename[CLUSTER_NAMELEN]; 
    uint32_t ping_sent; /* 發送節點最後一次給接收節點發送 PING 消息的時間戳,收到對應 PONG 回覆後會被賦值爲0 */
    uint32_t pong_received; /* 發送節點最後一次收到接收節點發送 PONG 消息的時間戳 */
    char ip[NET_IP_STR_LEN];  /* IP address last time it was seen */
    uint16_t port;       /* IP*/       
    uint16_t cport;      /* 端口*/  
    uint16_t flags;      /* 標識*/ 
    uint32_t notused1;   /* 對齊字符*/
} clusterMsgDataGossip;

typedef struct {
    char nodename[CLUSTER_NAMELEN]; /* 下線節點的名字 */
} clusterMsgDataFail;

看完了節點維護的數據結構體和發送的消息結構體後,咱們就來看看 Redis 的具體行爲源碼了。

隨機週期性發送PING消息

Redis 的 clusterCron 函數會被定時調用,每被執行10次,就會準備向隨機的一個節點發送 PING 消息。

它會先隨機的選出 5 個節點,而後從中選擇最久沒有與之通訊的節點,調用 clusterSendPing 函數發送類型爲 CLUSTERMSG_TYPE_PING 的消息

// cluster.c 文件 
// clusterCron() 每執行 10 次(至少間隔一秒鐘),就向一個隨機節點發送 gossip 信息
if (!(iteration % 10)) {
    int j;

    /* 隨機 5 個節點,選出其中一個 */
    for (j = 0; j < 5; j++) {
        de = dictGetRandomKey(server.cluster->nodes);
        clusterNode *this = dictGetVal(de);

        /* 不要 PING 鏈接斷開的節點,也不要 PING 最近已經 PING 過的節點 */
        if (this->link == NULL || this->ping_sent != 0continue;
        if (this->flags & (CLUSTER_NODE_MYSELF|CLUSTER_NODE_HANDSHAKE))
            continue;
        /* 對比 pong_received 字段,選出更長時間未收到其 PONG 消息的節點(表示很久沒有接受到該節點的PONG消息了) */
        if (min_pong_node == NULL || min_pong > this->pong_received) {
            min_pong_node = this;
            min_pong = this->pong_received;
        }
    }
    /* 向最久沒有收到 PONG 回覆的節點發送 PING 命令 */
    if (min_pong_node) {
        serverLog(LL_DEBUG,"Pinging node %.40s", min_pong_node->name);
        clusterSendPing(min_pong_node->link, CLUSTERMSG_TYPE_PING);
    }
}

clusterSendPing 函數的具體行爲咱們後續再瞭解,由於該函數在其餘環節也會常常用到

節點加入集羣

當節點執行 CLUSTER MEET 命令後,會在自身給新節點維護一個 clusterNode 結構體,該結構體的 link 也就是TCP鏈接字段是 null,表示是新節點還沒有創建鏈接。

clusterCron 函數中也會處理這些未創建鏈接的新節點,調用 createClusterLink 創立鏈接,而後調用 clusterSendPing 函數來發送 MEET 消息

/* cluster.c clusterCron 函數部分,爲未建立鏈接的節點建立鏈接 */
if (node->link == NULL) {
    int fd;
    mstime_t old_ping_sent;
    clusterLink *link;
    /* 和該節點創建鏈接 */
    fd = anetTcpNonBlockBindConnect(server.neterr, node->ip,
        node->cport, NET_FIRST_BIND_ADDR);
    /* .... fd 爲-1時的異常處理 */
    /* 創建 link */
    link = createClusterLink(node);
    link->fd = fd;
    node->link = link;
    aeCreateFileEvent(server.el,link->fd,AE_READABLE,
            clusterReadHandler,link);
    /* 向新鏈接的節點發送 PING 命令,防止節點被識進入下線 */
    /* 若是節點被標記爲 MEET ,那麼發送 MEET 命令,不然發送 PING 命令 */
    old_ping_sent = node->ping_sent;
    clusterSendPing(link, node->flags & CLUSTER_NODE_MEET ?
            CLUSTERMSG_TYPE_MEET : CLUSTERMSG_TYPE_PING);
    /* .... */
    /* 若是當前節點(發送者)沒能收到 MEET 信息的回覆,那麼它將再也不向目標節點發送命令。*/
    /* 若是接收到回覆的話,那麼節點將再也不處於 HANDSHAKE 狀態,並繼續向目標節點發送普通 PING 命令*/
    node->flags &= ~CLUSTER_NODE_MEET;
}

防止節點假超時及狀態過時

防止節點假超時和標記疑似下線標記也是在 clusterCron 函數中,具體以下所示。它會檢查當前全部的 nodes 節點列表,若是發現某個節點與本身的最後一個 PONG 通訊時間超過了預約的閾值的一半時,爲了防止節點是假超時,會主動釋放掉與之的 link 鏈接,而後會主動向它發送一個 PING 消息。

/* cluster.c clusterCron 函數部分,遍歷節點來檢查 fail 的節點*/
while((de = dictNext(di)) != NULL) {
    clusterNode *node = dictGetVal(de);
    now = mstime(); /* Use an updated time at every iteration. */
    mstime_t delay;

    /* 若是等到 PONG 到達的時間超過了 node timeout 一半的鏈接 */
    /* 由於儘管節點依然正常,但鏈接可能已經出問題了 */
    if (node->link && /* is connected */
        now - node->link->ctime >
        server.cluster_node_timeout && /* 還未重連 */
        node->ping_sent && /* 已經發過ping消息 */
        node->pong_received < node->ping_sent && /* 還在等待pong消息 */
        /* 等待pong消息超過了 timeout/2 */
        now - node->ping_sent > server.cluster_node_timeout/2)
    {
        /* 釋放鏈接,下次 clusterCron() 會自動重連 */
        freeClusterLink(node->link);
    }

    /* 若是目前沒有在 PING 節點*/
    /* 而且已經有 node timeout 一半的時間沒有從節點那裏收到 PONG 回覆 */
    /* 那麼向節點發送一個 PING ,確保節點的信息不會太舊,有可能一直沒有隨機中 */
    if (node->link &&
        node->ping_sent == 0 &&
        (now - node->pong_received) > server.cluster_node_timeout/2)
    {
        clusterSendPing(node->link, CLUSTERMSG_TYPE_PING);
        continue;
    }
    /* .... 處理failover和標記遺失下線 */
}

處理failover和標記疑似下線

若是防止節點假超時處理後,節點依舊未收到目標節點的 PONG 消息,而且時間已經超過了 cluster_node_timeout,那麼就將該節點標記爲疑似下線狀態。

/* 若是這是一個主節點,而且有一個從服務器請求進行手動故障轉移,那麼向從服務器發送 PING*/
if (server.cluster->mf_end &&
    nodeIsMaster(myself) &&
    server.cluster->mf_slave == node &&
    node->link)
{
    clusterSendPing(node->link, CLUSTERMSG_TYPE_PING);
    continue;
}

/* 後續代碼只在節點發送了 PING 命令的狀況下執行*/
if (node->ping_sent == 0continue;

/* 計算等待 PONG 回覆的時長 */ 
delay = now - node->ping_sent;
/* 等待 PONG 回覆的時長超過了限制值,將目標節點標記爲 PFAIL (疑似下線)*/
if (delay > server.cluster_node_timeout) {
    /* 超時了,標記爲疑似下線 */
    if (!(node->flags & (REDIS_NODE_PFAIL|REDIS_NODE_FAIL))) {
        redisLog(REDIS_DEBUG,"*** NODE %.40s possibly failing",
            node->name);
        // 打開疑似下線標記
        node->flags |= REDIS_NODE_PFAIL;
        update_state = 1;
    }
}

實際發送Gossip消息

如下是前方屢次調用過的clusterSendPing()方法的源碼,代碼中有詳細的註釋,你們能夠自行閱讀。主要的操做就是將節點自身維護的 clusterState 轉換爲對應的消息結構體,。

/* 向指定節點發送一條 MEET 、 PING 或者 PONG 消息 */
void clusterSendPing(clusterLink *link, int type) {
    unsigned char *buf;
    clusterMsg *hdr;
    int gossipcount = 0/* Number of gossip sections added so far. */
    int wanted; /* Number of gossip sections we want to append if possible. */
    int totlen; /* Total packet length. */
    // freshnodes 是用於發送 gossip 信息的計數器
    // 每次發送一條信息時,程序將 freshnodes 的值減一
    // 當 freshnodes 的數值小於等於 0 時,程序中止發送 gossip 信息
    // freshnodes 的數量是節點目前的 nodes 表中的節點數量減去 2 
    // 這裏的 2 指兩個節點,一個是 myself 節點(也便是發送信息的這個節點)
    // 另外一個是接受 gossip 信息的節點
    int freshnodes = dictSize(server.cluster->nodes)-2;

    
    /* 計算要攜帶多少節點的信息,最少3個,最多 1/10 集羣總節點數量*/
    wanted = floor(dictSize(server.cluster->nodes)/10);
    if (wanted < 3) wanted = 3;
    if (wanted > freshnodes) wanted = freshnodes;

    /* .... 省略 totlen 的計算等*/

    /* 若是發送的信息是 PING ,那麼更新最後一次發送 PING 命令的時間戳 */
    if (link->node && type == CLUSTERMSG_TYPE_PING)
        link->node->ping_sent = mstime();
    /* 將當前節點的信息(好比名字、地址、端口號、負責處理的槽)記錄到消息裏面 */
    clusterBuildMessageHdr(hdr,type);

    /* Populate the gossip fields */
    int maxiterations = wanted*3;
    /* 每一個節點有 freshnodes 次發送 gossip 信息的機會
       每次向目標節點發送 2 個被選中節點的 gossip 信息(gossipcount 計數) */

    while(freshnodes > 0 && gossipcount < wanted && maxiterations--) {
        /* 從 nodes 字典中隨機選出一個節點(被選中節點) */
        dictEntry *de = dictGetRandomKey(server.cluster->nodes);
        clusterNode *this = dictGetVal(de);

        /* 如下節點不能做爲被選中節點:
         * Myself:節點自己。
         * PFAIL狀態的節點
         * 處於 HANDSHAKE 狀態的節點。
         * 帶有 NOADDR 標識的節點
         * 由於不處理任何 Slot 而被斷開鏈接的節點 
         */

        if (this == myself) continue;
        if (this->flags & CLUSTER_NODE_PFAIL) continue;
        if (this->flags & (CLUSTER_NODE_HANDSHAKE|CLUSTER_NODE_NOADDR) ||
            (this->link == NULL && this->numslots == 0))
        {
            freshnodes--; /* Tecnically not correct, but saves CPU. */
            continue;
        }

        // 檢查被選中節點是否已經在 hdr->data.ping.gossip 數組裏面
        // 若是是的話說明這個節點以前已經被選中了
        // 不要再選中它(不然就會出現重複)
        if (clusterNodeIsInGossipSection(hdr,gossipcount,this)) continue;

        /* 這個被選中節點有效,計數器減一 */
        clusterSetGossipEntry(hdr,gossipcount,this);
        freshnodes--;
        gossipcount++;
    }

    /* .... 若是有 PFAIL 節點,最後添加 */


    /* 計算信息長度 */
    totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
    totlen += (sizeof(clusterMsgDataGossip)*gossipcount);
    /* 將被選中節點的數量(gossip 信息中包含了多少個節點的信息)記錄在 count 屬性裏面*/
    hdr->count = htons(gossipcount);
    /* 將信息的長度記錄到信息裏面 */
    hdr->totlen = htonl(totlen);
    /* 發送網絡請求 */
    clusterSendMessage(link,buf,totlen);
    zfree(buf);
}


void clusterSetGossipEntry(clusterMsg *hdr, int i, clusterNode *n) {
    clusterMsgDataGossip *gossip;
    /* 指向 gossip 信息結構 */
    gossip = &(hdr->data.ping.gossip[i]);
    /* 將被選中節點的名字記錄到 gossip 信息 */   
    memcpy(gossip->nodename,n->name,CLUSTER_NAMELEN);
    /* 將被選中節點的 PING 命令發送時間戳記錄到 gossip 信息 */
    gossip->ping_sent = htonl(n->ping_sent/1000);
    /* 將被選中節點的 PONG 命令回覆的時間戳記錄到 gossip 信息 */
    gossip->pong_received = htonl(n->pong_received/1000);
    /* 將被選中節點的 IP 記錄到 gossip 信息 */
    memcpy(gossip->ip,n->ip,sizeof(n->ip));
    /* 將被選中節點的端口號記錄到 gossip 信息 */
    gossip->port = htons(n->port);
    gossip->cport = htons(n->cport);
    /* 將被選中節點的標識值記錄到 gossip 信息 */
    gossip->flags = htons(n->flags);
    gossip->notused1 = 0;
}

下面是 clusterBuildMessageHdr 函數,它主要負責填充消息結構體中的基礎信息和當前節點的狀態信息。

/* 構建消息的 header */
void clusterBuildMessageHdr(clusterMsg *hdr, int type) {
    int totlen = 0;
    uint64_t offset;
    clusterNode *master;

    /* 若是當前節點是salve,則master爲其主節點,若是當前節點是master節點,則master就是當前節點 */
    master = (nodeIsSlave(myself) && myself->slaveof) ?
              myself->slaveof : myself;

    memset(hdr,0,sizeof(*hdr));
    /* 初始化協議版本、標識、及類型, */
    hdr->ver = htons(CLUSTER_PROTO_VER);
    hdr->sig[0] = 'R';
    hdr->sig[1] = 'C';
    hdr->sig[2] = 'm';
    hdr->sig[3] = 'b';
    hdr->type = htons(type);
    /* 消息頭設置當前節點id */
    memcpy(hdr->sender,myself->name,CLUSTER_NAMELEN);

    /* 消息頭設置當前節點ip */
    memset(hdr->myip,0,NET_IP_STR_LEN);
    if (server.cluster_announce_ip) {
        strncpy(hdr->myip,server.cluster_announce_ip,NET_IP_STR_LEN);
        hdr->myip[NET_IP_STR_LEN-1] = '\0';
    }

    /* 基礎端口及集羣內節點通訊端口 */
    int announced_port = server.cluster_announce_port ?
                         server.cluster_announce_port : server.port;
    int announced_cport = server.cluster_announce_bus_port ?
                          server.cluster_announce_bus_port :
                          (server.port + CLUSTER_PORT_INCR);
    /* 設置當前節點的槽信息 */
    memcpy(hdr->myslots,master->slots,sizeof(hdr->myslots));
    memset(hdr->slaveof,0,CLUSTER_NAMELEN);
    if (myself->slaveof != NULL)
        memcpy(hdr->slaveof,myself->slaveof->name, CLUSTER_NAMELEN);
    hdr->port = htons(announced_port);
    hdr->cport = htons(announced_cport);
    hdr->flags = htons(myself->flags);
    hdr->state = server.cluster->state;

    /* 設置 currentEpoch and configEpochs. */
    hdr->currentEpoch = htonu64(server.cluster->currentEpoch);
    hdr->configEpoch = htonu64(master->configEpoch);

    /* 設置複製偏移量 */
    if (nodeIsSlave(myself))
        offset = replicationGetSlaveOffset();
    else
        offset = server.master_repl_offset;
    hdr->offset = htonu64(offset);

    /* Set the message flags. */
    if (nodeIsMaster(myself) && server.cluster->mf_end)
        hdr->mflags[0] |= CLUSTERMSG_FLAG0_PAUSED;

    /* 計算並設置消息的總長度 */
    if (type == CLUSTERMSG_TYPE_FAIL) {
        totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
        totlen += sizeof(clusterMsgDataFail);
    } else if (type == CLUSTERMSG_TYPE_UPDATE) {
        totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
        totlen += sizeof(clusterMsgDataUpdate);
    }
    hdr->totlen = htonl(totlen);
}

後記

原本只想寫一下 Redis Cluster 的 Gossip 協議,沒想到文章越寫,內容越多,最後源碼分析也是有點有始無終,你們就湊合看一下,也但願你們繼續關注我後續的問題。


本文分享自微信公衆號 - 碼農沉思錄(code-thinker)。
若有侵權,請聯繫 support@oschina.cn 刪除。
本文參與「OSC源創計劃」,歡迎正在閱讀的你也加入,一塊兒分享。

相關文章
相關標籤/搜索