在 Redis cluster 中故障轉移是個很重要的功能,下面就從故障發現到故障轉移整個流程作一下詳細分析。html
集羣中每一個節點都會按期向其餘節點發送 PING 消息,以此來檢測對方是否在線,若是接收 PING 消息的節點 B 沒有在規定時間(cluster_node_timeout)內迴應節點 A PONG 消息,那麼節點 A 就會將節點 B 標記爲疑似下線(probable fail, PFAIL)。node
void clusterCron(void) { // ... di = dictGetSafeIterator(server.cluster->nodes); while((de = dictNext(di)) != NULL) { clusterNode *node = dictGetVal(de); now = mstime(); /* Use an updated time at every iteration. */ // ... delay = now - node->ping_sent; if (delay > server.cluster_node_timeout) { /* Timeout reached. Set the node as possibly failing if it is * not already in this state. */ if (!(node->flags & (CLUSTER_NODE_PFAIL|CLUSTER_NODE_FAIL))) { node->flags |= CLUSTER_NODE_PFAIL; update_state = 1; } } } dictReleaseIterator(di); // ... }
能夠看到,在 clusterCron
函數中若是對節點 B 發出 PING 消息,在 server.cluster_node_timeout 時間內沒有收到其返回的 PONG 消息,若是節點 B 如今沒有被標記成 CLUSTER_NODE_PFAIL 狀態,那麼如今就作下這個標記。redis
能夠根據 ping_sent 參數進行判斷的依據以下,算法
int clusterProcessPacket(clusterLink *link) { // ... if (link->node && type == CLUSTERMSG_TYPE_PONG) { link->node->pong_received = mstime(); link->node->ping_sent = 0; // ... } // ... }
當節點 A 接收到節點 B 的 PONG 消息時,會把 ping_sent 更新成 0,同時記下收到本次 PONG 消息的時間。less
上面提到的 clusterNode 與 clusterLink 有以下關聯關係:dom
能夠看出, clusterLink 就是爲了接收對端 gossip 消息而設置的。 ide
另外,咱們發現, 在上面的 clusterCron
函數中將節點標記成 PFAIL 時,會將 update_state 變量置爲 1,這會引起後面更改集羣狀態的邏輯。函數
if (update_state || server.cluster->state == CLUSTER_FAIL) clusterUpdateState();
集羣有兩個狀態,CLUSTER_OK 和 CLUSTER_FAIL,若是集羣目前狀態是 CLUSTER_FAIL,且設置了參數 cluster-require-full-coverage yes
,那麼此時訪問集羣會返回錯誤,意思是可能有某些 slot 沒有被 server 接管。測試
clusterUpdateState
函數負責更新集羣狀態,該部分邏輯與本篇博文要講的主邏輯關係不大,因此放到了後面的補充章節中了。ui
被節點 A 標記成 FAIL/ PFAIL 的節點如何讓節點 C 知道呢?這主要是經過日常發送的 PING/PONG 消息實現的,在 3.x 的版本時,會盡最大努力把這樣的節點放到 gossip 消息的流言部分,到後面的 4.x 版本的代碼中每次的 PING/PONG 消息都會把 PFAIL 節點都帶上。
clusterProcessGossipSection
函數用來處理 gossip 消息的流言部分。
void clusterProcessGossipSection(clusterMsg *hdr, clusterLink *link) { uint16_t count = ntohs(hdr->count); clusterMsgDataGossip *g = (clusterMsgDataGossip*) hdr->data.ping.gossip; clusterNode *sender = link->node ? link->node : clusterLookupNode(hdr->sender); while(count--) { // ... node = clusterLookupNode(g->nodename); if (node) { if (sender && nodeIsMaster(sender) && node != myself) { if (flags & (CLUSTER_NODE_FAIL|CLUSTER_NODE_PFAIL)) { if (clusterNodeAddFailureReport(node,sender)) { serverLog(LL_VERBOSE, "Node %.40s reported node %.40s as not reachable.", sender->name, node->name); } markNodeAsFailingIfNeeded(node); } else { // ... } } // ... } // ... } // ... }
該函數依次處理 gossip 消息流言部分攜帶的各節點信息(總節點數的1/10)。當發現帶有 CLUSTER_NODE_FAIL 或者 CLUSTER_NODE_PFAIL 時會調用 clusterNodeAddFailureReport
函數。
int clusterNodeAddFailureReport(clusterNode *failing, clusterNode *sender) { list *l = failing->fail_reports; listNode *ln; listIter li; clusterNodeFailReport *fr; /* If a failure report from the same sender already exists, just update * the timestamp. */ listRewind(l,&li); while ((ln = listNext(&li)) != NULL) { fr = ln->value; if (fr->node == sender) { fr->time = mstime(); return 0; } } /* Otherwise create a new report. */ fr = zmalloc(sizeof(*fr)); fr->node = sender; fr->time = mstime(); listAddNodeTail(l,fr); return 1; }
每個節點都有一個名爲 fail_reports 的 list 結構的變量,用來蒐集該異常節點得到了集羣中哪些節點的 PFAIL 狀態投票。fail_reports 每一個成員都是一個 clusterNodeFailReport 結構。
typedef struct clusterNodeFailReport { struct clusterNode *node; /* Node reporting the failure condition. */ mstime_t time; /* Time of the last report from this node. */ } clusterNodeFailReport;
clusterNodeFailReport 中帶有時間戳,標記這個節點上一次被報上來處於異常狀態的時間。
每次調用 clusterNodeAddFailureReport
函數時,先會檢查sender 是否已經爲該異常節點投票過了,若是有,更新時間戳,若是沒有,把 sender 加入到投票節點中。
簡單點說就是,在 A 節點看來 B 節點是 PFAIL 狀態,在 gossip 通訊中把它告訴了 C 節點,C 節點發現這個異常狀態的節點,檢查一下爲 B 節點投過票的節點中有沒有 A 節點,若是沒有就加進去。
而後下面就是判斷 PFAIL 狀態是否是要轉變成 FAIL 狀態的關鍵。
void markNodeAsFailingIfNeeded(clusterNode *node) { int failures; int needed_quorum = (server.cluster->size / 2) + 1; if (!nodeTimedOut(node)) return; /* We can reach it. */ if (nodeFailed(node)) return; /* Already FAILing. */ failures = clusterNodeFailureReportsCount(node); /* Also count myself as a voter if I'm a master. */ if (nodeIsMaster(myself)) failures++; if (failures < needed_quorum) return; /* No weak agreement from masters. */ serverLog(LL_NOTICE, "Marking node %.40s as failing (quorum reached).", node->name); /* Mark the node as failing. */ node->flags &= ~CLUSTER_NODE_PFAIL; node->flags |= CLUSTER_NODE_FAIL; node->fail_time = mstime(); /* Broadcast the failing node name to everybody, forcing all the other * reachable nodes to flag the node as FAIL. */ if (nodeIsMaster(myself)) clusterSendFail(node->name); /* 廣播這個節點的 fail 消息 */ clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG); }
C 節點收到消息,檢查下 A 報過來的異常節點 B,在本身看來是否也是 PFAIL 狀態的,若是不是,那麼不理會 A 節點本次 report。若是在節點 C 看來,節點 B 已經被標記成 FAIL 了,那麼就不須要進行下面的斷定了。
在函數 clusterNodeFailureReportsCount
中會判斷計算出把 B 節點標記成 PFAIL 狀態的節點的數量 sum,若是 sum 值小於集羣 size 的一半,爲防止誤判,忽略掉這條信息。在函數 clusterNodeFailureReportsCount
中會檢查關於 B 節點的 clusterNodeFailReport,清理掉那些過時的投票,過時時間爲 2 倍的 server.cluster_node_timeout。
若是知足條件,節點 C 將節點 B 的 PFAIL 狀態消除,標記成 FAIL,同時記下 fail_time,若是 C 節點是個 master,那麼將 B 節點 FAIL 的消息廣播出去,以便讓集羣中其餘節點儘快知道。
void clusterSendFail(char *nodename) { unsigned char buf[sizeof(clusterMsg)]; clusterMsg *hdr = (clusterMsg*) buf; clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_FAIL); memcpy(hdr->data.fail.about.nodename,nodename,CLUSTER_NAMELEN); clusterBroadcastMessage(buf,ntohl(hdr->totlen)); }
發送的 gossip 消息類型爲 CLUSTERMSG_TYPE_FAIL,廣播的節點排除自身和處於 HANDSHAKE 狀態節點。
前面說過,gossip 消息的處理函數爲 clusterProcessPacket
,下面看 CLUSTERMSG_TYPE_FAIL 類型的消息如何處理。
int clusterProcessPacket(clusterLink *link) { // ... uint16_t type = ntohs(hdr->type); // ... if (type == CLUSTERMSG_TYPE_FAIL) { // fail clusterNode *failing; if (sender) { failing = clusterLookupNode(hdr->data.fail.about.nodename); if (failing && !(failing->flags & (CLUSTER_NODE_FAIL|CLUSTER_NODE_MYSELF))) { serverLog(LL_NOTICE, "FAIL message received from %.40s about %.40s", hdr->sender, hdr->data.fail.about.nodename); failing->flags |= CLUSTER_NODE_FAIL; failing->fail_time = mstime(); failing->flags &= ~CLUSTER_NODE_PFAIL; clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG| CLUSTER_TODO_UPDATE_STATE); } } else { serverLog(LL_NOTICE, "Ignoring FAIL message from unknown node %.40s about %.40s", hdr->sender, hdr->data.fail.about.nodename); } } // ... }
集羣中另外一個節點 D 收到節點 B 廣播過來的消息:B 節點 FAIL 了。若是 D 尚未把 B 標記成 FAIL,那麼標記成 CLUSTER_NODE_FAIL,並取消 CLUSTER_NODE_PFAIL 標記;不然,忽略,由於D已經知道 B 是 FAIL 節點了。
failover 分爲兩類,主動 failover(主動切主從)以及被動 failover(被動切主從),下面挨個進行分析。
void clusterCron(void) { // ... if (nodeIsSlave(myself)) { clusterHandleSlaveFailover(); // ... } // ... }
是否要作被動主從切換,在 clusterHandleSlaveFailover
函數中有以下的判斷邏輯,
if (nodeIsMaster(myself) || myself->slaveof == NULL || (!nodeFailed(myself->slaveof) && !manual_failover) || myself->slaveof->numslots == 0) { /* There are no reasons to failover, so we set the reason why we * are returning without failing over to NONE. */ server.cluster->cant_failover_reason = CLUSTER_CANT_FAILOVER_NONE; return; }
只有知足以下條件的節點纔有資格作 failover:
假設,如今 B 節點的 slave Bx 節點檢測到 B 節點掛掉了,經過了以上的條件測試,接下來就會進行 failover。
那麼下面 Bx 節點就開始在集羣中進行拉票,該邏輯也在 clusterHandleSlaveFailover
函數中。
mstime_t auth_age = mstime() - server.cluster->failover_auth_time; int needed_quorum = (server.cluster->size / 2) + 1; mstime_t auth_timeout, auth_retry_time; auth_timeout = server.cluster_node_timeout*2; if (auth_timeout < 2000) auth_timeout =2000 ; auth_retry_time = auth_timeout*2;
cluster 的 failover_auth_time 屬性,表示 slave 節點開始進行故障轉移的時刻。集羣初始化時該屬性置爲 0,一旦知足 failover 的條件後,該屬性就置爲將來的某個時間點(不是立馬執行),在該時間點,slave 節點纔開始進行拉票。
auth_age 變量表示從發起 failover 流程開始到如今,已通過去了多長時間。
needed_quorum 變量表示當前 slave 節點必須至少得到多少選票,才能成爲新的 master。
auth_timeout 變量表示當前 slave 發起投票後,等待迴應的超時時間,至少爲 2s。若是超過該時間尚未得到足夠的選票,那麼表示本次 failover 失敗。
auth_retry_time 變量用來判斷是否能夠開始發起下一次 failover 的時間間隔。
if (server.repl_state == REPL_STATE_CONNECTED) { data_age = (mstime_t)(server.unixtime - server.master->lastinteraction) * 1000; } else { data_age = (mstime_t)(server.unixtime - server.repl_down_since) * 1000; } if (data_age > server.cluster_node_timeout) data_age -= server.cluster_node_timeout;
data_age 變量表示距離上一次與個人 master 節點交互過去了多長時間。通過 cluster_node_timeout 時間尚未收到 PONG 消息纔會將節點標記爲 PFAIL 狀態。實際上 data_age 表示在 master 節點下線以前,當前 slave 節點有多長時間沒有與其交互過了。
data_age 主要用於判斷當前 slave 節點的數據新鮮度;若是 data_age 超過了必定時間,表示當前 slave 節點的數據已經太老了,不能替換掉下線 master 節點,所以在不是手動強制故障轉移的狀況下,直接返回。
void clusterHandleSlaveFailover(void) { // ... if (auth_age > auth_retry_time) { server.cluster->failover_auth_time = mstime() + 500 + /* Fixed delay of 500 milliseconds, let FAIL msg propagate. */ random() % 500; /* Random delay between 0 and 500 milliseconds. */ server.cluster->failover_auth_count = 0; server.cluster->failover_auth_sent = 0; server.cluster->failover_auth_rank = clusterGetSlaveRank(); /* We add another delay that is proportional to the slave rank. * Specifically 1 second * rank. This way slaves that have a probably * less updated replication offset, are penalized. * */ server.cluster->failover_auth_time += server.cluster->failover_auth_rank * 1000; if (server.cluster->mf_end) { server.cluster->failover_auth_time = mstime(); server.cluster->failover_auth_rank = 0; } // ... clusterBroadcastPong(CLUSTER_BROADCAST_LOCAL_SLAVES); return; } // ... }
知足條件(auth_age > auth_retry_time)後,發起故障轉移流程。
首先設置故障轉移發起時刻,即設置 failover_auth_time 時間。
mstime() + 500 + random()%500 + rank*1000
固定延時 500ms 是爲了讓 master fail 的消息可以普遍傳播到集羣,這樣集羣中的其餘節點纔可能投票。
隨機延時是爲了不多個你 slave 節點同時發起 failover 流程。
rank 表示 slave 節點的排名,計算方式以下,
int clusterGetSlaveRank(void) { long long myoffset; int j, rank = 0; clusterNode *master; serverAssert(nodeIsSlave(myself)); master = myself->slaveof; if (master == NULL) return 0; /* Never called by slaves without master. */ myoffset = replicationGetSlaveOffset(); for (j = 0; j < master->numslaves; j++) if (master->slaves[j] != myself && master->slaves[j]->repl_offset > myoffset) rank++; return rank; }
能夠看出,排名主要是根據複製數據量來定,複製數據量越多,排名越靠前(rank 值越小)。這樣作是爲了作 failover 時儘可能選擇一個複製數據量較多的 slave,以盡最大努力保留數據。在沒有開始拉選票以前,每隔一段時間(每次調用clusterHandleSlaveFailover
函數,也就是每次 cron 的時間)就會調用一次 clusterGetSlaveRank
函數,以更新當前 slave 節點的排名。
注意,若是是 mf,那麼 failover_auth_time 和 failover_auth_rank 都置爲 0,表示該 slave 節點如今就能夠執行故障轉移。
最後向該 master 的全部 slave 廣播 PONG 消息,主要是爲了更新複製偏移量,以便其餘 slave 計算出 failover 時間點。
這時,函數返回,就此開始了一輪新的故障轉移,當已經處在某一輪故障轉移時,執行接下來的邏輯。
首先對於一些不合理的 failover 要過濾掉。
/* Return ASAP if we can't still start the election. */ if (mstime() < server.cluster->failover_auth_time) { clusterLogCantFailover(CLUSTER_CANT_FAILOVER_WAITING_DELAY); return; } /* Return ASAP if the election is too old to be valid. * failover 超時 */ if (auth_age > auth_timeout) { clusterLogCantFailover(CLUSTER_CANT_FAILOVER_EXPIRED); return; }
而後開始拉選票。
if (server.cluster->failover_auth_sent == 0) { server.cluster->currentEpoch++; // 增長當前節點的currentEpoch的值,表示要開始新一輪選舉了 server.cluster->failover_auth_epoch = server.cluster->currentEpoch; serverLog(LL_WARNING,"Starting a failover election for epoch %llu.", (unsigned long long) server.cluster->currentEpoch); /* 向全部節點發送 CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST 消息,開始拉票*/ clusterRequestFailoverAuth(); server.cluster->failover_auth_sent = 1; // 表示已經發起了故障轉移流程 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG| CLUSTER_TODO_UPDATE_STATE| CLUSTER_TODO_FSYNC_CONFIG); return; /* Wait for replies. */ }
若是 failover_auth_sent 爲 0,表示沒有發起過投票,那麼將 currentEpoch 加 1,記錄 failover_auth_epoch 爲 currentEpoch,函數 clusterRequestFailoverAuth
用來發起投票,failover_auth_sent 置 1,表示該 slave 已經發起過投票了。
void clusterRequestFailoverAuth(void) { unsigned char buf[sizeof(clusterMsg)]; clusterMsg *hdr = (clusterMsg*) buf; uint32_t totlen; clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST); /* If this is a manual failover, set the CLUSTERMSG_FLAG0_FORCEACK bit * in the header to communicate the nodes receiving the message that * they should authorized the failover even if the master is working. */ if (server.cluster->mf_end) hdr->mflags[0] |= CLUSTERMSG_FLAG0_FORCEACK; totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData); hdr->totlen = htonl(totlen); clusterBroadcastMessage(buf,totlen); }
clusterRequestFailoverAuth
函數向集羣廣播 CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST 類型的 gossip 信息,這類型的信息就是向集羣中的 master 節點索要本輪選舉中的選票。另外,若是是 mf,那麼會在 gossip hdr 中帶上 CLUSTERMSG_FLAG0_FORCEACK 信息。
else if (type == CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST) { if (!sender) return 1; /* We don't know that node. */ clusterSendFailoverAuthIfNeeded(sender,hdr); }
在 clusterProcessPacket
函數中處理 gossip 消息,當接收到 CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST 類型的消息時,調用 clusterSendFailoverAuthIfNeeded
函數處理,在知足條件的基礎上,給 sender 投票。
注:如下若不進行特殊說明,都是 clusterSendFailoverAuthIfNeeded
函數處理邏輯。
if (nodeIsSlave(myself) || myself->numslots == 0) return;
slave 節點或者不負責 slot 的 master 節點
uint64_t requestCurrentEpoch = ntohu64(request->currentEpoch); if (requestCurrentEpoch < server.cluster->currentEpoch) { serverLog(LL_WARNING, "Failover auth denied to %.40s: reqEpoch (%llu) < curEpoch(%llu)", node->name, (unsigned long long) requestCurrentEpoch, (unsigned long long) server.cluster->currentEpoch); return; }
sender 節點集羣信息過舊。
正常來講,若是 receiver 在接收到 sender 的 CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST 消息以前接收了 PING/PONG 消息,會更新本身的 currentEpoch,這時 currentEpoch 會增長,由於 sender 發起選舉以前,會先增長自身的currentEpoch;不然的話,receiver 的 currentEpoch 應該小於 sender。所以 sender 的 currentEpoch 應該 >= receiver 的。有可能 sender 是個長時間下線的節點剛剛上線,這樣的節點不能給他投票,由於它的集羣信息過舊。
if (server.cluster->lastVoteEpoch == server.cluster->currentEpoch) { serverLog(LL_WARNING, "Failover auth denied to %.40s: already voted for epoch %llu", node->name, (unsigned long long) server.cluster->currentEpoch); return; }
receiver 節點在本輪選舉中已經投過票了,避免兩個 slave 節點同時贏得本界選舉。
lastVoteEpoch 記錄了在本輪投票中 receiver 投過票的 sender 的 currentEpoch。各 slave 節點獨立發起選舉,currentEpoch 是相同的,都在原來的基礎上加 1。
clusterNode *master = node->slaveof; if (nodeIsMaster(node) || master == NULL || (!nodeFailed(master) && !force_ack)) { if (nodeIsMaster(node)) { serverLog(LL_WARNING, "Failover auth denied to %.40s: it is a master node", node->name); } else if (master == NULL) { serverLog(LL_WARNING, "Failover auth denied to %.40s: I don't know its master", node->name); } else if (!nodeFailed(master)) { serverLog(LL_WARNING, "Failover auth denied to %.40s: its master is up", node->name); } return; }
sender 是個 master。
sender 是個沒有 master 的 slave。
sender 的 master 沒有 fail,且不是個 mf。
if (mstime() - node->slaveof->voted_time < server.cluster_node_timeout * 2) { serverLog(LL_WARNING, "Failover auth denied to %.40s: " "can't vote about this master before %lld milliseconds", node->name, (long long) ((server.cluster_node_timeout*2) - (mstime() - node->slaveof->voted_time))); return; }
兩次投票時間間隔不能少於 2 倍 的 cluster_node_timeout。
這個裕量時間,使得得到贏得選舉的 slave 將新的主從關係周知集羣其餘節點,避免其餘 slave 發起新一輪的投票。
uint64_t requestConfigEpoch = ntohu64(request->configEpoch); unsigned char *claimed_slots = request->myslots; for (j = 0; j < CLUSTER_SLOTS; j++) { if (bitmapTestBit(claimed_slots, j) == 0) continue; if (server.cluster->slots[j] == NULL || server.cluster->slots[j]->configEpoch <= requestConfigEpoch) { continue; } /* If we reached this point we found a slot that in our current slots * is served by a master with a greater configEpoch than the one claimed * by the slave requesting our vote. Refuse to vote for this slave. */ serverLog(LL_WARNING, "Failover auth denied to %.40s: " "slot %d epoch (%llu) > reqEpoch (%llu)", node->name, j, (unsigned long long) server.cluster->slots[j]->configEpoch, (unsigned long long) requestConfigEpoch); return; }
sender 節點聲稱要接管的 slots,在 receiver 節點看來其中有個別 slot 原來負責節點的 configEpoch 要比 sender 的大,這說明 sender 看到的集羣消息太舊了,這多是一個長時間下線又從新上線的節點。
clusterSendFailoverAuth(node); server.cluster->lastVoteEpoch = server.cluster->currentEpoch; node->slaveof->voted_time = mstime(); // 更新投票時間
clusterSendFailoverAuth
函數中發送 CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK 類型的 gossip 消息,這就算在本輪選舉中投票了,並記錄本輪投票的 epoch以及投票時間。
slave 接收到 CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK 類型的 gossip 消息,就算統計到一票。
else if (type == CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK) { // slave 統計票數 if (!sender) return 1; /* We don't know that node. */ /* We consider this vote only if the sender is a master serving * a non zero number of slots, and its currentEpoch is greater or * equal to epoch where this node started the election. */ if (nodeIsMaster(sender) && sender->numslots > 0 && senderCurrentEpoch >= server.cluster->failover_auth_epoch) { server.cluster->failover_auth_count++; /* Maybe we reached a quorum here, set a flag to make sure * we check ASAP. */ clusterDoBeforeSleep(CLUSTER_TODO_HANDLE_FAILOVER); } }
sender 是個負責 slot 的 master 而且知足 currentEpoch 的要求,那麼這張選票有效。出現 senderCurrentEpoch < server.cluster->failover_auth_epoch
的狀況時有可能的,若是這張選票是上一輪選舉的得到選票,就不能做數。
failover_auth_count 變量中記錄了 slave 在本輪選舉中得到選票數目。
void clusterHandleSlaveFailover(void) { // ... int needed_quorum = (server.cluster->size / 2) + 1; if (server.cluster->failover_auth_count >= needed_quorum) { /* We have the quorum, we can finally failover the master. */ serverLog(LL_WARNING, "Failover election won: I'm the new master."); /* Update my configEpoch to the epoch of the election. */ if (myself->configEpoch < server.cluster->failover_auth_epoch) { myself->configEpoch = server.cluster->failover_auth_epoch; serverLog(LL_WARNING, "configEpoch set to %llu after successful failover", (unsigned long long) myself->configEpoch); } /* Take responsability for the cluster slots. */ clusterFailoverReplaceYourMaster(); } else { clusterLogCantFailover(CLUSTER_CANT_FAILOVER_WAITING_VOTES); } }
slave 節點得到足夠多選票後, 成爲新的 master 節點。
更新本身的 configEpoch 爲選舉協商的 failover_auth_epoch,這是本節點就得到了最新當前集羣最大的 configEpoch,代表它看到的集羣信息如今是最新的。
最後調用 clusterFailoverReplaceYourMaster
函數取代下線主節點,成爲新的主節點,並向其餘節點廣播這種變化。
void clusterFailoverReplaceYourMaster(void) { int j; clusterNode *oldmaster = myself->slaveof; if (nodeIsMaster(myself) || oldmaster == NULL) return; /* 1) Turn this node into a master. */ /* 把 myself 標記爲 master,並從原 master 裏刪掉,更新原 master 的涉及 slave 的參數, * 若是 slave 數量爲0,去掉它的 CLUSTER_NODE_MIGRATE_TO 標記 */ clusterSetNodeAsMaster(myself); /* 取消主從複製過程,將當前節點升級爲主節點 *、 replicationUnsetMaster(); /* 2) Claim all the slots assigned to our master. * 接手老的 master 節點負責的槽位 */ for (j = 0; j < CLUSTER_SLOTS; j++) { if (clusterNodeGetSlotBit(oldmaster,j)) { clusterDelSlot(j); clusterAddSlot(myself,j); } } /* 3) Update state and save config. */ clusterUpdateState(); clusterSaveConfigOrDie(1); /* 4) Pong all the other nodes so that they can update the state * accordingly and detect that we switched to master role. */ clusterBroadcastPong(CLUSTER_BROADCAST_ALL); /* 5) If there was a manual failover in progress, clear the state. */ resetManualFailover(); }
進行必要的 flag 設置和 slots 交接,向集羣廣播 PONG 消息,並進行善後處理。
if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_PONG || type == CLUSTERMSG_TYPE_MEET) { // ... /* Check for role switch: slave -> master or master -> slave. */ if (sender) { if (!memcmp(hdr->slaveof, CLUSTER_NODE_NULL_NAME, sizeof(hdr->slaveof))) { /* Node is a master. set master flag for sender */ clusterSetNodeAsMaster(sender); } // ... } clusterNode *sender_master = NULL; /* Sender or its master if slave. */ int dirty_slots = 0; /* Sender claimed slots don't match my view? */ if (sender) { sender_master = nodeIsMaster(sender) ? sender : sender->slaveof; if (sender_master) { dirty_slots = memcmp(sender_master->slots, hdr->myslots, sizeof(hdr->myslots)) != 0; } } if (sender && nodeIsMaster(sender) && dirty_slots) clusterUpdateSlotsConfigWith(sender,senderConfigEpoch,hdr->myslots); // ... }
集羣中其餘節點接收到 PONG 消息後,對 sender 進行正確的 role 標記,以某節點 D 爲例。
對於剛剛作完故障轉移的 slave,也即如今 master,在節點 D 看來它負責的 slot 是空的,因此 dirty_slots 爲 1。
以後調用 clusterUpdateSlotsConfigWith
函數處理 slots 的 dirty diff 信息。
至此 failover 的邏輯就已經基本完成。
除了上面的發現故障後集羣自動 failover,也能夠進行主動的主從切換。
主動 failover 是經過 redis 命令實現的,命令格式爲 CLUSTER FAILOVER [FORCE|TAKEOVER]
,該命令使用詳情能夠參考這篇文檔。
#define CLUSTER_MF_TIMEOUT 5000 else if (!strcasecmp(c->argv[1]->ptr,"failover") && (c->argc == 2 || c->argc == 3)){ /* CLUSTER FAILOVER [FORCE|TAKEOVER] */ int force = 0, takeover = 0; if (c->argc == 3) { /* 不與 master 溝通,主節點也不會阻塞其客戶端,須要通過選舉 */ if (!strcasecmp(c->argv[2]->ptr,"force")) { force = 1; /* 不與 master 溝通,不通過選舉 */ } else if (!strcasecmp(c->argv[2]->ptr,"takeover")) { takeover = 1; force = 1; /* Takeover also implies force. */ /* 與 master 溝通,須要通過選舉 */ } else { addReply(c,shared.syntaxerr); return; } } // ... server.cluster->mf_end = mstime() + CLUSTER_MF_TIMEOUT; // mf 的超時時間爲 5s }
cluster failover 命令有三種不一樣的選項,各有不一樣的含義,如上面註釋所說。takeover 變量標記是否要通過選舉, force 變量標記是否須要與 master 溝通。
另外,mf 過程有一個過時時間,目前定義爲 5s,同時, mf_end 也表示如今正在作 mf。
不一樣的選項有不一樣的處理方式,以下,
if (takeover) { // takeover 不會作任何初始化校驗。 // 不通過其餘節點選舉協商,直接將該節點的 current epoch 加 1,而後廣播這個新的配置 serverLog(LL_WARNING,"Taking over the master (user request)."); clusterBumpConfigEpochWithoutConsensus(); clusterFailoverReplaceYourMaster(); } else if (force) { /* If this is a forced failover, we don't need to talk with our * master to agree about the offset. We just failover taking over * it without coordination. */ serverLog(LL_WARNING,"Forced failover user request accepted."); server.cluster->mf_can_start = 1;// 能夠直接開始選舉過程 } else { serverLog(LL_WARNING,"Manual failover user request accepted."); clusterSendMFStart(myself->slaveof); // 發送帶有 CLUSTERMSG_TYPE_MFSTART 標記的 gossip 包(只有消息頭)給個人 master }
takeover 方式最爲粗暴,slave 節點不發起選舉,而是直接將本身升級爲master,接手原主節點的槽位,增長本身的 configEpoch 後更新配置。clusterFailoverReplaceYourMaster
的邏輯在前面講過,只有在本輪選舉中得到足夠多的選票纔會調用該函數。
force 方式表示能夠直接開始選舉過程,選舉過程也在前面說過了。
如今來看看默認方式,處理邏輯爲 clusterSendMFStart
函數。該函數主要邏輯就是發送向要作 failover 的 slave 的 master 發送 CLUSTERMSG_TYPE_MFSTART
類型的 gossip 消息。
else if (type == CLUSTERMSG_TYPE_MFSTART) { /* This message is acceptable only if I'm a master and the sender * is one of my slaves. */ if (!sender || sender->slaveof != myself) return 1; /* Manual failover requested from slaves. * Initialize the state accordingly. * master 收到消息,重置 mf 狀態 */ resetManualFailover(); server.cluster->mf_end = mstime() + CLUSTER_MF_TIMEOUT; server.cluster->mf_slave = sender; pauseClients(mstime()+(CLUSTER_MF_TIMEOUT*2)); // 阻塞客戶端 10s serverLog(LL_WARNING,"Manual failover requested by slave %.40s.", sender->name); }
resetManualFailover
函數中重置與 mf 相關的參數,表示這是一次新的 mf。
設置 mf_end,將它的 master 指向 sender(就是那個搞事情的 slave),同時阻塞 client 10s 鍾。
隨後,標記在作 mf 的 master 發送 PING 信息時 hdr 會帶上 CLUSTERMSG_FLAG0_PAUSED 標記。
void clusterBuildMessageHdr(clusterMsg *hdr, int type) { // ... /* Set the message flags. */ if (nodeIsMaster(myself) && server.cluster->mf_end) hdr->mflags[0] |= CLUSTERMSG_FLAG0_PAUSED; // ... }
mflags 記錄與 mf 相關的 flag。
slave 節點處理帶有 CLUSTERMSG_FLAG0_PAUSED 標記的 gossip 消息。
int clusterProcessPacket(clusterLink *link) { // ... sender = clusterLookupNode(hdr->sender); if (sender && !nodeInHandshake(sender)) { // ... if (server.cluster->mf_end && // 處於 mf 狀態 nodeIsSlave(myself) && // 我是 slave myself->slaveof == sender && // 個人 master 是 sender hdr->mflags[0] & CLUSTERMSG_FLAG0_PAUSED && server.cluster->mf_master_offset == 0) // 尚未正式開始時,mf_master_offset 設置爲 0 { server.cluster->mf_master_offset = sender->repl_offset; // 從 sender 得到 repl_offset serverLog(LL_WARNING, "Received replication offset for paused " "master manual failover: %lld", server.cluster->mf_master_offset); } } // ... }
對於那個發起 failover 的 slave,記下其 master 的 repl_offset,若是以前尚未記錄下的話。
void clusterCron(void) { // ... if (nodeIsSlave(myself)) { clusterHandleManualFailover(); // ... } // ... } void clusterHandleManualFailover(void) { /* Return ASAP if no manual failover is in progress. */ if (server.cluster->mf_end == 0) return; /* If mf_can_start is non-zero, the failover was already triggered so the * next steps are performed by clusterHandleSlaveFailover(). */ if (server.cluster->mf_can_start) return; if (server.cluster->mf_master_offset == 0) return; /* Wait for offset... */ if (server.cluster->mf_master_offset == replicationGetSlaveOffset()) { /* Our replication offset matches the master replication offset * announced after clients were paused. We can start the failover. */ server.cluster->mf_can_start = 1; serverLog(LL_WARNING, "All master replication stream processed, " "manual failover can start."); } }
在 clusterCron
函數裏有 clusterHandleManualFailover
的邏輯。
mf_end 爲 0,說明此時沒有 mf 發生。
mf_can_start 非 0 值,表示如今能夠此 slave 能夠發起選舉了。
mf_master_offset 爲 0,說明如今尚未得到 master 的複製偏移量,須要等一下子。當 mf_master_offset 值等於 replicationGetSlaveOffset
函數的返回值時,把 mf_can_start 置爲 1。另外,應該記得,使用帶有 force 選項的 CLUSTER FAILOVER
命令,直接就會把 mf_can_start 置爲 1,而 replicationGetSlaveOffset
函數的做用就是檢查當前的主從複製偏移量,也就是說主從複製偏移量必定要達到 mf_master_offset 時,slave 纔會發起選舉,即默認選項有一個追平 repl offset 的過程。
其餘一些選舉什麼的流程跟被動 failover 沒有區別。
主從節點在週期性的clusterCron
中都有一個檢查本次 mf 是否過時的函數。
void manualFailoverCheckTimeout(void) { if (server.cluster->mf_end && server.cluster->mf_end < mstime()) { serverLog(LL_WARNING,"Manual failover timed out."); resetManualFailover(); } } void resetManualFailover(void) { if (server.cluster->mf_end && clientsArePaused()) { server.clients_pause_end_time = 0; clientsArePaused(); /* Just use the side effect of the function. */ } server.cluster->mf_end = 0; /* No manual failover in progress. */ server.cluster->mf_can_start = 0; server.cluster->mf_slave = NULL; server.cluster->mf_master_offset = 0; }
若是過時沒有作 mf ,那麼就會重置它的相關參數。
在 Redis cluster 裏 epoch 是個很是重要的概念,相似於 raft 算法中的 term 概念。Redis cluster 裏主要是兩種:currentEpoch 和 configEpoch。
這是一個集羣狀態相關的概念,能夠當作記錄集羣狀態變動的遞增版本號。每一個集羣節點,都會經過server.cluster->currentEpoch 記錄當前的 currentEpoch。集羣節點建立時,不論是主節點仍是從節點,都置currentEpoch 爲 0。當前節點接收到來自其餘節點的包時,若是發送者的currentEpoch(消息頭部會包含發送者的currentEpoch)大於當前節點的currentEpoch,那麼當前節點會更新 currentEpoch 爲發送者的 currentEpoch。所以,集羣中全部節點的currentEpoch最終會達成一致,至關於對集羣狀態的認知達成了一致。
currentEpoch 做用在於,集羣狀態發生改變時,某節點會先增長自身 currentEpoch 的值,而後向集羣中其餘節點徵求贊成,以便執行某些動做。目前,僅用於 slave 節點的故障轉移流程,在上面分析中也看到了,在發起選舉以前,slave 會增長本身的 currentEpoch,而且獲得的 currentEpoch 表示這一輪選舉的 voteEpoch,當得到了足夠多的選票後纔會執行故障轉移。
這是一個集羣節點配置相關的概念,每一個集羣節點都有本身獨一無二的 configepoch。所謂的節點配置,其實是指節點所負責的 slot 信息。
configEpoch 主要用於解決不一樣的節點就 slot 歸屬認知發生衝突的狀況。公說公有理婆說婆有理,到底聽誰的,configEpoch 越大,看到的集羣節點配置信息越新,就越有話語權。對於衝突的狀況,後面會有博客進行詳細分析。
如下幾種狀況 configEpoch 會更新:
遞增 node epoch 稱爲 bump epoch。
關於 configEpoch 有三個原則:
#define CLUSTER_MAX_REJOIN_DELAY 5000 #define CLUSTER_MIN_REJOIN_DELAY 500 #define CLUSTER_WRITABLE_DELAY 2000 void clusterUpdateState(void) { // ... static mstime_t among_minority_time; static mstime_t first_call_time = 0; server.cluster->todo_before_sleep &= ~CLUSTER_TODO_UPDATE_STATE; /* 時間從第一次調用該函數算起,是爲了跳過 DB load 時間。 * cluster 啓動時,狀態爲 CLUSTER_FAIL, * 這裏要等待必定的時間(2s)讓 cluster 變爲 CLUSTER_OK 狀態。 */ if (first_call_time == 0) first_call_time = mstime(); if (nodeIsMaster(myself) && server.cluster->state == CLUSTER_FAIL && mstime() - first_call_time < CLUSTER_WRITABLE_DELAY) return; /* 先假設集羣狀態爲 CLUSTER_OK, * 而後遍歷 16384 個 slot,若是發現有 slot 被有被接管, * 或者接管某 slot 的 node 是 fail 狀態,那麼把集羣設置爲 CLUSTER_FAIL,退出循環 */ new_state = CLUSTER_OK; if (server.cluster_require_full_coverage) { for (j = 0; j < CLUSTER_SLOTS; j++) { if (server.cluster->slots[j] == NULL || server.cluster->slots[j]->flags & (CLUSTER_NODE_FAIL)) { new_state = CLUSTER_FAIL; break; } } } { /* 計算 cluster size,計數的是那些至少負責一個 slot 的 node * 計算 reachable_masters,計數基於 cluster size, * 加入篩選條件(不帶有 CLUSTER_NODE_FAIL|CLUSTER_NODE_PFAIL) 標記 */ dictIterator *di; dictEntry *de; server.cluster->size = 0; di = dictGetSafeIterator(server.cluster->nodes); while((de = dictNext(di)) != NULL) { clusterNode *node = dictGetVal(de); if (nodeIsMaster(node) && node->numslots) { server.cluster->size++; if ((node->flags & (CLUSTER_NODE_FAIL|CLUSTER_NODE_PFAIL)) == 0) reachable_masters++; } } dictReleaseIterator(di); } { /* 若是 reachable_masters 不到 cluster size 一半(a minority partition), * 就將集羣標記爲 CLUSTER_FAIL */ int needed_quorum = (server.cluster->size / 2) + 1; if (reachable_masters < needed_quorum) { new_state = CLUSTER_FAIL; among_minority_time = mstime(); } } if (new_state != server.cluster->state) { mstime_t rejoin_delay = server.cluster_node_timeout; if (rejoin_delay > CLUSTER_MAX_REJOIN_DELAY) rejoin_delay = CLUSTER_MAX_REJOIN_DELAY; if (rejoin_delay < CLUSTER_MIN_REJOIN_DELAY) rejoin_delay = CLUSTER_MIN_REJOIN_DELAY; /* 處於 minority partition 的時間沒有超過 cluster_node_timeout, * 那麼這次不更新集羣狀態。 */ if (new_state == CLUSTER_OK && nodeIsMaster(myself) && mstime() - among_minority_time < rejoin_delay) { return; } /* Change the state and log the event. */ serverLog(LL_WARNING,"Cluster state changed: %s", new_state == CLUSTER_OK ? "ok" : "fail"); server.cluster->state = new_state; }