手擼golang etcd raft協議之7node
最近閱讀 [雲原生分佈式存儲基石:etcd深刻解析] (杜軍 , 2019.1)
本系列筆記擬採用golang練習之
gitee: https://gitee.com/ioly/learning.gooopgit
分佈式存儲系統一般會經過維護多個副原本進行容錯, 以提升系統的可用性。 這就引出了分佈式存儲系統的核心問題——如何保證多個副本的一致性? Raft算法把問題分解成了四個子問題: 1. 領袖選舉(leader election)、 2. 日誌複製(log replication)、 3. 安全性(safety) 4. 成員關係變化(membership changes) 這幾個子問題。
實現各raft節點之間的rpc通信golang
定義IRaftClient接口,封裝節點間的rpc調用算法
tConnectedState: 管理已鏈接狀態的rpc鏈接安全
tBrokenState:管理已斷開狀態的rpc鏈接app
事件驅動的邏輯編排基類tcp
package model type TEventHandleFunc func(e string, args ...interface{}) type IEventDrivenModel interface { hook(e string, handleFunc TEventHandleFunc) raise(e string, args ...interface{}) } type TEventDrivenModel struct { items map[string][]TEventHandleFunc } func (me *TEventDrivenModel) Hook(e string, handler TEventHandleFunc) { arr, ok := me.items[e] if ok { me.items[e] = append(arr, handler) } else { me.items[e] = []TEventHandleFunc{handler} } } func (me *TEventDrivenModel) Raise(e string, args ...interface{}) { if handlers, ok := me.items[e]; ok { for _, it := range handlers { it(e, args...) } } }
管理全部的節點間rpc鏈接分佈式
package client import ( "learning/gooop/etcd/raft/config" "learning/gooop/etcd/raft/rpc" netrpc "net/rpc" "sync" ) type tRaftClientService struct { cfg config.IRaftConfig rwmutex *sync.RWMutex clients map[string]IRaftClient } func NewRaftClientService(cfg config.IRaftConfig) IRaftClientService { it := new(tRaftClientService) it.init(cfg) return it } func (me *tRaftClientService) init(cfg config.IRaftConfig) { me.cfg = cfg me.rwmutex = new(sync.RWMutex) me.clients = make(map[string]IRaftClient) } func (me *tRaftClientService) Using(peerID string, action func(client rpc.IRaftRPC) error) error { // check client exists? me.rwmutex.RLock() it,ok := me.clients[peerID] if ok { return action(it) } var nodeCfg config.IRaftNodeConfig for _,it := range me.cfg.Nodes() { if it.ID() == peerID { nodeCfg = it break } } me.rwmutex.RUnlock() // dial to peer conn, err := netrpc.Dial("tcp", nodeCfg.Endpoint()) if err != nil { return err } // to create new client me.rwmutex.Lock() defer me.rwmutex.Unlock() // recheck client _,ok = me.clients[peerID] if ok { defer conn.Close() return action(it) } // create new client return action(newRaftClient(nodeCfg, conn)) }
管理當前節點與某個節點間的rpc鏈接oop
package client import "learning/gooop/etcd/raft/rpc" type IRaftClient interface { rpc.IRaftRPC iStateContext Ping(cmd *PingCmd, ret *PingRet) error } type PingCmd struct { SenderID string Timestamp int64 } type PingRet struct { SenderID string Timestamp int64 }
基於狀態模式的rpc鏈接狀態接口設計
package client import "learning/gooop/etcd/raft/rpc" type iClientState interface { rpc.IRaftRPC Start() Ping(cmd *PingCmd, ret *PingRet) error }
狀態模式下的鏈接狀態上下文接口
package client import ( "learning/gooop/etcd/raft/config" "net/rpc" ) type iStateContext interface { Config() config.IRaftNodeConfig GetConn() *rpc.Client SetConn(client *rpc.Client) HandleStateChanged(state iClientState) }
IRaftClient接口的具體實現,並實現iStateContext接口。
package client import ( "learning/gooop/etcd/raft/config" "net/rpc" rrpc "learning/gooop/etcd/raft/rpc" ) type tRaftClient struct { cfg config.IRaftNodeConfig conn *rpc.Client state iClientState } func newRaftClient(cfg config.IRaftNodeConfig, conn *rpc.Client) IRaftClient { it := new(tRaftClient) it.init(cfg, conn) return it } func (me *tRaftClient) init(cfg config.IRaftNodeConfig, conn *rpc.Client) { me.cfg = cfg me.conn = conn } func (me *tRaftClient) Config() config.IRaftNodeConfig { return me.cfg } func (me *tRaftClient) GetConn() *rpc.Client { return me.conn } func (me *tRaftClient) SetConn(conn *rpc.Client) { me.conn = conn } func (me *tRaftClient) HandleStateChanged(state iClientState) { me.state = state state.Start() } func (me *tRaftClient) Heartbeat(cmd *rrpc.HeartbeatCmd, ret *rrpc.HeartbeatRet) error { return me.state.Heartbeat(cmd, ret) } func (me *tRaftClient) AppendLog(cmd *rrpc.AppendLogCmd, ret *rrpc.AppendLogRet) error { return me.state.AppendLog(cmd, ret) } func (me *tRaftClient) CommitLog(cmd *rrpc.CommitLogCmd, ret *rrpc.CommitLogRet) error { return me.state.CommitLog(cmd, ret) } func (me *tRaftClient) RequestVote(cmd *rrpc.RequestVoteCmd, ret *rrpc.RequestVoteRet) error { return me.state.RequestVote(cmd, ret) } func (me *tRaftClient) Ping(cmd *PingCmd, ret *PingRet) error { return me.state.Ping(cmd, ret) }
管理已鏈接狀態的rpc鏈接
基於讀寫分離的字段管理
package client
import (
"learning/gooop/etcd/raft/model" "learning/gooop/etcd/raft/rpc" "learning/gooop/etcd/raft/timeout" "sync" "time"
)
type tConnectedState struct {
model.TEventDrivenModel context iStateContext mInitOnce sync.Once mStartOnce sync.Once // update: ceInit, ceDisposing mDisposedFlag bool
}
// trigger: init()
// args: empty
const ceInit = "connected.init"
// trigger: Start()
// args: empty
const ceStart = "connected.Start"
// trigger:
// args: empty
const ceDisposing = "connected.Disposing"
// trigger: whenStartThenBeginPing()
// args: empty
const cePingFailed = "connected.PingFailed"
func newConnectedState(ctx iStateContext) iClientState {
it := new(tConnectedState) it.init(ctx) return it
}
func (me *tConnectedState) init(ctx iStateContext) {
me.mInitOnce.Do(func() { me.context = ctx me.initEventHandlers() me.Raise(ceInit) })
}
func (me *tConnectedState) initEventHandlers() {
// write only logic me.hookEventsForDisposedFlag() // read only logic me.Hook(ceStart, me.whenStartThenBeginPing) me.Hook(cePingFailed, me.whenPingFailedThenSwitchToBrokenState) me.Hook(ceDisposing, me.whenDisposingThenCloseConn)
}
func (me *tConnectedState) Start() {
me.mStartOnce.Do(func() { me.Raise(ceStart) })
}
func (me *tConnectedState) hookEventsForDisposedFlag() {
me.Hook(ceInit, func(e string, args ...interface{}) { me.mDisposedFlag = false }) me.Hook(ceDisposing, func(e string, args ...interface{}) { me.mDisposedFlag = true })
}
func (me *tConnectedState) whenStartThenBeginPing(_ string, _ ...interface{}) {
go func() { cmd := &PingCmd{ SenderID: me.context.Config().ID(), Timestamp: time.Now().UnixNano(), } ret := &PingRet{} for range time.Tick(timeout.ClientPingInterval) { if me.mDisposedFlag { return } cmd.Timestamp = time.Now().UnixNano() err := me.Ping(cmd, ret) if err != nil { me.Raise(cePingFailed) } } }()
}
func (me *tConnectedState) whenPingFailedThenSwitchToBrokenState(_ string, _ ...interface{}) {
me.Raise(ceDisposing) me.context.HandleStateChanged(newBrokenState(me.context))
}
func (me *tConnectedState) whenDisposingThenCloseConn(_ string, _ ...interface{}) {
it := me.context.GetConn() if it != nil { it.Close() } me.context.SetConn(nil)
}
func (me tConnectedState) Heartbeat(cmd rpc.HeartbeatCmd, ret *rpc.HeartbeatRet) error {
return me.context.GetConn().Call("TRaftRPCServer.Heartbeat", cmd, ret)
}
func (me tConnectedState) AppendLog(cmd rpc.AppendLogCmd, ret *rpc.AppendLogRet) error {
return me.context.GetConn().Call("TRaftRPCServer.AppendLog", cmd, ret)
}
func (me tConnectedState) CommitLog(cmd rpc.CommitLogCmd, ret *rpc.CommitLogRet) error {
return me.context.GetConn().Call("TRaftRPCServer.CommitLog", cmd, ret)
}
func (me tConnectedState) RequestVote(cmd rpc.RequestVoteCmd, ret *rpc.RequestVoteRet) error {
return me.context.GetConn().Call("TRaftRPCServer.RequestVote", cmd, ret)
}
func (me tConnectedState) Ping(cmd PingCmd, ret *PingRet) error {
return me.context.GetConn().Call("TRaftRPCServer.Ping", cmd, ret)
}
# tBrokenState.go 管理已斷開狀態的rpc鏈接 - 定時Dial以嘗試重鏈接 - 基於事件驅動的邏輯編排 - 基於讀寫分離的字段管理
package client
import (
"errors" "learning/gooop/etcd/raft/model" rrpc "learning/gooop/etcd/raft/rpc" "learning/gooop/etcd/raft/timeout" "sync" "net/rpc" "time"
)
type tBrokenState struct {
model.TEventDrivenModel context iStateContext mInitOnce sync.Once mStartOnce sync.Once mDisposedFlag bool
}
// trigger : init()
// args: empty
const beInit = "broken.init"
// trigger: Start()
// args: empty
const beStart = "broken.Start"
// trigger: whenStartThenBeginDial
// args: *rpc.Client
const beDialOK = "broken.DialOK"
// trigger: whenDialOKThenSwitchToConnectedState
// args: empty
const beDisposing = "broken.Disposing"
func newBrokenState(ctx iStateContext) iClientState {
it := new(tBrokenState) it.init(ctx) return it
}
func (me *tBrokenState) init(ctx iStateContext) {
me.mInitOnce.Do(func() { me.context = ctx me.initEventHandlers() me.Raise(beInit) })
}
func (me *tBrokenState) initEventHandlers() {
// write only logic me.hookEventsForDisposedFlag() // read only logic me.Hook(beStart, me.whenStartThenBeginDial) me.Hook(beDialOK, me.whenDialOKThenSetConn) me.Hook(beDialOK, me.whenDialOKThenSwitchToConnectedState)
}
func (me *tBrokenState) hookEventsForDisposedFlag() {
me.Hook(beInit, func(e string, args ...interface{}) { me.mDisposedFlag = false }) me.Hook(beDisposing, func(e string, args ...interface{}) { me.mDisposedFlag = true })
}
func (me *tBrokenState) Start() {
me.mStartOnce.Do(func() { me.Raise(beStart) })
}
func (me *tBrokenState) whenStartThenBeginDial(_ string, _ ...interface{}) {
go func() { for !me.mDisposedFlag { conn, err := rpc.Dial("tcp", me.context.Config().Endpoint()) if err == nil { me.Raise(beDialOK, conn) break } else { time.Sleep(timeout.ClientRedialInterval) } } }()
}
func (me *tBrokenState) whenDialOKThenSetConn(_ string, args ...interface{}) {
conn := args[0].(*rpc.Client) me.context.SetConn(conn)
}
func (me *tBrokenState) whenDialOKThenSwitchToConnectedState(_ string, _ ...interface{}) {
me.Raise(beDisposing) me.context.HandleStateChanged(newConnectedState(me.context))
}
func (me tBrokenState) Heartbeat(cmd rrpc.HeartbeatCmd, ret *rrpc.HeartbeatRet) error {
return gErrorConnectionBroken
}
func (me tBrokenState) AppendLog(cmd rrpc.AppendLogCmd, ret *rrpc.AppendLogRet) error {
return gErrorConnectionBroken
}
func (me tBrokenState) CommitLog(cmd rrpc.CommitLogCmd, ret *rrpc.CommitLogRet) error {
return gErrorConnectionBroken
}
func (me tBrokenState) RequestVote(cmd rrpc.RequestVoteCmd, ret *rrpc.RequestVoteRet) error {
return gErrorConnectionBroken
}
func (me tBrokenState) Ping(cmd PingCmd, ret *PingRet) error {
return gErrorConnectionBroken
}
var gErrorConnectionBroken = errors.New("peer connection broken")
(未完待續)