翻譯自Eli Bendersky的系列博客,已得到原做者受權。git
本文是系列文章中的第一部分,本系列文章旨在介紹Raft分佈式一致性協議及其Go語言實現。文章的完整列表以下:github
在這一部分,咱們會添加持久性和一些優化來完善Raft的基礎實現。全部代碼已上傳到這個目錄。算法
相似Raft這樣的一致性算法的目標,就是經過在獨立的服務器之間複製任務來建立一個更具高可用性的系統。在此以前,咱們主要關注的是網絡分區的故障,也就是集羣中一些服務器與其它服務器(或客戶端)斷開了鏈接。還有另外一種失敗模式就是崩潰,也就是一臺服務器中止工做並重啓。數據庫
對於其它服務器,這看起來很像網絡分區——服務器暫時斷開鏈接,可是對於崩潰服務器自身來講,狀況就徹底不一樣了,由於重啓以後其內部全部的易失性存儲狀態都丟失了。編程
正是因爲這個緣由,Raft論文中的圖2中清楚地標註了哪些狀態應該持久化,持久化的狀態在每次更新的時候都須要刷新到非易失性存儲中。在服務器發起下一次RPC或響應正在進行的RPC以前,全部須要持久化的狀態都須要保存好。安全
Raft能夠經過僅持久化其狀態的一個子集來實現,也就是:服務器
currentTerm
- 此服務器觀察到的最新任期votedFor
- 在最新任期中,此服務器投贊同票的服務器IDlog
- Raft日誌條目Q:爲何commitIndex
和lastApplied
是易失性的?網絡
A:commitIndex
字段是易失性的,由於在重啓以後,Raft只根據持久化狀態就能夠獲得正確的值。一旦領導者成功提交了一條新指令,它也就知道在此以前的全部指令都已經提交了。若是一個追隨者崩潰又從新接入集羣中,當前領導者向其發送AE請求時,會告訴其正確的commitIndex
。app
重啓以後,lastApplied
是從0開始的,由於基本的Raft算法假定了服務(如鍵-值數據庫)不會保存任何持久化狀態。所以,須要經過重放日誌條目來從新建立它的狀態。固然,這是至關低效的,也有不少可行的優化方法。Raft支持在日誌變大時對其進行快照,這在Raft論文的第6章節有描述,不過這也超出了本系列的討論範圍。分佈式
在Raft中,根據不一樣狀況,一條指令會屢次發給客戶端。有幾種場景會出現這樣的狀況,包括崩潰和重啓(重放日誌恢復服務)。
在消息傳遞語義方面,Raft站在至少一次陣營。一旦一條指令被呈遞,它最終會被複制給全部的客戶端,可是有些客戶端可能會屢次看到同一條指令。
所以,建議指令須要攜帶惟一的ID,而客戶端要忽略已經收到的指令。這個在Raft論文的第8節有更詳細的描述。
爲了實現持久性,咱們在代碼中添加了以下的接口:
type Storage interface {
Set(key string, value []byte)
Get(key string) ([]byte, bool)
// HasData returns true if any Sets were made on this Storage.
HasData() bool
}
複製代碼
你能夠將它看做是字符串到通用字節切片的映射,由持久性存儲實現。
如今CM構造函數要接受Storage
做爲參數並進行調用:
if cm.storage.HasData() {
cm.restoreFromStorage(cm.storage)
}
複製代碼
這裏的restoreFromStorage
方法也是新加的,該方法會從存儲中加載持久化的狀態變量,使用標準的encoding/go
包對其進行反序列化:
func (cm *ConsensusModule) restoreFromStorage(storage Storage) {
if termData, found := cm.storage.Get("currentTerm"); found {
d := gob.NewDecoder(bytes.NewBuffer(termData))
if err := d.Decode(&cm.currentTerm); err != nil {
log.Fatal(err)
}
} else {
log.Fatal("currentTerm not found in storage")
}
if votedData, found := cm.storage.Get("votedFor"); found {
d := gob.NewDecoder(bytes.NewBuffer(votedData))
if err := d.Decode(&cm.votedFor); err != nil {
log.Fatal(err)
}
} else {
log.Fatal("votedFor not found in storage")
}
if logData, found := cm.storage.Get("log"); found {
d := gob.NewDecoder(bytes.NewBuffer(logData))
if err := d.Decode(&cm.log); err != nil {
log.Fatal(err)
}
} else {
log.Fatal("log not found in storage")
}
}
複製代碼
鏡像方法是persistToStorage
——將全部的狀態變量編碼並保存到提供的存儲介質中。
func (cm *ConsensusModule) persistToStorage() {
var termData bytes.Buffer
if err := gob.NewEncoder(&termData).Encode(cm.currentTerm); err != nil {
log.Fatal(err)
}
cm.storage.Set("currentTerm", termData.Bytes())
var votedData bytes.Buffer
if err := gob.NewEncoder(&votedData).Encode(cm.votedFor); err != nil {
log.Fatal(err)
}
cm.storage.Set("votedFor", votedData.Bytes())
var logData bytes.Buffer
if err := gob.NewEncoder(&logData).Encode(cm.log); err != nil {
log.Fatal(err)
}
cm.storage.Set("log", logData.Bytes())
}
複製代碼
咱們只須要在這些狀態值每次變化時都調用pesistToStorage
方法便可實現持久化。若是你比對一下第二部分和本部分的CM代碼,就能看的該方法的調用散步在少數幾個地方。
固然,這並非最有效的持久化方式,可是它簡單有效,所以也能夠知足咱們的須要。效率最低的部分是保存整個日誌,日誌在實際的應用中可能會很大。爲了真正解決整個問題,Raft論文的第7節提出了一個日誌壓縮機制。咱們不會實現壓縮,可是能夠將其做爲聯繫添加到已有的實現中。
實現持久性以後,咱們的Raft集羣能夠在必定程度上應對崩潰。只要是集羣中的少數服務器崩潰並在以後的某個時間重啓,集羣對於客戶端都是一直可用的(若是領導者是崩潰的服務器之一,可能還須要等集羣選舉出新的領導者)。提醒一下,擁有2N+1
個服務器的集羣能夠容忍N臺服務器出現故障,而且只要其它N+1
臺機器保持互連,集羣就是一直可用的。
咱們在這一部分增強了測試,我想提醒您注意關於系統彈性的另外一個方面——不可靠的RPC傳遞。在此以前,咱們都假設在鏈接的服務器之間的RPC請求都會成功到達,可能會有很小的延時。若是你看一下server.go
,你會注意到其中使用了一個RPCProxy
類型來實現隨機延遲。每一個RPC請求會延遲1-5ms,以模擬真實世界中同一數據中心的同伴服務器通訊延時。
RPCProxy
還幫助咱們實現了可選的不可靠傳遞。若是啓動了RAFT_UNRELIABLE_RPC
系統變量,RPC會偶爾出現明顯的延遲(75ms)或者被直接丟棄,用於模擬真實世界中的網絡故障。
咱們能夠設置RAFT_UNRELIABLE_RPC
以後運行以前的測試,觀察Raft集羣在出現這些故障時的行爲——另外一個強烈推薦的練習。若是您學有餘力,能夠修改一下RPCProxy
,對RPC應答也進行延遲,這應該只須要改幾行代碼。
我在第二部分提到過,目前領導者的實現效率很低。領導者在leaderSendHeartbeats
方法中發送AE請求,而該方法由定時器每50ms觸發一次。假設有一天新的指令被呈遞,領導者不會當即通知全部的追隨者,而是等待下一個50ms的觸發。更糟糕的是,要通知追隨者某條指令被提交須要經過兩次AE請求。下圖展現了目前的工做流程: tries-50ms-boundary.png)
以後有一條新指令被提交(假設是35ms後)。
領導者等到下一個50ms計時結束,也就是時刻2再向追隨者發送更新後的日誌。
在時刻3,追隨者回復指令已經成功添加到本地的日誌中。此時,領導者已經修改了它的commit index
(假定已獲得多數服務器確認)並能夠當即通知追隨者。可是,領導者一直等到下一個50ms邊界(時刻4)才這樣作。
最後,當追隨者收到更新後的leaderCommit
時,將最新的提交指令通知到客戶端。
咱們的實現中,領導者Submit(X)
和追隨者commitChan <-X
之間等待的大部分時間都是沒必要要的。
咱們真正想要的執行順利應該像下面這樣:
這正是本部分代碼所作的。咱們先從startLeader
開始看一下實現中的新代碼:
func (cm *ConsensusModule) startLeader() {
cm.state = Leader
for _, peerId := range cm.peerIds {
cm.nextIndex[peerId] = len(cm.log)
cm.matchIndex[peerId] = -1
}
cm.dlog("becomes Leader; term=%d, nextIndex=%v, matchIndex=%v; log=%v", cm.currentTerm, cm.nextIndex, cm.matchIndex, cm.log)
/*********如下代碼是新增部分********/
/* 該goroutine在後臺運行並向同伴服務器發送AE請求: - triggerAEChan通道發送任何內容時 - 若是triggerAEChan通道沒有內容時,每50ms執行一次 */
go func(heartbeatTimeout time.Duration) {
// Immediately send AEs to peers.
cm.leaderSendAEs()
t := time.NewTimer(heartbeatTimeout)
defer t.Stop()
for {
doSend := false
select {
case <-t.C:
doSend = true
// Reset timer to fire again after heartbeatTimeout.
t.Stop()
t.Reset(heartbeatTimeout)
case _, ok := <-cm.triggerAEChan:
if ok {
doSend = true
} else {
return
}
// Reset timer for heartbeatTimeout.
if !t.Stop() {
<-t.C
}
t.Reset(heartbeatTimeout)
}
if doSend {
cm.mu.Lock()
if cm.state != Leader {
cm.mu.Unlock()
return
}
cm.mu.Unlock()
cm.leaderSendAEs()
}
}
}(50 * time.Millisecond)
}
複製代碼
startLeader
方法中的循環不僅是等待50ms的觸發器,並且等待兩種可能的狀況之一:
cm.triggerAEChan
通道上的數據發送咱們稍後會看到是什麼觸發了cm.triggerAEChan
,這個信號表示如今要發送一條AE請求。不管什麼時候觸發該通道,定時器都會重置,從而實現心跳邏輯——若是領導者沒有新信息須要發送,最多會等待50ms。
還要注意,真正發送AE請求的方法名改成了leaderSendAEs
,以便更好地反映新代碼的設計意圖。
如咱們所料,觸發cm.triggerAEChan
的方法之一 就是Submit
:
func (cm *ConsensusModule) Submit(command interface{}) bool {
cm.mu.Lock()
cm.dlog("Submit received by %v: %v", cm.state, command)
if cm.state == Leader {
cm.log = append(cm.log, LogEntry{Command: command, Term: cm.currentTerm})
cm.persistToStorage()
cm.dlog("... log=%v", cm.log)
cm.mu.Unlock()
cm.triggerAEChan <- struct{}{}
return true
}
cm.mu.Unlock()
return false
}
複製代碼
更改以下:
cm.persistToStorage
對新的日誌條目進行持久化。這與心跳請求的優化無關,可是我仍是要在這裏說明一下,由於第2部分的代碼沒有實現該功能,並且該功能是在本文的前面描述的。cm.persistToStorage
上發送空結構體。這會通知領導者goroutine中的循環。cm.persistToStorage
發送數據時持有鎖,由於在某些狀況下會致使死鎖。你能猜到代碼中還有什麼地方會通知triggerAEChan
嗎?
就是在領導者處理AE應答並修改commit index
的代碼中,我這裏就不貼出整個方法,只複製了修改的部分:
if cm.commitIndex != savedCommitIndex {
cm.dlog("leader sets commitIndex := %d", cm.commitIndex)
// Commit index改變:代表領導者認爲新指令能夠被提交了。
// 經過commit channel向領導者的客戶端發送新指令。
// 發送AE請求通知全部的追隨者
cm.newCommitReadyChan <- struct{}{}
cm.triggerAEChan <- struct{}{}
}
複製代碼
這是一個重要的優化,可讓咱們的代碼對新指令的響應速度更快。
上一節中的代碼看起來可能會讓你有些不舒服,如今有不少行爲是每次調用Submit
時觸發的——領導者當即向追隨者廣播RPC請求。若是咱們一次提交多條命令時會怎樣?鏈接Raft集羣的網絡可能被RPC請求淹沒。
儘管看起來效率低下,但其實是安全的。Raft的RPC請求都是冪等的,意味着屢次收到包含相同信息的RPC請求不會形成什麼危害。
若是你擔憂同時提交多條指令時致使的網絡擁塞,批處理是很容易實現的。最簡單的方法就是提供一種將整個指令片斷髮送給Submit
的方式。所以,Raft實現中只有不多的代碼須要修改,而後客戶端就能夠呈遞一組指令而不會產生太多RPC通訊。做爲練習試試看!
我想在這篇文章中討論的另外一個優化,是在一些場景中減小領導者更新追隨者日誌時被拒絕的AE請求數量。回想一下,nextIndex
機制從日誌的尾端開始,而且每次追隨者拒絕AE請求時都減1。在極少數狀況下,追隨者會出現嚴重過期,由於每次RPC請求只會回退一條指令索引,因此更新該追隨者日誌會花費很長時間。
論文在5.3節的最後提到了這種優化,可是沒有提供任何的實現細節。爲了實現它,咱們在AE應答中擴展了新的字段:
type AppendEntriesReply struct {
Term int
Success bool
// Faster conflict resolution optimization (described near the end of section
// 5.3 in the paper.)
ConflictIndex int
ConflictTerm int
}
複製代碼
你能夠在本部分的代碼中看到其它改動。有兩個地方作了改動:
AppendEntries
是AE請求處理方法,當追隨者拒絕AE請求時,會填入ConflictIndex
和ConflictTerm
。leaderSendAEs
方法在收到AE應答時進行更新,並經過ConflictIndex
和ConflictTerm
更有效地回溯nextIndex
。Raft論文中寫:
在實踐中,咱們懷疑這種優化是否必要,由於失敗不多發生,並且不大可能有不少不一致的條目。
我徹底贊成。爲了測試這個優化點,我不得不想出一個至關刻意的測試。恕我直言,在現實生活中出現這種狀況的機率很是低,並且一次性節省幾百ms並不能保證代碼複雜度。我在這裏只是將它做爲Raft中特殊狀況下的優化案例之一。就編程而言,這是一個很好的例子,說明在某些特定狀況下,能夠對Raft算法進行輕微修改來調整其行爲邏輯。
Raft的設計意圖是保證在普通狀況下有至關快的處理速度,而且以犧牲特殊狀況下的性能爲代價(實際發生故障的狀況)。我相信這是絕對正確的設計選擇。在上一節中說到的快速發送AE請求優化是頗有必要的,由於這會直接影響公共路徑。
另外一方面,像快速回溯衝突索引這樣的優化,雖然在技術上頗有趣,可是在實踐中並不重要,由於它們只是在集羣生命週期中出現時間<0.01%
的特殊場景中作出了有限的優化。
至此,咱們結束了有關Raft分佈式一致性算法的4篇文章,感謝閱讀!
若是您對於文章內容或代碼有任何問題,能夠給我發送郵件或者在Github發佈issue。
若是您對工業級、有實戰經驗的Raft實現感興趣,我向您推薦:
它們都實現了Raft論文中的全部特性,包括: