etcd raft如何實現成員變動

成員變動在一致性協議裏稍複雜一些,因爲不一樣的成員不可能在同一時刻從舊成員組切換至新成員組,因此可能出現兩個不相交的majority,從而致使同一個term出現兩個leader,進而致使同一個index的日誌不一致,違反一致性協議。下圖是個例子:
node

raft做者提出了一種比較簡單的方法,一次只增長或減小一個成員,這樣可以保證任什麼時候刻,都不可能出現兩個不相交的majority,因此,能夠從舊成員組直接切到新成員組。以下圖:
算法

切換的時機是把成員變動日誌寫盤的時候,無論是否commit。這個切換時機帶來的問題是若是這條成員變動日誌最終沒有commit,在發生leader切換的時候,成員組就須要回滾到舊的成員組。app

etcd raft爲了實現簡單,將切換成員組的實機選在apply成員變動日誌的時候。ui

下面看看etcd raft library如何實現的:日誌

應用調用code

func (n *node) ProposeConfChange(ctx context.Context, cc pb.ConfChange) error {
    data, err := cc.Marshal()
    if err != nil {
        return err
    }
    return n.Step(ctx, pb.Message{Type: pb.MsgProp, Entries: []pb.Entry{{Type: pb.EntryConfChange, Data: data}}})
}

能夠看出,ConfChange是和普通的log entry同樣封裝在MsgProp消息中,進入propc,orm

跑raft算法的goroutine從propc中拿到消息後,會作以下判斷:blog

for i, e := range m.Entries {
            if e.Type == pb.EntryConfChange {
                if r.pendingConf {
                    r.logger.Infof("propose conf %s ignored since pending unapplied configuration", e.String())
                    m.Entries[i] = pb.Entry{Type: pb.EntryNormal}
                }
                r.pendingConf = true
            }
}

檢查已經有成員變動正在作,就忽略新的成員變動。而後將pendingConf置爲true,意味着目前有成員變動正在作了,從這裏能夠看出,多個成員變動不能同時進行。follower接收端的處理和普通log entry同樣。接口

若是成員變動日誌達成了一致,則會被封裝在Ready中,應用拿到後,作以下處理:rem

if entry.Type == raftpb.EntryConfChange {
          var cc raftpb.ConfChange
          cc.Unmarshal(entry.Data)
          s.Node.ApplyConfChange(cc)
}

ApplyConfChange:

func (n *node) ApplyConfChange(cc pb.ConfChange) *pb.ConfState {
    var cs pb.ConfState
    select {
    case n.confc <- cc:
    case <-n.done:
    }
    select {
    case cs = <-n.confstatec:
    case <-n.done:
    }
    return &cs
}

講ConfChange放入confc,而後阻塞在confstatec上,跑raft協議的goroutine從confc中拿出ConfChange,作相應的增長/刪除節點操做,而後將成員組放入confstatec。

switch cc.Type {
            case pb.ConfChangeAddNode:
                r.addNode(cc.NodeID)
            case pb.ConfChangeRemoveNode:
                // block incoming proposal when local node is
                // removed
                if cc.NodeID == r.id {
                    propc = nil
                }
                r.removeNode(cc.NodeID)
            case pb.ConfChangeUpdateNode:
                r.resetPendingConf()
            default:
                panic("unexpected conf type")
            }
            select {
            case n.confstatec <- pb.ConfState{Nodes: r.nodes()}:
            case <-n.done:
}

增長/刪除節點操做都只是更新prs,map的每一個元素保存一個peer的狀態,其中最重要的狀態莫過於

Match, Next uint64

看過raft小論文的人一看變量名就很明確意義,Match表明最大的已經落盤的log index,Next表明下一條須要發給這個peer的log index。而後將pendingConf置爲false,表明成員變動結束。

重啓如何恢復成員組:

hs, cs, err := c.Storage.InitialState()

Storage接口中:

// InitialState returns the saved HardState and ConfState information.
    InitialState() (pb.HardState, pb.ConfState, error)

Storage是個接口,其中InitialState()用於恢復成員組,須要應用本身實現,一般將ConfState記在最後一次Snapshot的Metadata中:

message SnapshotMetadata {
    optional ConfState conf_state = 1 [(gogoproto.nullable) = false];
    optional uint64    index      = 2 [(gogoproto.nullable) = false];
    optional uint64    term       = 3 [(gogoproto.nullable) = false];
}

ConfState:

message ConfState {
    repeated uint64 nodes = 1;
}

拿到ConfState後就能夠初始化上面提到的prs,snapshot後續的已經commit的log entry同樣,經過Ready封裝,應用進行apply,若是其中有ConfChange,則調用

s.Node.ApplyConfChange(cc)
相關文章
相關標籤/搜索