解析高可用分佈式鍵值存儲 etcd 的原理

這篇文章將會介紹 etcd 的實現原理,其中包括 Raft 協議、存儲兩大模塊,在最後咱們也會簡單介紹 etcd 一些具體應用場景。node

etcd 的官方將它定位成一個可信賴的分佈式鍵值存儲服務,它可以爲整個分佈式集羣存儲一些關鍵數據,協助分佈式集羣的正常運轉。git

咱們能夠簡單看一下 etcd 和 Zookeeper 在定義上有什麼不一樣:github

  • etcd is a distributed reliable key-value store for the most critical data of a distributed system…
  • ZooKeeper is a centralized service for maintaining configuration information, naming, providing distributed synchronization, and providing group services.

其中前者是一個用於存儲關鍵數據的鍵值存儲,後者是一個用於管理配置等信息的中心化服務。算法

etcd 的使用其實很是簡單,它對外提供了 gRPC 接口,咱們能夠經過 Protobuf 和 gRPC 直接對 etcd 中存儲的數據進行管理,也可使用官方提供的 etcdctl 操做存儲的數據。數據庫

service KV {
  rpc Range(RangeRequest) returns (RangeResponse) {
      option (google.api.http) = {
        post: "/v3beta/kv/range"
        body: "*"
    };
  }

  rpc Put(PutRequest) returns (PutResponse) {
      option (google.api.http) = {
        post: "/v3beta/kv/put"
        body: "*"
    };
  }
}

文章並不會展開介紹 etcd 的使用方法,這一小節將逐步介紹幾大核心模塊的實現原理,包括 etcd 使用 Raft 協議同步各個節點數據的過程以及 etcd 底層存儲數據使用的結構。編程

在每個分佈式系統中,etcd 每每都扮演了很是重要的地位,因爲不少服務配置發現以及配置的信息都存儲在 etcd 中,因此整個集羣可用性的上限每每就是 etcd 的可用性,而使用 3 ~ 5 個 etcd 節點構成高可用的集羣每每都是常規操做。後端

正是由於 etcd 在使用的過程當中會啓動多個節點,如何處理幾個節點之間的分佈式一致性就是一個比較有挑戰的問題了。api

解決多個節點數據一致性的方案其實就是共識算法,在以前的文章中咱們簡單介紹過 Zookeeper 使用的Zab 協議 以及常見的共識算法 Paxos 和 Raft,etcd 使用的共識算法就是 Raft,這一節咱們將詳細介紹 Raft 以及 etcd 中 Raft 的一些實現細節。數組

Raft 從一開始就被設計成一個易於理解和實現的共識算法,它在容錯和性能上與 Paxos 協議比較相似,區別在於它將分佈式一致性的問題分解成了幾個子問題,而後一一進行解決。緩存

每個 Raft 集羣中都包含多個服務器,在任意時刻,每一臺服務器只可能處於 Leader、Follower 以及 Candidate 三種狀態;在處於正常的狀態時,集羣中只會存在一個 Leader,其他的服務器都是 Follower。

上述圖片修改自 In Search of an Understandable Consensus Algorithm 一文 5.1 小結中圖四。

全部的 Follower 節點都是被動的,它們不會主動發出任何的請求,只會響應 Leader 和 Candidate 發出的請求,對於每個用戶的可變操做,都會被路由給 Leader 節點進行處理,除了 Leader 和 Follower 節點以外,Candidate 節點其實只是集羣運行過程當中的一個臨時狀態。

Raft 集羣中的時間也被切分紅了不一樣的幾個任期(Term),每個任期都會由 Leader 的選舉開始,選舉結束後就會進入正常操做的階段,直到 Leader 節點出現問題纔會開始新一輪的選擇。

每個服務器都會存儲當前集羣的最新任期,它就像是一個單調遞增的邏輯時鐘,可以同步各個節點之間的狀態,當前節點持有的任期會隨着 每個 請求被傳遞到其餘的節點上。

Raft 協議在每個任期的開始時都會從一個集羣中選出一個節點做爲集羣的 Leader 節點,這個節點會負責集羣中的日誌的複製以及管理工做。

咱們將 Raft 協議分紅三個子問題:節點選舉、日誌複製以及安全性,文章會以 etcd 爲例介紹 Raft 協議是如何解決這三個子問題的。

節點選舉

在此我向你們推薦一個架構學習交流羣。交流學習羣號:821169538  裏面會分享一些資深架構師錄製的視頻錄像:有Spring,MyBatis,Netty源碼分析,高併發、高性能、分佈式、微服務架構的原理,JVM性能優化、分佈式架構等這些成爲架構師必備的知識體系。還能領取免費的學習資源,目前受益良多。
使用 Raft 協議的 etcd 集羣在啓動節點時,會遵循 Raft 協議的規則,全部節點一開始都被初始化爲 Follower 狀態,新加入的節點會在 NewNode 中作一些配置的初始化,包括用於接收各類信息的 Channel:

// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/raft/node.go#L190-225
func StartNode(c *Config, peers []Peer) Node {
	r := newRaft(c)
	r.becomeFollower(1, None)
	r.raftLog.committed = r.raftLog.lastIndex()
	for _, peer := range peers {
		r.addNode(peer.ID)
	}

	n := newNode()
	go n.run(r)
	return &n
}

在作完這些初始化的節點和 Raft 配置的事情以後,就會進入一個由 for 和 select 組成的超大型循環,這個循環會從 Channel 中獲取待處理的事件:

// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/raft/node.go#L291-423
func (n *node) run(r *raft) {
	lead := None

	for {
		if lead != r.lead {
			lead = r.lead
		}

		select {
		case m := <-n.recvc:
			r.Step(m)
		case <-n.tickc:
			r.tick()
		case <-n.stop:
			close(n.done)
			return
		}
	}
}

做者對整個循環內的代碼進行了簡化,由於當前只須要關心三個 Channel 中的消息,也就是用於接受其餘節點消息的 recvc 、用於觸發定時任務的 tickc 以及用於暫停當前節點的 stop

除了 stop Channel 中介紹到的消息以外, recvc 和 tickc 兩個 Channel 中介紹到事件時都會交給當前節點持有 Raft 結構體處理。

定時器與心跳

當節點從任意狀態(包括啓動)調用 becomeFollower 時,都會將節點的定時器設置爲 tickElection :

// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/raft/raft.go#L636-643
func (r *raft) tickElection() {
	r.electionElapsed++

	if r.promotable() && r.pastElectionTimeout() {
		r.electionElapsed = 0
		r.Step(pb.Message{From: r.id, Type: pb.MsgHup})
	}
}

若是當前節點能夠成爲 Leader 而且上一次收到 Leader 節點的消息或者心跳已經超過了等待的時間,當前節點就會發送 MsgHup 消息嘗試開始新的選舉。

可是若是 Leader 節點正常運行,就可以一樣經過它的定時器 tickHeartbeat 向全部的 Follower 節點廣播心跳請求,也就是 MsgBeat 類型的 RPC 消息:

func (r *raft) tickHeartbeat() {
	r.heartbeatElapsed++
	r.electionElapsed++

	if r.heartbeatElapsed >= r.heartbeatTimeout {
		r.heartbeatElapsed = 0
		r.Step(pb.Message{From: r.id, Type: pb.MsgBeat})
	}
}

上述代碼段 Leader 節點中調用的 Step 函數,最終會調用 stepLeader 方法,該方法會根據消息的類型進行不一樣的處理:

// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/raft/raft.go#L931-1142
func stepLeader(r *raft, m pb.Message) error {
	switch m.Type {
	case pb.MsgBeat:
		r.bcastHeartbeat()
		return nil
	// ...
	}

	//...
}

bcastHeartbeat 方法最終會向全部的 Follower 節點發送 MsgHeartbeat 類型的消息,通知它們目前 Leader 的存活狀態,重置全部 Follower 持有的超時計時器。

// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/raft/raft.go#L518-534
func (r *raft) sendHeartbeat(to uint64, ctx []byte) {
	commit := min(r.getProgress(to).Match, r.raftLog.committed)
	m := pb.Message{
		To:      to,
		Type:    pb.MsgHeartbeat,
		Commit:  commit,
		Context: ctx,
	}

	r.send(m)
}

做爲集羣中的 Follower,它們會在 stepFollower 方法中處理接收到的所有消息,包括 Leader 節點發送的心跳 RPC 消息:

// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/raft/raft.go#L1191-1247
func stepFollower(r *raft, m pb.Message) error {
	switch m.Type {
	case pb.MsgHeartbeat:
		r.electionElapsed = 0
		r.lead = m.From
		r.handleHeartbeat(m)
	// ...
	}
	return nil
}

當 Follower 接受到了來自 Leader 的 RPC 消息 MsgHeartbeat 時,會將當前節點的選舉超時時間重置並經過 handleHeartbeat 向 Leader 節點發出響應 —— 通知 Leader 當前節點可以正常運行。

而 Candidate 節點對於 MsgHeartBeat 消息的處理會稍有不一樣,它會先執行 becomeFollower 設置當前節點和 Raft 協議的配置:

// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/raft/raft.go#L1146-1189
func stepCandidate(r *raft, m pb.Message) error {
  // ...
	switch m.Type {
	case pb.MsgHeartbeat:
		r.becomeFollower(m.Term, m.From) // always m.Term == r.Term
		r.handleHeartbeat(m)
	}
  // ...
	return nil
}

Follower 與 Candidate 會根據節點類型的不一樣作出不一樣的響應,二者收到心跳請求時都會重置節點的選舉超時時間,不事後者會將節點的狀態直接轉變成 Follower:

當 Leader 節點收到心跳的響應時就會將對應節點的狀態設置爲 Active ,若是 Follower 節點在一段時間內沒有收到來自 Leader 節點的消息就會嘗試發起競選。

// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/raft/raft.go#L636-643
func (r *raft) tickElection() {
	r.electionElapsed++

	if r.promotable() && r.pastElectionTimeout() {
		r.electionElapsed = 0
		r.Step(pb.Message{From: r.id, Type: pb.MsgHup})
	}
}

到了這裏,心跳機制就起到了做用開始發送 MsgHup 嘗試重置整個集羣中的 Leader 節點,接下來咱們就會開始分析 Raft 協議中的競選流程了。

競選流程

若是集羣中的某一個 Follower 節點長時間內沒有收到來自 Leader 的心跳請求,當前節點就會經過 MsgHup 消息進入預選舉或者選舉的流程。

// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/raft/raft.go#L785-927
func (r *raft) Step(m pb.Message) error {
  // ...

	switch m.Type {
	case pb.MsgHup:
		if r.state != StateLeader {
			if r.preVote {
				r.campaign(campaignPreElection)
			} else {
				r.campaign(campaignElection)
			}
		} else {
			r.logger.Debugf("%x ignoring MsgHup because already leader", r.id)
		}
	}
  // ...
  return nil
}

若是收到 MsgHup 消息的節點不是 Leader 狀態,就會根據當前集羣的配置選擇進入 PreElection 或者 Election 階段, PreElection 階段並不會真正增長當前節點的 Term ,它的主要做用是獲得當前集羣可否成功選舉出一個 Leader 的答案,若是當前集羣中只有兩個節點並且沒有預選舉階段,那麼這兩個節點的 Term 會無休止的增長,預選舉階段就是爲了解決這一問題而出現的。

在這裏不會討論預選舉的過程,而是將目光主要放在選舉階段,具體瞭解一下使用 Raft 協議的 etcd 集羣是如何從衆多節點中選出 Leader 節點的。

咱們能夠繼續來分析 campaign 方法的具體實現,下面就是刪去預選舉相關邏輯後的代碼:

// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/raft/raft.go#L730-766
func (r *raft) campaign(t CampaignType) {
	r.becomeCandidate()
	
	if r.quorum() == r.poll(r.id, voteRespMsgType(voteMsg), true) {
		r.becomeLeader()
		return
	}
	for id := range r.prs {
		if id == r.id {
			continue
		}

		r.send(pb.Message{Term: r.Term, To: id, Type: pb.MsgVote, Index: r.raftLog.lastIndex(), LogTerm: r.raftLog.lastTerm(), Context: ctx})
	}
}

當前節點會馬上調用 becomeCandidate 將當前節點的 Raft 狀態變成候選人;在這以後,它會將票投給本身,若是當前集羣只有一個節點,該節點就會直接成爲集羣中的 Leader 節點。

若是集羣中存在了多個節點,就會向集羣中的其餘節點發出 MsgVote 消息,請求其餘節點投票,在 Step 函數中包含不一樣狀態的節點接收到消息時的響應:

// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/raft/raft.go#L785-927
func (r *raft) Step(m pb.Message) error {
  // ...

	switch m.Type {
	case pb.MsgVote, pb.MsgPreVote:
		canVote := r.Vote == m.From || (r.Vote == None && r.lead == None)
		if canVote && r.raftLog.isUpToDate(m.Index, m.LogTerm) {
			r.send(pb.Message{To: m.From, Term: m.Term, Type: pb.MsgVoteResp})
			r.electionElapsed = 0
			r.Vote = m.From
		} else {
			r.send(pb.Message{To: m.From, Term: r.Term, Type: pb.MsgVoteResp, Reject: true})
		}

	}
  // ...
  return nil
}

若是當前節點投的票就是消息的來源或者當前節點沒有投票也沒有 Leader,那麼就會向來源的節點投票,不然就會通知該節點當前節點拒絕投票。

在 stepCandidate 方法中,候選人節點會處理來自其餘節點的投票響應消息,也就是 MsgVoteResp :

// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/raft/raft.go#L1146-1189
func stepCandidate(r *raft, m pb.Message) error {
	switch m.Type {
	// ...
	case pb.MsgVoteResp:
		gr := r.poll(m.From, m.Type, !m.Reject)
		switch r.quorum() {
		case gr:
			r.becomeLeader()
			r.bcastAppend()
		// ...
		}
	}
	return nil
}

每當收到一個 MsgVoteResp 類型的消息時,就會設置當前節點持有的 votes 數組,更新其中存儲的節點投票狀態並返回投『贊成』票的人數,若是得到的票數大於法定人數 quorum ,當前節點就會成爲集羣的 Leader 並向其餘的節點發送當前節點當選的消息,通知其他節點更新 Raft 結構體中的 Term 等信息。

節點狀態

對於每個節點來講,它們根據不一樣的節點狀態會對網絡層發來的消息作出不一樣的響應,咱們會分別介紹下面的四種狀態在 Raft 中對於配置和消息到底是如何處理的。

對於每個 Raft 的節點狀態來講,它們分別有三個比較重要的區別,其中一個是在改變狀態時調用 becomeLeader 、 becomeCandidate 、 becomeFollower 和 becomePreCandidate方法改變 Raft 狀態有比較大的不一樣,第二是處理消息時調用 stepLeader 、 stepCandidate和 stepFollower 時有比較大的不一樣,最後是幾種不一樣狀態的節點具備功能不一樣的定時任務。

對於方法的詳細處理,咱們在這一節中不詳細介紹和分析,若是一個節點的狀態是 Follower,那麼當前節點切換到 Follower 必定會經過 becomeFollower 函數,在這個函數中會重置節點持有任期,而且設置處理消息的函數爲 stepFollower :

// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/raft/raft.go#L671-678
func (r *raft) becomeFollower(term uint64, lead uint64) {
	r.step = stepFollower
	r.reset(term)
	r.tick = r.tickElection
	r.lead = lead
	r.state = StateFollower
}

// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/raft/raft.go#L636-643
func (r *raft) tickElection() {
	r.electionElapsed++

	if r.promotable() && r.pastElectionTimeout() {
		r.electionElapsed = 0
		r.Step(pb.Message{From: r.id, Type: pb.MsgHup})
	}
}

除此以外,它還會設置一個用於在 Leader 節點宕機時觸發選舉的定時器 tickElection 。

Candidate 狀態的節點與 Follower 的配置差不了太多,只是在消息處理函數 step 、任期以及狀態上的設置有一些比較小的區別:

// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/raft/raft.go#L680-691
func (r *raft) becomeCandidate() {
	r.step = stepCandidate
	r.reset(r.Term + 1)
	r.tick = r.tickElection
	r.Vote = r.id
	r.state = StateCandidate
}

最後的 Leader 就與這二者有其餘的區別了,它不只設置了處理消息的函數 step 並且設置了與其餘狀態徹底不一樣的 tick 函數:

// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/raft/raft.go#L708-728
func (r *raft) becomeLeader() {
	r.step = stepLeader
	r.reset(r.Term)
	r.tick = r.tickHeartbeat
	r.lead = r.id
	r.state = StateLeader

	r.pendingConfIndex = r.raftLog.lastIndex()
	r.appendEntry(pb.Entry{Data: nil})
}

這裏的 tick 函數 tickHeartbeat 每隔一段時間會經過 Step 方法向集羣中的其餘節點發送 MsgBeat 消息:

// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/raft/raft.go#L646-669
func (r *raft) tickHeartbeat() {
	r.heartbeatElapsed++
	r.electionElapsed++

	if r.electionElapsed >= r.electionTimeout {
		r.electionElapsed = 0
		if r.checkQuorum {
			r.Step(pb.Message{From: r.id, Type: pb.MsgCheckQuorum})
		}
	} 

	if r.heartbeatElapsed >= r.heartbeatTimeout {
		r.heartbeatElapsed = 0
		r.Step(pb.Message{From: r.id, Type: pb.MsgBeat})
	}
}

上述代碼中的 MsgBeat 消息會在 Step 中被轉換成 MsgHeartbeat 最終發送給其餘的節點,Leader 節點超時以後的選舉流程咱們在前兩節中也已經介紹過了,在這裏就再也不重複了。

etcd 目前支持 V2 和 V3 兩個大版本,這兩個版本在實現上有比較大的不一樣,一方面是對外提供接口的方式,另外一方面就是底層的存儲引擎,V2 版本的實例是一個純內存的實現,全部的數據都沒有存儲在磁盤上,而 V3 版本的實例就支持了數據的持久化。

在這一節中,咱們會介紹 V3 版本的 etcd 到底是經過什麼樣的方式存儲用戶數據的。

在 V3 版本的設計中,etcd 經過 backend 後端這一設計,很好地封裝了存儲引擎的實現細節,爲上層提供一個更一致的接口,對於 etcd 的其餘模塊來講,它們能夠將更多注意力放在接口中的約定上,不過在這裏,咱們更關注的是 etcd 對 Backend 接口的實現。

// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/backend/backend.go#L51-69
type Backend interface {
	ReadTx() ReadTx
	BatchTx() BatchTx

	Snapshot() Snapshot
	Hash(ignores map[IgnoreKey]struct{}) (uint32, error)
	Size() int64
	SizeInUse() int64
	Defrag() error
	ForceCommit()
	Close() error
}

etcd 底層默認使用的是開源的嵌入式鍵值存儲數據庫 bolt ,可是這個項目目前的狀態已是歸檔再也不維護了,若是想要使用這個項目可使用 CoreOS 的 bbolt 版本。

這一小節中,咱們會簡單介紹 etcd 是如何使用 BoltDB 做爲底層存儲的,首先能夠先來看一下 pacakge 內部的 backend 結構體,這是一個實現了 Backend 接口的結構:

// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/backend/backend.go#L80-104
type backend struct {
	size int64
	sizeInUse int64
	commits int64

	mu sync.RWMutex
	db *bolt.DB

	batchInterval time.Duration
	batchLimit    int
	batchTx       *batchTxBuffered

	readTx *readTx

	stopc chan struct{}
	donec chan struct{}

	lg *zap.Logger
}

從結構體的成員 db 咱們就能夠看出,它使用了 BoltDB 做爲底層存儲,另外的兩個 readTx和 batchTx 分別實現了 ReadTx 和 BatchTx 接口:

// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/backend/read_tx.go#L30-36
type ReadTx interface {
	Lock()
	Unlock()

	UnsafeRange(bucketName []byte, key, endKey []byte, limit int64) (keys [][]byte, vals [][]byte)
	UnsafeForEach(bucketName []byte, visitor func(k, v []byte) error) error
}

// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/backend/batch_tx.go#L28-38
type BatchTx interface {
	ReadTx
	UnsafeCreateBucket(name []byte)
	UnsafePut(bucketName []byte, key []byte, value []byte)
	UnsafeSeqPut(bucketName []byte, key []byte, value []byte)
	UnsafeDelete(bucketName []byte, key []byte)
	Commit()
	CommitAndStop()
}

從這兩個接口的定義,咱們不難發現它們可以對外提供數據庫的讀寫操做,而 Backend 就能對這二者提供的方法進行封裝,爲上層屏蔽存儲的具體實現:

每當咱們使用 newBackend 建立一個新的 backend 結構時,都會建立一個 readTx 和 batchTx 結構體,這二者一個負責處理只讀請求,一個負責處理讀寫請求:

// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/backend/backend.go#L137-176
func newBackend(bcfg BackendConfig) *backend {
	bopts := &bolt.Options{}
	bopts.InitialMmapSize = bcfg.mmapSize()
	db, _ := bolt.Open(bcfg.Path, 0600, bopts)

	b := &backend{
		db: db,
		batchInterval: bcfg.BatchInterval,
		batchLimit:    bcfg.BatchLimit,
		readTx: &readTx{
			buf: txReadBuffer{
				txBuffer: txBuffer{make(map[string]*bucketBuffer)},
			},
			buckets: make(map[string]*bolt.Bucket),
		},
		stopc: make(chan struct{}),
		donec: make(chan struct{}),
	}
	b.batchTx = newBatchTxBuffered(b)
	go b.run()
	return b
}

當咱們在 newBackend 中進行了初始化 BoltDB、事務等工做後,就會開一個 goroutine 異步的對全部批量讀寫事務進行定時提交:

// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/backend/backend.go#L289-305
func (b *backend) run() {
	defer close(b.donec)
	t := time.NewTimer(b.batchInterval)
	defer t.Stop()
	for {
		select {
		case <-t.C:
		case <-b.stopc:
			b.batchTx.CommitAndStop()
			return
		}
		if b.batchTx.safePending() != 0 {
			b.batchTx.Commit()
		}
		t.Reset(b.batchInterval)
	}
}

對於上層來講, backend 其實只是對底層存儲的一個抽象,不少時候並不會直接跟它打交道,每每都是使用它持有的 ReadTx 和 BatchTx 與數據庫進行交互。

只讀事務

目前大多數的數據庫對於只讀類型的事務並無那麼多的限制,尤爲是在使用了 MVCC 以後,全部的只讀請求幾乎不會被寫請求鎖住,這大大提高了讀的效率,因爲在 BoltDB 的同一個 goroutine 中開啓兩個相互依賴的只讀事務和讀寫事務會發生死鎖,爲了不這種狀況咱們仍是引入了 sync.RWLock 保證死鎖不會出現:

// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/backend/read_tx.go#L38-47
type readTx struct {
	mu  sync.RWMutex
	buf txReadBuffer

	txmu    sync.RWMutex
	tx      *bolt.Tx
	buckets map[string]*bolt.Bucket
}

你能夠看到在整個結構體中,除了用於保護 tx 的 txmu 讀寫鎖以外,還存在另一個 mu 讀寫鎖,它的做用是保證 buf 中的數據不會出現問題, buf 和結構體中的 buckets 都是用於加速讀效率的緩存。

對於一個只讀事務來講,它對上層提供了兩個獲取存儲引擎中數據的接口,分別是 UnsafeRange 和 UnsafeForEach ,在這裏會重點介紹前面方法的實現細節:

// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/backend/read_tx.go#L52-90
func (rt *readTx) UnsafeRange(bucketName, key, endKey []byte, limit int64) ([][]byte, [][]byte) {
	if endKey == nil {
		limit = 1
	}
	keys, vals := rt.buf.Range(bucketName, key, endKey, limit)
	if int64(len(keys)) == limit {
		return keys, vals
	}

	bn := string(bucketName)
	bucket, ok := rt.buckets[bn]
	if !ok {
		bucket = rt.tx.Bucket(bucketName)
		rt.buckets[bn] = bucket
	}

	if bucket == nil {
		return keys, vals
	}
	c := bucket.Cursor()

	k2, v2 := unsafeRange(c, key, endKey, limit-int64(len(keys)))
	return append(k2, keys...), append(v2, vals...)
}

上述代碼中省略了加鎖保護讀緩存以及 Bucket 中存儲數據的合法性,也省去了一些參數的檢查,不過方法的總體接口仍是沒有太多變化, UnsafeRange 會先從本身持有的緩存 txReadBuffer 中讀取數據,若是數據不可以知足調用者的需求,就會從 buckets 緩存中查找對應的 BoltDB bucket 並從 BoltDB 數據庫中讀取。

// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/backend/batch_tx.go#L121-141
func unsafeRange(c *bolt.Cursor, key, endKey []byte, limit int64) (keys [][]byte, vs [][]byte) {
	var isMatch func(b []byte) bool
	if len(endKey) > 0 {
		isMatch = func(b []byte) bool { return bytes.Compare(b, endKey) < 0 }
	} else {
		isMatch = func(b []byte) bool { return bytes.Equal(b, key) }
		limit = 1
	}

	for ck, cv := c.Seek(key); ck != nil && isMatch(ck); ck, cv = c.Next() {
		vs = append(vs, cv)
		keys = append(keys, ck)
		if limit == int64(len(keys)) {
			break
		}
	}
	return keys, vs
}

這個包內部的函數 unsafeRange 實際上經過 BoltDB 中的遊標來遍歷知足查詢條件的鍵值對。

到這裏爲止,整個只讀事務提供的接口就基本介紹完了,在 etcd 中不管咱們想要後去單個 Key 仍是一個範圍內的 Key 最終都是經過 Range 來實現的,這其實也是隻讀事務的最主要功能。

讀寫事務

只讀事務只提供了讀數據的能力,包括 UnsafeRange 和 UnsafeForeach ,而讀寫事務 BatchTx 提供的就是讀和寫數據的能力了:

// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/backend/batch_tx.go#L40-46
type batchTx struct {
	sync.Mutex
	tx      *bolt.Tx
	backend *backend

	pending int
}

讀寫事務同時提供了不帶緩存的 batchTx 實現以及帶緩存的 batchTxBuffered 實現,後者其實『繼承了』前者的結構體,並額外加入了緩存 txWriteBuffer 加速讀請求:

// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/backend/batch_tx.go#L243-246
type batchTxBuffered struct {
	batchTx
	buf txWriteBuffer
}

後者在實現接口規定的方法時,會直接調用 batchTx 的同名方法,並將操做形成的反作用的寫入的緩存中,在這裏咱們並不會展開介紹這一版本的實現,仍是以分析 batchTx 的方法爲主。

當咱們向 etcd 中寫入數據時,最終都會調用 batchTx 的 unsafePut 方法將數據寫入到 BoltDB 中:

// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/backend/batch_tx.go#L65-67
func (t *batchTx) UnsafePut(bucketName []byte, key []byte, value []byte) {
	t.unsafePut(bucketName, key, value, false)
}

// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/backend/batch_tx.go#L74-103
func (t *batchTx) unsafePut(bucketName []byte, key []byte, value []byte, seq bool) {
	bucket := t.tx.Bucket(bucketName)
	if err := bucket.Put(key, value); err != nil {
		plog.Fatalf("cannot put key into bucket (%v)", err)
	}
	t.pending++
}

這兩個方法的實現很是清晰,做者以爲他們都並不值得展開詳細介紹,只是調用了 BoltDB 提供的 API 操做一下 bucket 中的數據,而另外一個刪除方法的實現與這個也差很少:

// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/backend/batch_tx.go#L144-169
func (t *batchTx) UnsafeDelete(bucketName []byte, key []byte) {
	bucket := t.tx.Bucket(bucketName)
	err := bucket.Delete(key)
	if err != nil {
		plog.Fatalf("cannot delete key from bucket (%v)", err)
	}
	t.pending++
}

它們都是經過 Bolt.Tx 找到對應的 Bucket ,而後作出相應的增刪操做,可是這寫請求在這兩個方法執行後其實並無提交,咱們還須要手動或者等待 etcd 自動將請求提交:

// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/backend/batch_tx.go#L184-188
func (t *batchTx) Commit() {
	t.Lock()
	t.commit(false)
	t.Unlock()
}

// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/backend/batch_tx.go#L210-241
func (t *batchTx) commit(stop bool) {
	if t.tx != nil {
		if t.pending == 0 && !stop {
			return
		}

		start := time.Now()

		err := t.tx.Commit()

		rebalanceSec.Observe(t.tx.Stats().RebalanceTime.Seconds())
		spillSec.Observe(t.tx.Stats().SpillTime.Seconds())
		writeSec.Observe(t.tx.Stats().WriteTime.Seconds())
		commitSec.Observe(time.Since(start).Seconds())
		atomic.AddInt64(&t.backend.commits, 1)

		t.pending = 0
	}
	if !stop {
		t.tx = t.backend.begin(true)
	}
}

在每次調用 Commit 對讀寫事務進行提交時,都會先檢查是否有等待中的事務,而後會將數據上報至 Prometheus 中,其餘的服務就能夠將 Prometheus 做爲數據源對 etcd 的執行情況進行監控了。

常用 etcd 的開發者可能會了解到,它自己對於每個鍵值對都有一個 revision 的概念,鍵值對的每一次變化都會被 BoltDB 單獨記錄下來,因此想要在存儲引擎中獲取某一個 Key 對應的值,要先獲取 revision ,再經過它才能找到對應的值,在裏咱們想要介紹的實際上是 etcd 如何管理和存儲一個 Key 的多個 revision 記錄。

在 etcd 服務中有一個用於存儲全部的鍵值對 revision 信息的 btree,咱們能夠經過 index的 Get 接口獲取一個 Key 對應 Revision 的值:

// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/index.go#L68-76
func (ti *treeIndex) Get(key []byte, atRev int64) (modified, created revision, ver int64, err error) {
	keyi := &keyIndex{key: key}
	if keyi = ti.keyIndex(keyi); keyi == nil {
		return revision{}, revision{}, 0, ErrRevisionNotFound
	}
	return keyi.get(ti.lg, atRev)
}

上述方法經過 keyIndex 方法查找 Key 對應的 keyIndex 結構體,這裏使用的內存結構體 btree 是 Google 實現的一個版本:

// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/index.go#L84-89
func (ti *treeIndex) keyIndex(keyi *keyIndex) *keyIndex {
	if item := ti.tree.Get(keyi); item != nil {
		return item.(*keyIndex)
	}
	return nil
}

能夠看到這裏的實現很是簡單,只是從 treeIndex 持有的成員 btree 中查找 keyIndex ,將結果強制轉換成 keyIndex 類型後返回;獲取 Key 對應 revision 的方式也很是簡單:

// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/key_index.go#L149-171
func (ki *keyIndex) get(lg *zap.Logger, atRev int64) (modified, created revision, ver int64, err error) {
	g := ki.findGeneration(atRev)
	if g.isEmpty() {
		return revision{}, revision{}, 0, ErrRevisionNotFound
	}

	n := g.walk(func(rev revision) bool { return rev.main > atRev })
	if n != -1 {
		return g.revs[n], g.created, g.ver - int64(len(g.revs)-n-1), nil
	}

	return revision{}, revision{}, 0, ErrRevisionNotFound
}

KeyIndex

在咱們具體介紹方法實現的細節以前,首先咱們須要理解 keyIndex 包含的字段以及管理同一個 Key 不一樣版本的方式:

每個 keyIndex 結構體中都包含當前鍵的值以及最後一次修改對應的 revision 信息,其中還保存了一個 Key 的多個 generation ,每個 generation 都會記錄當前 Key『從生到死』的所有過程,每當一個 Key 被刪除時都會調用 timestone 方法向當前的 generation 中追加一個新的墓碑版本:在此我向你們推薦一個架構學習交流羣。交流學習羣號:821169538  裏面會分享一些資深架構師錄製的視頻錄像:有Spring,MyBatis,Netty源碼分析,高併發、高性能、分佈式、微服務架構的原理,JVM性能優化、分佈式架構等這些成爲架構師必備的知識體系。還能領取免費的學習資源,目前受益良多。

// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/key_index.go#L127-145
func (ki *keyIndex) tombstone(lg *zap.Logger, main int64, sub int64) error {
	if ki.generations[len(ki.generations)-1].isEmpty() {
		return ErrRevisionNotFound
	}
	ki.put(lg, main, sub)
	ki.generations = append(ki.generations, generation{})
	return nil
}

這個 tombstone 版本標識這當前的 Key 已經被刪除了,可是在每次刪除一個 Key 以後,就會在當前的 keyIndex 中建立一個新的 generation 結構用於存儲新的版本信息,其中 ver 記錄當前 generation 包含的修改次數, created 記錄建立 generation 時的 revision 版本,最後的 revs 用於存儲全部的版本信息。

etcd 中全部的查詢請求,不管是查詢一個仍是多個、是數量仍是鍵值對,最終都會調用 rangeKeys 方法:

// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/kvstore_txn.go#L112-165
func (tr *storeTxnRead) rangeKeys(key, end []byte, curRev int64, ro RangeOptions) (*RangeResult, error) {
	rev := ro.Rev

	revpairs := tr.s.kvindex.Revisions(key, end, rev)
	if len(revpairs) == 0 {
		return &RangeResult{KVs: nil, Count: 0, Rev: curRev}, nil
	}

	kvs := make([]mvccpb.KeyValue, int(ro.Limit))
	revBytes := newRevBytes()
	for i, revpair := range revpairs[:len(kvs)] {
		revToBytes(revpair, revBytes)
		_, vs := tr.tx.UnsafeRange(keyBucketName, revBytes, nil, 0)
		kvs[i].Unmarshal(vs[0])
	}
	return &RangeResult{KVs: kvs, Count: len(revpairs), Rev: curRev}, nil
}

爲了獲取一個範圍內的全部鍵值對,咱們首先須要經過 Revisions 函數從 btree 中獲取範圍內全部的 keyIndex :

// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/index.go#L106-120
func (ti *treeIndex) Revisions(key, end []byte, atRev int64) (revs []revision) {
	if end == nil {
		rev, _, _, err := ti.Get(key, atRev)
		if err != nil {
			return nil
		}
		return []revision{rev}
	}
	ti.visit(key, end, func(ki *keyIndex) {
		if rev, _, _, err := ki.get(ti.lg, atRev); err == nil {
			revs = append(revs, rev)
		}
	})
	return revs
}

若是隻須要獲取一個 Key 對應的版本,就是直接使用 treeIndex 的方法,可是當上述方法會從 btree 索引中獲取一個連續多個 revision 值時,就會調用 keyIndex.get 來遍歷整顆樹並選取合適的版本:

func (ki *keyIndex) get(lg *zap.Logger, atRev int64) (modified, created revision, ver int64, err error) {
	g := ki.findGeneration(atRev)
	if g.isEmpty() {
		return revision{}, revision{}, 0, ErrRevisionNotFound
	}

	n := g.walk(func(rev revision) bool { return rev.main > atRev })
	if n != -1 {
		return g.revs[n], g.created, g.ver - int64(len(g.revs)-n-1), nil
	}

	return revision{}, revision{}, 0, ErrRevisionNotFound
}

由於每個 Key 的 keyIndex 中其實都存儲着多個 generation ,咱們須要根據傳入的參數返回合適的 generation 並從其中返回主版本大於 atRev 的 revision 結構。

對於上層的鍵值存儲來講,它會利用這裏返回的 revision 從真正存儲數據的 BoltDB 中查詢當前 Key 對應 revision 的結果。

當咱們向 etcd 中插入數據時,會使用傳入的 key 構建一個 keyIndex 結構體並從樹中獲取相關版本等信息:

// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/index.go#L53-66
func (ti *treeIndex) Put(key []byte, rev revision) {
	keyi := &keyIndex{key: key}

	item := ti.tree.Get(keyi)
	if item == nil {
		keyi.put(ti.lg, rev.main, rev.sub)
		ti.tree.ReplaceOrInsert(keyi)
		return
	}
	okeyi := item.(*keyIndex)
	okeyi.put(ti.lg, rev.main, rev.sub)
}

treeIndex.Put 在獲取內存中的 keyIndex 結構以後會經過 keyIndex.put 其中加入新的 revision :

// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/key_index.go#L77-104
func (ki *keyIndex) put(lg *zap.Logger, main int64, sub int64) {
	rev := revision{main: main, sub: sub}

	if len(ki.generations) == 0 {
		ki.generations = append(ki.generations, generation{})
	}
	g := &ki.generations[len(ki.generations)-1]
	if len(g.revs) == 0 {
		g.created = rev
	}
	g.revs = append(g.revs, rev)
	g.ver++
	ki.modified = rev
}

每個新 revision 結構體寫入 keyIndex 時,都會改變當前 generation 的 created 和 ver 等參數,從這個方法中咱們就能夠了解到 generation 中的各個成員都是如何被寫入的。

寫入的操做除了增長以外,刪除某一個 Key 的函數也會常常被調用:

// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/kvstore_txn.go#L252-309
func (tw *storeTxnWrite) delete(key []byte) {
	ibytes := newRevBytes()
	idxRev := revision{main: tw.beginRev + 1, sub: int64(len(tw.changes))}
	revToBytes(idxRev, ibytes)

	ibytes = appendMarkTombstone(tw.storeTxnRead.s.lg, ibytes)

	kv := mvccpb.KeyValue{Key: key}

	d, _ := kv.Marshal()

	tw.tx.UnsafeSeqPut(keyBucketName, ibytes, d)
	tw.s.kvindex.Tombstone(key, idxRev)
	tw.changes = append(tw.changes, kv)
}

正如咱們在文章前面所介紹的,刪除操做會向結構體中的 generation 追加一個新的 tombstone 標記,用於標識當前的 Key 已經被刪除;除此以外,上述方法還會將每個更新操做的 revision 存到單獨的 keyBucketName 中。

索引的恢復

由於在 etcd 中,全部的 keyIndex 都是在內存的 btree 中存儲的,因此在啓動服務時須要從 BoltDB 中將全部的數據都加載到內存中,在這裏就會初始化一個新的 btree 索引,而後調用 restore 方法開始恢復索引:

// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/kvstore.go#L321-433
func (s *store) restore() error {
	min, max := newRevBytes(), newRevBytes()
	revToBytes(revision{main: 1}, min)
	revToBytes(revision{main: math.MaxInt64, sub: math.MaxInt64}, max)

	tx := s.b.BatchTx()

	rkvc, revc := restoreIntoIndex(s.lg, s.kvindex)
	for {
		keys, vals := tx.UnsafeRange(keyBucketName, min, max, int64(restoreChunkKeys))
		if len(keys) == 0 {
			break
		}
		restoreChunk(s.lg, rkvc, keys, vals, keyToLease)
		newMin := bytesToRev(keys[len(keys)-1][:revBytesLen])
		newMin.sub++
		revToBytes(newMin, min)
	}
	close(rkvc)
	s.currentRev = <-revc

	return nil
}

在恢復索引的過程當中,有一個用於遍歷不一樣鍵值的『生產者』循環,其中由 UnsafeRange 和 restoreChunk 兩個方法構成,這兩個方法會從 BoltDB 中遍歷數據,而後將鍵值對傳到 rkvc中,交給 restoreIntoIndex 方法中建立的 goroutine 處理:

// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/kvstore.go#L486-506
func restoreChunk(lg *zap.Logger, kvc chan<- revKeyValue, keys, vals [][]byte, keyToLease map[string]lease.LeaseID) {
	for i, key := range keys {
		rkv := r evKeyValue{key: key}
		_ := rkv.kv.Unmarshal(vals[i])
		rkv.kstr = string(rkv.kv.Key)
		if isTombstone(key) {
			delete(keyToLease, rkv.kstr)
		} else if lid := lease.LeaseID(rkv.kv.Lease); lid != lease.NoLease {
			keyToLease[rkv.kstr] = lid
		} else {
			delete(keyToLease, rkv.kstr)
		}
		kvc <- rkv
	}
}

先被調用的 restoreIntoIndex 方法會建立一個用於接受鍵值對的 Channel,在這以後會在一個 goroutine 中處理從 Channel 接收到的數據,並將這些數據恢復到內存裏的 btree 中:

// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/kvstore.go#L441-484
func restoreIntoIndex(lg *zap.Logger, idx index) (chan<- revKeyValue, <-chan int64) {
	rkvc, revc := make(chan revKeyValue, restoreChunkKeys), make(chan int64, 1)
	go func() {
		currentRev := int64(1)
		defer func() { revc <- currentRev }()
		for rkv := range rkvc {
			ki = &keyIndex{key: rkv.kv.Key}
			ki := idx.KeyIndex(ki)

			rev := bytesToRev(rkv.key)
			currentRev = rev.main
			if ok {
				if isTombstone(rkv.key) {
					ki.tombstone(lg, rev.main, rev.sub)
					continue
				}
				ki.put(lg, rev.main, rev.sub)
			} else if !isTombstone(rkv.key) {
				ki.restore(lg, revision{rkv.kv.CreateRevision, 0}, rev, rkv.kv.Version)
				idx.Insert(ki)
			}
		}
	}()
	return rkvc, revc
}

恢復內存索引的相關代碼在實現上很是值得學習,兩個不一樣的函數經過 Channel 進行通訊並使用 goroutine 處理任務,可以很好地將消息的『生產者』和『消費者』進行分離。

Channel 做爲整個恢復索引邏輯的一個消息中心,它將遍歷 BoltDB 中的數據和恢復索引兩部分代碼進行了分離。

etcd 的 mvcc 模塊對外直接提供了兩種不一樣的訪問方式,一種是鍵值存儲 kvstore ,另外一種是 watchableStore 它們都實現了包內公開的 KV 接口:

// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/kv.go#L100-125
type KV interface {
	ReadView
	WriteView

	Read() TxnRead
	Write() TxnWrite

	Hash() (hash uint32, revision int64, err error)
	HashByRev(rev int64) (hash uint32, revision int64, compactRev int64, err error)

	Compact(rev int64) (<-chan struct{}, error)
	Commit()
	Restore(b backend.Backend) error
	Close() error
}

kvstore

對於 kvstore 來講,其實沒有太多值得展開介紹的地方,它利用底層的 BoltDB 等基礎設施爲上層提供最多見的增傷改查,它組合了下層的 readTx 、 batchTx 等結構體,將一些線程不安全的操做變成線程安全的。

// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/kvstore_txn.go#L32-40
func (s *store) Read() TxnRead {
	s.mu.RLock()
	tx := s.b.ReadTx()
	s.revMu.RLock()
	tx.Lock()
	firstRev, rev := s.compactMainRev, s.currentRev
	s.revMu.RUnlock()
	return newMetricsTxnRead(&storeTxnRead{s, tx, firstRev, rev})
}

它也負責對內存中 btree 索引的維護以及壓縮一些無用或者不經常使用的數據,幾個對外的接口 Read 、 Write 就是對 readTx 、 batchTx 等結構體的組合並將它們的接口暴露給其餘的模塊。

watchableStore

另一個比較有意思的存儲就是 watchableStore 了,它是 mvcc 模塊爲外界提供 Watch 功能的接口,它負責了註冊、管理以及觸發 Watcher 的功能,咱們先來看一下這個結構體的各個字段:

// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/watchable_store.go#L45-65
type watchableStore struct {
	*store

	mu sync.RWMutex

	unsynced watcherGroup
	synced watcherGroup

	stopc chan struct{}
	wg    sync.WaitGroup
}

每個 watchableStore 其實都組合了來自 store 結構體的字段和方法,除此以外,還有兩個 watcherGroup 類型的字段,其中 unsynced 用於存儲未同步完成的實例, synced 用於存儲已經同步完成的實例。

在初始化一個新的 watchableStore 時,咱們會建立一個用於同步 watcherGroup 的 Goroutine,在 syncWatchersLoop 這個循環中會每隔 100ms 調用一次 syncWatchers 方法,將全部未通知的事件通知給全部的監聽者,這能夠說是整個模塊的核心:

func (s *watchableStore) syncWatchers() int {
	curRev := s.store.currentRev
	compactionRev := s.store.compactMainRev

	wg, minRev := s.unsynced.choose(maxWatchersPerSync, curRev, compactionRev)
	minBytes, maxBytes := newRevBytes(), newRevBytes()
	revToBytes(revision{main: minRev}, minBytes)
	revToBytes(revision{main: curRev + 1}, maxBytes)

	tx := s.store.b.ReadTx()
	revs, vs := tx.UnsafeRange(keyBucketName, minBytes, maxBytes, 0)
	evs := kvsToEvents(nil, wg, revs, vs)

	wb := newWatcherBatch(wg, evs)
	for w := range wg.watchers {
		w.minRev = curRev + 1

		eb, ok := wb[w]
		if !ok {
			s.synced.add(w)
			s.unsynced.delete(w)
			continue
		}

		w.send(WatchResponse{WatchID: w.id, Events: eb.evs, Revision: curRev})

		s.synced.add(w)
		s.unsynced.delete(w)
	}

	return s.unsynced.size()
}

簡化後的 syncWatchers 方法中總共作了三件事情,首先是根據當前的版本從未同步的 watcherGroup 中選出一些待處理的任務,而後從 BoltDB 中後去當前版本範圍內的數據變動並將它們轉換成事件,事件和 watcherGroup 在打包以後會經過 send 方法發送到每個 watcher 對應的 Channel 中。

上述圖片中展現了 mvcc 模塊對於向外界提供的監聽某個 Key 和範圍的接口,外部的其餘模塊會經過 watchStream.watch 函數與模塊內部進行交互,每一次調用 watch 方法最終都會向 watchableStore 持有的 watcherGroup 中添加新的 watcher 結構。

// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/watcher.go#L108-135
func (ws *watchStream) Watch(id WatchID, key, end []byte, startRev int64, fcs ...FilterFunc) (WatchID, error) {
	if id == AutoWatchID {
		for ws.watchers[ws.nextID] != nil {
			ws.nextID++
		}
		id = ws.nextID
		ws.nextID++
	}

	w, c := ws.watchable.watch(key, end, startRev, id, ws.ch, fcs...)

	ws.cancels[id] = c
	ws.watchers[id] = w
	return id, nil
}

// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/watchable_store.go#L111-142
func (s *watchableStore) watch(key, end []byte, startRev int64, id WatchID, ch chan<- WatchResponse, fcs ...FilterFunc) (*watcher, cancelFunc) {
	wa := &watcher{
		key:    key,
		end:    end,
		minRev: startRev,
		id:     id,
		ch:     ch,
		fcs:    fcs,
	}

	synced := startRev > s.store.currentRev || startRev == 0
	if synced {
		s.synced.add(wa)
	} else {
		s.unsynced.add(wa)
	}

	return wa, func() { s.cancelWatcher(wa) }
}

當 etcd 服務啓動時,會在服務端運行一個用於處理監聽事件的 watchServer gRPC 服務,客戶端的 Watch 請求最終都會被轉發到這個服務的 Watch 函數中:

// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/etcdserver/api/v3rpc/watch.go#L136-206
func (ws *watchServer) Watch(stream pb.Watch_WatchServer) (err error) {
	sws := serverWatchStream{
		// ...
		gRPCStream:  stream,
		watchStream: ws.watchable.NewWatchStream(),
		ctrlStream: make(chan *pb.WatchResponse, ctrlStreamBufLen),
	}

	sws.wg.Add(1)
	go func() {
		sws.sendLoop()
		sws.wg.Done()
	}()

	go func() {
		sws.recvLoop()
	}()

	sws.wg.Wait()
	return err
}

當客戶端想要經過 Watch 結果監聽某一個 Key 或者一個範圍的變更,在每一次客戶端調用服務端上述方式都會建立兩個 Goroutine,這兩個協程一個會負責向監聽者發送數據變更的事件,另外一個協程會負責處理客戶端發來的事件。

// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/etcdserver/api/v3rpc/watch.go#L220-334 
func (sws *serverWatchStream) recvLoop() error {
	for {
		req, err := sws.gRPCStream.Recv()
		if err == io.EOF {
			return nil
		}
		if err != nil {
			return err
		}

		switch uv := req.RequestUnion.(type) {
		case *pb.WatchRequest_CreateRequest:
			creq := uv.CreateRequest

			filters := FiltersFromRequest(creq)
			wsrev := sws.watchStream.Rev()
			rev := creq.StartRevision
			id, _ := sws.watchStream.Watch(mvcc.WatchID(creq.WatchId), creq.Key, creq.RangeEnd, rev, filters...)
			wr := &pb.WatchResponse{
				Header:   sws.newResponseHeader(wsrev),
				WatchId:  int64(id),
				Created:  true,
				Canceled: err != nil,
			}
			select {
			case sws.ctrlStream <- wr:
			case <-sws.closec:
				return nil
			}

		case *pb.WatchRequest_CancelRequest: // ...
		case *pb.WatchRequest_ProgressRequest: // ...
		default:
			continue
		}
	}
}

在用於處理客戶端的 recvLoop 方法中調用了 mvcc 模塊暴露出的 watchStream.Watch 方法,該方法會返回一個能夠用於取消監聽事件的 watchID ;當 gRPC 流已經結束後者出現錯誤時,當前的循環就會返回,兩個 Goroutine 也都會結束。

若是出現了更新或者刪除事件,就會被髮送到 watchStream 持有的 Channel 中,而 sendLoop 會經過 select 來監聽多個 Channel 中的數據並將接收到的數據封裝成 pb.WatchResponse 結構並經過 gRPC 流發送給客戶端:在此我向你們推薦一個架構學習交流羣。交流學習羣號:821169538  裏面會分享一些資深架構師錄製的視頻錄像:有Spring,MyBatis,Netty源碼分析,高併發、高性能、分佈式、微服務架構的原理,JVM性能優化、分佈式架構等這些成爲架構師必備的知識體系。還能領取免費的學習資源,目前受益良多。

func (sws *serverWatchStream) sendLoop() {
	for {
		select {
		case wresp, ok := <-sws.watchStream.Chan():
			evs := wresp.Events
			events := make([]*mvccpb.Event, len(evs))
			for i := range evs {
				events[i] = &evs[i]			}

			canceled := wresp.CompactRevision != 0
			wr := &pb.WatchResponse{
				Header:          sws.newResponseHeader(wresp.Revision),
				WatchId:         int64(wresp.WatchID),
				Events:          events,
				CompactRevision: wresp.CompactRevision,
				Canceled:        canceled,
			}

			sws.gRPCStream.Send(wr)

		case c, ok := <-sws.ctrlStream: // ...
		case <-progressTicker.C: // ...
		case <-sws.closec:
			return
		}
	}
}

對於每個 Watch 請求來講, watchServer 會根據請求建立兩個用於處理當前請求的 Goroutine,這兩個協程會與更底層的 mvcc 模塊協做提供監聽和回調功能:

到這裏,咱們對於 Watch 功能的介紹就差很少結束了,從對外提供的接口到底層的使用的數據結構以及具體實現,其餘與 Watch 功能相關的話題能夠直接閱讀 etcd 的源代碼瞭解更加細節的實現。

在上面已經介紹了核心的 Raft 共識算法以及使用的底層存儲以後,這一節更想談一談 etcd 的一些應用場景,與以前談到的 分佈式協調服務 Zookeeper 同樣,etcd 在大多數的集羣中仍是處於比較關鍵的位置,工程師每每都會使用 etcd 存儲集羣中的重要數據和元數據,多個節點之間的強一致性以及集羣部署的方式賦予了 etcd 集羣高可用性。

咱們依然可使用 etcd 實現微服務架構中的服務發現、發佈訂閱、分佈式鎖以及分佈式協調等功能,由於雖然它被定義成了一個可靠的分佈式鍵值存儲,可是它起到的依然是一個分佈式協調服務的做用,這也使咱們在須要不一樣的協調服務中進行權衡和選擇。

爲何要在分佈式協調服務中選擇 etcd 實際上是一個比較關鍵的問題,不少工程師選擇 etcd 主要是由於它使用 Go 語言開發、部署簡單、社區也比較活躍,可是缺點就在於它相比 Zookeeper 仍是一個比較年輕的項目,須要一些時間來成長和穩定。

etcd 的實現原理很是有趣,咱們可以在它的源代碼中學習不少 Go 編程的最佳實踐和設計,這也值得咱們去研究它的源代碼。

目前不少項目和公司都在生產環境中大規模使用 etcd,這對於社區來講是意見很是有利的事情,若是微服務的大部分技術棧是 Go,做者也更加推薦各位讀者在選擇分佈式協調服務時選擇 etcd 做爲系統的基礎設施。

出處 http://draveness.me/etcd-int

相關文章
相關標籤/搜索