手擼golang etcd raft協議之4git
最近閱讀 [雲原生分佈式存儲基石:etcd深刻解析] (杜軍 , 2019.1)
本系列筆記擬採用golang練習之
gitee: https://gitee.com/ioly/learning.gooopgithub
分佈式存儲系統一般會經過維護多個副原本進行容錯, 以提升系統的可用性。 這就引出了分佈式存儲系統的核心問題——如何保證多個副本的一致性? Raft算法把問題分解成了領袖選舉(leader election)、 日誌複製(log replication)、安全性(safety) 和成員關係變化(membership changes)這幾個子問題。 Raft算法的基本操做只需2種RPC便可完成。 RequestVote RPC是在選舉過程當中經過舊的Leader觸發的, AppendEntries RPC是領導人觸發的,目的是向其餘節點複製日誌條目和發送心跳(heartbeat)。
使用boltdb存儲操做日誌和kv鍵值數據golang
日誌條目算法
package model import "encoding/json" type LogEntry struct { Tag int Term int64 Index int64 PrevTerm int64 PrevIndex int64 Command []byte } func (me *LogEntry) Marshal() (error, []byte) { j, e := json.Marshal(me) if e != nil { return e, nil } return nil, j } func (me *LogEntry) Unmarshal(data []byte) error { return json.Unmarshal(data, me) }
操做指令接口json
package store import "github.com/boltdb/bolt" type ICmd interface { Marshal() []byte Unmarshal(data []byte) Apply(tx *bolt.Tx) error }
操做指令工廠安全
package store import "fmt" type ICmdFactory interface { OfTag(tag int) ICmd Tag(cmd ICmd) int } type tDefaultCmdFactory int const gPutCmdTag = 1 const gDelCmdTag = 2 func (me *tDefaultCmdFactory) OfTag(tag int) ICmd { switch tag { case gPutCmdTag: return new(PutCmd) case gDelCmdTag: return new(DelCmd) } panic(fmt.Sprintf("unknown tag: %d", tag)) } func (me *tDefaultCmdFactory) Tag(cmd ICmd) int { if _, ok := cmd.(*PutCmd); ok { return gPutCmdTag } if _, ok := cmd.(*DelCmd); ok { return gDelCmdTag } panic(fmt.Sprintf("unknown cmd: %v", cmd)) } var gCmdFactory = new(tDefaultCmdFactory)
日誌存儲接口app
package store import "learning/gooop/etcd/raft/model" type ILogStore interface { Term() int64 Index() int64 Append(entry *model.LogEntry) error Commit(index int64) error }
指令基類分佈式
package store import "encoding/json" type tCmdBase struct { } func (me *tCmdBase) Marshal() []byte { j, e := json.Marshal(me) if e != nil { return nil } return j } func (me *tCmdBase) Unmarshal(data []byte) { _ = json.Unmarshal(data, me) }
put指令oop
package store import "github.com/boltdb/bolt" type PutCmd struct { tCmdBase Key string Value []byte } func (me *PutCmd) Apply(tx *bolt.Tx) error { b := tx.Bucket(gDataBucket) return b.Put([]byte(me.Key), me.Value) }
del指令設計
package store import "github.com/boltdb/bolt" type DelCmd struct { tCmdBase Key string } func (me *DelCmd) Apply(tx *bolt.Tx) error { b := tx.Bucket(gDataBucket) return b.Delete([]byte(me.Key)) }
基於boltdb實現日誌暫存,提交和應用
package store import ( "bytes" "encoding/binary" "errors" "github.com/boltdb/bolt" "learning/gooop/etcd/raft/model" ) type tBoltDBStore struct { file string term int64 index int64 db bolt.DB } func NewBoltStore(file string) (error, ILogStore) { db, err := bolt.Open(file, 0600, nil) if err != nil { return err, nil } store := new(tBoltDBStore) err = db.Update(func(tx *bolt.Tx) error { b, e := tx.CreateBucketIfNotExists(gMetaBucket) if e != nil { return e } v := b.Get(gKeyCommittedTerm) if v == nil { e = b.Put(gKeyCommittedTerm, int64ToBytes(gDefaultTerm)) if e != nil { return e } store.term = gDefaultTerm } else { store.term = bytesToInt64(v) } v = b.Get(gKeyCommittedIndex) if v == nil { e = b.Put(gKeyCommittedIndex, int64ToBytes(gDefaultIndex)) if e != nil { return e } store.index = gDefaultIndex } else { store.index = bytesToInt64(v) } b, e = tx.CreateBucketIfNotExists(gDataBucket) if e != nil { return e } e = tx.DeleteBucket(gUnstableBucket) if e != nil { return e } _, e = tx.CreateBucket(gUnstableBucket) if e != nil { return e } _, e = tx.CreateBucketIfNotExists(gCommittedBucket) if e != nil { return e } return nil }) if err != nil { return err, nil } return nil, store } func int64ToBytes(i int64) []byte { buf := bytes.NewBuffer(make([]byte, 8)) _ = binary.Write(buf, binary.BigEndian, i) return buf.Bytes() } func bytesToInt64(data []byte) int64 { var i int64 buf := bytes.NewBuffer(data) _ = binary.Read(buf, binary.BigEndian, &i) return i } func (me *tBoltDBStore) Term() int64 { return me.term } func (me *tBoltDBStore) Index() int64 { return me.index } func (me *tBoltDBStore) Append(entry *model.LogEntry) error { cmd := gCmdFactory.OfTag(entry.Tag) cmd.Unmarshal(entry.Command) e, entryData := entry.Marshal() if e != nil { return e } return me.db.Update(func(tx *bolt.Tx) error { // save log to unstable b := tx.Bucket(gUnstableBucket) e = b.Put(int64ToBytes(entry.Index), entryData) if e != nil { return e } me.index = entry.Index me.term = entry.Term return nil }) } func (me *tBoltDBStore) Commit(index int64) error { return me.db.Update(func(tx *bolt.Tx) error { // read unstable log ub := tx.Bucket(gUnstableBucket) k := int64ToBytes(index) data := ub.Get(k) if data == nil { return gErrorCommitLogNotFound } entry := new(model.LogEntry) e := entry.Unmarshal(data) if e != nil { return e } // apply cmd cmd := gCmdFactory.OfTag(entry.Tag) cmd.Unmarshal(entry.Command) e = cmd.Apply(tx) if e != nil { return e } // save to committed log cb := tx.Bucket(gCommittedBucket) e = cb.Put(k, data) if e != nil { return e } // update committed.index, committed.term mb := tx.Bucket(gMetaBucket) e = mb.Put(gKeyCommittedIndex, int64ToBytes(index)) if e != nil { return e } e = mb.Put(gKeyCommittedTerm, int64ToBytes(entry.Term)) if e != nil { return e } // del unstable.index e = ub.Delete(k) if e != nil { return e } return nil }) } var gMetaBucket = []byte("meta") var gUnstableBucket = []byte("unstable") var gCommittedBucket = []byte("committed") var gDataBucket = []byte("data") var gKeyCommittedIndex = []byte("committed.index") var gKeyCommittedTerm = []byte("committed.term") var gDefaultTerm int64 = 0 var gDefaultIndex int64 = 0 var gErrorCommitLogNotFound = errors.New("committing log not found")
(未完待續)