etcd 是基於 raft 協議實現的分佈式一致性jian值存儲,本篇文章不介紹etcd的使用,本文講解在etcd源碼中提供的example,經過這個example來學習etcd是如何使用 raft協議。node
這個example在etcd源碼目錄下的contrib目錄中golang
tree -L 1 . ├── Makefile ├── NOTICE ├── OWNERS ├── Procfile ├── Procfile.v2 ├── README.md ├── ROADMAP.md ├── auth ├── bill-of-materials.json ├── bill-of-materials.override.json ├── build ├── build.bat ├── build.ps1 ├── client ├── clientv3 ├── code-of-conduct.md ├── contrib # 今天的主角 ├── docs ├── embed ├── etcd.conf.yml.sample ├── etcdctl ├── etcdmain ├── etcdserver ├── functional ├── functional.yaml ├── go.mod ├── go.sum ├── hack ├── integration ├── lease ├── logos ├── main.go ├── main_test.go ├── mvcc ├── pkg ├── proxy ├── raft ├── scripts ├── test ├── tests ├── tools ├── vendor ├── version └── wal
tree -L 1 contrib/raftexample/ contrib/raftexample/ ├── Procfile ├── README.md ├── doc.go ├── httpapi.go ├── kvstore.go ├── kvstore_test.go ├── listener.go ├── main.go ├── raft.go └── raftexample_test.go
先看一下入口文件 main.gojson
func main() { cluster := flag.String("cluster", "http://127.0.0.1:9021", "comma separated cluster peers") id := flag.Int("id", 1, "node ID") kvport := flag.Int("port", 9121, "key-value server port") join := flag.Bool("join", false, "join an existing cluster") flag.Parse() proposeC := make(chan string) defer close(proposeC) confChangeC := make(chan raftpb.ConfChange) defer close(confChangeC) // raft provides a commit stream for the proposals from the http api var kvs *kvstore getSnapshot := func() ([]byte, error) { return kvs.getSnapshot() } commitC, errorC, snapshotterReady := newRaftNode(*id, strings.Split(*cluster, ","), *join, getSnapshot, proposeC, confChangeC) kvs = newKVStore(<-snapshotterReady, proposeC, commitC, errorC) // the key-value http handler will propose updates to raft serveHttpKVAPI(kvs, *kvport, confChangeC, errorC) }
進行了一些初始化動做,看一下 newRaftNode 在 raft.go文件中api
// newRaftNode initiates a raft instance and returns a committed log entry // channel and error channel. Proposals for log updates are sent over the // provided the proposal channel. All log entries are replayed over the // commit channel, followed by a nil message (to indicate the channel is // current), then new log entries. To shutdown, close proposeC and read errorC. func newRaftNode(id int, peers []string, join bool, getSnapshot func() ([]byte, error), proposeC <-chan string, confChangeC <-chan raftpb.ConfChange) (<-chan *string, <-chan error, <-chan *snap.Snapshotter) { commitC := make(chan *string) errorC := make(chan error) // 初始化 raftnode 這個raft node 是etcd 中應用層面的 raft node 在 raft 協議層面也是用一個 raft node 經過應用層面的結構體定義能夠發現 在結構體中是存在一個 raft 協議層面的node的,這兩個node是一對一的關係 rc := &raftNode{ proposeC: proposeC, confChangeC: confChangeC, commitC: commitC, errorC: errorC, id: id, peers: peers, join: join, waldir: fmt.Sprintf("raftexample-%d", id), snapdir: fmt.Sprintf("raftexample-%d-snap", id), getSnapshot: getSnapshot, snapCount: defaultSnapshotCount, stopc: make(chan struct{}), httpstopc: make(chan struct{}), httpdonec: make(chan struct{}), snapshotterReady: make(chan *snap.Snapshotter, 1), // rest of structure populated after WAL replay } go rc.startRaft() return commitC, errorC, rc.snapshotterReady }
看一下 startRaft 作了什麼,仍是在當前文件下mvc
func (rc *raftNode) startRaft() { if !fileutil.Exist(rc.snapdir) { if err := os.Mkdir(rc.snapdir, 0750); err != nil { log.Fatalf("raftexample: cannot create dir for snapshot (%v)", err) } } // 獲取快照實例 rc.snapshotter = snap.New(zap.NewExample(), rc.snapdir) rc.snapshotterReady <- rc.snapshotter // 重放wal日誌到內存,由於etcd維護了內存索引,查詢時會經過內存索引獲取到信息,這個信息是指key的值和版本號 oldwal := wal.Exist(rc.waldir) rc.wal = rc.replayWAL() rpeers := make([]raft.Peer, len(rc.peers)) for i := range rpeers { rpeers[i] = raft.Peer{ID: uint64(i + 1)} } // 和 raft 一致性協議的相關配置 c := &raft.Config{ ID: uint64(rc.id), ElectionTick: 10, HeartbeatTick: 1, Storage: rc.raftStorage, MaxSizePerMsg: 1024 * 1024, MaxInflightMsgs: 256, MaxUncommittedEntriesSize: 1 << 30, } // 初始化 raft協議層面的 node if oldwal { rc.node = raft.RestartNode(c) } else { startPeers := rpeers if rc.join { startPeers = nil } rc.node = raft.StartNode(c, startPeers) } // 初始化 transport, transport 用來和etcd 集羣中其餘節點間進行通訊並傳遞信息的橋樑,raft協議只是實現了消息和狀態的一致,可是沒有實現傳輸消息的代碼,這部分須要etcd應用層面來實現 rc.transport = &rafthttp.Transport{ Logger: zap.NewExample(), ID: types.ID(rc.id), ClusterID: 0x1000, Raft: rc, ServerStats: stats.NewServerStats("", ""), LeaderStats: stats.NewLeaderStats(strconv.Itoa(rc.id)), ErrorC: make(chan error), } // 記錄集羣中實例信息,用來通訊 rc.transport.Start() for i := range rc.peers { if i+1 != rc.id { rc.transport.AddPeer(types.ID(i+1), []string{rc.peers[i]}) } } // serveRaft 是transport 的httpserver用來處理通訊,先不看 go rc.serveRaft() go rc.serveChannels() }
看一下 serveChannels,這裏是重點,下面咱們來分析一下整個過程app
func (rc *raftNode) serveChannels() { snap, err := rc.raftStorage.Snapshot() if err != nil { panic(err) } rc.confState = snap.Metadata.ConfState rc.snapshotIndex = snap.Metadata.Index rc.appliedIndex = snap.Metadata.Index defer rc.wal.Close() ticker := time.NewTicker(100 * time.Millisecond) defer ticker.Stop() // 當e咱們對etcd進行操做時,增刪改查時,都是一個proposals,這個proposals要傳遞到 raft 協議中,讓其維護集羣中各個節點的一致狀態 // send proposals over raft go func() { confChangeCount := uint64(0) for rc.proposeC != nil && rc.confChangeC != nil { select { // 收到一個 proposals 後發送到 raft 協議中,後面會看到當一個http請求進來時會向這個proposeC傳遞數據的 case prop, ok := <-rc.proposeC: if !ok { rc.proposeC = nil } else { // blocks until accepted by raft state machine // 發送 rc.node.Propose(context.TODO(), []byte(prop)) } case cc, ok := <-rc.confChangeC: if !ok { rc.confChangeC = nil } else { confChangeCount++ cc.ID = confChangeCount rc.node.ProposeConfChange(context.TODO(), cc) } } } // client closed channel; shutdown raft if not already close(rc.stopc) }() // event loop on raft state machine updates for { select { case <-ticker.C: rc.node.Tick() // store raft entries to wal, then publish over commit channel // 當raft 協議處理完後,會返回給上層應用一條消息,由etcd應用層面進行處理,raft 協議層作了什麼先不分析,接下來會專門寫一篇文章來分析raft協議的流程 case rd := <-rc.node.Ready(): // 持久化到 wal 日誌 rc.wal.Save(rd.HardState, rd.Entries) if !raft.IsEmptySnap(rd.Snapshot) { // 保存到快照 rc.saveSnap(rd.Snapshot) rc.raftStorage.ApplySnapshot(rd.Snapshot) rc.publishSnapshot(rd.Snapshot) } // 添加到內存索引中 rc.raftStorage.Append(rd.Entries) // 發送到集羣中其餘節點 rc.transport.Send(rd.Messages) if ok := rc.publishEntries(rc.entriesToApply(rd.CommittedEntries)); !ok { rc.stop() return } rc.maybeTriggerSnapshot() // 從 raft 協議中獲取下一條待處理的消息 rc.node.Advance() case err := <-rc.transport.ErrorC: rc.writeError(err) return case <-rc.stopc: rc.stop() return } } }
讓咱們回到 main.go 文件中,從 newRaftNode 這個函數一直走了很遠出來,這個函數最後返回了 幾個參數分佈式
commitC, errorC, snapshotterReady := newRaftNode(*id, strings.Split(*cluster, ","), *join, getSnapshot, proposeC, confChangeC)
接着這行代碼繼續向下分析 newKVStoreide
kvs = newKVStore(<-snapshotterReady, proposeC, commitC, errorC)
着是一個簡單的 內存 kv 存儲,模擬了etcd中的kv存儲,在etcd中 v3版本是用 bolt 這個golang語言開發的kv存儲,這個 example爲了說明raft協議在etcd中的應用因此簡單用內存結構構造了kv存儲。代碼裏作的事情就是讀取 commitC 這個cahnnel 中的信息,而後將信息存儲到map中,就不具體分析了函數
serveHttpKVAPI(kvs, *kvport, confChangeC, errorC)
緊接着啓動了http服務,具體實如今 httpapi.go 文件中oop
func (h *httpKVAPI) ServeHTTP(w http.ResponseWriter, r *http.Request) { key := r.RequestURI switch { // 看一下put操做 case r.Method == "PUT": v, err := ioutil.ReadAll(r.Body) if err != nil { log.Printf("Failed to read on PUT (%v)\n", err) http.Error(w, "Failed on PUT", http.StatusBadRequest) return } // stor 是 kvstor 內存kv存儲, h.store.Propose(key, string(v)) // Optimistic-- no waiting for ack from raft. Value is not yet // committed so a subsequent GET on the key may return old value w.WriteHeader(http.StatusNoContent) case r.Method == "GET": if v, ok := h.store.Lookup(key); ok { w.Write([]byte(v)) } else { http.Error(w, "Failed to GET", http.StatusNotFound) } case r.Method == "POST": url, err := ioutil.ReadAll(r.Body) if err != nil { log.Printf("Failed to read on POST (%v)\n", err) http.Error(w, "Failed on POST", http.StatusBadRequest) return } nodeId, err := strconv.ParseUint(key[1:], 0, 64) if err != nil { log.Printf("Failed to convert ID for conf change (%v)\n", err) http.Error(w, "Failed on POST", http.StatusBadRequest) return } cc := raftpb.ConfChange{ Type: raftpb.ConfChangeAddNode, NodeID: nodeId, Context: url, } h.confChangeC <- cc // As above, optimistic that raft will apply the conf change w.WriteHeader(http.StatusNoContent) case r.Method == "DELETE": nodeId, err := strconv.ParseUint(key[1:], 0, 64) if err != nil { log.Printf("Failed to convert ID for conf change (%v)\n", err) http.Error(w, "Failed on DELETE", http.StatusBadRequest) return } cc := raftpb.ConfChange{ Type: raftpb.ConfChangeRemoveNode, NodeID: nodeId, } h.confChangeC <- cc // As above, optimistic that raft will apply the conf change w.WriteHeader(http.StatusNoContent) default: w.Header().Set("Allow", "PUT") w.Header().Add("Allow", "GET") w.Header().Add("Allow", "POST") w.Header().Add("Allow", "DELETE") http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) } } // serveHttpKVAPI starts a key-value server with a GET/PUT API and listens. func serveHttpKVAPI(kv *kvstore, port int, confChangeC chan<- raftpb.ConfChange, errorC <-chan error) { srv := http.Server{ Addr: ":" + strconv.Itoa(port), Handler: &httpKVAPI{ store: kv, confChangeC: confChangeC, }, } go func() { if err := srv.ListenAndServe(); err != nil { log.Fatal(err) } }() // exit when raft goes down if err, ok := <-errorC; ok { log.Fatal(err) } }
func (s *kvstore) Propose(k string, v string) { var buf bytes.Buffer if err := gob.NewEncoder(&buf).Encode(kv{k, v}); err != nil { log.Fatal(err) } // 發送請求數據到proposeC中,上面咱們分析過有地方在監聽這個proposeC channel s.proposeC <- buf.String() }
到此整個example中的raft流程結束了,看上去仍是蠻簡單的,接下來會專門分析一下 raft協議內部的原理。