當今互聯網行業中,對於分佈式一致性算法,我的以爲實用性最高而且應用最普遍的就是 Raft 算法了。Raft 很是適合用於全部的節點均爲可信節點時的必要數據同步場景中。Raft 的基本原理理解起來並不難,網上不少文字簡介,都不如一個很生動的動畫來得直觀。node
在 Kubenetes 中普遍使用的分佈式 KV 存儲系統 etcd
使用的就是 Raft 算法。算法的實現就直接做爲 etcd 的子 package(用 Go 編寫),路徑爲:github.com/etcd-io/etcd/raft。git
官方提供了一個 demo。這個 demo 其實已經很是完整了,它包含了網絡通訊、快照壓縮、數據同步等完整的功能。而對於 etcd/raft 的初見者而言,仍是稍微有點門檻了。本文的目的是儘可能抽絲剝繭,首先從 raft 最基本的功能——選舉來入手,構建一個小的集羣 demo,一步一步說明 etcd/raft 的用法。github
這個小 demo 只實現一個功能:已知數量的集羣節點,可以進行 leader 的選舉。更多的功能(好比數據的存儲)在之後的文章陸續解析。算法
爲此,咱們須要研究 etcd/raft 的相關函數的用法。shell
包 raft
使用接口 Node
來描述一個 Raft 節點。該接口的函數中,本文(或者說本階段)涉及的有四個:json
type Node interface { Tick() Step(ctx context.Context, msg raftpb.Message) error Ready() <-chan Ready Status() Status }
Raft 節點數量建議是一個素數。這裏我採用 3 個。在節點數量已知的狀況下,咱們首先要告知 Raft node 節點的列表。每一個節點應該有惟一的一個 uint64
類型的 ID:segmentfault
peers := []raft.Peer{{ID: 0x01}, {ID: 0x02}, {ID: 0x03}}
應用程序須要本身實現節點與節點之間的網絡通訊。這裏我就在本地單進程運行三個協程,模擬三個節點,給三個節點分配三個 channel 用來通訊:網絡
var ( bcChans = []chan raftpb.Message{ make(chan raftpb.Message), make(chan raftpb.Message), make(chan raftpb.Message), } ) // ...... func main() { peers := []raft.Peer{{ID: 0x01}, {ID: 0x02}, {ID: 0x03}} go startNode(0x01, peers) go startNode(0x02, peers) go startNode(0x03, peers) time.Sleep(2 * time.Second) return }
在 etcd/raft 中對 Raft 算法的邏輯實現是儘可能地輕量化,只實現算法的核心功能。但與此相對的,須要調用 Raft 的應用程序實現較多的額外邏輯來實現完整的節點功能。在本文中,咱們只關心節點的選舉,該場景下咱們須要實現的功能有如下兩個:數據結構
Raft 節點依賴按期的心跳來進行週期性的狀態機流轉,應用程序須要給 raft 節點提供。在 demo 中,我用了一個帶隨機抖動的 ticker 來實現——而這也是 Raft 算法中建議的方案,也就是帶有一點隨機因素。當每一次 tick 到來時,就能夠調用 raft node 的 Tick()
方法,推進內部狀態機的更新:分佈式
func startNode(id uint64, peers []raft.Peer) { // ...... for { select { case <-n.tick.Elapsed(): // 至關於 time 包 Ticker 的 tick.C n.node.Tick() // n.node 是 raft.Node 對象,下同 // ...... } // ...... }
前文說到,Raft 節點之間的網絡通訊須要應用程序來實現。應用程序經過 etcd/raft 節點的 Ready()
方法接收節點須要對其餘發出的的信息。Ready()
函數返回 raft.Ready
結構體,在這一階段中,咱們須要使用的是 Ready
結構體的 Messages
成員,這是一個 []raftpb.Message
類型。應用程序須要負責的,就是將這些 message 發送出去。
Message 的定義並不長,以下所示:
type Message struct { Type MessageType `protobuf:"varint,1,opt,name=type,enum=raftpb.MessageType" json:"type"` To uint64 `protobuf:"varint,2,opt,name=to" json:"to"` From uint64 `protobuf:"varint,3,opt,name=from" json:"from"` Term uint64 `protobuf:"varint,4,opt,name=term" json:"term"` LogTerm uint64 `protobuf:"varint,5,opt,name=logTerm" json:"logTerm"` Index uint64 `protobuf:"varint,6,opt,name=index" json:"index"` Entries []Entry `protobuf:"bytes,7,rep,name=entries" json:"entries"` Commit uint64 `protobuf:"varint,8,opt,name=commit" json:"commit"` Snapshot Snapshot `protobuf:"bytes,9,opt,name=snapshot" json:"snapshot"` Reject bool `protobuf:"varint,10,opt,name=reject" json:"reject"` RejectHint uint64 `protobuf:"varint,11,opt,name=rejectHint" json:"rejectHint"` Context []byte `protobuf:"bytes,12,opt,name=context" json:"context,omitempty"` XXX_unrecognized []byte `json:"-"` }
能夠看到,這個結構體分別按照 protobuf 和 json 進行了定義,這就很是方便應用程序根據不一樣的通訊模式對數據進行序列化和反序列化後在網絡中傳輸。而 To
則告訴了應用程序應該將這個消息發送給哪個節點。在 demo 則是根據 To
發到對應的 channel 裏。
在 demo 中,節點從 channel 中獲取到 Message
對象以後,調用本節點的 Step()
函數:
func startNode(id uint64, peers []raft.Peer) { // ...... for { select { // ...... case m := <-n.recv: n.node.Step(context.TODO(), m) } // ...... }
完整代碼九十來行,能夠直接運行以後觀察 shell 輸出,瞭解 raft 的選舉過程。
package main import ( "context" "log" "strings" "time" "github.com/coreos/etcd/raft/raftpb" "github.com/etcd-io/etcd/raft" "github.com/influxdata/telegraf/agent" ) func init() { log.SetFlags(log.Lshortfile | log.LstdFlags) } var ( infof = log.Printf errorf = log.Printf bcChans = []chan raftpb.Message{ make(chan raftpb.Message), make(chan raftpb.Message), make(chan raftpb.Message), } ) const ( tickInterval = 100 * time.Millisecond jitterMillisecond = 15 * time.Millisecond ) func main() { infof("hello, raft!") defer infof("end of raft") peers := []raft.Peer{{ID: 0x01}, {ID: 0x02}, {ID: 0x03}} go startNode(0x01, peers) go startNode(0x02, peers) go startNode(0x03, peers) time.Sleep(2 * time.Second) return } func startNode(id uint64, peers []raft.Peer) { ctx := context.TODO() storage := raft.NewMemoryStorage() c := raft.Config{ ID: id, ElectionTick: 10, HeartbeatTick: 1, Storage: storage, MaxSizePerMsg: 4096, MaxInflightMsgs: 256, } n := &node{ id: id, prefix: strings.Repeat("\t\t\t", int(id)) + "| ", node: raft.StartNode(&c, peers), tick: agent.NewRollingTicker(tickInterval-jitterMillisecond, tickInterval+jitterMillisecond), recv: bcChans[id-1], raftStorage: storage, } for { select { case <-n.tick.Elapsed(): n.node.Tick() case rd := <-n.node.Ready(): n.raftStorage.Append(rd.Entries) go n.sendMessage(rd.Messages) n.node.Advance() case m := <-n.recv: infof("%d -%s got message from %v to %v, type %v", id, n.prefix, m.From, m.To, m.Type) n.node.Step(ctx, m) infof("%d -%s status: %v", id, n.prefix, n.node.Status().RaftState) } } return } type node struct { id uint64 prefix string node raft.Node tick *agent.RollingTicker recv chan raftpb.Message raftStorage *raft.MemoryStorage } func (n *node) sendMessage(msg []raftpb.Message) { for _, m := range msg { to := m.To ch := bcChans[to-1] infof("%d -%s send to %v, type %v", n.id, n.prefix, m.To, m.Type) ch <- m } return }
本文章採用 知識共享署名-非商業性使用-相同方式共享 4.0 國際許可協議 進行許可。
原做者: amc,歡迎轉載,但請註明出處。
原文標題:用 etcd/raft 組建可以選舉的最簡集羣 demo
發佈日期:2020/06/12
本文連接:http://www.javashuo.com/article/p-fwaydwej-nt.html
本文最先發佈於雲+社區,也是本人的博客