raft協議在 etcd中的應用

1、簡介

etcd 是基於 raft 協議實現的分佈式一致性jian值存儲,本篇文章不介紹etcd的使用,本文講解在etcd源碼中提供的example,經過這個example來學習etcd是如何使用 raft協議。node


2、實現

這個example在etcd源碼目錄下的contrib目錄中golang

tree -L 1
.
├── Makefile
├── NOTICE
├── OWNERS
├── Procfile
├── Procfile.v2
├── README.md
├── ROADMAP.md
├── auth
├── bill-of-materials.json
├── bill-of-materials.override.json
├── build
├── build.bat
├── build.ps1
├── client
├── clientv3
├── code-of-conduct.md
├── contrib    # 今天的主角
├── docs
├── embed
├── etcd.conf.yml.sample
├── etcdctl
├── etcdmain
├── etcdserver
├── functional
├── functional.yaml
├── go.mod
├── go.sum
├── hack
├── integration
├── lease
├── logos
├── main.go
├── main_test.go
├── mvcc
├── pkg
├── proxy
├── raft
├── scripts
├── test
├── tests
├── tools
├── vendor
├── version
└── wal
tree -L 1 contrib/raftexample/
contrib/raftexample/
├── Procfile
├── README.md
├── doc.go
├── httpapi.go
├── kvstore.go
├── kvstore_test.go
├── listener.go
├── main.go
├── raft.go
└── raftexample_test.go

先看一下入口文件 main.gojson

func main() {
    cluster := flag.String("cluster", "http://127.0.0.1:9021", "comma separated cluster peers")
    id := flag.Int("id", 1, "node ID")
    kvport := flag.Int("port", 9121, "key-value server port")
    join := flag.Bool("join", false, "join an existing cluster")
    flag.Parse()

    proposeC := make(chan string)
    defer close(proposeC)
    confChangeC := make(chan raftpb.ConfChange)
    defer close(confChangeC)

    // raft provides a commit stream for the proposals from the http api
    var kvs *kvstore
    getSnapshot := func() ([]byte, error) { return kvs.getSnapshot() }
    commitC, errorC, snapshotterReady := newRaftNode(*id, strings.Split(*cluster, ","), *join, getSnapshot, proposeC, confChangeC)

    kvs = newKVStore(<-snapshotterReady, proposeC, commitC, errorC)

    // the key-value http handler will propose updates to raft
    serveHttpKVAPI(kvs, *kvport, confChangeC, errorC)
}

進行了一些初始化動做,看一下 newRaftNode 在 raft.go文件中api

// newRaftNode initiates a raft instance and returns a committed log entry
// channel and error channel. Proposals for log updates are sent over the
// provided the proposal channel. All log entries are replayed over the
// commit channel, followed by a nil message (to indicate the channel is
// current), then new log entries. To shutdown, close proposeC and read errorC.
func newRaftNode(id int, peers []string, join bool, getSnapshot func() ([]byte, error), proposeC <-chan string,
    confChangeC <-chan raftpb.ConfChange) (<-chan *string, <-chan error, <-chan *snap.Snapshotter) {

    commitC := make(chan *string)
    errorC := make(chan error)
    // 初始化 raftnode 這個raft node 是etcd 中應用層面的 raft node 在 raft 協議層面也是用一個 raft node 經過應用層面的結構體定義能夠發現 在結構體中是存在一個 raft 協議層面的node的,這兩個node是一對一的關係
    rc := &raftNode{
        proposeC:    proposeC,
        confChangeC: confChangeC,
        commitC:     commitC,
        errorC:      errorC,
        id:          id,
        peers:       peers,
        join:        join,
        waldir:      fmt.Sprintf("raftexample-%d", id),
        snapdir:     fmt.Sprintf("raftexample-%d-snap", id),
        getSnapshot: getSnapshot,
        snapCount:   defaultSnapshotCount,
        stopc:       make(chan struct{}),
        httpstopc:   make(chan struct{}),
        httpdonec:   make(chan struct{}),

        snapshotterReady: make(chan *snap.Snapshotter, 1),
        // rest of structure populated after WAL replay
    }
    go rc.startRaft()
    return commitC, errorC, rc.snapshotterReady
}

看一下 startRaft 作了什麼,仍是在當前文件下mvc

func (rc *raftNode) startRaft() {
    if !fileutil.Exist(rc.snapdir) {
        if err := os.Mkdir(rc.snapdir, 0750); err != nil {
            log.Fatalf("raftexample: cannot create dir for snapshot (%v)", err)
        }
    }
    // 獲取快照實例
    rc.snapshotter = snap.New(zap.NewExample(), rc.snapdir)
    rc.snapshotterReady <- rc.snapshotter
   // 重放wal日誌到內存,由於etcd維護了內存索引,查詢時會經過內存索引獲取到信息,這個信息是指key的值和版本號
    oldwal := wal.Exist(rc.waldir)
    rc.wal = rc.replayWAL()

    rpeers := make([]raft.Peer, len(rc.peers))
    for i := range rpeers {
        rpeers[i] = raft.Peer{ID: uint64(i + 1)}
    }
    // 和 raft 一致性協議的相關配置
    c := &raft.Config{
        ID:                        uint64(rc.id),
        ElectionTick:              10,
        HeartbeatTick:             1,
        Storage:                   rc.raftStorage,
        MaxSizePerMsg:             1024 * 1024,
        MaxInflightMsgs:           256,
        MaxUncommittedEntriesSize: 1 << 30,
    }
    // 初始化 raft協議層面的 node 
    if oldwal {
        rc.node = raft.RestartNode(c)
    } else {
        startPeers := rpeers
        if rc.join {
            startPeers = nil
        }
        rc.node = raft.StartNode(c, startPeers)
    }
    // 初始化 transport, transport 用來和etcd 集羣中其餘節點間進行通訊並傳遞信息的橋樑,raft協議只是實現了消息和狀態的一致,可是沒有實現傳輸消息的代碼,這部分須要etcd應用層面來實現
    rc.transport = &rafthttp.Transport{
        Logger:      zap.NewExample(),
        ID:          types.ID(rc.id),
        ClusterID:   0x1000,
        Raft:        rc,
        ServerStats: stats.NewServerStats("", ""),
        LeaderStats: stats.NewLeaderStats(strconv.Itoa(rc.id)),
        ErrorC:      make(chan error),
    }
    // 記錄集羣中實例信息,用來通訊
    rc.transport.Start()
    for i := range rc.peers {
        if i+1 != rc.id {
            rc.transport.AddPeer(types.ID(i+1), []string{rc.peers[i]})
        }
    }
    // serveRaft 是transport 的httpserver用來處理通訊,先不看
    go rc.serveRaft()
    go rc.serveChannels()
}

看一下 serveChannels,這裏是重點,下面咱們來分析一下整個過程app

func (rc *raftNode) serveChannels() {
    snap, err := rc.raftStorage.Snapshot()
    if err != nil {
        panic(err)
    }
    rc.confState = snap.Metadata.ConfState
    rc.snapshotIndex = snap.Metadata.Index
    rc.appliedIndex = snap.Metadata.Index

    defer rc.wal.Close()

    ticker := time.NewTicker(100 * time.Millisecond)
    defer ticker.Stop()
     // 當e咱們對etcd進行操做時,增刪改查時,都是一個proposals,這個proposals要傳遞到 raft 協議中,讓其維護集羣中各個節點的一致狀態
    // send proposals over raft
    go func() {
        confChangeCount := uint64(0)

        for rc.proposeC != nil && rc.confChangeC != nil {
            select {
            // 收到一個 proposals 後發送到 raft 協議中,後面會看到當一個http請求進來時會向這個proposeC傳遞數據的
            case prop, ok := <-rc.proposeC:
                if !ok {
                    rc.proposeC = nil
                } else {
                    // blocks until accepted by raft state machine
                    // 發送
                    rc.node.Propose(context.TODO(), []byte(prop))
                }

            case cc, ok := <-rc.confChangeC:
                if !ok {
                    rc.confChangeC = nil
                } else {
                    confChangeCount++
                    cc.ID = confChangeCount
                    rc.node.ProposeConfChange(context.TODO(), cc)
                }
            }
        }
        // client closed channel; shutdown raft if not already
        close(rc.stopc)
    }()

    // event loop on raft state machine updates
    for {
        select {
        case <-ticker.C:
            rc.node.Tick()

        // store raft entries to wal, then publish over commit channel
        // 當raft 協議處理完後,會返回給上層應用一條消息,由etcd應用層面進行處理,raft 協議層作了什麼先不分析,接下來會專門寫一篇文章來分析raft協議的流程
        case rd := <-rc.node.Ready():
            // 持久化到 wal 日誌
            rc.wal.Save(rd.HardState, rd.Entries)
            if !raft.IsEmptySnap(rd.Snapshot) {
                // 保存到快照
                rc.saveSnap(rd.Snapshot)
    rc.raftStorage.ApplySnapshot(rd.Snapshot)
                rc.publishSnapshot(rd.Snapshot)
            }
            // 添加到內存索引中
            rc.raftStorage.Append(rd.Entries)
            // 發送到集羣中其餘節點
            rc.transport.Send(rd.Messages)
            if ok := rc.publishEntries(rc.entriesToApply(rd.CommittedEntries)); !ok {
                rc.stop()
                return
            }
            rc.maybeTriggerSnapshot()
            // 從 raft 協議中獲取下一條待處理的消息
            rc.node.Advance()

        case err := <-rc.transport.ErrorC:
            rc.writeError(err)
            return

        case <-rc.stopc:
            rc.stop()
            return
        }
    }
}

讓咱們回到 main.go 文件中,從 newRaftNode 這個函數一直走了很遠出來,這個函數最後返回了 幾個參數分佈式

commitC, errorC, snapshotterReady := newRaftNode(*id, strings.Split(*cluster, ","), *join, getSnapshot, proposeC, confChangeC)

接着這行代碼繼續向下分析 newKVStoreide

kvs = newKVStore(<-snapshotterReady, proposeC, commitC, errorC)

着是一個簡單的 內存 kv 存儲,模擬了etcd中的kv存儲,在etcd中 v3版本是用 bolt 這個golang語言開發的kv存儲,這個 example爲了說明raft協議在etcd中的應用因此簡單用內存結構構造了kv存儲。代碼裏作的事情就是讀取 commitC 這個cahnnel 中的信息,而後將信息存儲到map中,就不具體分析了函數

serveHttpKVAPI(kvs, *kvport, confChangeC, errorC)

緊接着啓動了http服務,具體實如今 httpapi.go 文件中oop

func (h *httpKVAPI) ServeHTTP(w http.ResponseWriter, r *http.Request) {
    key := r.RequestURI
    switch {
    // 看一下put操做
    case r.Method == "PUT":
        v, err := ioutil.ReadAll(r.Body)
        if err != nil {
            log.Printf("Failed to read on PUT (%v)\n", err)
            http.Error(w, "Failed on PUT", http.StatusBadRequest)
            return
        }
        // stor 是 kvstor 內存kv存儲,
        h.store.Propose(key, string(v))

        // Optimistic-- no waiting for ack from raft. Value is not yet
        // committed so a subsequent GET on the key may return old value
        w.WriteHeader(http.StatusNoContent)
    case r.Method == "GET":
        if v, ok := h.store.Lookup(key); ok {
            w.Write([]byte(v))
        } else {
            http.Error(w, "Failed to GET", http.StatusNotFound)
        }
    case r.Method == "POST":
        url, err := ioutil.ReadAll(r.Body)
        if err != nil {
            log.Printf("Failed to read on POST (%v)\n", err)
            http.Error(w, "Failed on POST", http.StatusBadRequest)
            return
        }

        nodeId, err := strconv.ParseUint(key[1:], 0, 64)
        if err != nil {
            log.Printf("Failed to convert ID for conf change (%v)\n", err)
            http.Error(w, "Failed on POST", http.StatusBadRequest)
            return
        }

        cc := raftpb.ConfChange{
            Type:    raftpb.ConfChangeAddNode,
            NodeID:  nodeId,
            Context: url,
        }
        h.confChangeC <- cc

        // As above, optimistic that raft will apply the conf change
        w.WriteHeader(http.StatusNoContent)
    case r.Method == "DELETE":
        nodeId, err := strconv.ParseUint(key[1:], 0, 64)
        if err != nil {
            log.Printf("Failed to convert ID for conf change (%v)\n", err)
            http.Error(w, "Failed on DELETE", http.StatusBadRequest)
            return
        }

        cc := raftpb.ConfChange{
            Type:   raftpb.ConfChangeRemoveNode,
            NodeID: nodeId,
        }
        h.confChangeC <- cc

        // As above, optimistic that raft will apply the conf change
        w.WriteHeader(http.StatusNoContent)
    default:
        w.Header().Set("Allow", "PUT")
        w.Header().Add("Allow", "GET")
        w.Header().Add("Allow", "POST")
        w.Header().Add("Allow", "DELETE")
        http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
    }
}

// serveHttpKVAPI starts a key-value server with a GET/PUT API and listens.
func serveHttpKVAPI(kv *kvstore, port int, confChangeC chan<- raftpb.ConfChange, errorC <-chan error) {
    srv := http.Server{
        Addr: ":" + strconv.Itoa(port),
        Handler: &httpKVAPI{
            store:       kv,
            confChangeC: confChangeC,
        },
    }
    go func() {
        if err := srv.ListenAndServe(); err != nil {
            log.Fatal(err)
        }
    }()

    // exit when raft goes down
    if err, ok := <-errorC; ok {
        log.Fatal(err)
    }
}
func (s *kvstore) Propose(k string, v string) {
    var buf bytes.Buffer
    if err := gob.NewEncoder(&buf).Encode(kv{k, v}); err != nil {
        log.Fatal(err)
    }
     // 發送請求數據到proposeC中,上面咱們分析過有地方在監聽這個proposeC channel
    s.proposeC <- buf.String()
}

到此整個example中的raft流程結束了,看上去仍是蠻簡單的,接下來會專門分析一下 raft協議內部的原理。

相關文章
相關標籤/搜索