Raft在etcd中的實現

做者:喵叔
原文:blog.betacat.io/post/raft-i…node

Raft是近年來比較流行的一個一致性算法。它的原理比較容易理解,網上也有不少相關的介紹,所以這裏我就再也不囉嗦原理了,而是打算以raft在etcd中的實現[1]爲例,從工程的角度來說講這個算法的一個具體實現,畢竟瞭解原理只算是「紙上談兵」,離真正能把它應用起來還有很長一段距離。git

若是你還不熟悉raft,這個經典的動畫演示它的論文以及這個lecture可能會對你有幫助。或者你也能夠直接觀看下面的視頻,這是我做的一次技術分享,講的是etcd中raft模塊的源碼解析。說句題外話,不少Conference和Meetup都會把視頻錄像上傳到YouTube上,YouTube簡直就是程序員的衣櫃,每逛一次都有新收穫。www.youtube.com/watch?v=sL0…程序員

Overview

Etcd將raft協議實現爲一個library,而後自己做爲一個應用使用它。固然,多是爲了推廣它所實現的這個library,etcd還額外提供了一個叫raftexample的示例程序,向用戶展現怎樣在它所提供的raft library的基礎上構建出一個分佈式的KV存儲引擎。github

在etcd中,raft做爲底層的共識模塊,運行在一個goroutine裏,經過channel接受上層(etcdserver)傳來的消息,並將處理後的結果經過另外一個channel返回給上層應用,他們的交互過程大概是這樣的:算法

Raft stack

這種全異步的交互方式好處就是它提升了性能,但壞處就是難以調試,代碼看起來會很繞。拿etcd舉例,不少時候你只看到它把一個消息push到一個slice/channel裏面,而後這部分函數調用鏈就結束了,你沒法直觀的追蹤到,究竟是誰最後處理了這個消息。數組

Code Breakdown

咱們來看一下這個raft library裏面都有哪些文件:bash

$ tree --dirsfirst -L 1 -I '*test*' -P '*.go'
.
├── raftpb
├── doc.go
├── log.go
├── log_unstable.go
├── logger.go
├── node.go
├── progress.go
├── raft.go
├── rawnode.go
├── read_only.go
├── status.go
├── storage.go
└── util.go
複製代碼

下面按功能模塊依次介紹:網絡

raftpb

Raft中的序列化是藉助於Protocol Buffer來實現的,這個文件夾就定義了須要序列化的幾個數據結構,咱們先從EntryMessage開始看起:數據結構

Entry

從總體上來講,一個集羣中的每一個節點都是一個狀態機,而raft管理的就是對這個狀態機進行更改的一些操做,這些操做在代碼中被封裝爲一個個Entryapp

// https://github.com/etcd-io/etcd/blob/v3.3.10/raft/raftpb/raft.pb.go#L203
type Entry struct {
    Term             uint64
    Index            uint64
    Type             EntryType
    Data             []byte
}
複製代碼
  • Term:選舉任期,每次選舉以後遞增1。它的主要做用是標記信息的時效性,比方說當一個節點發出來的消息中攜帶的term是2,而另外一個節點攜帶的term是3,那咱們就認爲第一個節點的信息過期了。
  • Index:當前這個entry在整個raft日誌中的位置索引。有了TermIndex以後,一個log entry就能被惟一標識。
  • Type:當前entry的類型,目前etcd支持兩種類型:EntryNormalEntryConfChange,EntryNormal表明當前Entry是對狀態機的操做,EntryConfChange則表明對當前集羣配置進行更改的操做,好比增長或者減小節點。
  • Data:一個被序列化後的byte數組,表明當前entry真正要執行的操做,比方說若是上面的TypeEntryNormal,那這裏的Data就多是具體要更改的key-value pair,若是TypeEntryConfChange,那Data就是具體的配置更改項ConfChange。raft算法自己並不關心這個數據是什麼,它只是把這段數據當作log同步過程當中的payload來處理,具體對這個數據的解析則有上層應用來完成。

Message

Raft集羣中節點之間的通信都是經過傳遞不一樣的Message來完成的,這個Message結構就是一個很是general的大容器,它涵蓋了各類消息所需的字段。

// https://github.com/etcd-io/etcd/blob/v3.3.10/raft/raftpb/raft.pb.go#L239
type Message struct {
    Type             MessageType
    To               uint64
    From             uint64
    Term             uint64
    LogTerm          uint64
    Index            uint64
    Entries          []Entry
    Commit           uint64
    Snapshot         Snapshot
    Reject           bool
    RejectHint       uint64
    Context          []byte
}
複製代碼
  • Type:當前傳遞的消息類型,它的取值有不少種,好比用來請求投票的MsgVote、用來處理網絡分區的MsgPreVote[2]、用來發給leader節點,讓它在日誌中增長數據的MsgProp(ose)、用來複制日誌的MsgApp(end)、用來安裝snapshot的MsgSnap。不一樣類型的消息也會用到下面不一樣的字段。
  • To, From分別表明了這個消息的接受者和發送者。
  • Term:這個消息發出時整個集羣所處的任期。
  • LogTerm:消息發出者所保存的日誌中最後一條的任期號,通常MsgVote會用到這個字段。
  • Index:日誌索引號。若是當前消息是MsgVote的話,表明這個candidate最後一條日誌的索引號,它跟上面的LogTerm一塊兒表明這個candidate所擁有的最新日誌信息,這樣別人就能夠比較本身的日誌是否是比candidata的日誌要新,從而決定是否投票。
  • Entries:須要存儲的日誌。
  • Commit:已經提交的日誌的索引值,用來向別人同步日誌的提交信息。
  • Snapshot:通常跟MsgSnap合用,用來放置具體的Snapshot值。
  • Reject,RejectHint:表明對方節點拒絕了當前節點的請求(MsgVote/MsgApp/MsgSnap...)

log_unstable.go

顧名思義,unstable數據結構用於尚未被用戶層持久化的數據,它維護了兩部份內容snapshotentries

// https://github.com/etcd-io/etcd/blob/v3.3.10/raft/log_unstable.go#L23
type unstable struct {
    // the incoming unstable snapshot, if any.
    snapshot *pb.Snapshot
    // all entries that have not yet been written to storage.
    entries []pb.Entry
    offset  uint64

    logger Logger
}
複製代碼

entries表明的是要進行操做的日誌,但日誌不可能無限增加,在特定的狀況下,某些過時的日誌會被清空。那這就引入一個新問題了,若是此後一個新的follower加入,而leader只有一部分操做日誌,那這個新follower不是無法跟別人同步了嗎?因此這個時候snapshot就登場了 - 我沒法給你以前的日誌,但我給你全部以前日誌應用後的結果,以後的日誌你再以這個snapshot爲基礎進行應用,那咱們的狀態就能夠同步了。所以它們的結構關係能夠用下圖表示[3]

log_unstable

這裏的前半部分是快照數據,然後半部分是日誌條目組成的數組entries,另外unstable.offset成員保存的是entries數組中的第一條數據在raft日誌中的索引,即第i條entries在raft日誌中的索引爲i + unstable.offset

storage.go

這個文件定義了一個Storage接口,由於etcd中的raft實現並不負責數據的持久化,因此它但願上面的應用層能實現這個接口,以便提供給它查詢log的能力。

另外,這個文件也提供了Storage接口的一個內存版本的實現MemoryStorage,這個實現一樣也維護了snapshotentries這兩部分,他們的排列跟unstable中的相似,也是snapshot在前,entries在後。從代碼中看來etcdserverraftexample都是直接用的這個實現來提供log的查詢功能的。

log.go

有了以上的介紹unstable、Storage的準備以後,下面能夠來介紹raftLog的實現,這個結構體承擔了raft日誌相關的操做。

raftLog由如下成員組成:

  • storage Storage:前面提到的存放已經持久化數據的Storage接口。
  • unstable unstable:前面分析過的unstable結構體,用於保存應用層尚未持久化的數據。
  • committed uint64:保存當前提交的日誌數據索引。
  • applied uint64:保存當前傳入狀態機的數據最高索引。

須要說明的是,一條日誌數據,首先須要被提交(committed)成功,而後才能被應用(applied)到狀態機中。所以,如下不等式一直成立:applied <= committed

raftLog結構體中,幾部分數據的排列以下圖所示[3:1]

RaftLog Layout

這個數據排布的狀況,能夠從raftLog的初始化函數中看出來:

// https://github.com/etcd-io/etcd/blob/v3.3.10/raft/log.go#L45
// newLog returns log using the given storage. It recovers the log to the state
// that it just commits and applies the latest snapshot.
func newLog(storage Storage, logger Logger) *raftLog {
    if storage == nil {
        log.Panic("storage must not be nil")
    }
    log := &raftLog{
        storage: storage,
        logger:  logger,
    }
    firstIndex, err := storage.FirstIndex()
    if err != nil {
        panic(err) // TODO(bdarnell)
    }
    lastIndex, err := storage.LastIndex()
    if err != nil {
        panic(err) // TODO(bdarnell)
    }
    log.unstable.offset = lastIndex + 1
    log.unstable.logger = logger
    // Initialize our committed and applied pointers to the time of the last compaction.
    log.committed = firstIndex - 1
    log.applied = firstIndex - 1

    return log
}
複製代碼

所以,從這裏的代碼能夠看出,raftLog的兩部分,持久化存儲和非持久化存儲,它們之間的分界線就是lastIndex,在此以前都是Storage管理的已經持久化的數據,而在此以後都是unstable管理的尚未持久化的數據。

以上分析中還有一個疑問,爲何並無初始化unstable.snapshot成員,也就是unstable結構體的快照數據?緣由在於,上面這個是初始化函數,也就是節點剛啓動的時候調用來初始化存儲狀態的函數,而unstable.snapshot數據,是在啓動以後同步數據的過程當中,若是須要同步快照數據時纔會去進行賦值修改的數據,所以在這裏並無對它進行操做的地方。

progress.go

Leader經過Progress這個數據結構來追蹤一個follower的狀態,並根據Progress裏的信息來決定每次同步的日誌項。這裏介紹三個比較重要的屬性:

// https://github.com/etcd-io/etcd/blob/v3.3.10/raft/progress.go#L37
// Progress represents a follower’s progress in the view of the leader. Leader maintains
// progresses of all followers, and sends entries to the follower based on its progress.
type Progress struct {
    Match, Next uint64

    State ProgressStateType

    ins *inflights
}
複製代碼
  1. 用來保存當前follower節點的日誌狀態的屬性:

    • Match:保存目前爲止,已複製給該follower的日誌的最高索引值。若是leader對該follower上的日誌狀況一無所知的話,這個值被設爲0。
    • Next:保存下一次leader發送append消息給該follower的日誌索引,即下一次複製日誌時,leader會從Next開始發送日誌。

    在正常狀況下,Next = Match + 1,也就是下一個要同步的日誌應當是對方已有日誌的下一條。

  2. State屬性用來保存該節點當前的同步狀態,它會有一下幾種取值[4]

    • ProgressStateProbe

    探測狀態,當follower拒絕了最近的append消息時,那麼就會進入探測狀態,此時leader會試圖繼續往前追溯該follower的日誌從哪裏開始丟失的。在probe狀態時,leader每次最多append一條日誌,若是收到的迴應中帶有RejectHint信息,則回退Next索引,以便下次重試。在初始時,leader會把全部follower的狀態設爲probe,由於它並不知道各個follower的同步狀態,因此須要慢慢試探。

    • ProgressStateReplicate

    當leader確認某個follower的同步狀態後,它就會把這個follower的state切換到這個狀態,而且用pipeline的方式快速複製日誌。leader在發送複製消息以後,就修改該節點的Next索引爲發送消息的最大索引+1。

    • ProgressStateSnapshot

    接收快照狀態。當leader向某個follower發送append消息,試圖讓該follower狀態跟上leader時,發現此時leader上保存的索引數據已經對不上了,好比leader在index爲10以前的數據都已經寫入快照中了,可是該follower須要的是10以前的數據,此時就會切換到該狀態下,發送快照給該follower。當快照數據同步追上以後,並非直接切換到Replicate狀態,而是首先切換到Probe狀態。

  3. ins屬性用來作流量控制,由於若是同步請求很是多,再碰上網絡分區時,leader可能會累積不少待發送消息,一旦網絡恢復,可能會有很是大流量發送給follower,因此這裏要作flow control。它的實現有點相似TCP的滑動窗口,這裏再也不贅述。

綜上,Progress其實也是個狀態機,下面是它的狀態轉移圖:

Progress State Machine

raft.go

前面鋪設了一大堆概念,如今終於輪到實現邏輯了。從名字也能夠看出,raft協議的具體實現就在這個文件裏。這其中,大部分的邏輯是由Step函數驅動的。

// https://github.com/etcd-io/etcd/blob/v3.3.10/raft/raft.go#L752
func (r *raft) Step(m pb.Message) error {
    //...
    switch m.Type {
        case pb.MsgHup:
        //...
        case pb.MsgVote, pb.MsgPreVote:
        //...
        default:
            r.step(r, m)
    }
}
複製代碼

Step的主要做用是處理不一樣的[消息]({{< relref "#message" >}}),因此之後當咱們想知道raft對某種消息的處理邏輯時,到這裏找就對了。在函數的最後,有個default語句,即全部上面不能處理的消息都落入這裏,由一個小寫的step函數處理,這個設計的緣由是什麼呢?

實際上是由於這裏的raft也被實現爲一個狀態機,它的step屬性是一個函數指針,根據當前節點的不一樣角色,指向不一樣的消息處理函數:stepLeader/stepFollower/stepCandidate。與它相似的還有一個tick函數指針,根據角色的不一樣,也會在tickHeartbeattickElection之間來回切換,分別用來觸發定時心跳和選舉檢測。這裏的函數指針感受像實現了OOP裏的多態。

Raft State Machine

node.go

node的主要做用是應用層(etcdserver)和共識模塊(raft)的銜接。將應用層的消息傳遞給底層共識模塊,並將底層共識模塊共識後的結果反饋給應用層。因此它的初始化函數建立了不少用來通訊的channel,而後就在另外一個goroutine裏面開始了事件循環,不停的在各類channel中倒騰數據(貌似這種由for-select-channel組成的事件循環在Go裏面很受歡迎)。

// https://github.com/etcd-io/etcd/blob/v3.3.10/raft/node.go#L286
for {
  select {
    case m := <-propc:
        r.Step(m)
    case m := <-n.recvc:
        r.Step(m)
    case cc := <-n.confc:
        // Add/remove/update node according to cc.Type
    case <-n.tickc:
        r.tick()
    case readyc <- rd:
        // Cleaning after result is consumed by application
    case <-advancec:
        // Stablize logs
    case c := <-n.status:
        // Update status
    case <-n.stop:
        close(n.done)
        return
    }
}
複製代碼

propcrecvc中拿到的是從上層應用傳進來的消息,這個消息會被交給raft層的Step函數處理,具體處理邏輯我上面有過介紹。

下面來解釋下readyc的做用。在etcd的這個實現中,node並不負責數據的持久化、網絡消息的通訊、以及將已經提交的log應用到狀態機中,因此node使用readyc這個channel對外通知有數據要處理了,並將這些須要外部處理的數據打包到一個Ready結構體中:

// https://github.com/etcd-io/etcd/blob/v3.3.10/raft/node.go#L52
// Ready encapsulates the entries and messages that are ready to read,
// be saved to stable storage, committed or sent to other peers.
// All fields in Ready are read-only.
type Ready struct {
    // The current volatile state of a Node.
    // SoftState will be nil if there is no update.
    // It is not required to consume or store SoftState.
    *SoftState

    // The current state of a Node to be saved to stable storage BEFORE
    // Messages are sent.
    // HardState will be equal to empty state if there is no update.
    pb.HardState

    // ReadStates can be used for node to serve linearizable read requests locally
    // when its applied index is greater than the index in ReadState.
    // Note that the readState will be returned when raft receives msgReadIndex.
    // The returned is only valid for the request that requested to read.
    ReadStates []ReadState

    // Entries specifies entries to be saved to stable storage BEFORE
    // Messages are sent.
    Entries []pb.Entry

    // Snapshot specifies the snapshot to be saved to stable storage.
    Snapshot pb.Snapshot

    // CommittedEntries specifies entries to be committed to a
    // store/state-machine. These have previously been committed to stable
    // store.
    CommittedEntries []pb.Entry

    // Messages specifies outbound messages to be sent AFTER Entries are
    // committed to stable storage.
    // If it contains a MsgSnap message, the application MUST report back to raft
    // when the snapshot has been received or has failed by calling ReportSnapshot.
    Messages []pb.Message

    // MustSync indicates whether the HardState and Entries must be synchronously
    // written to disk or if an asynchronous write is permissible.
    MustSync bool
}
複製代碼

應用程序獲得這個Ready以後,須要:

  1. 將HardState, Entries, Snapshot持久化到storage。
  2. 將Messages廣播給其餘節點。
  3. 將CommittedEntries(已經commit尚未apply)應用到狀態機。
  4. 若是發現CommittedEntries中有成員變動類型的entry,調用node.ApplyConfChange()方法讓node知道。
  5. 最後再調用node.Advance()告訴raft,這批狀態更新處理完了,狀態已經演進了,能夠給我下一批Ready讓我處理。

Life of a Request

前面咱們把整個包的結構過了一遍,下面來結合具體的代碼看看raft對一個請求的處理過程是怎樣的。我一直以爲,若是能從代碼的層面追蹤到一個請求的處理過程,那不管是從宏觀仍是微觀的角度,對理解整個系統都是很是有幫助的。

Life of a Vote Request

  1. 首先,在node的大循環裏,有一個會定時輸出的tick channel,它來觸發raft.tick()函數,根據上面的介紹可知,若是當前節點是follower,那它的tick函數會指向tickElectiontickElection的處理邏輯是給本身發送一個MsgHup的內部消息,Step函數看到這個消息後會調用campaign函數,進入競選狀態。

    // tickElection is run by followers and candidates after r.electionTimeout.
    func (r *raft) tickElection() {
        r.electionElapsed++
    
        if r.promotable() && r.pastElectionTimeout() {
            r.electionElapsed = 0
            r.Step(pb.Message{From: r.id, Type: pb.MsgHup})
        }
    }
    
    func (r *raft) Step(m pb.Message) error {
        //...
        switch m.Type {
        case pb.MsgHup:
            r.campaign(campaignElection)
        }
    }
    複製代碼
  2. campaign則會調用becomeCandidate把本身切換到candidate模式,並遞增Term值。而後再將本身的Term及日誌信息發送給其餘的節點,請求投票。

    func (r *raft) campaign(t CampaignType) {
        //...
        r.becomeCandidate()
        // Get peer id from progress
        for id := range r.prs {
            //...
            r.send(pb.Message{Term: term, To: id, Type: voteMsg, Index: r.raftLog.lastIndex(), LogTerm: r.raftLog.lastTerm(), Context: ctx})
        }
    }
    複製代碼
  3. 另外一方面,其餘節點在接受到這個請求後,會首先比較接收到的Term是否是比本身的大,以及接受到的日誌信息是否是比本身的要新,從而決定是否投票。這個邏輯咱們仍是能夠從Step函數中找到:

    func (r *raft) Step(m pb.Message) error {
        //...
        switch m.Type {
        case pb.MsgVote, pb.MsgPreVote:
            // We can vote if this is a repeat of a vote we've already cast...
            canVote := r.Vote == m.From ||
                // ...we haven't voted and we don't think there's a leader yet in this term...
                (r.Vote == None && r.lead == None) ||
                // ...or this is a PreVote for a future term...
                (m.Type == pb.MsgPreVote && m.Term > r.Term)
            // ...and we believe the candidate is up to date.
            if canVote && r.raftLog.isUpToDate(m.Index, m.LogTerm) {
                r.send(pb.Message{To: m.From, Term: m.Term, Type: voteRespMsgType(m.Type)})
            } else {
                r.send(pb.Message{To: m.From, Term: r.Term, Type: voteRespMsgType(m.Type), Reject: true})
            }
        }
    }
    複製代碼
  4. 最後當candidate節點收到投票回覆後,就會計算收到的選票數目是否大於全部節點數的一半,若是大於則本身成爲leader,並昭告天下,不然將本身置爲follower:

    func (r *raft) Step(m pb.Message) error {
        //...
        switch m.Type {
        case myVoteRespType:
            gr := r.poll(m.From, m.Type, !m.Reject)
            switch r.quorum() {
            case gr:
                if r.state == StatePreCandidate {
                    r.campaign(campaignElection)
                } else {
                    r.becomeLeader()
                    r.bcastAppend()
                }
            case len(r.votes) - gr:
                r.becomeFollower(r.Term, None)
        }
    }
    複製代碼

Life of a Write Request

  1. 一個寫請求通常會經過調用node.Propose開始,Propose方法將這個寫請求封裝到一個MsgProp消息裏面,發送給本身處理。

  2. 消息處理函數Step沒法直接處理這個消息,它會調用那個小寫的step函數,來根據當前的狀態進行處理。

    • 若是當前是follower,那它會把這個消息轉發給leader。
    func stepFollower(r *raft, m pb.Message) error {
        switch m.Type {
        case pb.MsgProp:
            //...
            m.To = r.lead
            r.send(m)
        }
    }
    複製代碼
  3. Leader收到這個消息後(無論是follower轉發過來的仍是本身內部產生的)會有兩步操做:

    1. 將這個消息添加到本身的log裏
    2. 向其餘follower廣播這個消息
    func stepLeader(r *raft, m pb.Message) error {
        switch m.Type {
        case pb.MsgProp:
            //...
            if !r.appendEntry(m.Entries...) {
                return ErrProposalDropped
            }
            r.bcastAppend()
            return nil
        }
    }
    複製代碼
  4. 在follower接受完這個log後,會返回一個MsgAppResp消息。

  5. 當leader確認已經有足夠多的follower接受了這個log後,它首先會commit這個log,而後再廣播一次,告訴別人它的commit狀態。這裏的實現就有點像兩階段提交了。

    func stepLeader(r *raft, m pb.Message) error {
        switch m.Type {
        case pb.MsgAppResp:
            //...
            if r.maybeCommit() {
                r.bcastAppend()
            }
        }
    }
    
    // maybeCommit attempts to advance the commit index. Returns true if
    // the commit index changed (in which case the caller should call
    // r.bcastAppend).
    func (r *raft) maybeCommit() bool {
        //...
        mis := r.matchBuf[:len(r.prs)]
        idx := 0
        for _, p := range r.prs {
            mis[idx] = p.Match
            idx++
        }
        sort.Sort(mis)
        mci := mis[len(mis)-r.quorum()]
        return r.raftLog.maybeCommit(mci, r.Term)
    }
    複製代碼

Conclusion

Etcd裏的raft模塊只實現了raft共識算法,而像消息的網絡傳輸,數據存儲都由上層應用來完成。這篇文章先介紹了基本的數據結構,而後在這些數據結構的基礎上引入了raft算法。同時,這裏還以一個投票請求和寫請求爲例,介紹了一個請求從接受到應答的完整處理過程。

但到目前爲止,咱們還有不少細節沒有涉及,好比說Linearizable Read,snapshot機制,WAL的存儲與回放,因此但願你能以這篇文章爲基礎,順藤摸瓜,繼續深刻研究下去。


  1. 到寫這篇文章爲止,etcd的最新版本爲v3.3.10,因此這裏的分析都是以v3.3.10爲基礎。 ↩︎

  2. Raft的PreVote實現機制 ↩︎

  3. etcd Raft庫解析 ↩︎ ↩︎

  4. Design spec for Raft Progress ↩︎

相關文章
相關標籤/搜索