手擼golang etcd raft協議之5node
最近閱讀 [雲原生分佈式存儲基石:etcd深刻解析] (杜軍 , 2019.1)
本系列筆記擬採用golang練習之
gitee: https://gitee.com/ioly/learning.gooopgit
分佈式存儲系統一般會經過維護多個副原本進行容錯, 以提升系統的可用性。 這就引出了分佈式存儲系統的核心問題——如何保證多個副本的一致性? Raft算法把問題分解成了領袖選舉(leader election)、 日誌複製(log replication)、安全性(safety) 和成員關係變化(membership changes)這幾個子問題。 Raft算法的基本操做只需2種RPC便可完成。 RequestVote RPC是在選舉過程當中經過舊的Leader觸發的, AppendEntries RPC是領導人觸發的,目的是向其餘節點複製日誌條目和發送心跳(heartbeat)。
將原有濃縮的兩個接口分解爲更易於理解和實現的四個接口。盡信書則不如無書-_-||github
package rpc import "learning/gooop/etcd/raft/model" type IRaftRPC interface { // leader to follower Heartbeat(cmd *HeartbeatCmd, ret *HeartbeatRet) error // leader to follower AppendLog(cmd *AppendLogCmd, ret *AppendLogRet) error // leader to follower CommitLog(cmd *CommitLogCmd, ret *CommitLogRet) error // candidate to follower RequestVote(cmd *RequestVoteCmd, ret *RequestVoteRet) error } type HeartbeatCmd struct { LeaderID string Term int64 } type HeartbeatRet struct { Code HBCode Term int64 } type HBCode int const ( HBOk HBCode = iota HBTermMismatch HBCode = iota ) type RequestVoteCmd struct { CandidateID string Term int64 LastLogIndex int64 LastLogTerm int64 } type RequestVoteRet struct { Code RVCode Term int64 } type RVCode int const ( RVOk RVCode = iota RVLogMismatch RVCode = iota RVTermMismatch RVCode = iota RVVotedAnother RVCode = iota ) type AppendLogCmd struct { LeaderID string Term int64 Entry *model.LogEntry } type AppendLogRet struct { Code ALCode Term int64 PrevLogIndex int64 PrevLogTerm int64 } type ALCode int const ( ALOk ALCode = iota ALTermMismatch ALCode = iota ALIndexMismatch ALCode = iota ALInternalError ALCode = iota ) type CommitLogCmd struct { LeaderID string Term int64 Index int64 } type CommitLogRet struct { Code CLCode } type CLCode int const ( CLOk CLCode = iota CLLogNotFound CLCode = iota CLInternalError CLCode = iota )
添加部分包內支持接口golang
package lsm import ( "learning/gooop/etcd/raft/config" "learning/gooop/etcd/raft/rpc" "learning/gooop/etcd/raft/store" ) // IRaftLSM raft有限狀態自動機 type IRaftLSM interface { rpc.IRaftRPC State() IRaftState config() config.IRaftConfig store() store.ILogStore handleStateChanged(state IRaftState) }
抽取並實現事件驅動型的邏輯編排算法
package lsm type tEventHandleFunc func(e string, args... interface{}) type iEventDrivenModel interface { hook(e string, handleFunc tEventHandleFunc) raise(e string, args... interface{}) } type tEventDrivenModel struct { items map[string][]tEventHandleFunc } func (me *tEventDrivenModel) hook(e string, handler tEventHandleFunc) { arr, ok := me.items[e] if ok { me.items[e] = append(arr, handler) } else { me.items[e] = []tEventHandleFunc{handler } } } func (me *tEventDrivenModel) raise(e string, args... interface{}) { if handlers, ok := me.items[e];ok { for _,it := range handlers { it(e, args...) } } }
改造適配新分解的RPC接口安全
package store import "learning/gooop/etcd/raft/model" type ILogStore interface { LastAppendedTerm() int64 LastAppendedIndex() int64 LastCommittedTerm() int64 LastCommittedIndex() int64 Append(entry *model.LogEntry) error Commit(index int64) error GetLog(index int64) (error, *model.LogEntry) }
基於boltdb實現日誌暫存,提交和應用app
package store import ( "bytes" "encoding/binary" "errors" "github.com/boltdb/bolt" "learning/gooop/etcd/raft/model" ) type tBoltDBStore struct { file string lastAppendedTerm int64 lastAppendedIndex int64 lastCommittedTerm int64 lastCommittedIndex int64 db bolt.DB } func NewBoltStore(file string) (error, ILogStore) { db, err := bolt.Open(file, 0600, nil) if err != nil { return err, nil } store := new(tBoltDBStore) err = db.Update(func(tx *bolt.Tx) error { b, e := tx.CreateBucketIfNotExists(gMetaBucket) if e != nil { return e } v := b.Get(gKeyCommittedTerm) if v == nil { e = b.Put(gKeyCommittedTerm, int64ToBytes(gDefaultTerm)) if e != nil { return e } store.lastCommittedTerm = gDefaultTerm } else { store.lastCommittedTerm = bytesToInt64(v) } v = b.Get(gKeyCommittedIndex) if v == nil { e = b.Put(gKeyCommittedIndex, int64ToBytes(gDefaultIndex)) if e != nil { return e } store.lastCommittedIndex = gDefaultIndex } else { store.lastCommittedIndex = bytesToInt64(v) } b, e = tx.CreateBucketIfNotExists(gDataBucket) if e != nil { return e } e = tx.DeleteBucket(gUnstableBucket) if e != nil { return e } _, e = tx.CreateBucket(gUnstableBucket) if e != nil { return e } _, e = tx.CreateBucketIfNotExists(gCommittedBucket) if e != nil { return e } return nil }) if err != nil { return err, nil } return nil, store } func int64ToBytes(i int64) []byte { buf := bytes.NewBuffer(make([]byte, 8)) _ = binary.Write(buf, binary.BigEndian, i) return buf.Bytes() } func bytesToInt64(data []byte) int64 { var i int64 buf := bytes.NewBuffer(data) _ = binary.Read(buf, binary.BigEndian, &i) return i } func (me *tBoltDBStore) LastCommittedTerm() int64 { return me.lastCommittedTerm } func (me *tBoltDBStore) LastCommittedIndex() int64 { return me.lastCommittedIndex } func (me *tBoltDBStore) LastAppendedTerm() int64 { return me.lastAppendedTerm } func (me *tBoltDBStore) LastAppendedIndex() int64 { return me.lastAppendedIndex } func (me *tBoltDBStore) Append(entry *model.LogEntry) error { cmd := gCmdFactory.OfTag(entry.Tag) cmd.Unmarshal(entry.Command) e, entryData := entry.Marshal() if e != nil { return e } return me.db.Update(func(tx *bolt.Tx) error { // save log to unstable b := tx.Bucket(gUnstableBucket) e = b.Put(int64ToBytes(entry.Index), entryData) if e != nil { return e } return nil }) } func (me *tBoltDBStore) Commit(index int64) error { return me.db.Update(func(tx *bolt.Tx) error { // read unstable log ub := tx.Bucket(gUnstableBucket) k := int64ToBytes(index) data := ub.Get(k) if data == nil { return gErrorCommitLogNotFound } entry := new(model.LogEntry) e := entry.Unmarshal(data) if e != nil { return e } // apply cmd cmd := gCmdFactory.OfTag(entry.Tag) cmd.Unmarshal(entry.Command) e = cmd.Apply(tx) if e != nil { return e } // save to committed log cb := tx.Bucket(gCommittedBucket) e = cb.Put(k, data) if e != nil { return e } // update committed.index, committed.term mb := tx.Bucket(gMetaBucket) e = mb.Put(gKeyCommittedIndex, int64ToBytes(index)) if e != nil { return e } e = mb.Put(gKeyCommittedTerm, int64ToBytes(entry.Term)) if e != nil { return e } // del unstable.index e = ub.Delete(k) if e != nil { return e } me.lastCommittedIndex = entry.Index me.lastCommittedTerm = entry.Term return nil }) } func (me *tBoltDBStore) GetLog(index int64) (error, *model.LogEntry) { ret := []*model.LogEntry{ nil } e := me.db.View(func(tx *bolt.Tx) error { k := int64ToBytes(index) v := tx.Bucket(gCommittedBucket).Get(k) if v == nil { return nil } entry := new(model.LogEntry) e := entry.Unmarshal(v) if e != nil { return e } ret[0] = entry return nil }) return e, ret[0] } var gMetaBucket = []byte("meta") var gUnstableBucket = []byte("unstable") var gCommittedBucket = []byte("committed") var gDataBucket = []byte("data") var gKeyCommittedIndex = []byte("committed.index") var gKeyCommittedTerm = []byte("committed.term") var gDefaultTerm int64 = 0 var gDefaultIndex int64 = 0 var gErrorCommitLogNotFound = errors.New("committing log not found")
根據新分解的RPC接口,重寫Follower狀態的實現(未完成)分佈式
package lsm import ( "learning/gooop/etcd/raft/roles" "learning/gooop/etcd/raft/rpc" "learning/gooop/etcd/raft/timeout" "sync" "time" ) // tFollowerState presents a follower node type tFollowerState struct { tEventDrivenModel context IRaftLSM mInitOnce sync.Once mStartOnce sync.Once mDisposeOnce sync.Once // updated when init, set term == store.lastCommittedTerm // updated when leader.heartbeat mTerm int64 // updated when leader.heartbeat mLeaderHeartbeatClock int64 mVotedLeaderID string mVotedTimestamp int64 } const feStart string = "follower.Start" const feLeaderHeartbeatTimeout string = "follower.LeaderHeartbeatTimeout" func newFollowerState(ctx IRaftLSM) IRaftState { it := new(tFollowerState) it.init(ctx) return it } func (me *tFollowerState) init(ctx IRaftLSM) { me.mInitOnce.Do(func() { me.context = ctx me.mTerm = ctx.store().LastCommittedTerm() me.mLeaderHeartbeatClock = 0 me.initEventHandlers() }) } func (me *tFollowerState) initEventHandlers() { me.hook(feStart, me.whenStartThenBeginWatchLeaderTimeout) me.hook(feLeaderHeartbeatTimeout, me.whenLeaderHeartbeatTimeoutThenSwitchToCandidateState) } func (me *tFollowerState) Start() { me.mStartOnce.Do(func() { me.raise(feStart) }) } func (me *tFollowerState) whenStartThenBeginWatchLeaderTimeout(e string, args ...interface{}) { go func() { iCheckingTimeoutInterval := timeout.HeartbeatTimeout / 3 iHeartbeatTimeoutNanos := int64(timeout.HeartbeatTimeout / time.Nanosecond) for range time.Tick(iCheckingTimeoutInterval) { now := time.Now().UnixNano() if now - me.mLeaderHeartbeatClock >= iHeartbeatTimeoutNanos { me.raise(feLeaderHeartbeatTimeout) return } } }() } func (me *tFollowerState) whenLeaderHeartbeatTimeoutThenSwitchToCandidateState(_ string, args ...interface{}) { panic("implements me") } func (me *tFollowerState) Role() roles.RaftRole { return roles.Follower } // Heartbeat leader to follower func (me *tFollowerState) Heartbeat(cmd *rpc.HeartbeatCmd, ret *rpc.HeartbeatRet) error { if cmd.Term < me.mTerm { // invalid leader ret.Code = rpc.HBTermMismatch ret.Term = me.mTerm return nil } else if cmd.Term > me.mTerm { // new leader me.mTerm = cmd.Term } // update heartbeat clock and return me.mLeaderHeartbeatClock = time.Now().UnixNano() ret.Code = rpc.HBOk return nil } // AppendLog leader to follower func (me *tFollowerState) AppendLog(cmd *rpc.AppendLogCmd, ret *rpc.AppendLogRet) error { ret.Term = me.mTerm if cmd.Term < me.mTerm { // invalid leader ret.Code = rpc.ALTermMismatch return nil } store := me.context.store() entry := cmd.Entry // check log: expecting appending action follows previous committing action if entry.PrevIndex != store.LastCommittedIndex() || entry.PrevTerm != store.LastCommittedTerm() { // check log e, log := store.GetLog(entry.Index) if e != nil { ret.Code = rpc.ALInternalError return nil } if log == nil || log.PrevIndex != entry.PrevIndex || log.PrevTerm != entry.PrevTerm { // bad log ret.Code = rpc.ALIndexMismatch ret.PrevLogIndex = store.LastCommittedIndex() ret.PrevLogTerm = store.LastCommittedTerm() return nil } // good log, but old, just ignore it ret.Code = rpc.ALOk return nil } // good log e := store.Append(entry) if e != nil { ret.Code = rpc.ALInternalError return nil } else { ret.Code = rpc.ALOk return nil } } // CommitLog leader to follower func (me *tFollowerState) CommitLog(cmd *rpc.CommitLogCmd, ret *rpc.CommitLogRet) error { store := me.context.store() if cmd.Index != store.LastAppendedIndex() || cmd.Term != store.LastAppendedTerm() { // bad index ret.Code = rpc.CLLogNotFound return nil } e := store.Commit(cmd.Index) if e != nil { ret.Code = rpc.CLInternalError return nil } ret.Code = rpc.CLOk return nil } // RequestVote candidate to follower func (me *tFollowerState) RequestVote(cmd *rpc.RequestVoteCmd, ret *rpc.RequestVoteRet) error { panic("implements me") }
(未完待續)oop