Redis源碼閱讀(三)集羣-鏈接初始化

Redis源碼閱讀(三)集羣-鏈接創建

 

  對於併發請求很高的生產環境,單個Redis知足不了性能要求,一般都會配置Redis集羣來提升服務性能。3.0以後的Redis支持了集羣模式。node

  Redis官方提供的集羣功能是無中心的,命令請求能夠發送到任意一個Redis節點,若是該請求的key不是由該節點負責處理,則會返回給客戶端MOVED錯誤,提示客戶端須要轉向到該key對應的處理節點上。支持集羣模式的redis客戶端會自動進行轉向,普通模式客戶端則只返回MOVED錯誤。redis

  先看下常見的Redis集羣結構:服務器

  節點兩兩之間都有鏈接,只有主節點能夠處理客戶端的命令請求;從節點複製主節點數據,並在主節點下線後,升級爲主節點。每一個主節點能夠掛多個從節點,在主節點下線後從節點須要競爭,只有一個從節點會被選舉爲主節點。數據結構

考慮如下幾個關鍵點:併發

  1. 節點是如何互發現的,請求又是如何分配到各個節點的?
  2. 其中部分節點出現故障,其餘節點是如何發現的又是怎樣恢復的?
  3. 主節點下線後從節點是如何競爭的?
  4. 是否能夠不中斷Redis服務進行動態的擴容?

  接下來幾篇會從這幾個關鍵問題入手來分析Redis集羣源碼;首先先看集羣的基本數據結構,以及節點之間是如何創建鏈接的函數

1. 數據結構

   Redis集羣是無中心的,每一個節點會存儲整個集羣各個節點的信息。咱們看下Redis源碼中存儲集羣節點信息的數據結構性能

struct clusterNode { //clusterState->nodes結構  集羣數據交互接收的地方在clusterProcessPacket
    mstime_t ctime; /* Node object creation time. */
char name[REDIS_CLUSTER_NAMELEN]; /* Node name, hex string, sha1-size */ int flags; /* REDIS_NODE_... */ //取值能夠參考clusterGenNodeDescription uint64_t configEpoch; /* Last configEpoch observed for this node */ unsigned char slots[REDIS_CLUSTER_SLOTS/8]; /* slots handled by this node */ int numslots; /* Number of slots handled by this node */ int numslaves; /* Number of slave nodes, if this is a master */ struct clusterNode **slaves; /* pointers to slave nodes */ struct clusterNode *slaveof; /* pointer to the master node */ //注意ClusterNode.slaveof與clusterMsg.slaveof的關聯 mstime_t ping_sent; /* Unix time we sent latest ping */ mstime_t pong_received; /* Unix time we received the pong */ mstime_t fail_time; /* Unix time when FAIL flag was set */ mstime_t voted_time; /* Last time we voted for a slave of this master */ mstime_t repl_offset_time; /* Unix time we received offset for this node */ long long repl_offset; /* Last known repl offset for this node. */ char ip[REDIS_IP_STR_LEN]; /* Latest known IP address of this node */ int port; /* Latest known port of this node */ A節點 B節點 clusterNode-B(link1) ---> link2(該link不屬於任何clusterNode) (A發起meet到B) 步驟1 link4 <---- clusterNode-A(link3) (該link不屬於任何clusterNode) (B收到meet後,再下一個clusterCron中向A發起鏈接) 步驟2 */ //clusterCron若是節點的link爲NULL,則須要進行重連,在freeClusterLink中若是和集羣中某個節點異常掛掉,則本節點經過讀寫事件而感知到,
//而後在freeClusterLink置爲NULL
clusterLink *link; /* TCP/IP link with this node */ //還有個賦值的地方在clusterCron,當主動和對端創建鏈接的時候賦值
list *fail_reports; /* List of nodes signaling this as failing */ //鏈表中成員類型爲clusterNodeFailReport }; typedef struct clusterNode clusterNode;

 

  clusterNode結構體存儲了一個節點的基本信息,包括節點的IP,port,鏈接信息等;Redis節點每次和其餘節點創建鏈接都會建立一個clusterNode用來記錄其餘節點的信息, 這些clusterNode都會存儲到clusterState結構中,每一個節點自身只擁有一個clusterState,用來存儲整個集羣系統的狀態和信息。ui

typedef struct clusterState { //數據源頭在server.cluster   //集羣相關配置加載在clusterLoadConfig

    clusterNode *myself;  /* This node */

    uint64_t currentEpoch; 
    
    int state;            /* REDIS_CLUSTER_OK, REDIS_CLUSTER_FAIL, ... */

    int size;             /* Num of master nodes with at least one slot */ //默認從1開始,而不是從0開始

    dict *nodes;          /* Hash table of name -> clusterNode structures */

  ......
// 例如 slots[i] = clusterNode_A 表示槽 i 由節點 A 處理 clusterNode *slots[REDIS_CLUSTER_SLOTS];
zskiplist *slots_to_keys; /* The following fields are used to take the slave state on elections. */ ...... } clusterState;

  clusterState結構中還有不少是故障遷移時須要用到的成員,與集羣鏈接初始化關係不大,能夠先不關注,後面再分析。nodes* 存儲的就是本節點所知的集羣全部節點的信息。this

2 鏈接創建

  集羣節點在初始化前都是孤立的Redis服務節點,尚未連成一個總體。其餘節點的信息是如何被該節點獲取的,整個集羣是如何鏈接起來的呢?spa

  這裏有兩種途徑:

  1)人爲干預指定讓節點和其餘節點鏈接,也就是經過cluster meet命令來指定要連入的其餘節點;

  2)集羣自發傳播,靠集羣內部的gossip協議自發擴散其餘節點的信息。想象下若是沒有集羣內部的自發傳播,任意兩個節點間的鏈接都須要人爲輸入命令來創建;節點數若是爲n, 整個集羣創建的總鏈接數量會達到n*(n-1);要想創建起整個集羣,讓每一個節點都知道完整的集羣信息,須要的cluster meet指令數量是O(n2),節點多起來的話初始化的成本會很高。因此說內部自發的傳播是頗有必要的。

  下面來看兩種方式的源碼實現:

Meet指令

CLUSTER MEET <ip> <port>

  該指令會指定另外一個節點的ip和port,讓接收到MEET命令的Redis節點去和該ip和端口創建鏈接;

struct redisCommand redisCommandTable[] = {  //sentinelcmds  redisCommandTable  配置文件加載見loadServerConfigFromString 全部配置文件加載見loadServerConfigFromStringsentinel
    {"get",getCommand,2,"r",0,NULL,1,1,1,0,0},
    {"set",setCommand,-3,"wm",0,NULL,1,1,1,0,0},
    {"setnx",setnxCommand,3,"wm",0,NULL,1,1,1,0,0},
   ......
   {"cluster",clusterCommand,-2,"ar",0,NULL,0,0,0,0,0},
   ......
}

  能夠看出Redis服務處理cluster meet指令的函數是clusterCommand。

//CLUSTER 命令的實現
void clusterCommand(redisClient *c) {   
// 不能在非集羣模式下使用該命令
    if (server.cluster_enabled == 0) {
        addReplyError(c,"This instance has cluster support disabled");
        return;
    }
    if (!strcasecmp(c->argv[1]->ptr,"meet") && c->argc == 4) {
        /* CLUSTER MEET <ip> <port> */
         // 將給定地址的節點添加到當前節點所處的集羣裏面
        long long port;
        // 檢查 port 參數的合法性
        if (getLongLongFromObject(c->argv[3], &port) != REDIS_OK) {
            addReplyErrorFormat(c,"Invalid TCP port specified: %s",
                                (char*)c->argv[3]->ptr);
            return;
        }
        //A經過cluster meet bip bport  B後,B端在clusterAcceptHandler接收鏈接,A端經過clusterCommand->clusterStartHandshake鏈接服務器
        // 嘗試與給定地址的節點進行鏈接
        if (clusterStartHandshake(c->argv[2]->ptr,port) == 0 &&
            errno == EINVAL)
        {
             // 鏈接失敗
            addReplyErrorFormat(c,"Invalid node address specified: %s:%s",
                            (char*)c->argv[2]->ptr, (char*)c->argv[3]->ptr);
        } else {
             // 鏈接成功
            addReply(c,shared.ok);
        }
   ......
}

  A節點收到cluster meet B指令後,A進入處理函數clusterCommand,並在該函數中調用clusterStartHandshake鏈接B服務器。這個函數實質上也只是建立一個記錄了B節點信息的clusterNode(B),並將clusterNode(B)的link置爲空。真正發起鏈接的是集羣的時間事件處理函數clusterCron。clusterCron會遍歷A節點上全部的nodes,並向link爲空的節點發起鏈接。這裏的鏈接又用到前面介紹的文件事件機制,再也不贅述。

Gossip消息擴散

  Gossip消息的擴散是利用節點之間的ping消息,在經過meet創建鏈接以後爲了對節點在線狀態進行檢測,每一個節點都要對本身已知集羣節點發送ping消息,若是在超時時間內返回了pong則認爲節點正常在線。

  假定對於A、B、C三個節點,初始只向A節點發送了以下兩條meet指令:

    Cluster meet B

    Cluster meet C

  對於A來說,B和C都是已知的節點信息;A會向B、C分別發送ping消息;在A發送ping消息給B時,發送方A會在gossip消息體中隨機帶上已知的節點信息(假設包含C節點);接收到ping消息的B節點會解析這gossip消息體中的節點信息,發現C節點是未知節點,那麼就會向C節點進行握手,並創建鏈接。那麼對B來說,C也成爲了已知節點。

  看下接收gossip消息並處理未知節點的函數實現:

*/ //解釋 MEET 、 PING 或 PONG 消息中和 gossip 協議有關的信息。
void clusterProcessGossipSection(clusterMsg *hdr, clusterLink *link) {

     // 記錄這條消息中包含了多少個節點的信息
    uint16_t count = ntohs(hdr->count);
    // 指向第一個節點的信息
    clusterMsgDataGossip *g = (clusterMsgDataGossip*) hdr->data.ping.gossip;
    // 取出發送者
    clusterNode *sender = link->node ? link->node : clusterLookupNode(hdr->sender);
    // 遍歷全部節點的信息
    while(count--) {
        sds ci = sdsempty();

        // 分析節點的 flag
        uint16_t flags = ntohs(g->flags);

        // 信息節點
        clusterNode *node;

        // 取出節點的 flag
        if (flags == 0) ci = sdscat(ci,"noflags,");
        if (flags & REDIS_NODE_MYSELF) ci = sdscat(ci,"myself,");
        if (flags & REDIS_NODE_MASTER) ci = sdscat(ci,"master,");
        if (flags & REDIS_NODE_SLAVE) ci = sdscat(ci,"slave,");
        if (flags & REDIS_NODE_PFAIL) ci = sdscat(ci,"fail?,");
        if (flags & REDIS_NODE_FAIL) ci = sdscat(ci,"fail,");
        if (flags & REDIS_NODE_HANDSHAKE) ci = sdscat(ci,"handshake,");
        if (flags & REDIS_NODE_NOADDR) ci = sdscat(ci,"noaddr,");
        if (ci[sdslen(ci)-1] == ',') ci[sdslen(ci)-1] = ' ';

        redisLog(REDIS_DEBUG,"GOSSIP %.40s %s:%d %s",
            g->nodename,
            g->ip,
            ntohs(g->port),
            ci);
        sdsfree(ci);

        /* Update our state accordingly to the gossip sections */
        // 使用消息中的信息對節點進行更新
        node = clusterLookupNode(g->nodename);
        // 節點已經存在於當前節點
        if (node) {
            /* We already know this node.
               Handle failure reports, only when the sender is a master. */
            if (sender && nodeIsMaster(sender) && node != myself) {
                if (flags & (REDIS_NODE_FAIL|REDIS_NODE_PFAIL)) {//發送端每隔1s會從集羣挑選一個節點來發送PING,參考CLUSTERMSG_TYPE_PING
                    // 添加 sender 對 node 的下線報告
                    if (clusterNodeAddFailureReport(node,sender)) { 
                    //clusterProcessGossipSection->clusterNodeAddFailureReport把接收的fail或者pfail添加到本地fail_reports
                        redisLog(REDIS_VERBOSE,
                            "Node %.40s reported node %.40s as not reachable.",
                            sender->name, node->name); //sender節點告訴本節點node節點異常了
                    }
                    // 嘗試將 node 標記爲 FAIL
                    markNodeAsFailingIfNeeded(node);
                 // 節點處於正常狀態
                } else {
                     // 若是 sender 曾經發送過對 node 的下線報告      
                     // 那麼清除該報告
                    if (clusterNodeDelFailureReport(node,sender)) {
                        redisLog(REDIS_VERBOSE,
                            "Node %.40s reported node %.40s is back online.",
                            sender->name, node->name);
                    }
                }
            }

            /* If we already know this node, but it is not reachable, and
             * we see a different address in the gossip section, start an
             * handshake with the (possibly) new address: this will result
             * into a node address update if the handshake will be
             * successful. */
            // 若是節點以前處於 PFAIL 或者 FAIL 狀態         
            // 而且該節點的 IP 或者端口號已經發生變化       
            // 那麼多是節點換了新地址,嘗試對它進行握手
            if (node->flags & (REDIS_NODE_FAIL|REDIS_NODE_PFAIL) &&
                (strcasecmp(node->ip,g->ip) || node->port != ntohs(g->port)))
            {
                clusterStartHandshake(g->ip,ntohs(g->port));
            }

         // 當前節點不認識 node
        } else {
            if (sender &&
                !(flags & REDIS_NODE_NOADDR) &&
                !clusterBlacklistExists(g->nodename)) 
            //若是本節點經過cluster forget把某個節點刪除本節點集羣的話,那麼這個被刪的節點須要等黑名單過時後本節點才能發送handshark
            {
                clusterStartHandshake(g->ip,ntohs(g->port)); //這樣本地就會建立這個不存在的node節點了,本地也就有了sender裏面有,本地沒有的節點了
            }
        }

        /* Next node */
        // 處理下個節點的信息
        g++;
    }
}

  Gossip協議的原理通俗來說就是一傳十,十傳百;互相之間傳遞集羣節點信息,最終能夠達到系統中全部節點都能獲取到完整的集羣節點。在ping消息中附加集羣節點信息,帶來的額外負擔就是每次接收到ping消息都要預先遍歷下gossip消息中全部節點信息,並判斷是否有包含自身未知的節點,還要創建鏈接。爲了減輕接收方的負擔,gossip消息能夠不附帶全部節點信息,附帶隨機節點也能夠最終達到全部節點都取到完整集羣信息的目的。

相關文章
相關標籤/搜索