Redis cluster 是 redis 官方提出的分佈式集羣解決方案,在此以前,有一些第三方的可選方案,如 codis、Twemproxy等。cluster 內部使用了 gossip 協議進行通訊,以達到數據的最終一致性。詳細介紹可參考官網 Redis cluster tutorial。
本文試圖藉着cluster meet
命令的實現來對其中的一些通訊細節一探究竟。
咱們都知道,當 redis server 以 cluster mode 啓動時,節點 A 想加入節點 B 所在的集羣,只須要執行 CLUSTER MEET ip port
這個命令便可,經過 gossip 通訊,最終 B 所在集羣的其餘節點也都會認識到 A。大概流程圖以下:html
當 redis server 以 cluster mode 啓動時,即配置文件中的 cluster-enabled
選項設置爲 true
,此時在服務啓動時,會有一個 cluster 初始化的流程,這個在以前的文章 《Redis 啓動流程》中有提到過,即執行函數 clusterInit
。在 cluster 中有三個數據結構很重要, clusterState
、 clusterNode
和 clusterLink
。
每一個節點都保存着一個 clusterState
結構,這個結構記錄了在當前節點的視角下,集羣目前所處的狀態,即「我看到的世界是什麼樣子」。
每一個節點都會使用一個 clusterNode
結構來記錄本身的狀態, 併爲集羣中的全部其餘節點(包括主節點和從節點)都建立一個相應的 clusterNode
結構, 以此來記錄其餘節點的狀態。clusterNode
結構的 link
屬性是一個 clusterLink
結構, 該結構保存了鏈接節點所需的有關信息, 好比套接字描述符, 輸入緩衝區和輸出緩衝區。
更多的細節能夠經過網頁 《redis 設計與實現 - 節點》進行了解。
該初始化很簡單,首先是建立一個 clusterState
結構,並初始化一些成員,以下:node
server.cluster = zmalloc(sizeof(clusterState)); server.cluster->myself = NULL; server.cluster->currentEpoch = 0; // 新節點的 currentEpoch = 0 server.cluster->state = CLUSTER_FAIL; // 初始狀態置爲 FAIL server.cluster->size = 1; server.cluster->todo_before_sleep = 0; server.cluster->nodes = dictCreate(&clusterNodesDictType,NULL); server.cluster->nodes_black_list = dictCreate(&clusterNodesBlackListDictType,NULL); server.cluster->failover_auth_time = 0; server.cluster->failover_auth_count = 0; server.cluster->failover_auth_rank = 0; server.cluster->failover_auth_epoch = 0; server.cluster->cant_failover_reason = CLUSTER_CANT_FAILOVER_NONE; server.cluster->lastVoteEpoch = 0; server.cluster->stats_bus_messages_sent = 0; server.cluster->stats_bus_messages_received = 0; memset(server.cluster->slots,0, sizeof(server.cluster->slots)); clusterCloseAllSlots(); // Clear the migrating/importing state for all the slots
而後給 node.conf 文件加鎖,確保每一個節點使用本身的 cluster 配置文件。redis
if (clusterLockConfig(server.cluster_configfile) == C_ERR) exit(1);
藉着這個機會學習下 redis 如何使用的文件鎖。數據庫
int fd = open(filename,O_WRONLY|O_CREAT,0644); if (fd == -1) { serverLog(LL_WARNING, "Can't open %s in order to acquire a lock: %s", filename, strerror(errno)); return C_ERR; } if (flock(fd,LOCK_EX|LOCK_NB) == -1) { if (errno == EWOULDBLOCK) { serverLog(LL_WARNING, "Sorry, the cluster configuration file %s is already used " "by a different Redis Cluster node. Please make sure that " "different nodes use different cluster configuration " "files.", filename); } else { serverLog(LL_WARNING, "Impossible to lock %s: %s", filename, strerror(errno)); } close(fd); return C_ERR; }
而後加載 node.conf 文件,這個過程還會檢查這個文件是否合理。服務器
若是加載失敗(或者配置文件不存在),則以 REDIS_NODE_MYSELF|REDIS_NODE_MASTER
爲標記,建立一個clusterNode 結構表示本身自己,置爲主節點,並設置本身的名字爲一個40字節的隨機串;而後將該節點添加到server.cluster->nodes中,這說明這是個新啓動的節點,生成的配置文件進行刷盤。數據結構
if (clusterLoadConfig(server.cluster_configfile) == C_ERR) { myself = server.cluster->myself = createClusterNode(NULL,CLUSTER_NODE_MYSELF|CLUSTER_NODE_MASTER); serverLog(LL_NOTICE,"No cluster configuration found, I'm %.40s", myself->name); clusterAddNode(myself); saveconf = 1; } if (saveconf) clusterSaveConfigOrDie(1); // 新節點,將配置刷到配置文件中,fsync
接下來,調用 listenToPort
函數,在集羣 gossip 通訊端口上建立 socket fd 進行監聽。集羣內 gossip 通訊端口是在 Redis 監聽端口基礎上加 10000,好比若是Redis監聽客戶端的端口爲 6379,則集羣監聽端口就是16379,該監聽端口用於接收其餘集羣節點發送過來的 gossip 消息。app
而後註冊監聽端口上的可讀事件,事件回調函數爲 clusterAcceptHandler
。dom
#define CLUSTER_PORT_INCR 10000 if (listenToPort(server.port+CLUSTER_PORT_INCR, server.cfd,&server.cfd_count) == C_ERR) { exit(1); } else { int j; for (j = 0; j < server.cfd_count; j++) { if (aeCreateFileEvent(server.el, server.cfd[j], AE_READABLE, clusterAcceptHandler, NULL) == AE_ERR) serverPanic("Unrecoverable error creating Redis Cluster " "file event."); } }
當前節點收到其餘集羣節點發來的TCP建鏈請求以後,就會調用 clusterAcceptHandler
函數 accept 鏈接。在 clusterAcceptHandler
函數中,對於每一個已經 accept 的連接,都會建立一個clusterLink
結構表示該連接,並註冊 socket fd上的可讀事件,事件回調函數爲 clusterReadHandler
。socket
#define MAX_CLUSTER_ACCEPTS_PER_CALL 1000 void clusterAcceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) { int cport, cfd; int max = MAX_CLUSTER_ACCEPTS_PER_CALL; char cip[NET_IP_STR_LEN]; clusterLink *link; ... ... // 若是服務器正在啓動,不要接受其餘節點的鏈接, 由於 UPDATE 消息可能會干擾數據庫內容 if (server.masterhost == NULL && server.loading) return; while(max--) { cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport); if (cfd == ANET_ERR) { if (errno != EWOULDBLOCK) serverLog(LL_VERBOSE, "Error accepting cluster node: %s", server.neterr); return; } anetNonBlock(NULL,cfd); anetEnableTcpNoDelay(NULL,cfd); ... ... // 建立一個 link 結構來處理鏈接 // 剛開始的時候, link->node 被設置成 null,由於如今咱們不知道是哪一個節點 link = createClusterLink(NULL); link->fd = cfd; aeCreateFileEvent(server.el,cfd,AE_READABLE,clusterReadHandler,link); } }
最後是 reset mf 相關的參數。tcp
A 節點在cluster.c
-> clusterCommand
函數中,接收到 CLUSTER MEET
命令,即
if (!strcasecmp(c->argv[1]->ptr,"meet") && c->argc == 4) { long long port; // CLUSTER MEET <ip> <port> if (getLongLongFromObject(c->argv[3], &port) != C_OK) { addReplyErrorFormat(c,"Invalid TCP port specified: %s", (char*)c->argv[3]->ptr); return; } if (clusterStartHandshake(c->argv[2]->ptr,port) == 0 && errno == EINVAL) { addReplyErrorFormat(c,"Invalid node address specified: %s:%s", (char*)c->argv[2]->ptr, (char*)c->argv[3]->ptr); } else { addReply(c,shared.ok); } }
能夠看到重點在 clusterStartHandshake
這個函數。
int clusterStartHandshake(char *ip, int port) { clusterNode *n; char norm_ip[NET_IP_STR_LEN]; struct sockaddr_storage sa; /* IP and Port sanity check */ ... ... // 檢查節點(flag) norm_ip:port 是否正在握手 if (clusterHandshakeInProgress(norm_ip,port)) { errno = EAGAIN; return 0; } // 建立一個含隨機名字的 node,type 爲 CLUSTER_NODE_HANDSHAKE|CLUSTER_NODE_MEET // 相關信息會在 handshake 過程當中被修復 n = createClusterNode(NULL,CLUSTER_NODE_HANDSHAKE|CLUSTER_NODE_MEET); memcpy(n->ip,norm_ip,sizeof(n->ip)); n->port = port; clusterAddNode(n); return 1; }
clusterNode *createClusterNode(char *nodename, int flags) { clusterNode *node = zmalloc(sizeof(*node)); if (nodename) memcpy(node->name, nodename, CLUSTER_NAMELEN); else // 在本地新建一個 nodename 節點,節點名字隨機,跟它通訊時它會告訴我真實名字 getRandomHexChars(node->name, CLUSTER_NAMELEN); node->ctime = mstime(); // mstime node->configEpoch = 0; node->flags = flags; memset(node->slots,0,sizeof(node->slots)); node->slaveof = NULL; ... ... node->link = NULL; // link 爲空, 在 clusterCron 中能檢查的到 memset(node->ip,0,sizeof(node->ip)); node->port = 0; node->fail_reports = listCreate(); ... ... listSetFreeMethod(node->fail_reports,zfree); return node; }
這個函數會首先進行一些 ip 和 port 的合理性檢查,而後去遍歷所看到的 nodes,這個 ip:port 對應的 node 是否是正處於 CLUSTER_NODE_HANDSHAKE
狀態,是的話,就說明這是重複 meet,不必往下走。以後,經過 createClusterNode
函數建立一個帶有 CLUSTER_NODE_HANDSHAKE|CLUSTER_NODE_MEET
標記的節點,名字爲一個隨機的 40 字節字符串(由於此時對 A 來講,B 是一個陌生的節點,信息除了 ip 和 port,其餘都不知道),經過 clusterAddNode
函數加到本身的 nodes 中。
這個過程成功後,就返回給客戶端 OK 了,其餘事情須要經過 gossip 通訊去作。
A 節點在定時任務 clusterCron
中,會作一些事情。
handshake_timeout = server.cluster_node_timeout; if (handshake_timeout < 1000) handshake_timeout = 1000; // 檢查是否有 disconnected nodes 而且從新創建鏈接 di = dictGetSafeIterator(server.cluster->nodes); // 遍歷全部節點 while((de = dictNext(di)) != NULL) { clusterNode *node = dictGetVal(de); // 忽略掉 myself 和 noaddr 狀態的節點 if (node->flags & (CLUSTER_NODE_MYSELF|CLUSTER_NODE_NOADDR)) continue; // 節點處於 handshake 狀態,且狀態維持時間超過 handshake_timeout,那麼從 nodes中刪掉它 if (nodeInHandshake(node) && now - node->ctime > handshake_timeout) { clusterDelNode(node); continue; } // 剛剛收到 cluster meet 命令建立的新 node ,或是 server 剛啓動,或是因爲某種緣由斷開了 if (node->link == NULL) { int fd; mstime_t old_ping_sent; clusterLink *link; // 對端 gossip 通訊端口爲 node 端口 + 10000,建立 tcp 鏈接, 本節點至關於 client fd = anetTcpNonBlockBindConnect(server.neterr, node->ip, node->port+CLUSTER_PORT_INCR, NET_FIRST_BIND_ADDR); ... ... link = createClusterLink(node); link->fd = fd; node->link = link; // 註冊 link->fd 上的可讀事件,事件回調函數爲 clusterReadHandler aeCreateFileEvent(server.el,link->fd,AE_READABLE, clusterReadHandler,link); ... ... // 若是 node 帶有 MEET flag,咱們發送一個 MEET 包而不是 PING, // 這是爲了強制讓接收者把咱們加到它的 nodes 中 clusterSendPing(link, node->flags & CLUSTER_NODE_MEET ? CLUSTERMSG_TYPE_MEET : CLUSTERMSG_TYPE_PING); ... ... node->flags &= ~CLUSTER_NODE_MEET; ... ... } } dictReleaseIterator(di);
能夠看到,遍歷本身看到的 nodes,當遍歷到 B 節點時,因爲 node->link == NULL
,所以會監聽 B 的啓動端口號+10000,即 gossip 通訊端口,而後註冊可讀事件,處理函數爲 clusterReadHandler
。接着會發送 CLUSTER_NODE_MEET 消息給 B 節點,消除掉 B 節點的 meet 狀態。
當 B 節點接收到 A 節點發送 gossip 時,回調函數 clusterAcceptHandler
進行處理,而後會 accept 對端的 connect(B 做爲 server,對端做爲 client),註冊可讀事件,回調函數爲 clusterReadHandler
,基本邏輯以下,
void clusterAcceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) { int cport, cfd; int max = MAX_CLUSTER_ACCEPTS_PER_CALL; char cip[NET_IP_STR_LEN]; clusterLink *link; UNUSED(el); UNUSED(mask); UNUSED(privdata); // 若是服務器正在啓動,不要接受其餘節點的連接,由於 UPDATE 消息可能會干擾數據庫內容 if (server.masterhost == NULL && server.loading) return; while(max--) { // 1000 個請求 cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport); if (cfd == ANET_ERR) { if (errno != EWOULDBLOCK) serverLog(LL_VERBOSE, "Error accepting cluster node: %s", server.neterr); return; } anetNonBlock(NULL,cfd); anetEnableTcpNoDelay(NULL,cfd); serverLog(LL_VERBOSE,"Accepted cluster node %s:%d", cip, cport); // 建立一個 link 結構來處理鏈接 // 剛開始的時候, link->node 被設置成 null,由於如今咱們不知道是哪一個節點 link = createClusterLink(NULL); link->fd = cfd; aeCreateFileEvent(server.el,cfd,AE_READABLE,clusterReadHandler,link); } }
能夠看到每次 accept 對端connect時,都會建立一個 clusterLink
結構用來接收數據,
typedef struct clusterLink { mstime_t ctime; /* Link creation time */ int fd; /* TCP socket file descriptor */ sds sndbuf; /* Packet send buffer */ sds rcvbuf; /* Packet reception buffer */ struct clusterNode *node; /* Node related to this link if any, or NULL */ } clusterLink;
clusterLink
有一個指針是指向 node 自身的。
B 節點接收到 A 節點發送過來的信息,放到 clusterLink
的 rcvbuf
字段,而後使用 clusterProcessPacket
函數來處理(接收數據過程很簡單,不作分析)。
因此 clusterProcessPacket
函數的做用是處理別人發過來的 gossip 包。
if (!sender && type == CLUSTERMSG_TYPE_MEET) { clusterNode *node; // 建立一個帶有 CLUSTER_NODE_HANDSHAKE 標記的 cluster node,名字隨機 node = createClusterNode(NULL,CLUSTER_NODE_HANDSHAKE); nodeIp2String(node->ip,link); // ip 和 port 信息均從 link 中得到 node->port = ntohs(hdr->port); clusterAddNode(node); clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG); } ..... clusterSendPing(link,CLUSTERMSG_TYPE_PONG);
因爲這時 B 節點還不認識 A 節點,所以 B 節點從本身的 nodes 中找 A 節點是找不到的,因此 sender 是空,所以會走進如上的這段邏輯。一樣以隨機的名字,CLUSTER_NODE_HANDSHAKE 爲 flag 建立一個 node,加入本身的 nodes 中。
在這個邏輯末尾會給 A 節點回復一個 PONG 消息。
一樣是在 clusterProcessPacket
中處理 gossip 消息。
if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_PONG || type == CLUSTERMSG_TYPE_MEET) { ... ... if (link->node) { if (nodeInHandshake(link->node)) { // node 處於握手狀態 ... ... clusterRenameNode(link->node, hdr->sender); // 修正節點名 link->node->flags &= ~CLUSTER_NODE_HANDSHAKE; // 消除 handshake 狀態 link->node->flags |= flags&(CLUSTER_NODE_MASTER|CLUSTER_NODE_SLAVE); clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG); } }
這個時候 A 節點會根據 B 節點發來的消息,更正 A 節點 nodes 中關於 B 節點的名字,以及消除 handshake 狀態。
當 B 節點在作 clusterCron
時,發現本身看到的 A 節點中的 link 爲空,即 node->link == NULL
,這與上面講的 A 節點給 B 節點發 MEET 消息相似,不過在 B 節點看了 A 節點沒有 meet flag,所以發送的是 PING 消息。
作一些邏輯,不過跟此次要討論的事情無關,後面會詳寫。
對於 PING 和 MEET 消息,不管如何都是會回覆一個 PONG 消息的。
邏輯同上,將 B 節點的 nodes 中 A 節點的名字進行更正,而後去掉 A 節點的 handshake flag。
至此,一個 cluster meet
命令執行的完整過程就解釋清楚了,畫了一個流程圖能夠幫助更好的理解這個流程。