早在2013年11月份,在raft論文還只能在網上下載到草稿版時,我曾經寫過一篇blog對其進行簡要分析。4年過去了,各類raft協議的講解鋪天蓋地,raft也確實獲得了普遍的應用。其中最知名的應用莫過於etcd。etcd將raft協議自己實現爲一個library,位於https://github.com/coreos/etcd/tree/master/raft,而後自己做爲一個應用使用它。html
本文不講解raft協議核心內容,而是站在一個etcd raft library使用者的角度,講解要用上這個library須要瞭解的東西。node
這個library使用起來相對來講仍是有點麻煩。官方有一個使用示例在 https://github.com/coreos/etcd/tree/master/contrib/raftexample。總體來講,這個庫實現了raft協議核心的內容,好比append log的邏輯,選主邏輯,snapshot,成員變動等邏輯。須要明確的是:library沒有實現消息的網絡傳輸和接收,庫只會把一些待發送的消息保存在內存中,用戶自定義的網絡傳輸層取出消息併發送出去,而且在網絡接收端,須要調一個library的函數,用於將收到的消息傳入library,後面會詳細說明。同時,library定義了一個Storage接口,須要library的使用者自行實現。git
Storage接口以下:github
// Storage is an interface that may be implemented by the application // to retrieve log entries from storage. // // If any Storage method returns an error, the raft instance will // become inoperable and refuse to participate in elections; the // application is responsible for cleanup and recovery in this case. type Storage interface { // InitialState returns the saved HardState and ConfState information. InitialState() (pb.HardState, pb.ConfState, error) // Entries returns a slice of log entries in the range [lo,hi). // MaxSize limits the total size of the log entries returned, but // Entries returns at least one entry if any. Entries(lo, hi, maxSize uint64) ([]pb.Entry, error) // Term returns the term of entry i, which must be in the range // [FirstIndex()-1, LastIndex()]. The term of the entry before // FirstIndex is retained for matching purposes even though the // rest of that entry may not be available. Term(i uint64) (uint64, error) // LastIndex returns the index of the last entry in the log. LastIndex() (uint64, error) // FirstIndex returns the index of the first log entry that is // possibly available via Entries (older entries have been incorporated // into the latest Snapshot; if storage only contains the dummy entry the // first log entry is not available). FirstIndex() (uint64, error) // Snapshot returns the most recent snapshot. // If snapshot is temporarily unavailable, it should return ErrSnapshotTemporarilyUnavailable, // so raft state machine could know that Storage needs some time to prepare // snapshot and call Snapshot later. Snapshot() (pb.Snapshot, error) }
這些接口在library中會被用到。熟悉raft協議的人不難理解。上面提到的官方示例https://github.com/coreos/etcd/tree/master/contrib/raftexample中使用了library自帶的MemoryStorage,和etcd的wal和snap包作持久化,重啓的時候從wal和snap中獲取日誌恢復MemoryStorage。網絡
要提供這種IO/網絡密集型的東西,提升吞吐最好的手段就是batch加批處理了。etcd raft library正是這麼作的。併發
下面看一下爲了作這事,etcd提供的核心抽象Ready結構體:app
// Ready encapsulates the entries and messages that are ready to read, // be saved to stable storage, committed or sent to other peers. // All fields in Ready are read-only. type Ready struct { // The current volatile state of a Node. // SoftState will be nil if there is no update. // It is not required to consume or store SoftState. *SoftState // The current state of a Node to be saved to stable storage BEFORE // Messages are sent. // HardState will be equal to empty state if there is no update. pb.HardState // ReadStates can be used for node to serve linearizable read requests locally // when its applied index is greater than the index in ReadState. // Note that the readState will be returned when raft receives msgReadIndex. // The returned is only valid for the request that requested to read. ReadStates []ReadState // Entries specifies entries to be saved to stable storage BEFORE // Messages are sent. Entries []pb.Entry // Snapshot specifies the snapshot to be saved to stable storage. Snapshot pb.Snapshot // CommittedEntries specifies entries to be committed to a // store/state-machine. These have previously been committed to stable // store. CommittedEntries []pb.Entry // Messages specifies outbound messages to be sent AFTER Entries are // committed to stable storage. // If it contains a MsgSnap message, the application MUST report back to raft // when the snapshot has been received or has failed by calling ReportSnapshot. Messages []pb.Message // MustSync indicates whether the HardState and Entries must be synchronously // written to disk or if an asynchronous write is permissible. MustSync bool }
能夠說,這個Ready結構體封裝了一批更新,這些更新包括:async
庫的使用者從node結構體提供的一個ready channel中不斷的pop出一個個的Ready進行處理,庫使用者經過以下方法拿到Ready channel:函數
func (n *node) Ready() <-chan Ready { return n.readyc }
應用須要對Ready的處理包括:ui
應用經過raft.StartNode()來啓動raft中的一個副本,函數內部經過啓動一個goroutine運行
func (n *node) run(r *raft)
來啓動服務。
應用經過調用
func (n *node) Propose(ctx context.Context, data []byte) error
來Propose一個請求給raft,被raft開始處理後返回。
增刪節點經過調用
func (n *node) ProposeConfChange(ctx context.Context, cc pb.ConfChange) error
node結構體包含幾個重要的channel:
// node is the canonical implementation of the Node interface type node struct { propc chan pb.Message recvc chan pb.Message confc chan pb.ConfChange confstatec chan pb.ConfState readyc chan Ready advancec chan struct{} tickc chan struct{} done chan struct{} stop chan struct{} status chan chan Status logger Logger }
recvc: 應用自定義的transport在收到Message後須要調用
func (n *node) Step(ctx context.Context, m pb.Message) error來把Message放入recvc中,通過一些處理後,一樣,會把須要發送的Message放入到對應peers的mailbox中。後續經過自定義transport發送出去。
readyc/advancec: readyc和advancec都是沒有buffer的channel,node.run()內部把相關的一些狀態更新打包成Ready結構體(其中一種狀態就是上面提到的msgs)放入readyc中。應用從readyc中pop出Ready中,對相應的狀態進行處理,處理完成後,調用
rc.node.Advance()往advancec中push一個空結構體告訴raft,已經對這批Ready包含的狀態進行了相應的處理,node.run()內部從advancec中獲得通知後,對內部一些狀態進行處理,好比把已經持久化到storage中的entries從內存(對應type unstable struct)中刪除等。
confc/confstatec:應用從Ready中拿出CommittedEntries,檢查其若是含有成員變動類型的日誌,則須要調用
func (n *node) ApplyConfChange(cc pb.ConfChange) *pb.ConfState
這個函數會push ConfChange到confc中,confc一樣是個無buffer的channel,node.run()內部會從confc中拿出ConfChange,而後進行真正的增減peers操做,以後將最新的成員組push到confstatec中,而ApplyConfChange函數從confstatec pop出最新的成員組返回給應用。
能夠說,要想用上etcd的raft library仍是須要了解很多東西的。