手擼golang etcd raft協議之4

手擼golang etcd raft協議之4git

緣起

最近閱讀 [雲原生分佈式存儲基石:etcd深刻解析] (杜軍 , 2019.1)
本系列筆記擬採用golang練習之
gitee: https://gitee.com/ioly/learning.gooopgithub

raft分佈式一致性算法

分佈式存儲系統一般會經過維護多個副原本進行容錯,
以提升系統的可用性。
這就引出了分佈式存儲系統的核心問題——如何保證多個副本的一致性?

Raft算法把問題分解成了領袖選舉(leader election)、
日誌複製(log replication)、安全性(safety)
和成員關係變化(membership changes)這幾個子問題。

Raft算法的基本操做只需2種RPC便可完成。
RequestVote RPC是在選舉過程當中經過舊的Leader觸發的,
AppendEntries RPC是領導人觸發的,目的是向其餘節點複製日誌條目和發送心跳(heartbeat)。

目標

  • 根據raft協議,實現高可用分佈式強一致的kv存儲

子目標(Day 4)

  • 使用boltdb存儲操做日誌和kv鍵值數據golang

    • unstable存儲桶:已收到未提交的日誌,重啓後清空
    • committed存儲桶:已提交的日誌
    • data存儲桶:kv鍵值數據
    • meta存儲桶:記錄末次提交的index和term

設計

  • model/LogEntry: 日誌條目
  • ICmd:操做指令接口
  • ICmdFactory:操做指令工廠
  • ILogStore:日誌存儲接口
  • tCmdBase:指令基類
  • PutCmd:put指令
  • DelCmd:del指令
  • tBoltDBStore:基於boltdb實現日誌暫存,提交和應用

LogEntry.go

日誌條目算法

package model

import "encoding/json"

type LogEntry struct {
    Tag       int
    Term      int64
    Index     int64
    PrevTerm  int64
    PrevIndex int64
    Command   []byte
}

func (me *LogEntry) Marshal() (error, []byte) {
    j, e := json.Marshal(me)
    if e != nil {
        return e, nil
    }
    return nil, j
}

func (me *LogEntry) Unmarshal(data []byte) error {
    return json.Unmarshal(data, me)
}

ICmd.go

操做指令接口json

package store

import "github.com/boltdb/bolt"

type ICmd interface {
    Marshal() []byte
    Unmarshal(data []byte)
    Apply(tx *bolt.Tx) error
}

ICmdFactory.go

操做指令工廠安全

package store

import "fmt"

type ICmdFactory interface {
    OfTag(tag int) ICmd
    Tag(cmd ICmd) int
}

type tDefaultCmdFactory int

const gPutCmdTag = 1
const gDelCmdTag = 2

func (me *tDefaultCmdFactory) OfTag(tag int) ICmd {
    switch tag {
    case gPutCmdTag:
        return new(PutCmd)

    case gDelCmdTag:
        return new(DelCmd)
    }

    panic(fmt.Sprintf("unknown tag: %d", tag))
}

func (me *tDefaultCmdFactory) Tag(cmd ICmd) int {
    if _, ok := cmd.(*PutCmd); ok {
        return gPutCmdTag
    }

    if _, ok := cmd.(*DelCmd); ok {
        return gDelCmdTag
    }

    panic(fmt.Sprintf("unknown cmd: %v", cmd))
}

var gCmdFactory = new(tDefaultCmdFactory)

ILogStore.go

日誌存儲接口app

package store

import "learning/gooop/etcd/raft/model"

type ILogStore interface {
    Term() int64
    Index() int64
    Append(entry *model.LogEntry) error
    Commit(index int64) error
}

tCmdBase.go

指令基類分佈式

package store

import "encoding/json"

type tCmdBase struct {
}

func (me *tCmdBase) Marshal() []byte {
    j, e := json.Marshal(me)
    if e != nil {
        return nil
    }
    return j
}

func (me *tCmdBase) Unmarshal(data []byte) {
    _ = json.Unmarshal(data, me)
}

PutCmd.go

put指令oop

package store

import "github.com/boltdb/bolt"

type PutCmd struct {
    tCmdBase

    Key   string
    Value []byte
}

func (me *PutCmd) Apply(tx *bolt.Tx) error {
    b := tx.Bucket(gDataBucket)
    return b.Put([]byte(me.Key), me.Value)
}

DelCmd.go

del指令設計

package store

import "github.com/boltdb/bolt"

type DelCmd struct {
    tCmdBase

    Key string
}

func (me *DelCmd) Apply(tx *bolt.Tx) error {
    b := tx.Bucket(gDataBucket)
    return b.Delete([]byte(me.Key))
}

tBoltDBStore.go

基於boltdb實現日誌暫存,提交和應用

package store

import (
    "bytes"
    "encoding/binary"
    "errors"
    "github.com/boltdb/bolt"
    "learning/gooop/etcd/raft/model"
)

type tBoltDBStore struct {
    file  string
    term  int64
    index int64

    db bolt.DB
}

func NewBoltStore(file string) (error, ILogStore) {
    db, err := bolt.Open(file, 0600, nil)
    if err != nil {
        return err, nil
    }

    store := new(tBoltDBStore)
    err = db.Update(func(tx *bolt.Tx) error {
        b, e := tx.CreateBucketIfNotExists(gMetaBucket)
        if e != nil {
            return e
        }

        v := b.Get(gKeyCommittedTerm)
        if v == nil {
            e = b.Put(gKeyCommittedTerm, int64ToBytes(gDefaultTerm))
            if e != nil {
                return e
            }
            store.term = gDefaultTerm

        } else {
            store.term = bytesToInt64(v)
        }

        v = b.Get(gKeyCommittedIndex)
        if v == nil {
            e = b.Put(gKeyCommittedIndex, int64ToBytes(gDefaultIndex))
            if e != nil {
                return e
            }
            store.index = gDefaultIndex

        } else {
            store.index = bytesToInt64(v)
        }

        b, e = tx.CreateBucketIfNotExists(gDataBucket)
        if e != nil {
            return e
        }

        e = tx.DeleteBucket(gUnstableBucket)
        if e != nil {
            return e
        }
        _, e = tx.CreateBucket(gUnstableBucket)
        if e != nil {
            return e
        }

        _, e = tx.CreateBucketIfNotExists(gCommittedBucket)
        if e != nil {
            return e
        }

        return nil
    })

    if err != nil {
        return err, nil
    }

    return nil, store
}

func int64ToBytes(i int64) []byte {
    buf := bytes.NewBuffer(make([]byte, 8))
    _ = binary.Write(buf, binary.BigEndian, i)
    return buf.Bytes()
}

func bytesToInt64(data []byte) int64 {
    var i int64
    buf := bytes.NewBuffer(data)
    _ = binary.Read(buf, binary.BigEndian, &i)
    return i
}

func (me *tBoltDBStore) Term() int64 {
    return me.term
}

func (me *tBoltDBStore) Index() int64 {
    return me.index
}

func (me *tBoltDBStore) Append(entry *model.LogEntry) error {
    cmd := gCmdFactory.OfTag(entry.Tag)
    cmd.Unmarshal(entry.Command)

    e, entryData := entry.Marshal()
    if e != nil {
        return e
    }

    return me.db.Update(func(tx *bolt.Tx) error {
        // save log to unstable
        b := tx.Bucket(gUnstableBucket)
        e = b.Put(int64ToBytes(entry.Index), entryData)
        if e != nil {
            return e
        }

        me.index = entry.Index
        me.term = entry.Term

        return nil
    })
}

func (me *tBoltDBStore) Commit(index int64) error {
    return me.db.Update(func(tx *bolt.Tx) error {
        // read unstable log
        ub := tx.Bucket(gUnstableBucket)
        k := int64ToBytes(index)
        data := ub.Get(k)
        if data == nil {
            return gErrorCommitLogNotFound
        }

        entry := new(model.LogEntry)
        e := entry.Unmarshal(data)
        if e != nil {
            return e
        }

        // apply cmd
        cmd := gCmdFactory.OfTag(entry.Tag)
        cmd.Unmarshal(entry.Command)
        e = cmd.Apply(tx)
        if e != nil {
            return e
        }

        // save to committed log
        cb := tx.Bucket(gCommittedBucket)
        e = cb.Put(k, data)
        if e != nil {
            return e
        }

        // update committed.index, committed.term
        mb := tx.Bucket(gMetaBucket)
        e = mb.Put(gKeyCommittedIndex, int64ToBytes(index))
        if e != nil {
            return e
        }

        e = mb.Put(gKeyCommittedTerm, int64ToBytes(entry.Term))
        if e != nil {
            return e
        }

        // del unstable.index
        e = ub.Delete(k)
        if e != nil {
            return e
        }

        return nil
    })
}

var gMetaBucket = []byte("meta")
var gUnstableBucket = []byte("unstable")
var gCommittedBucket = []byte("committed")
var gDataBucket = []byte("data")

var gKeyCommittedIndex = []byte("committed.index")
var gKeyCommittedTerm = []byte("committed.term")

var gDefaultTerm int64 = 0
var gDefaultIndex int64 = 0

var gErrorCommitLogNotFound = errors.New("committing log not found")

(未完待續)

相關文章
相關標籤/搜索