用 etcd/raft 組建可以選舉的最簡集羣 demo

當今互聯網行業中,對於分佈式一致性算法,我的以爲實用性最高而且應用最普遍的就是 Raft 算法了。Raft 很是適合用於全部的節點均爲可信節點時的必要數據同步場景中。Raft 的基本原理理解起來並不難,網上不少文字簡介,都不如一個很生動的動畫來得直觀。node

etcd/raft

在 Kubenetes 中普遍使用的分佈式 KV 存儲系統 etcd 使用的就是 Raft 算法。算法的實現就直接做爲 etcd 的子 package(用 Go 編寫),路徑爲:github.com/etcd-io/etcd/raftgit

官方提供了一個 demo。這個 demo 其實已經很是完整了,它包含了網絡通訊、快照壓縮、數據同步等完整的功能。而對於 etcd/raft 的初見者而言,仍是稍微有點門檻了。本文的目的是儘可能抽絲剝繭,首先從 raft 最基本的功能——選舉來入手,構建一個小的集羣 demo,一步一步說明 etcd/raft 的用法。github

Demo 功能

這個小 demo 只實現一個功能:已知數量的集羣節點,可以進行 leader 的選舉。更多的功能(好比數據的存儲)在之後的文章陸續解析。算法

爲此,咱們須要研究 etcd/raft 的相關函數的用法。shell

Raft 節點數據結構

raft 使用接口 Node 來描述一個 Raft 節點。該接口的函數中,本文(或者說本階段)涉及的有四個:json

type Node interface {
    Tick()
    Step(ctx context.Context, msg raftpb.Message) error
    Ready() <-chan Ready
    Status() Status
}

啓動節點

Raft 節點數量建議是一個素數。這裏我採用 3 個。在節點數量已知的狀況下,咱們首先要告知 Raft node 節點的列表。每一個節點應該有惟一的一個 uint64 類型的 ID:segmentfault

peers := []raft.Peer{{ID: 0x01}, {ID: 0x02}, {ID: 0x03}}

應用程序須要本身實現節點與節點之間的網絡通訊。這裏我就在本地單進程運行三個協程,模擬三個節點,給三個節點分配三個 channel 用來通訊:網絡

var (
    bcChans = []chan raftpb.Message{
        make(chan raftpb.Message),
        make(chan raftpb.Message),
        make(chan raftpb.Message),
    }
)

// ......

func main() {
    peers := []raft.Peer{{ID: 0x01}, {ID: 0x02}, {ID: 0x03}}
    go startNode(0x01, peers)
    go startNode(0x02, peers)
    go startNode(0x03, peers)

    time.Sleep(2 * time.Second)
    return
}

節點程序中要完成的功能

在 etcd/raft 中對 Raft 算法的邏輯實現是儘可能地輕量化,只實現算法的核心功能。但與此相對的,須要調用 Raft 的應用程序實現較多的額外邏輯來實現完整的節點功能。在本文中,咱們只關心節點的選舉,該場景下咱們須要實現的功能有如下兩個:數據結構

節點內部心跳機制

Raft 節點依賴按期的心跳來進行週期性的狀態機流轉,應用程序須要給 raft 節點提供。在 demo 中,我用了一個帶隨機抖動的 ticker 來實現——而這也是 Raft 算法中建議的方案,也就是帶有一點隨機因素。當每一次 tick 到來時,就能夠調用 raft node 的 Tick() 方法,推進內部狀態機的更新:分佈式

func startNode(id uint64, peers []raft.Peer) {
    // ......

    for {
        select {
        case <-n.tick.Elapsed():    // 至關於 time 包 Ticker 的 tick.C
            n.node.Tick()           // n.node 是 raft.Node 對象,下同

        // ......
    }

    // ......
}

轉發節點之間的 raft 通訊

前文說到,Raft 節點之間的網絡通訊須要應用程序來實現。應用程序經過 etcd/raft 節點的 Ready() 方法接收節點須要對其餘發出的的信息。Ready() 函數返回 raft.Ready 結構體,在這一階段中,咱們須要使用的是 Ready 結構體的 Messages 成員,這是一個 []raftpb.Message 類型。應用程序須要負責的,就是將這些 message 發送出去。

Message 的定義並不長,以下所示:

type Message struct {
    Type             MessageType `protobuf:"varint,1,opt,name=type,enum=raftpb.MessageType" json:"type"`
    To               uint64      `protobuf:"varint,2,opt,name=to" json:"to"`
    From             uint64      `protobuf:"varint,3,opt,name=from" json:"from"`
    Term             uint64      `protobuf:"varint,4,opt,name=term" json:"term"`
    LogTerm          uint64      `protobuf:"varint,5,opt,name=logTerm" json:"logTerm"`
    Index            uint64      `protobuf:"varint,6,opt,name=index" json:"index"`
    Entries          []Entry     `protobuf:"bytes,7,rep,name=entries" json:"entries"`
    Commit           uint64      `protobuf:"varint,8,opt,name=commit" json:"commit"`
    Snapshot         Snapshot    `protobuf:"bytes,9,opt,name=snapshot" json:"snapshot"`
    Reject           bool        `protobuf:"varint,10,opt,name=reject" json:"reject"`
    RejectHint       uint64      `protobuf:"varint,11,opt,name=rejectHint" json:"rejectHint"`
    Context          []byte      `protobuf:"bytes,12,opt,name=context" json:"context,omitempty"`
    XXX_unrecognized []byte      `json:"-"`
}

能夠看到,這個結構體分別按照 protobuf 和 json 進行了定義,這就很是方便應用程序根據不一樣的通訊模式對數據進行序列化和反序列化後在網絡中傳輸。而 To 則告訴了應用程序應該將這個消息發送給哪個節點。在 demo 則是根據 To 發到對應的 channel 裏。

接收其餘節點發來的 raft 通訊

在 demo 中,節點從 channel 中獲取到 Message 對象以後,調用本節點的 Step() 函數:

func startNode(id uint64, peers []raft.Peer) {
    // ......

    for {
        select {
        // ......

        case m := <-n.recv:
            n.node.Step(context.TODO(), m)
    }

    // ......
}

完整 demo 代碼

完整代碼九十來行,能夠直接運行以後觀察 shell 輸出,瞭解 raft 的選舉過程。

package main

import (
    "context"
    "log"
    "strings"
    "time"

    "github.com/coreos/etcd/raft/raftpb"
    "github.com/etcd-io/etcd/raft"
    "github.com/influxdata/telegraf/agent"
)

func init() {
    log.SetFlags(log.Lshortfile | log.LstdFlags)
}

var (
    infof  = log.Printf
    errorf = log.Printf

    bcChans = []chan raftpb.Message{
        make(chan raftpb.Message),
        make(chan raftpb.Message),
        make(chan raftpb.Message),
    }
)

const (
    tickInterval      = 100 * time.Millisecond
    jitterMillisecond = 15 * time.Millisecond
)

func main() {
    infof("hello, raft!")
    defer infof("end of raft")

    peers := []raft.Peer{{ID: 0x01}, {ID: 0x02}, {ID: 0x03}}
    go startNode(0x01, peers)
    go startNode(0x02, peers)
    go startNode(0x03, peers)

    time.Sleep(2 * time.Second)
    return
}

func startNode(id uint64, peers []raft.Peer) {
    ctx := context.TODO()
    storage := raft.NewMemoryStorage()
    c := raft.Config{
        ID:              id,
        ElectionTick:    10,
        HeartbeatTick:   1,
        Storage:         storage,
        MaxSizePerMsg:   4096,
        MaxInflightMsgs: 256,
    }

    n := &node{
        id:          id,
        prefix:      strings.Repeat("\t\t\t", int(id)) + "| ",
        node:        raft.StartNode(&c, peers),
        tick:        agent.NewRollingTicker(tickInterval-jitterMillisecond, tickInterval+jitterMillisecond),
        recv:        bcChans[id-1],
        raftStorage: storage,
    }

    for {
        select {
        case <-n.tick.Elapsed():
            n.node.Tick()

        case rd := <-n.node.Ready():
            n.raftStorage.Append(rd.Entries)
            go n.sendMessage(rd.Messages)
            n.node.Advance()

        case m := <-n.recv:
            infof("%d -%s got message from %v to %v, type %v", id, n.prefix, m.From, m.To, m.Type)
            n.node.Step(ctx, m)
            infof("%d -%s status: %v", id, n.prefix, n.node.Status().RaftState)
        }
    }

    return
}

type node struct {
    id          uint64
    prefix      string
    node        raft.Node
    tick        *agent.RollingTicker
    recv        chan raftpb.Message
    raftStorage *raft.MemoryStorage
}

func (n *node) sendMessage(msg []raftpb.Message) {
    for _, m := range msg {
        to := m.To
        ch := bcChans[to-1]
        infof("%d -%s send to %v, type %v", n.id, n.prefix, m.To, m.Type)
        ch <- m
    }
    return
}

本文章採用 知識共享署名-非商業性使用-相同方式共享 4.0 國際許可協議 進行許可。

原做者: amc,歡迎轉載,但請註明出處。

原文標題:用 etcd/raft 組建可以選舉的最簡集羣 demo

發佈日期:2020/06/12

本文連接:http://www.javashuo.com/article/p-fwaydwej-nt.html

本文最先發佈於雲+社區,也是本人的博客

相關文章
相關標籤/搜索