Redis 源碼分析之 cluster meet

Redis cluster 是 redis 官方提出的分佈式集羣解決方案,在此以前,有一些第三方的可選方案,如 codis、Twemproxy等。cluster 內部使用了 gossip 協議進行通訊,以達到數據的最終一致性。詳細介紹可參考官網 Redis cluster tutorial
本文試圖藉着cluster meet 命令的實現來對其中的一些通訊細節一探究竟。
咱們都知道,當 redis server 以 cluster mode 啓動時,節點 A 想加入節點 B 所在的集羣,只須要執行 CLUSTER MEET ip port 這個命令便可,經過 gossip 通訊,最終 B 所在集羣的其餘節點也都會認識到 A。大概流程圖以下:html

cluster 初始化

當 redis server 以 cluster mode 啓動時,即配置文件中的 cluster-enabled 選項設置爲 true,此時在服務啓動時,會有一個 cluster 初始化的流程,這個在以前的文章 《Redis 啓動流程》中有提到過,即執行函數 clusterInit。在 cluster 中有三個數據結構很重要, clusterStateclusterNodeclusterLink
每一個節點都保存着一個 clusterState 結構,這個結構記錄了在當前節點的視角下,集羣目前所處的狀態,即「我看到的世界是什麼樣子」。
每一個節點都會使用一個 clusterNode 結構來記錄本身的狀態, 併爲集羣中的全部其餘節點(包括主節點和從節點)都建立一個相應的 clusterNode 結構, 以此來記錄其餘節點的狀態。
clusterNode 結構的 link 屬性是一個 clusterLink 結構, 該結構保存了鏈接節點所需的有關信息, 好比套接字描述符, 輸入緩衝區和輸出緩衝區。
更多的細節能夠經過網頁 《redis 設計與實現 - 節點》進行了解。
該初始化很簡單,首先是建立一個 clusterState 結構,並初始化一些成員,以下:node

server.cluster = zmalloc(sizeof(clusterState));
server.cluster->myself = NULL;
server.cluster->currentEpoch = 0;     // 新節點的 currentEpoch = 0
server.cluster->state = CLUSTER_FAIL; // 初始狀態置爲 FAIL
server.cluster->size = 1;
server.cluster->todo_before_sleep = 0;
server.cluster->nodes = dictCreate(&clusterNodesDictType,NULL);
server.cluster->nodes_black_list = dictCreate(&clusterNodesBlackListDictType,NULL);
server.cluster->failover_auth_time = 0;
server.cluster->failover_auth_count = 0;
server.cluster->failover_auth_rank = 0;
server.cluster->failover_auth_epoch = 0;
server.cluster->cant_failover_reason = CLUSTER_CANT_FAILOVER_NONE;
server.cluster->lastVoteEpoch = 0;
server.cluster->stats_bus_messages_sent = 0;
server.cluster->stats_bus_messages_received = 0;
memset(server.cluster->slots,0, sizeof(server.cluster->slots));
clusterCloseAllSlots(); // Clear the migrating/importing state for all the slots

而後給 node.conf 文件加鎖,確保每一個節點使用本身的 cluster 配置文件。redis

if (clusterLockConfig(server.cluster_configfile) == C_ERR)
    exit(1);

藉着這個機會學習下 redis 如何使用的文件鎖。數據庫

int fd = open(filename,O_WRONLY|O_CREAT,0644);
if (fd == -1) {
    serverLog(LL_WARNING,
              "Can't open %s in order to acquire a lock: %s",
              filename, strerror(errno));
    return C_ERR;
}

if (flock(fd,LOCK_EX|LOCK_NB) == -1) {
    if (errno == EWOULDBLOCK) {
        serverLog(LL_WARNING,
                  "Sorry, the cluster configuration file %s is already used "
                  "by a different Redis Cluster node. Please make sure that "
                  "different nodes use different cluster configuration "
                  "files.", filename);
    } else {
        serverLog(LL_WARNING,
                  "Impossible to lock %s: %s", filename, strerror(errno));
    }
    close(fd);
    return C_ERR;
}

而後加載 node.conf 文件,這個過程還會檢查這個文件是否合理。服務器

若是加載失敗(或者配置文件不存在),則以 REDIS_NODE_MYSELF|REDIS_NODE_MASTER 爲標記,建立一個clusterNode 結構表示本身自己,置爲主節點,並設置本身的名字爲一個40字節的隨機串;而後將該節點添加到server.cluster->nodes中,這說明這是個新啓動的節點,生成的配置文件進行刷盤。數據結構

if (clusterLoadConfig(server.cluster_configfile) == C_ERR) {
    myself = server.cluster->myself =
        createClusterNode(NULL,CLUSTER_NODE_MYSELF|CLUSTER_NODE_MASTER);
    serverLog(LL_NOTICE,"No cluster configuration found, I'm %.40s",
              myself->name);
    clusterAddNode(myself);
    saveconf = 1;
}
if (saveconf) clusterSaveConfigOrDie(1); // 新節點,將配置刷到配置文件中,fsync

接下來,調用 listenToPort 函數,在集羣 gossip 通訊端口上建立 socket fd 進行監聽。集羣內 gossip 通訊端口是在 Redis 監聽端口基礎上加 10000,好比若是Redis監聽客戶端的端口爲 6379,則集羣監聽端口就是16379,該監聽端口用於接收其餘集羣節點發送過來的 gossip 消息。app

而後註冊監聽端口上的可讀事件,事件回調函數爲 clusterAcceptHandlerdom

#define CLUSTER_PORT_INCR 10000

if (listenToPort(server.port+CLUSTER_PORT_INCR,
                 server.cfd,&server.cfd_count) == C_ERR)
{
    exit(1);
} else {
    int j;
    for (j = 0; j < server.cfd_count; j++) {
        if (aeCreateFileEvent(server.el, server.cfd[j], AE_READABLE, 
                              clusterAcceptHandler, NULL) == AE_ERR)
            serverPanic("Unrecoverable error creating Redis Cluster "
                        "file event.");
    }
}

當前節點收到其餘集羣節點發來的TCP建鏈請求以後,就會調用 clusterAcceptHandler 函數 accept 鏈接。在 clusterAcceptHandler函數中,對於每一個已經 accept 的連接,都會建立一個clusterLink 結構表示該連接,並註冊 socket fd上的可讀事件,事件回調函數爲 clusterReadHandlersocket

#define MAX_CLUSTER_ACCEPTS_PER_CALL 1000
void clusterAcceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
    int cport, cfd;
    int max = MAX_CLUSTER_ACCEPTS_PER_CALL;
    char cip[NET_IP_STR_LEN];
    clusterLink *link;
    ... ...
    // 若是服務器正在啓動,不要接受其餘節點的鏈接, 由於 UPDATE 消息可能會干擾數據庫內容
    if (server.masterhost == NULL && server.loading) return;
    while(max--) {
        cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
        if (cfd == ANET_ERR) {
            if (errno != EWOULDBLOCK)
                serverLog(LL_VERBOSE,
                    "Error accepting cluster node: %s", server.neterr);
            return;
        }
        anetNonBlock(NULL,cfd);
        anetEnableTcpNoDelay(NULL,cfd);
        ... ...
        // 建立一個 link 結構來處理鏈接
        // 剛開始的時候, link->node 被設置成 null,由於如今咱們不知道是哪一個節點
        link = createClusterLink(NULL);
        link->fd = cfd;
        aeCreateFileEvent(server.el,cfd,AE_READABLE,clusterReadHandler,link);
    }
}

最後是 reset mf 相關的參數。tcp

CLUSTER MEET

A 節點接收 CLUSTER MEET 命令

A 節點在cluster.c -> clusterCommand 函數中,接收到 CLUSTER MEET 命令,即

if (!strcasecmp(c->argv[1]->ptr,"meet") && c->argc == 4) {
    long long port;

    // CLUSTER MEET <ip> <port>
    if (getLongLongFromObject(c->argv[3], &port) != C_OK) {
        addReplyErrorFormat(c,"Invalid TCP port specified: %s", (char*)c->argv[3]->ptr);
        return;
    }
    if (clusterStartHandshake(c->argv[2]->ptr,port) == 0 && errno == EINVAL)
    {
        addReplyErrorFormat(c,"Invalid node address specified: %s:%s",
                            (char*)c->argv[2]->ptr, (char*)c->argv[3]->ptr);
    } else {
        addReply(c,shared.ok);
    }
}

能夠看到重點在 clusterStartHandshake 這個函數。

int clusterStartHandshake(char *ip, int port) {
    clusterNode *n;
    char norm_ip[NET_IP_STR_LEN];
    struct sockaddr_storage sa;
    /* IP and Port sanity check */
    ... ...
        
    // 檢查節點(flag) norm_ip:port 是否正在握手
    if (clusterHandshakeInProgress(norm_ip,port)) { 
        errno = EAGAIN;
        return 0;
    }
    // 建立一個含隨機名字的 node,type 爲 CLUSTER_NODE_HANDSHAKE|CLUSTER_NODE_MEET
    // 相關信息會在 handshake 過程當中被修復
    n = createClusterNode(NULL,CLUSTER_NODE_HANDSHAKE|CLUSTER_NODE_MEET);
    memcpy(n->ip,norm_ip,sizeof(n->ip));
    n->port = port;
    clusterAddNode(n);
    return 1;
}
clusterNode *createClusterNode(char *nodename, int flags) {
    clusterNode *node = zmalloc(sizeof(*node));
    if (nodename)
        memcpy(node->name, nodename, CLUSTER_NAMELEN);
    else
        // 在本地新建一個 nodename 節點,節點名字隨機,跟它通訊時它會告訴我真實名字
        getRandomHexChars(node->name, CLUSTER_NAMELEN);
    node->ctime = mstime(); // mstime
    node->configEpoch = 0;
    node->flags = flags;
    memset(node->slots,0,sizeof(node->slots));
    node->slaveof = NULL;
    ... ...
    node->link = NULL; // link 爲空, 在 clusterCron 中能檢查的到
    memset(node->ip,0,sizeof(node->ip));
    node->port = 0;
    node->fail_reports = listCreate();
    ... ...
    listSetFreeMethod(node->fail_reports,zfree);
    return node;
}

這個函數會首先進行一些 ip 和 port 的合理性檢查,而後去遍歷所看到的 nodes,這個 ip:port 對應的 node 是否是正處於 CLUSTER_NODE_HANDSHAKE 狀態,是的話,就說明這是重複 meet,不必往下走。以後,經過 createClusterNode 函數建立一個帶有 CLUSTER_NODE_HANDSHAKE|CLUSTER_NODE_MEET 標記的節點,名字爲一個隨機的 40 字節字符串(由於此時對 A 來講,B 是一個陌生的節點,信息除了 ip 和 port,其餘都不知道),經過 clusterAddNode 函數加到本身的 nodes 中。
這個過程成功後,就返回給客戶端 OK 了,其餘事情須要經過 gossip 通訊去作。

A 節點發送 MEET gossip 消息給 B 節點

A 節點在定時任務 clusterCron 中,會作一些事情。

handshake_timeout = server.cluster_node_timeout;
if (handshake_timeout < 1000) handshake_timeout = 1000;

// 檢查是否有 disconnected nodes 而且從新創建鏈接
di = dictGetSafeIterator(server.cluster->nodes); // 遍歷全部節點
while((de = dictNext(di)) != NULL) {
    clusterNode *node = dictGetVal(de);
    
     // 忽略掉 myself 和 noaddr 狀態的節點
    if (node->flags & (CLUSTER_NODE_MYSELF|CLUSTER_NODE_NOADDR)) continue; 
    
    // 節點處於 handshake 狀態,且狀態維持時間超過 handshake_timeout,那麼從 nodes中刪掉它
    if (nodeInHandshake(node) && now - node->ctime > handshake_timeout) {
        clusterDelNode(node);
        continue;
    }

    // 剛剛收到 cluster meet 命令建立的新 node ,或是 server 剛啓動,或是因爲某種緣由斷開了
    if (node->link == NULL) { 
        int fd;
        mstime_t old_ping_sent;
        clusterLink *link;

        // 對端 gossip 通訊端口爲 node 端口 + 10000,建立 tcp 鏈接, 本節點至關於 client
        fd = anetTcpNonBlockBindConnect(server.neterr, node->ip, node->port+CLUSTER_PORT_INCR, NET_FIRST_BIND_ADDR);
        ... ...
        link = createClusterLink(node);
        link->fd = fd;
        node->link = link;

        // 註冊 link->fd 上的可讀事件,事件回調函數爲 clusterReadHandler
        aeCreateFileEvent(server.el,link->fd,AE_READABLE, clusterReadHandler,link);
        ... ...

        // 若是 node 帶有 MEET flag,咱們發送一個 MEET 包而不是 PING,
        // 這是爲了強制讓接收者把咱們加到它的 nodes 中
        clusterSendPing(link, node->flags & CLUSTER_NODE_MEET ? CLUSTERMSG_TYPE_MEET : CLUSTERMSG_TYPE_PING);
        ... ...
        node->flags &= ~CLUSTER_NODE_MEET;
        ... ...
    }
}
dictReleaseIterator(di);

能夠看到,遍歷本身看到的 nodes,當遍歷到 B 節點時,因爲 node->link == NULL,所以會監聽 B 的啓動端口號+10000,即 gossip 通訊端口,而後註冊可讀事件,處理函數爲 clusterReadHandler。接着會發送 CLUSTER_NODE_MEET 消息給 B 節點,消除掉 B 節點的 meet 狀態。

B 節點處理 A 發來的 MEET gossip 消息

當 B 節點接收到 A 節點發送 gossip 時,回調函數 clusterAcceptHandler 進行處理,而後會 accept 對端的 connect(B 做爲 server,對端做爲 client),註冊可讀事件,回調函數爲 clusterReadHandler,基本邏輯以下,

void clusterAcceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
    int cport, cfd;
    int max = MAX_CLUSTER_ACCEPTS_PER_CALL;
    char cip[NET_IP_STR_LEN];
    clusterLink *link;
    UNUSED(el);
    UNUSED(mask);
    UNUSED(privdata);

    // 若是服務器正在啓動,不要接受其餘節點的連接,由於 UPDATE 消息可能會干擾數據庫內容
    if (server.masterhost == NULL && server.loading) return;
    while(max--) { // 1000 個請求
        cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
        if (cfd == ANET_ERR) {
            if (errno != EWOULDBLOCK)
                serverLog(LL_VERBOSE,
                    "Error accepting cluster node: %s", server.neterr);
            return;
        }
        anetNonBlock(NULL,cfd);
        anetEnableTcpNoDelay(NULL,cfd);
        serverLog(LL_VERBOSE,"Accepted cluster node %s:%d", cip, cport);
   
        // 建立一個 link 結構來處理鏈接
        // 剛開始的時候, link->node 被設置成 null,由於如今咱們不知道是哪一個節點
        link = createClusterLink(NULL);
        link->fd = cfd;
        aeCreateFileEvent(server.el,cfd,AE_READABLE,clusterReadHandler,link);
    }
}

能夠看到每次 accept 對端connect時,都會建立一個 clusterLink 結構用來接收數據,

typedef struct clusterLink {
    mstime_t ctime;             /* Link creation time */
    int fd;                     /* TCP socket file descriptor */
    sds sndbuf;                 /* Packet send buffer */
    sds rcvbuf;                 /* Packet reception buffer */
    struct clusterNode *node;   /* Node related to this link if any, or NULL */
} clusterLink;

clusterLink 有一個指針是指向 node 自身的。
B 節點接收到 A 節點發送過來的信息,放到 clusterLinkrcvbuf 字段,而後使用 clusterProcessPacket 函數來處理(接收數據過程很簡單,不作分析)。
因此 clusterProcessPacket 函數的做用是處理別人發過來的 gossip 包。

if (!sender && type == CLUSTERMSG_TYPE_MEET) {
    clusterNode *node;

    // 建立一個帶有 CLUSTER_NODE_HANDSHAKE 標記的 cluster node,名字隨機
    node = createClusterNode(NULL,CLUSTER_NODE_HANDSHAKE);
    nodeIp2String(node->ip,link); // ip 和 port 信息均從 link 中得到
    node->port = ntohs(hdr->port);

    clusterAddNode(node);
    clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
}
.....
clusterSendPing(link,CLUSTERMSG_TYPE_PONG);

因爲這時 B 節點還不認識 A 節點,所以 B 節點從本身的 nodes 中找 A 節點是找不到的,因此 sender 是空,所以會走進如上的這段邏輯。一樣以隨機的名字,CLUSTER_NODE_HANDSHAKE 爲 flag 建立一個 node,加入本身的 nodes 中。
在這個邏輯末尾會給 A 節點回復一個 PONG 消息。

A 節點處理 B 節點回復的 PONG gossip 消息

一樣是在 clusterProcessPacket 中處理 gossip 消息。

if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_PONG || type == CLUSTERMSG_TYPE_MEET) {
    ... ...
    if (link->node) {
        if (nodeInHandshake(link->node)) { // node 處於握手狀態
            ... ...
            clusterRenameNode(link->node, hdr->sender); // 修正節點名
            link->node->flags &= ~CLUSTER_NODE_HANDSHAKE; // 消除 handshake 狀態
            link->node->flags |= flags&(CLUSTER_NODE_MASTER|CLUSTER_NODE_SLAVE);
            clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
        }
}

這個時候 A 節點會根據 B 節點發來的消息,更正 A 節點 nodes 中關於 B 節點的名字,以及消除 handshake 狀態。

B 節點發送 PING gossip 消息給 A 節點

當 B 節點在作 clusterCron 時,發現本身看到的 A 節點中的 link 爲空,即 node->link == NULL,這與上面講的 A 節點給 B 節點發 MEET 消息相似,不過在 B 節點看了 A 節點沒有 meet flag,所以發送的是 PING 消息。

A 節點處理 B 節點發來的 PING 消息

作一些邏輯,不過跟此次要討論的事情無關,後面會詳寫。

對於 PING 和 MEET 消息,不管如何都是會回覆一個 PONG 消息的

B 節點處理 A 節點回復的 PONG 消息

邏輯同上,將 B 節點的 nodes 中 A 節點的名字進行更正,而後去掉 A 節點的 handshake flag。

小結

至此,一個 cluster meet 命令執行的完整過程就解釋清楚了,畫了一個流程圖能夠幫助更好的理解這個流程。

cluster meet

相關文章
相關標籤/搜索