翻譯自Eli Bendersky的系列博客,已得到原做者受權。git
本文是系列文章中的第一部分,本系列文章旨在介紹Raft分佈式一致性協議及其Go語言實現。文章的完整列表以下:github
在這一部分,咱們會大幅強化Raft的實現,作到可以實際處理客戶端提交的指令,並在Raft集羣中複製它們。代碼結構與第一部分相同,會有一些新的結構體和函數定義,對舊代碼也會有一些改動——我會對這些作簡短的解釋。算法
本部分的全部代碼都在這個目錄。數據庫
咱們在序言中對客戶端交互進行了簡短的討論,我強烈建議您返回去從新讀一下對應章節。接下來,咱們不會關注客戶端如何找到領導者,相反,咱們討論的是當他已經找到領導者時,會發生什麼。數組
首先說明一下術語。如前所述,客戶端使用Raft協議來複制一系列的指令,這些指令能夠視爲通用狀態機的輸入。就咱們的Raft實現而言,這些指令能夠是徹底任意的,咱們使用Go中的空指針類型(interface{})來進行表示。在Raft的一致性歷程中,一條指令會經歷如下步驟:安全
注意在呈遞和提交指令之間的不對稱性——在檢查咱們即將討論的實現策略時,牢記這一點很重要。一條指令會呈遞給單個Raft服務器,可是一段時候後多個服務器(特別是已鏈接/活動的同伴服務器)都會提交這個指令並通知各自的客戶端。服務器
回顧一下序言中的示意圖:網絡
狀態機表明使用Raft協議進行復制的任意服務,如鍵值數據庫。已提交的指令會改變服務的狀態(如:在數據庫中新增一個鍵值對)。app
當咱們在Raft ConsensusModule
上下文中討論客戶端時,一般指的是(上面說的)服務,由於這也是提交所通知的對象。換句話說,上圖中的一致性模塊指向服務狀態機的黑色箭頭就是這裏所說的通知。分佈式
客戶端還有另外的含義,就是該服務的客戶端(好比鍵值數據庫的用戶)。服務與其客戶端之間的交互是服務自身的業務,在本文中,咱們只關注Raft與服務之間的交互。
譯者注:做者在這裏對於指令的階段分別使用了submit和commit進行描述,在一般的翻譯中,這兩個詞都表示提交。我我的理解,
submit
表示提交時,傾向於對象A向對象B提交某些內容,而commit
表示提交時,傾向於本地記錄的確認。爲了不歧義,這裏將submit
翻譯爲呈遞
,表示服務向Raft一致性模塊發送了指令;commit
仍譯爲提交
,表示Raft一致性模塊確認本地的指令記錄。在這裏特別解釋一下,若有更好的建議,能夠聯繫我進行修改。
在咱們的實現中,在新建ConsensusModule
時會接收一個commit channel
做爲參數——CM可使用該通道向調用方發送已提交的指令:commitChan chan<- CommitEntry
。
CommitEntry
定義以下:
/* CommitEntry就是Raft向提交通道發送的數據。每一條提交的條目都會通知客戶端, 代表指令已知足一致性,能夠應用到客戶端的狀態機上。 */
type CommitEntry struct {
// Command 是被提交的客戶端指令
Command interface{}
// Index 是被提交的客戶端指令對應的日誌索引
Index int
// Term 是被提交的客戶端指令對應的任期
Term int
}
複製代碼
使用channel是一種設計選擇,可是這不是惟一的解決方法。咱們也能夠改用回調;在建立ConsensusModule
時調用方會註冊一個回調函數,一旦咱們須要提交指令,就能夠執行這個回調函數。
咱們很快會看到經過channel發送日誌條目的代碼,在此以前,咱們必須討論Raft服務器如何複製命令並決定是否提交。
在這個系列中,Raft日誌已經被說起不少次了,可是咱們尚未對此進行過多的介紹。日誌就是要應用於狀態機的指令的線性序列,若是須要的話,日誌要可以從某個起始狀態開始」重放「狀態機。正常運行時,全部Raft同伴的日誌的相同的。當領導者收到新指令時,會先加入本身的日誌中,而後將其複製給全部的追隨者。追隨者將命令放在日誌中並向領導者確認,後者會記錄已安全複製到集羣中多數服務器的最新日誌索引。
Raft論文中有一些日誌的示意圖,相似:
每一個方格是一條日誌條目。方格頂部的數字是該 條目添加到日誌時的任期(也就是第一部分所說的任期);方格底部是日誌條目包含的鍵-值指令。每一個日誌條目都有一個線性索引[2],方格的顏色用另外一種方式體現了任期。
若是上面的日誌應用到一個空的鍵-值存儲中,最終的結果就是x = 4, y = 7
。
在咱們的實現中,日誌條目的結構以下:
type LogEntry struct {
Command interface{}
Term int
}
複製代碼
每一個ConsensusModule
的日誌屬性就是數組log []LogEntry
。客戶端一般不在意任期,可是,任期對於Raft的正確性相當重要,所以在閱讀代碼時請務必牢記。
咱們首先看一下新加的方法Submit
,客戶端經過該方法呈遞新指令:
/* Submit方法會向CM呈遞一條新的指令。這個函數是非阻塞的; 客戶端讀取構造函數中傳入的commit channel,以得到新提交條目的通知。 若是當前CM是領導者返回true——表示指令被接受了。 若是返回false,客戶端會尋找新的服務器呈遞該指令。 */
func (cm *ConsensusModule) Submit(command interface{}) bool {
cm.mu.Lock()
defer cm.mu.Unlock()
cm.dlog("Submit received by %v: %v", cm.state, command)
if cm.state == Leader {
cm.log = append(cm.log, LogEntry{Command: command, Term: cm.currentTerm})
cm.dlog("... log=%v", cm.log)
return true
}
return false
}
複製代碼
邏輯很簡單,若是CM是領導者,則將新指令添加到日誌中,並返回true
。不然,忽略請求並返回false
。
Q:Submit
方法返true
是否足以代表客戶端已經將指令呈遞到領導者?
A:很遺憾並非。在極少數狀況下,領導者可能會與其它Raft服務器之間出現網絡分區,而其它服務器很快會從新選舉新的領導者,可是客戶端可能仍然鏈接在舊的領導者。客戶端對於其呈遞的指令應該等待一段合理的時間,檢查該指令是否出如今commit channel中;若是沒有的話,就代表它鏈接的是錯誤的領導者,應該鏈接其它領導者進行重試。
咱們剛剛看到呈遞給領導者的指令被追加到了日誌的末尾,可是這條新指令如何傳給追隨者呢?領導者的執行步驟在Raft論文的Figure 2
中的服務器規則
部分有詳細描述(能夠返回第一部分的附言查看詳細內容)。咱們在leaderSendHeartbeats
方法中完成該邏輯,這是一個新方法[3]:
func (cm *ConsensusModule) leaderSendHeartbeats() {
cm.mu.Lock()
savedCurrentTerm := cm.currentTerm
cm.mu.Unlock()
for _, peerId := range cm.peerIds {
go func(peerId int) {
cm.mu.Lock()
ni := cm.nextIndex[peerId]
prevLogIndex := ni - 1
prevLogTerm := -1
if prevLogIndex >= 0 {
prevLogTerm = cm.log[prevLogIndex].Term
}
entries := cm.log[ni:]
args := AppendEntriesArgs{
Term: savedCurrentTerm,
LeaderId: cm.id,
PrevLogIndex: prevLogIndex,
PrevLogTerm: prevLogTerm,
Entries: entries,
LeaderCommit: cm.commitIndex,
}
cm.mu.Unlock()
cm.dlog("sending AppendEntries to %v: ni=%d, args=%+v", peerId, ni, args)
var reply AppendEntriesReply
if err := cm.server.Call(peerId, "ConsensusModule.AppendEntries", args, &reply); err == nil {
cm.mu.Lock()
defer cm.mu.Unlock()
if reply.Term > savedCurrentTerm {
cm.dlog("term out of date in heartbeat reply")
cm.becomeFollower(reply.Term)
return
}
if cm.state == Leader && savedCurrentTerm == reply.Term {
if reply.Success {
cm.nextIndex[peerId] = ni + len(entries)
cm.matchIndex[peerId] = cm.nextIndex[peerId] - 1
cm.dlog("AppendEntries reply from %d success: nextIndex := %v, matchIndex := %v", peerId, cm.nextIndex, cm.matchIndex)
savedCommitIndex := cm.commitIndex
for i := cm.commitIndex + 1; i < len(cm.log); i++ {
if cm.log[i].Term == cm.currentTerm {
matchCount := 1
for _, peerId := range cm.peerIds {
if cm.matchIndex[peerId] >= i {
matchCount++
}
}
if matchCount*2 > len(cm.peerIds)+1 {
cm.commitIndex = i
}
}
}
if cm.commitIndex != savedCommitIndex {
cm.dlog("leader sets commitIndex := %d", cm.commitIndex)
cm.newCommitReadyChan <- struct{}{}
}
} else {
cm.nextIndex[peerId] = ni - 1
cm.dlog("AppendEntries reply from %d !success: nextIndex := %d", peerId, ni-1)
}
}
}
}(peerId)
}
}
複製代碼
這確實比第一部分的邏輯複雜得多,但它實際上也就是按照論文的圖2所寫的。關於這段代碼的幾點說明:
success
字段,用於告知領導者,其請求的追隨者是否獲得了匹配的prevLogIndex
和prevLogTerm
。根據該字段,領導者會更新追隨者對應的nextIndex
。commitIndex
是根據已複製某天日誌條目的追隨者數量來更新的,若是某條索引已複製到集羣中的多數服務器,commitIndex
就會修改成該索引值。代碼的這一部分對於前面討論發客戶端交互很是重要:
if cm.commitIndex != savedCommitIndex {
cm.dlog("leader sets commitIndex := %d", cm.commitIndex)
cm.newCommitReadyChan <- struct{}{}
}
複製代碼
newCommitReadyChan
是CM內部使用的一個通道,用來通知在commit channel
上有新條目能夠發生給客戶端。它是經過下面的方法起做用的,CM啓動時會在goroutine運行該方法:
/* commitChanSender負責在cm.commitChan上發送已提交的日誌條目。 它會監聽newCommitReadyChan的通知並檢查哪些條目能夠發送(給客戶端)。 該方法應該在單獨的後臺goroutine中運行;cm.commitChan可能會有緩衝來限制客戶端消費已提交指令的速度。 當newCommitReadyChan關閉時方法結束。 */
func (cm *ConsensusModule) commitChanSender() {
for range cm.newCommitReadyChan {
// 查找須要執行哪些指令
cm.mu.Lock()
savedTerm := cm.currentTerm
savedLastApplied := cm.lastApplied
var entries []LogEntry
if cm.commitIndex > cm.lastApplied {
entries = cm.log[cm.lastApplied+1 : cm.commitIndex+1]
cm.lastApplied = cm.commitIndex
}
cm.mu.Unlock()
cm.dlog("commitChanSender entries=%v, savedLastApplied=%d", entries, savedLastApplied)
for i, entry := range entries {
cm.commitChan <- CommitEntry{
Command: entry.Command,
Index: savedLastApplied + i + 1,
Term: savedTerm,
}
}
}
cm.dlog("commitChanSender done")
}
複製代碼
該方法會更新lastApplied
狀態變量,以瞭解哪些條目已發送到客戶端,並保證只發送新的條目。
咱們已經討論過領導者如何處理新日誌條目,如今來查看一下追隨者的代碼,尤爲是其中的AppendEntries
方法。
func (cm *ConsensusModule) AppendEntries(args AppendEntriesArgs, reply *AppendEntriesReply) error {
cm.mu.Lock()
defer cm.mu.Unlock()
if cm.state == Dead {
return nil
}
cm.dlog("AppendEntries: %+v", args)
// 請求中的任期大於本地任期,轉換爲追隨者狀態
if args.Term > cm.currentTerm {
cm.dlog("... term out of date in AppendEntries")
cm.becomeFollower(args.Term)
}
reply.Success = false
if args.Term == cm.currentTerm {
// 若是當前狀態不是追隨者,則變爲追隨者
if cm.state != Follower {
cm.becomeFollower(args.Term)
}
cm.electionResetEvent = time.Now()
// 如下代碼爲第二部分新增
// 檢查本地的日誌在索引PrevLogIndex處是否包含任期與PrevLogTerm匹配的記錄?
// 注意在PrevLogIndex=-1的極端狀況下,這裏應該是true
if args.PrevLogIndex == -1 ||
(args.PrevLogIndex < len(cm.log) && args.PrevLogTerm == cm.log[args.PrevLogIndex].Term) {
reply.Success = true
// 找到插入點 —— 索引從PrevLogIndex+1開始的本地日誌與RPC發送的新條目間出現任期不匹配的位置。
logInsertIndex := args.PrevLogIndex + 1
newEntriesIndex := 0
for {
if logInsertIndex >= len(cm.log) || newEntriesIndex >= len(args.Entries) {
break
}
if cm.log[logInsertIndex].Term != args.Entries[newEntriesIndex].Term {
break
}
logInsertIndex++
newEntriesIndex++
}
/* 循環結束時: - logInsertIndex指向本地日誌結尾,或者是與領導者發送日誌間存在任期衝突的索引位置 - newEntriesIndex指向請求條目的結尾,或者是與本地日誌存在任期衝突的索引位置 */
if newEntriesIndex < len(args.Entries) {
cm.dlog("... inserting entries %v from index %d", args.Entries[newEntriesIndex:], logInsertIndex)
cm.log = append(cm.log[:logInsertIndex], args.Entries[newEntriesIndex:]...)
cm.dlog("... log is now: %v", cm.log)
}
// Set commit index.
if args.LeaderCommit > cm.commitIndex {
cm.commitIndex = intMin(args.LeaderCommit, len(cm.log)-1)
cm.dlog("... setting commitIndex=%d", cm.commitIndex)
cm.newCommitReadyChan <- struct{}{}
}
}
}
reply.Term = cm.currentTerm
cm.dlog("AppendEntries reply: %+v", *reply)
return nil
}
複製代碼
這段代碼嚴格遵循了論文圖2中的算法(AppendEntries的Received implementation
部分),並且也給出了很好的註釋。
注意代碼當中,若是領導者的LeaderCommit
大於自身的cm.commitIndex
時,會在cm.newCommitReadyChan
通道發送數據。這就是追隨者從領導者處知道新增的日誌條目能夠提交的時間。
當領導者在AE請求中發送新的日誌條目時,會出現如下狀況:
success=true
matchIndex
。當有足夠的追隨者的matchIndex
指向下一索引時,領導者會更新commitIndex
並在下一次AE請求中發給全部追隨者(在leaderCommit
字段中)leaderCommit
,它們會意識到有新的日誌條目被提交了,它們就會經過commit channel把這些指令發給其客戶端。**Q:**提交一條新指令須要多少次RPC往返?
**A:**2次。第一次請求中,領導者發送下一條日誌條目給追隨者,追隨者進行確認。領導者處理AE應答時,可能會根據返回結果更新commit index。第二次RPC請求中,領導者會發送更新後的commit index給追隨者,以後追隨者會將這些日記條目標記爲已提交併經過commit channel將它們發送給客戶端。做爲練習,請回到上面的示例代碼中,找到這些步驟對應的片斷。
到目前爲止,咱們已經研究了爲支持日誌複製而添加的新代碼。可是,日誌也會對Raft選舉產生影響。Raft論文中在5.4.1小節(選舉約束)中進行了描述。除非候選人的日誌與集羣中多數同伴服務器同樣新,不然Raft的選舉程序會阻止其勝選[4]。
所以,RV請求中包含lastLogIndex
和lastLogTerm
字段。當候選人發送RV請求時,它會填入其最新日誌條目的相關信息。追隨者會與自己的屬性進行對比,並決定該候選人是否有資格當選。
下面是最新的startElection
代碼:
func (cm *ConsensusModule) startElection() {
cm.state = Candidate
cm.currentTerm += 1
savedCurrentTerm := cm.currentTerm
cm.electionResetEvent = time.Now()
cm.votedFor = cm.id
cm.dlog("becomes Candidate (currentTerm=%d); log=%v", savedCurrentTerm, cm.log)
var votesReceived int32 = 1
// Send RequestVote RPCs to all other servers concurrently.
for _, peerId := range cm.peerIds {
go func(peerId int) {
/*---------如下代碼爲新增--------*/
cm.mu.Lock()
savedLastLogIndex, savedLastLogTerm := cm.lastLogIndexAndTerm()
cm.mu.Unlock()
args := RequestVoteArgs{
Term: savedCurrentTerm,
CandidateId: cm.id,
LastLogIndex: savedLastLogIndex,
LastLogTerm: savedLastLogTerm,
}
/*---------以上代碼爲新增--------*/
cm.dlog("sending RequestVote to %d: %+v", peerId, args)
var reply RequestVoteReply
if err := cm.server.Call(peerId, "ConsensusModule.RequestVote", args, &reply); err == nil {
cm.mu.Lock()
defer cm.mu.Unlock()
cm.dlog("received RequestVoteReply %+v", reply)
if cm.state != Candidate {
cm.dlog("while waiting for reply, state = %v", cm.state)
return
}
if reply.Term > savedCurrentTerm {
cm.dlog("term out of date in RequestVoteReply")
cm.becomeFollower(reply.Term)
return
} else if reply.Term == savedCurrentTerm {
if reply.VoteGranted {
votes := int(atomic.AddInt32(&votesReceived, 1))
if votes*2 > len(cm.peerIds)+1 {
// Won the election!
cm.dlog("wins election with %d votes", votes)
cm.startLeader()
return
}
}
}
}
}(peerId)
}
// Run another election timer, in case this election is not successful.
go cm.runElectionTimer()
}
複製代碼
其中是lastLogIndexAndTerm
是一個新的輔助方法:
// lastLogIndexAndTerm方法返回服務器最新的日誌索引及最新的日誌條目對應的任期
// (若是沒有日誌返回-1)要求cm.mu鎖定
func (cm *ConsensusModule) lastLogIndexAndTerm() (int, int) {
if len(cm.log) > 0 {
lastIndex := len(cm.log) - 1
return lastIndex, cm.log[lastIndex].Term
} else {
return -1, -1
}
}
複製代碼
提醒一下,咱們實現中的索引是從0開始的,而不像Raft論文中是從1開始的,所以-1常常做爲一個標記值。
下面是更新後的RV處理邏輯,實現了選舉安全性檢查:
func (cm *ConsensusModule) RequestVote(args RequestVoteArgs, reply *RequestVoteReply) error {
cm.mu.Lock()
defer cm.mu.Unlock()
if cm.state == Dead {
return nil
}
lastLogIndex, lastLogTerm := cm.lastLogIndexAndTerm()
cm.dlog("RequestVote: %+v [currentTerm=%d, votedFor=%d, log index/term=(%d, %d)]", args, cm.currentTerm, cm.votedFor, lastLogIndex, lastLogTerm)
if args.Term > cm.currentTerm {
cm.dlog("... term out of date in RequestVote")
cm.becomeFollower(args.Term)
}
// 任期相同,未投票或已投票給當前請求同伴,且候選人的日誌知足安全性要求, 則返回同意投票;
// 不然,返回反對投票。
if cm.currentTerm == args.Term &&
(cm.votedFor == -1 || cm.votedFor == args.CandidateId) &&
(args.LastLogTerm > lastLogTerm ||
(args.LastLogTerm == lastLogTerm && args.LastLogIndex >= lastLogIndex)) {
reply.VoteGranted = true
cm.votedFor = args.CandidateId
cm.electionResetEvent = time.Now()
} else {
reply.VoteGranted = false
}
reply.Term = cm.currentTerm
cm.dlog("... RequestVote reply: %+v", reply)
return nil
}
複製代碼
在第一部分中,咱們討論了一個場景。在具備三臺服務器的集羣中,服務器B斷開鏈接幾秒鐘,致使其變爲候選人而且每隔150-300ms就發起一輪選舉。當它從新連入集羣中時,其任期要比留在集羣中不知道有新一輪選舉的同伴服務器高不少。
如今正好能夠回顧這個場景,並考慮一下,若是鏈接正常的同伴服務器在此期間複製了新的日誌條目,將會發生什麼。
雖然B重返集羣時會引起從新選舉(領導者會在AE的應答中看到更高的任期而轉換爲追隨者),可是由於它的日誌不如A和C完整,因此B不可能勝選。這就是由於上一節所說的選舉安全性檢查。A或C會贏得新一輪的選舉,所以(此次選舉)對集羣的破壞力相對較小。
若是您仍然擔憂這個沒必要要的影響(爲何要改選?),Ongaro的論文在Preventing disruptions when a server rejoins a cluster
一節討論了這個確切的問題。這個問題的經常使用解決方案就是」預投票「,即服務器在成爲候選人以前先執行一些檢查。
由於這是真的很是規情形的優化,我就不在這個主題上花費太多時間。你們能夠去查看論文——Raft網站提供了連接。
結束本節以前,咱們看一些在學習和實現Raft時常見的問題。若是你有其它問題,請隨時給我發郵件——我會收集最多見的問題並更新文章。
Q:爲何commitIndex
和lastApplied
是分開的?咱們能不能只記錄由於RPC請求(或RPC響應)致使commitIndex
變化了多少,而後只將這些變化的指令發給客戶端?
A:這二者分開是爲了將快速操做(RPC處理)與較慢的操做(向客戶端發送命令)進行解耦。考慮一下,當追隨者收到AE請求,發現領導者的commitIndex
比本身大時,會發生什麼?此時,它能夠向commit channel發送一下日誌指令。可是在channel發送數據(或執行回調函數)多是一個潛在的阻塞操做,而咱們但願儘量快地應答RPC請求。lastApplied
就能夠幫助咱們將兩者進行解耦,RPC方法只須要更新commitIndex
,後臺的commitChanSender
goroutine會觀察這些變化,並在空閒時把新提交的指令發送給客戶端。
那你可能會問對於newCommitReadyChan
通道的操做是否是也存在這個問題?觀察很仔細,可是通道是有緩衝的,並且因爲通道兩邊都是咱們控制的,咱們能夠設置一個小的緩衝區,來保證在絕大多數狀況下不會阻塞。儘管如此,在極少數狀況下,由於Raft代碼中不可能提供無限的緩衝區,很是慢的客戶端會拖延RPC請求。這未必是一件壞事,由於它會造成一種天然的背壓機制。
Q:咱們在領導者中須要爲每一個同伴都保存nextIndex
和matchIndex
嗎?
A:只有matchIndex
時算法也仍然是有效的,可是在有些狀況下效率會很低。考慮一個領導改變的狀況,新的領導者不能假設任何關於其同伴的最新狀況,因此將matchIndex
初始化爲-1,所以就會嘗試向每一個追隨者都發送整個日誌。可是追隨者(至少大部分)極可能擁有幾乎相同的日誌條目;nextIndex
幫助領導者從日誌末尾開始探查追隨者(所需的日誌),而沒必要複製大量的日誌。
我再一次強烈建議您研究一下代碼——運行測試用例,觀察輸出日誌。
到目前爲止,咱們已經有了一個基本可使用的Raft實現,除了尚未處理持久性。這意味着咱們的實現難以應對崩潰故障,即服務器崩潰並重啓。
Raft算法對此作了規定,這是第三部分將討論的內容。增長持久性會使咱們可以應對更嚴格的測試,包括最壞狀況下的服務器崩潰。
此外,第三部分會討論這裏提到的一些優化。更重要的是,若是領導者有新消息要發送給追隨者時,應該更及時地發送AE請求,可是如今領導者只會在每50ms發送一次AE。這個也會在下一部分被修正。
舉例來講,在規模爲5的集羣中,領導者指望獲得2個追隨者的確認回覆,這樣總數就是3個(2個追隨者加領導者自身),也就知足了多數的要求。 ↩︎
這裏須要注意,雖然Raft論文中的日誌索引是從1開始的,在咱們的實現中索引是從0開始的,由於這樣的代碼感受更天然。這些索引對於ConsensusModule
的客戶端/用戶沒有任何實質性影響。 ↩︎
在這裏將這個方法命名爲leaderSendHeartbeats
有點不恰當,由於它不僅是發送心跳。可是,由於在這一部分中,該方法每隔50ms都須要發送AE請求,因此保留了這個名字。在第三部分中咱們會修正。 ↩︎
這裏用了一個很是簡單的解釋,實際狀況很複雜。這裏對於正確性的推理至關複雜,我建議閱讀論文以獲取更多詳細信息。若是你是形式主義的擁躉,Ongaro的畢業論文有章節TLA+ spec of Raft來證實這些不變式的正確性。 ↩︎