手擼golang etcd raft協議之8node
最近閱讀 [雲原生分佈式存儲基石:etcd深刻解析] (杜軍 , 2019.1)
本系列筆記擬採用golang練習之
gitee: https://gitee.com/ioly/learning.gooopgit
分佈式存儲系統一般會經過維護多個副原本進行容錯, 以提升系統的可用性。 這就引出了分佈式存儲系統的核心問題——如何保證多個副本的一致性? Raft算法把問題分解成了四個子問題: 1. 領袖選舉(leader election)、 2. 日誌複製(log replication)、 3. 安全性(safety) 4. 成員關係變化(membership changes) 這幾個子問題。
raft有限狀態機接口golang
package lsm import ( "learning/gooop/etcd/raft/rpc" ) // IRaftLSM raft有限狀態自動機 type IRaftLSM interface { rpc.IRaftRPC iRaftStateContext State() IRaftState }
提供狀態模式下的上下文支持算法
package lsm import ( "learning/gooop/etcd/raft/config" "learning/gooop/etcd/raft/rpc/client" "learning/gooop/etcd/raft/store" ) type iRaftStateContext interface { Config() config.IRaftConfig Store() store.ILogStore HandleStateChanged(state IRaftState) RaftClientService() client.IRaftClientService }
Candidate(候選人)狀態的實現。基於事件驅動的邏輯編排,基於讀寫分離的字段管理。安全
package lsm import ( "learning/gooop/etcd/raft/roles" "learning/gooop/etcd/raft/rpc" "learning/gooop/etcd/raft/timeout" "sync" "time" ) // tCandidateState presents a candidate node type tCandidateState struct { tEventDrivenModel context iRaftStateContext mInitOnce sync.Once mStartOnce sync.Once // update: init / ceAskingForVote mTerm int64 // update: ceInit / ceAskingForVote / ceVoteToCandidate mVotedTerm int64 // update: ceInit / ceAskingForVote / ceVoteToCandidate mVotedCandidateID string // update: ceInit / ceAskingForVote / ceVoteToCandidate mVotedTimestamp int64 // update: ceInit / ceAskingForVote / ceReceiveTicket / ceDisposing mTicketCount map[string]bool mTicketMutex *sync.Mutex // update: ceInit / ceDisposing mDisposedFlag bool } // trigger: init() // args: empty const ceInit = "candidate.init" // trigger: Start() // args: empty const ceStart = "candidate.Start" // trigger: whenAskingForVoteThenWatchElectionTimeout() // args: empty const ceElectionTimeout = "candidate.ElectionTimeout" // trigger: Heartbeat() / AppendLog() / CommitLog() // args: empty const ceLeaderAnnounced = "candidate.LeaderAnnounced" // trigger: RequestVote() // args: *rpc.RequestVoteCmd const ceVoteToCandidate = "candidate.VoteToCandidate" // trigger: whenLeaderAnnouncedThenSwitchToFollower() // args: empty const ceDisposing = "candidate.Disposing" // trigger: beginAskForVote() // args: empty const ceAskingForVote = "candidate.AskingForVote" // trigger: handleRequestVoteOK() // args: empty const ceReceiveTicket = "candidate.ReceiveTicket" // trigger: whenReceiveTicketThenCheckTicketCount // args: empty const ceWinningTheVote = "candidate.ceWinningTheVote" func newCandidateState(ctx iRaftStateContext, term int64) IRaftState { it := new(tCandidateState) it.init(ctx, term) return it } func (me *tCandidateState) init(ctx iRaftStateContext, term int64) { me.mInitOnce.Do(func() { me.context = ctx me.mTerm = term me.initEventHandlers() me.raise(ceInit) }) } func (me *tCandidateState) initEventHandlers() { // write only logic me.hookEventsForTerm() me.hookEventsForVotedTerm() me.hookEventsForVotedCandidateID() me.hookEventsForVotedTimestamp() me.hookEventsForTicketCount() me.hookEventsForDisposedFlag() // read only logic me.hook(ceStart, me.whenStartThenAskForVote) me.hook(ceAskingForVote, me.whenAskingForVoteThenWatchElectionTimeout) me.hook(ceReceiveTicket, me.whenReceiveTicketThenCheckTicketCount) me.hook(ceElectionTimeout, me.whenElectionTimeoutThenAskForVoteAgain) me.hook(ceWinningTheVote, me.whenWinningTheVoteThenSwitchToLeader) me.hook(ceLeaderAnnounced, me.whenLeaderAnnouncedThenSwitchToFollower) } // hookEventsForTerm maintains field: mTerm // update: ceElectionTimeout func (me *tCandidateState) hookEventsForTerm() { me.hook(ceAskingForVote, func(e string, args ...interface{}) { me.mTerm++ }) } // hookEventsForVotedTerm maintains field: mVotedTerm // update: ceInit / ceElectionTimeout / ceVoteToCandidate func (me *tCandidateState) hookEventsForVotedTerm() { me.hook(ceInit, func(e string, args ...interface{}) { // initially, vote to itself me.mVotedTerm = me.mTerm }) me.hook(ceAskingForVote, func(e string, args ...interface{}) { // when timeout, reset to itself me.mVotedTerm = me.mTerm }) me.hook(ceVoteToCandidate, func(e string, args ...interface{}) { // after vote to candidate cmd := args[0].(*rpc.RequestVoteCmd) me.mVotedTerm = cmd.Term }) } // hookEventsForVotedCandidateID maintains field: mVotedCandidateID // update: ceInit / ceElectionTimeout / ceVoteToCandidate func (me *tCandidateState) hookEventsForVotedCandidateID() { me.hook(ceInit, func(e string, args ...interface{}) { // initially, vote to itself me.mVotedCandidateID = me.context.Config().ID() }) me.hook(ceAskingForVote, func(e string, args ...interface{}) { me.mVotedCandidateID = me.context.Config().ID() }) me.hook(ceVoteToCandidate, func(e string, args ...interface{}) { // after vote to candidate cmd := args[0].(*rpc.RequestVoteCmd) me.mVotedCandidateID = cmd.CandidateID }) } func (me *tCandidateState) hookEventsForVotedTimestamp() { me.hook(ceInit, func(e string, args ...interface{}) { // initially, vote to itself me.mVotedTimestamp = time.Now().UnixNano() }) me.hook(ceAskingForVote, func(e string, args ...interface{}) { me.mVotedTimestamp = time.Now().UnixNano() }) me.hook(ceVoteToCandidate, func(e string, args ...interface{}) { // after vote to candidate me.mVotedTimestamp = time.Now().UnixNano() }) } func (me *tCandidateState) hookEventsForTicketCount() { me.hook(ceInit, func(e string, args ...interface{}) { me.mTicketMutex = new(sync.Mutex) me.mTicketCount = make(map[string]bool, 0) me.mTicketCount[me.context.Config().ID()] = true }) me.hook(ceAskingForVote, func(e string, args ...interface{}) { me.mTicketMutex.Lock() defer me.mTicketMutex.Unlock() me.mTicketCount = make(map[string]bool, 0) me.mTicketCount[me.context.Config().ID()] = true }) me.hook(ceReceiveTicket, func(e string, args ...interface{}) { peerID := args[0].(string) me.mTicketMutex.Lock() defer me.mTicketMutex.Unlock() me.mTicketCount[peerID] = true }) me.hook(ceDisposing, func(e string, args ...interface{}) { me.mTicketMutex.Lock() defer me.mTicketMutex.Unlock() me.mTicketCount = make(map[string]bool, 0) }) } func (me *tCandidateState) hookEventsForDisposedFlag() { me.hook(ceInit, func(e string, args ...interface{}) { me.mDisposedFlag = false }) me.hook(ceDisposing, func(e string, args ...interface{}) { me.mDisposedFlag = true }) } func (me *tCandidateState) Heartbeat(cmd *rpc.HeartbeatCmd, ret *rpc.HeartbeatRet) error { // check term if cmd.Term <= me.mTerm { // bad leader ret.Code = rpc.HBTermMismatch return nil } // new leader me.raise(ceLeaderAnnounced) // return ok ret.Code = rpc.HBOk return nil } func (me *tCandidateState) AppendLog(cmd *rpc.AppendLogCmd, ret *rpc.AppendLogRet) error { // check term if cmd.Term <= me.mTerm { // bad leader ret.Code = rpc.ALTermMismatch return nil } // new leader me.raise(ceLeaderAnnounced) // ignore and return ret.Code = rpc.ALInternalError return nil } func (me *tCandidateState) CommitLog(cmd *rpc.CommitLogCmd, ret *rpc.CommitLogRet) error { // ignore and return ret.Code = rpc.CLInternalError return nil } func (me *tCandidateState) RequestVote(cmd *rpc.RequestVoteCmd, ret *rpc.RequestVoteRet) error { // check voted term if cmd.Term < me.mVotedTerm { ret.Code = rpc.RVTermMismatch return nil } if cmd.Term == me.mVotedTerm { if me.mVotedCandidateID != "" && me.mVotedCandidateID != cmd.CandidateID { // already vote another ret.Code = rpc.RVVotedAnother return nil } else { // already voted ret.Code = rpc.RVOk return nil } } if cmd.Term > me.mVotedTerm { // new term, check log if cmd.LastLogIndex >= me.context.Store().LastCommittedIndex() { // good log me.raise(ceVoteToCandidate, cmd) ret.Code = rpc.RVOk } else { // bad log ret.Code = rpc.RVLogMismatch } return nil } // should not reaches here ret.Code = rpc.RVTermMismatch return nil } func (me *tCandidateState) Role() roles.RaftRole { return roles.Candidate } func (me *tCandidateState) Start() { me.mStartOnce.Do(func() { me.raise(feStart) }) } func (me *tCandidateState) whenLeaderAnnouncedThenSwitchToFollower(_ string, _ ...interface{}) { me.raise(ceDisposing) me.context.HandleStateChanged(newFollowerState(me.context)) } func (me *tCandidateState) whenElectionTimeoutThenAskForVoteAgain(_ string, _ ...interface{}) { me.beginAskForVote() } func (me *tCandidateState) whenStartThenAskForVote(_ string, _ ...interface{}) { me.beginAskForVote() } func (me *tCandidateState) beginAskForVote() { // raise ceAskingForVote me.raise(ceAskingForVote) // for each node, call node.RequestVote cmd := new(rpc.RequestVoteCmd) cmd.CandidateID = me.context.Config().ID() cmd.Term = me.mTerm store := me.context.Store() cmd.LastLogIndex = store.LastCommittedIndex() cmd.LastLogTerm = store.LastCommittedTerm() term := me.mTerm for _,node := range me.context.Config().Nodes() { if node.ID() == me.context.Config().ID() { continue } peerID := node.ID() go func() { ret := new(rpc.RequestVoteRet) err := me.context.RaftClientService().Using(peerID, func(client rpc.IRaftRPC) error { return client.RequestVote(cmd, ret) }) if err == nil && ret.Code == rpc.RVOk { me.handleRequestVoteOK(peerID, term) } }() } } func (me *tCandidateState) whenAskingForVoteThenWatchElectionTimeout(_ string, _ ...interface{}) { term := me.mTerm go func() { time.Sleep(timeout.RandElectionTimeout()) if me.mDisposedFlag || me.mTerm != term { return } tc := me.getTicketCount() if tc < len(me.context.Config().Nodes())/2 + 1 { me.raise(ceElectionTimeout) } }() } func (me *tCandidateState) handleRequestVoteOK(peerID string, term int64) { if me.mDisposedFlag || me.mTerm != term { return } me.raise(ceReceiveTicket, peerID) } func (me *tCandidateState) whenReceiveTicketThenCheckTicketCount(_ string, _ ...interface{}) { tc := me.getTicketCount() if tc >= len(me.context.Config().Nodes())/2 + 1 { // win the vote me.raise(ceWinningTheVote) } } func (me *tCandidateState) getTicketCount() int { me.mTicketMutex.Lock() defer me.mTicketMutex.Unlock() return len(me.mTicketCount) } func (me *tCandidateState) whenWinningTheVoteThenSwitchToLeader(_ string, _ ...interface{}) { me.raise(ceDisposing) me.context.HandleStateChanged(newLeaderState(me.context, me.mTerm)) }
管理到指定raft節點的rpc鏈接tcp
package client import ( "learning/gooop/etcd/raft/config" rrpc "learning/gooop/etcd/raft/rpc" "net/rpc" ) type tRaftClient struct { cfg config.IRaftNodeConfig conn *rpc.Client state iClientState } func newRaftClient(cfg config.IRaftNodeConfig, conn *rpc.Client) IRaftClient { it := new(tRaftClient) it.init(cfg, conn) return it } func (me *tRaftClient) init(cfg config.IRaftNodeConfig, conn *rpc.Client) { me.cfg = cfg me.conn = conn if conn == nil { me.state = newBrokenState(me) } else { me.state = newConnectedState(me) } me.state.Start() } func (me *tRaftClient) Config() config.IRaftNodeConfig { return me.cfg } func (me *tRaftClient) GetConn() *rpc.Client { return me.conn } func (me *tRaftClient) SetConn(conn *rpc.Client) { me.conn = conn } func (me *tRaftClient) HandleStateChanged(state iClientState) { me.state = state state.Start() } func (me *tRaftClient) Heartbeat(cmd *rrpc.HeartbeatCmd, ret *rrpc.HeartbeatRet) error { return me.state.Heartbeat(cmd, ret) } func (me *tRaftClient) AppendLog(cmd *rrpc.AppendLogCmd, ret *rrpc.AppendLogRet) error { return me.state.AppendLog(cmd, ret) } func (me *tRaftClient) CommitLog(cmd *rrpc.CommitLogCmd, ret *rrpc.CommitLogRet) error { return me.state.CommitLog(cmd, ret) } func (me *tRaftClient) RequestVote(cmd *rrpc.RequestVoteCmd, ret *rrpc.RequestVoteRet) error { return me.state.RequestVote(cmd, ret) } func (me *tRaftClient) Ping(cmd *PingCmd, ret *PingRet) error { return me.state.Ping(cmd, ret) }
管理當前節點到其餘raft節點的rpc鏈接分佈式
package client import ( "errors" "learning/gooop/etcd/raft/config" "learning/gooop/etcd/raft/rpc" netrpc "net/rpc" ) type tRaftClientService struct { cfg config.IRaftConfig clients map[string]IRaftClient } func NewRaftClientService(cfg config.IRaftConfig) IRaftClientService { it := new(tRaftClientService) it.init(cfg) return it } func (me *tRaftClientService) init(cfg config.IRaftConfig) { me.cfg = cfg me.clients = make(map[string]IRaftClient) for _,nc := range me.cfg.Nodes() { me.clients[nc.ID()] = me.createRaftClient(nc) } } func (me *tRaftClientService) createRaftClient(nodeCfg config.IRaftNodeConfig) IRaftClient { // dial to peer conn, err := netrpc.Dial("tcp", nodeCfg.Endpoint()) if err != nil { return newRaftClient(nodeCfg, nil) } else { return newRaftClient(nodeCfg, conn) } } func (me *tRaftClientService) Using(peerID string, action func(client rpc.IRaftRPC) error) error { it, ok := me.clients[peerID] if ok { return action(it) } else { return gErrorUnknownRaftPeer } } var gErrorUnknownRaftPeer = errors.New("unknown raft peer")
(未完待續)oop