做者:喵叔
原文:blog.betacat.io/post/raft-i…node
Raft是近年來比較流行的一個一致性算法。它的原理比較容易理解,網上也有不少相關的介紹,所以這裏我就再也不囉嗦原理了,而是打算以raft在etcd中的實現[1]爲例,從工程的角度來說講這個算法的一個具體實現,畢竟瞭解原理只算是「紙上談兵」,離真正能把它應用起來還有很長一段距離。git
若是你還不熟悉raft,這個經典的動畫演示、它的論文以及這個lecture可能會對你有幫助。或者你也能夠直接觀看下面的視頻,這是我做的一次技術分享,講的是etcd中raft模塊的源碼解析。說句題外話,不少Conference和Meetup都會把視頻錄像上傳到YouTube上,YouTube簡直就是程序員的衣櫃,每逛一次都有新收穫。www.youtube.com/watch?v=sL0…程序員
Etcd將raft協議實現爲一個library,而後自己做爲一個應用使用它。固然,多是爲了推廣它所實現的這個library,etcd還額外提供了一個叫raftexample的示例程序,向用戶展現怎樣在它所提供的raft library的基礎上構建出一個分佈式的KV存儲引擎。github
在etcd中,raft做爲底層的共識模塊,運行在一個goroutine
裏,經過channel
接受上層(etcdserver)傳來的消息,並將處理後的結果經過另外一個channel
返回給上層應用,他們的交互過程大概是這樣的:算法
這種全異步的交互方式好處就是它提升了性能,但壞處就是難以調試,代碼看起來會很繞。拿etcd舉例,不少時候你只看到它把一個消息push到一個slice/channel裏面,而後這部分函數調用鏈就結束了,你沒法直觀的追蹤到,究竟是誰最後處理了這個消息。數組
咱們來看一下這個raft library裏面都有哪些文件:bash
$ tree --dirsfirst -L 1 -I '*test*' -P '*.go'
.
├── raftpb
├── doc.go
├── log.go
├── log_unstable.go
├── logger.go
├── node.go
├── progress.go
├── raft.go
├── rawnode.go
├── read_only.go
├── status.go
├── storage.go
└── util.go
複製代碼
下面按功能模塊依次介紹:網絡
Raft中的序列化是藉助於Protocol Buffer來實現的,這個文件夾就定義了須要序列化的幾個數據結構,咱們先從Entry
和Message
開始看起:數據結構
從總體上來講,一個集羣中的每一個節點都是一個狀態機,而raft管理的就是對這個狀態機進行更改的一些操做,這些操做在代碼中被封裝爲一個個Entry
。app
// https://github.com/etcd-io/etcd/blob/v3.3.10/raft/raftpb/raft.pb.go#L203
type Entry struct {
Term uint64
Index uint64
Type EntryType
Data []byte
}
複製代碼
Term
和Index
以後,一個log entry就能被惟一標識。Type
是EntryNormal
,那這裏的Data就多是具體要更改的key-value pair,若是Type
是EntryConfChange
,那Data就是具體的配置更改項ConfChange。raft算法自己並不關心這個數據是什麼,它只是把這段數據當作log同步過程當中的payload來處理,具體對這個數據的解析則有上層應用來完成。Raft集羣中節點之間的通信都是經過傳遞不一樣的Message
來完成的,這個Message
結構就是一個很是general的大容器,它涵蓋了各類消息所需的字段。
// https://github.com/etcd-io/etcd/blob/v3.3.10/raft/raftpb/raft.pb.go#L239
type Message struct {
Type MessageType
To uint64
From uint64
Term uint64
LogTerm uint64
Index uint64
Entries []Entry
Commit uint64
Snapshot Snapshot
Reject bool
RejectHint uint64
Context []byte
}
複製代碼
MsgVote
會用到這個字段。MsgVote
的話,表明這個candidate最後一條日誌的索引號,它跟上面的LogTerm
一塊兒表明這個candidate所擁有的最新日誌信息,這樣別人就能夠比較本身的日誌是否是比candidata的日誌要新,從而決定是否投票。MsgSnap
合用,用來放置具體的Snapshot值。顧名思義,unstable數據結構用於尚未被用戶層持久化的數據,它維護了兩部份內容snapshot
和entries
:
// https://github.com/etcd-io/etcd/blob/v3.3.10/raft/log_unstable.go#L23
type unstable struct {
// the incoming unstable snapshot, if any.
snapshot *pb.Snapshot
// all entries that have not yet been written to storage.
entries []pb.Entry
offset uint64
logger Logger
}
複製代碼
entries
表明的是要進行操做的日誌,但日誌不可能無限增加,在特定的狀況下,某些過時的日誌會被清空。那這就引入一個新問題了,若是此後一個新的follower
加入,而leader
只有一部分操做日誌,那這個新follower
不是無法跟別人同步了嗎?因此這個時候snapshot
就登場了 - 我沒法給你以前的日誌,但我給你全部以前日誌應用後的結果,以後的日誌你再以這個snapshot
爲基礎進行應用,那咱們的狀態就能夠同步了。所以它們的結構關係能夠用下圖表示[3]:
這裏的前半部分是快照數據,然後半部分是日誌條目組成的數組entries,另外unstable.offset成員保存的是entries數組中的第一條數據在raft日誌中的索引,即第i條entries在raft日誌中的索引爲i + unstable.offset
。
這個文件定義了一個Storage接口,由於etcd中的raft實現並不負責數據的持久化,因此它但願上面的應用層能實現這個接口,以便提供給它查詢log的能力。
另外,這個文件也提供了Storage
接口的一個內存版本的實現MemoryStorage,這個實現一樣也維護了snapshot
和entries
這兩部分,他們的排列跟unstable
中的相似,也是snapshot
在前,entries
在後。從代碼中看來etcdserver
和raftexample
都是直接用的這個實現來提供log的查詢功能的。
有了以上的介紹unstable、Storage的準備以後,下面能夠來介紹raftLog的實現,這個結構體承擔了raft日誌相關的操做。
raftLog由如下成員組成:
須要說明的是,一條日誌數據,首先須要被提交(committed)成功,而後才能被應用(applied)到狀態機中。所以,如下不等式一直成立:applied <= committed
。
raftLog結構體中,幾部分數據的排列以下圖所示[3:1]:
這個數據排布的狀況,能夠從raftLog的初始化函數中看出來:
// https://github.com/etcd-io/etcd/blob/v3.3.10/raft/log.go#L45
// newLog returns log using the given storage. It recovers the log to the state
// that it just commits and applies the latest snapshot.
func newLog(storage Storage, logger Logger) *raftLog {
if storage == nil {
log.Panic("storage must not be nil")
}
log := &raftLog{
storage: storage,
logger: logger,
}
firstIndex, err := storage.FirstIndex()
if err != nil {
panic(err) // TODO(bdarnell)
}
lastIndex, err := storage.LastIndex()
if err != nil {
panic(err) // TODO(bdarnell)
}
log.unstable.offset = lastIndex + 1
log.unstable.logger = logger
// Initialize our committed and applied pointers to the time of the last compaction.
log.committed = firstIndex - 1
log.applied = firstIndex - 1
return log
}
複製代碼
所以,從這裏的代碼能夠看出,raftLog的兩部分,持久化存儲和非持久化存儲,它們之間的分界線就是lastIndex,在此以前都是Storage
管理的已經持久化的數據,而在此以後都是unstable
管理的尚未持久化的數據。
以上分析中還有一個疑問,爲何並無初始化unstable.snapshot成員,也就是unstable結構體的快照數據?緣由在於,上面這個是初始化函數,也就是節點剛啓動的時候調用來初始化存儲狀態的函數,而unstable.snapshot數據,是在啓動以後同步數據的過程當中,若是須要同步快照數據時纔會去進行賦值修改的數據,所以在這裏並無對它進行操做的地方。
Leader經過Progress
這個數據結構來追蹤一個follower的狀態,並根據Progress
裏的信息來決定每次同步的日誌項。這裏介紹三個比較重要的屬性:
// https://github.com/etcd-io/etcd/blob/v3.3.10/raft/progress.go#L37
// Progress represents a follower’s progress in the view of the leader. Leader maintains
// progresses of all followers, and sends entries to the follower based on its progress.
type Progress struct {
Match, Next uint64
State ProgressStateType
ins *inflights
}
複製代碼
用來保存當前follower節點的日誌狀態的屬性:
Next
開始發送日誌。在正常狀況下,Next = Match + 1
,也就是下一個要同步的日誌應當是對方已有日誌的下一條。
State
屬性用來保存該節點當前的同步狀態,它會有一下幾種取值[4]:
探測狀態,當follower拒絕了最近的append消息時,那麼就會進入探測狀態,此時leader會試圖繼續往前追溯該follower的日誌從哪裏開始丟失的。在probe狀態時,leader每次最多append一條日誌,若是收到的迴應中帶有RejectHint
信息,則回退Next
索引,以便下次重試。在初始時,leader會把全部follower的狀態設爲probe,由於它並不知道各個follower的同步狀態,因此須要慢慢試探。
當leader確認某個follower的同步狀態後,它就會把這個follower的state切換到這個狀態,而且用pipeline
的方式快速複製日誌。leader在發送複製消息以後,就修改該節點的Next
索引爲發送消息的最大索引+1。
接收快照狀態。當leader向某個follower發送append消息,試圖讓該follower狀態跟上leader時,發現此時leader上保存的索引數據已經對不上了,好比leader在index爲10以前的數據都已經寫入快照中了,可是該follower須要的是10以前的數據,此時就會切換到該狀態下,發送快照給該follower。當快照數據同步追上以後,並非直接切換到Replicate狀態,而是首先切換到Probe狀態。
ins
屬性用來作流量控制,由於若是同步請求很是多,再碰上網絡分區時,leader可能會累積不少待發送消息,一旦網絡恢復,可能會有很是大流量發送給follower,因此這裏要作flow control。它的實現有點相似TCP的滑動窗口,這裏再也不贅述。
綜上,Progress
其實也是個狀態機,下面是它的狀態轉移圖:
前面鋪設了一大堆概念,如今終於輪到實現邏輯了。從名字也能夠看出,raft協議的具體實現就在這個文件裏。這其中,大部分的邏輯是由Step
函數驅動的。
// https://github.com/etcd-io/etcd/blob/v3.3.10/raft/raft.go#L752
func (r *raft) Step(m pb.Message) error {
//...
switch m.Type {
case pb.MsgHup:
//...
case pb.MsgVote, pb.MsgPreVote:
//...
default:
r.step(r, m)
}
}
複製代碼
Step
的主要做用是處理不一樣的[消息]({{< relref "#message" >}}),因此之後當咱們想知道raft對某種消息的處理邏輯時,到這裏找就對了。在函數的最後,有個default
語句,即全部上面不能處理的消息都落入這裏,由一個小寫的step
函數處理,這個設計的緣由是什麼呢?
實際上是由於這裏的raft也被實現爲一個狀態機,它的step
屬性是一個函數指針,根據當前節點的不一樣角色,指向不一樣的消息處理函數:stepLeader/stepFollower/stepCandidate。與它相似的還有一個tick
函數指針,根據角色的不一樣,也會在tickHeartbeat和tickElection之間來回切換,分別用來觸發定時心跳和選舉檢測。這裏的函數指針感受像實現了OOP
裏的多態。
node
的主要做用是應用層(etcdserver)和共識模塊(raft)的銜接。將應用層的消息傳遞給底層共識模塊,並將底層共識模塊共識後的結果反饋給應用層。因此它的初始化函數建立了不少用來通訊的channel
,而後就在另外一個goroutine
裏面開始了事件循環,不停的在各類channel
中倒騰數據(貌似這種由for-select-channel
組成的事件循環在Go裏面很受歡迎)。
// https://github.com/etcd-io/etcd/blob/v3.3.10/raft/node.go#L286
for {
select {
case m := <-propc:
r.Step(m)
case m := <-n.recvc:
r.Step(m)
case cc := <-n.confc:
// Add/remove/update node according to cc.Type
case <-n.tickc:
r.tick()
case readyc <- rd:
// Cleaning after result is consumed by application
case <-advancec:
// Stablize logs
case c := <-n.status:
// Update status
case <-n.stop:
close(n.done)
return
}
}
複製代碼
propc
和recvc
中拿到的是從上層應用傳進來的消息,這個消息會被交給raft層的Step
函數處理,具體處理邏輯我上面有過介紹。
下面來解釋下readyc
的做用。在etcd的這個實現中,node
並不負責數據的持久化、網絡消息的通訊、以及將已經提交的log應用到狀態機中,因此node
使用readyc
這個channel
對外通知有數據要處理了,並將這些須要外部處理的數據打包到一個Ready
結構體中:
// https://github.com/etcd-io/etcd/blob/v3.3.10/raft/node.go#L52
// Ready encapsulates the entries and messages that are ready to read,
// be saved to stable storage, committed or sent to other peers.
// All fields in Ready are read-only.
type Ready struct {
// The current volatile state of a Node.
// SoftState will be nil if there is no update.
// It is not required to consume or store SoftState.
*SoftState
// The current state of a Node to be saved to stable storage BEFORE
// Messages are sent.
// HardState will be equal to empty state if there is no update.
pb.HardState
// ReadStates can be used for node to serve linearizable read requests locally
// when its applied index is greater than the index in ReadState.
// Note that the readState will be returned when raft receives msgReadIndex.
// The returned is only valid for the request that requested to read.
ReadStates []ReadState
// Entries specifies entries to be saved to stable storage BEFORE
// Messages are sent.
Entries []pb.Entry
// Snapshot specifies the snapshot to be saved to stable storage.
Snapshot pb.Snapshot
// CommittedEntries specifies entries to be committed to a
// store/state-machine. These have previously been committed to stable
// store.
CommittedEntries []pb.Entry
// Messages specifies outbound messages to be sent AFTER Entries are
// committed to stable storage.
// If it contains a MsgSnap message, the application MUST report back to raft
// when the snapshot has been received or has failed by calling ReportSnapshot.
Messages []pb.Message
// MustSync indicates whether the HardState and Entries must be synchronously
// written to disk or if an asynchronous write is permissible.
MustSync bool
}
複製代碼
應用程序獲得這個Ready
以後,須要:
node.ApplyConfChange()
方法讓node
知道。node.Advance()
告訴raft,這批狀態更新處理完了,狀態已經演進了,能夠給我下一批Ready讓我處理。前面咱們把整個包的結構過了一遍,下面來結合具體的代碼看看raft對一個請求的處理過程是怎樣的。我一直以爲,若是能從代碼的層面追蹤到一個請求的處理過程,那不管是從宏觀仍是微觀的角度,對理解整個系統都是很是有幫助的。
首先,在node
的大循環裏,有一個會定時輸出的tick channel
,它來觸發raft.tick()
函數,根據上面的介紹可知,若是當前節點是follower,那它的tick
函數會指向tickElection
。tickElection
的處理邏輯是給本身發送一個MsgHup
的內部消息,Step
函數看到這個消息後會調用campaign
函數,進入競選狀態。
// tickElection is run by followers and candidates after r.electionTimeout.
func (r *raft) tickElection() {
r.electionElapsed++
if r.promotable() && r.pastElectionTimeout() {
r.electionElapsed = 0
r.Step(pb.Message{From: r.id, Type: pb.MsgHup})
}
}
func (r *raft) Step(m pb.Message) error {
//...
switch m.Type {
case pb.MsgHup:
r.campaign(campaignElection)
}
}
複製代碼
campaign
則會調用becomeCandidate
把本身切換到candidate模式,並遞增Term
值。而後再將本身的Term
及日誌信息發送給其餘的節點,請求投票。
func (r *raft) campaign(t CampaignType) {
//...
r.becomeCandidate()
// Get peer id from progress
for id := range r.prs {
//...
r.send(pb.Message{Term: term, To: id, Type: voteMsg, Index: r.raftLog.lastIndex(), LogTerm: r.raftLog.lastTerm(), Context: ctx})
}
}
複製代碼
另外一方面,其餘節點在接受到這個請求後,會首先比較接收到的Term
是否是比本身的大,以及接受到的日誌信息是否是比本身的要新,從而決定是否投票。這個邏輯咱們仍是能夠從Step
函數中找到:
func (r *raft) Step(m pb.Message) error {
//...
switch m.Type {
case pb.MsgVote, pb.MsgPreVote:
// We can vote if this is a repeat of a vote we've already cast...
canVote := r.Vote == m.From ||
// ...we haven't voted and we don't think there's a leader yet in this term...
(r.Vote == None && r.lead == None) ||
// ...or this is a PreVote for a future term...
(m.Type == pb.MsgPreVote && m.Term > r.Term)
// ...and we believe the candidate is up to date.
if canVote && r.raftLog.isUpToDate(m.Index, m.LogTerm) {
r.send(pb.Message{To: m.From, Term: m.Term, Type: voteRespMsgType(m.Type)})
} else {
r.send(pb.Message{To: m.From, Term: r.Term, Type: voteRespMsgType(m.Type), Reject: true})
}
}
}
複製代碼
最後當candidate節點收到投票回覆後,就會計算收到的選票數目是否大於全部節點數的一半,若是大於則本身成爲leader,並昭告天下,不然將本身置爲follower:
func (r *raft) Step(m pb.Message) error {
//...
switch m.Type {
case myVoteRespType:
gr := r.poll(m.From, m.Type, !m.Reject)
switch r.quorum() {
case gr:
if r.state == StatePreCandidate {
r.campaign(campaignElection)
} else {
r.becomeLeader()
r.bcastAppend()
}
case len(r.votes) - gr:
r.becomeFollower(r.Term, None)
}
}
複製代碼
一個寫請求通常會經過調用node.Propose
開始,Propose
方法將這個寫請求封裝到一個MsgProp
消息裏面,發送給本身處理。
消息處理函數Step
沒法直接處理這個消息,它會調用那個小寫的step
函數,來根據當前的狀態進行處理。
func stepFollower(r *raft, m pb.Message) error {
switch m.Type {
case pb.MsgProp:
//...
m.To = r.lead
r.send(m)
}
}
複製代碼
Leader收到這個消息後(無論是follower轉發過來的仍是本身內部產生的)會有兩步操做:
func stepLeader(r *raft, m pb.Message) error {
switch m.Type {
case pb.MsgProp:
//...
if !r.appendEntry(m.Entries...) {
return ErrProposalDropped
}
r.bcastAppend()
return nil
}
}
複製代碼
在follower接受完這個log後,會返回一個MsgAppResp
消息。
當leader確認已經有足夠多的follower接受了這個log後,它首先會commit這個log,而後再廣播一次,告訴別人它的commit狀態。這裏的實現就有點像兩階段提交了。
func stepLeader(r *raft, m pb.Message) error {
switch m.Type {
case pb.MsgAppResp:
//...
if r.maybeCommit() {
r.bcastAppend()
}
}
}
// maybeCommit attempts to advance the commit index. Returns true if
// the commit index changed (in which case the caller should call
// r.bcastAppend).
func (r *raft) maybeCommit() bool {
//...
mis := r.matchBuf[:len(r.prs)]
idx := 0
for _, p := range r.prs {
mis[idx] = p.Match
idx++
}
sort.Sort(mis)
mci := mis[len(mis)-r.quorum()]
return r.raftLog.maybeCommit(mci, r.Term)
}
複製代碼
Etcd裏的raft模塊只實現了raft共識算法,而像消息的網絡傳輸,數據存儲都由上層應用來完成。這篇文章先介紹了基本的數據結構,而後在這些數據結構的基礎上引入了raft算法。同時,這裏還以一個投票請求和寫請求爲例,介紹了一個請求從接受到應答的完整處理過程。
但到目前爲止,咱們還有不少細節沒有涉及,好比說Linearizable Read,snapshot機制,WAL的存儲與回放,因此但願你能以這篇文章爲基礎,順藤摸瓜,繼續深刻研究下去。