ETCD探索-Watch

ETCD探索-Watch

梗概

watch是mvcc包中的一個功能,之因此拿出來講,是由於它確實有很重的邏輯。watch是監聽一個或一組key,key的任何變化都會發出消息。某種意義上講,這就是發佈訂閱模式。golang

對比

既然Watch機制就是發佈訂閱模式,咱們經過對比Kafka,來更深刻了解Watch。
首先說明結論:緩存

ETCD沒有消費者組的概念,因此不能代替Kafka

對比其餘方面呢:mvc

ETCD Kafka
消費方式 監聽一個Key 訂閱一個Topic
生產方式 Put(Key, Value) Produce(Topic, Message)
歷史消息是否保留 保留 保留
可否從指定位置消費 能夠從指定Revision消費 能夠從指定offset消費
可否保證消息不重放 不能 消費者會主動上報offset,kafka會保存每一個消費者的offset,消費者重啓會從當前進度消費

對比Kafka不是試圖用ETCD代替Kafka,是想經過對比了解Watch的特性和侷限性oop

猜測

在討論別人是怎麼實現的時候,本身總要先猜測下。想的過程當中就會發現難點在哪。
個人想法:spa

type watcher struct {
    key string // 要監聽的key
    
    ch  chan struct{} // 經過ch將消息發出來
}

func loop() {
    for _, w := range []watchers {
        ch <- message
    }
}

解釋下,個人想法中,每個監聽者都是一個watcher,監聽者會本身消費本身的ch,實現消費功能。在服務端須要維護一個loop,將消息不斷的發送到每個監聽者的ch中。設計

我感受大多數人的最直觀想法應該就是這樣。code

這樣作我實現了協程

  • 訂閱發佈功能

但我沒有作到對象

  • 同時監聽一個範圍的key(好比:我能夠監聽key=foo,但不能監聽key=foo~fox。這是ETCD一個重要的功能)
  • 消費者消費速率不一樣(好比:按個人設想,有一個消費者出現阻塞,會致使loop阻塞)

有了這些想法以後,咱們來看看ETCD中Watch是怎麼實現的。blog

實現

在MVCC文章中提到,KV接口的具體實現是store結構體。Watch的實現是在store上封裝了一層,叫作:watchableStore,重寫了store的Write方法。
經過MVCC中介紹,store的任何寫操做,都須要Write方法返回的TxnWrite。因此這裏重寫Write方法意味這任何寫操做都會通過watchableStore。

func (tw *watchableStoreTxnWrite) End() {  
   changes := tw.Changes()  
  
   evs := make([]mvccpb.Event, len(changes))  
   for i, change := range changes {  
      evs[i].Kv = &changes[i]
   }  

   tw.s.notify(rev, evs)  
   tw.TxnWrite.End()        
}

type watchableStoreTxnWrite struct {  
   TxnWrite  
   s *watchableStore  
}  
  
func (s *watchableStore) Write(trace *traceutil.Trace) TxnWrite {  
   return &watchableStoreTxnWrite{s.store.Write(trace), s}  
}

以上代碼只列出了核心的邏輯,不難看出,watchableStoreTxnWrite在事務提交時,先將本次變動changes打包成Event,而後調用notify來將變動通知出去。最後真正提交事務TxnWrite.End()

如今待推送的消息(Event)已經經過notify方法進入到了Watch機制中,咱們看看這個消息是如何流轉的。

首先須要介紹幾個對象:

  • Event

事件。變動的消息是以Event的形式發送出去的,Event包括KeyValue,同時包括操做類型(Put、Delete等)

  • watcher

watcher監聽一個或一組key,若是有變動,watcher將變動內容經過chan發送出去。

  • watcherGroup

顧名思義,一組watcher。watcherGroup管理多個watcher,可以根據key快速找到監聽該key的一個或多個watcher。

  • watchableStore

繼承自store,在store基礎上實現了watch功能。watchableStore管理着兩個watcherGroup:synced、unsynced,和一個用於緩存的victims。victims是緩存當前未發出去的Event。

  • watchStream

watchStream是對watchableStore的封裝。由於watchableStore繼承自store,因此他實現了不少方法,但這些方法並不都是用於Watch功能。因此watchStream對watchableStore再次封裝,暴露出與Watch有關的方法。

在知道這5個對象以後,咱們是如何使用Watch呢?

func testWatch() {
    s := newWatchableStore()
    
    w := s.NewWatchStream()
    
    w.Watch(start_key: foo, end_key: nil)
    
    w.Watch(start_key: bar, end_key: nil)
    
    for {
        consume := <- w.Chan()
    }
}

解釋下,咱們先建立了watchableStore,這是ETCD啓動後就建立了的。當咱們要使用Watch功能時,咱們建立了一個watchStream(s.NewWatchStream)。建立出來的w能夠監聽多個key:foo、bar。以後咱們就能夠消費w.Chan()返回的chan。foo或bar的任何變化,都會經過這個chan發送給消費端consume。

因而咱們便獲得下面這幅圖:
image.png

能夠看到watchStream實現了在一大堆kv的變化中,過濾出監聽的key,將key的變化輸出。

緊接着,咱們將這幅圖補充完整:
image.png

這幅圖是什麼意思呢?
watchableStore收到了全部key的變動後,將這些key交給synced(watchGroup),synced可以快速地從全部key中找到監聽的key。將這些key發送給對應的watcher,這些watcher再經過chan將變動信息發送出去。

synced是怎麼快速找到符合條件的key呢?
ETCD中使用了map和adt(紅黑樹)來實現。
不單獨使用map是由於watch能夠監聽一個範圍的key。若是隻監聽一個key

watch(start_key: foo, end_key: nil)

咱們能夠這樣存儲

map[key]*watcher

這樣能夠根據key快速找到對應的watcher,ETCD也是這樣作的。

但對於一組key呢?

watch(start_key: foo, end_key: fop)

這裏我監聽了從foo->fop之間的全部key,理論上這些key的數目是無限的,因此沒法再使用map。
好比:key=fooac也屬於監聽範圍。
ETCD用adt來存儲這種key。
image.png

adt的實現這裏不作介紹,只用知道adt可以根據key=fooac快速地找到所屬範圍foo->fop。

adt的原理推薦這篇文章: https://www.jianshu.com/p/e13...
adt的go實現:go.etcd.io/etcd/pkg/ad

在找到watcher後,調用watcher的send()方法,將變動的Event發送出去。

這就是上述圖的意思,也就是正常的Watch流程。

各類場景

上圖所述是正常流程,可是會有不少不正常的狀況發生。
上圖能夠看到,消息都是經過一個Chan發送出去,但若是消費者消費速度慢,Chan就容易堆積。Chan的空間不可能無限大,那就必然會有滿的時候,滿了後該怎麼辦呢?

接下來就要討論上圖unsynced、victims的做用了。

Chan何時會滿呢?
image.png

代碼中Chan的長度是1024。不過這也是一個隨機值,只是沒有如今更好的選擇。

一旦滿了,會發生如下操做:

func (s *watchableStore) notify() {
    var victim watcherBatch
    ...
    w.minRev = rev + 1           // w是當前watcher
    if victim == nil {  
       victim = make(watcherBatch)  
    }  
    w.victim = true              // w被標記爲受損的
    victim[w] = eb               // eb是當前的變動消息EventBatch
    s.synced.delete(w)
    ...
    s.addVictim(victim)          // 將victim添加到s的victims中
}

(victim:受害者、犧牲品、受損的)
watcher會記錄當前的Revision,並將自身標記爲受損的。這次的變動操做會被保存到watchableStore的victims中。同時該watcher會被從synced踢出。

假設此時有一個寫操做:foo=f1。而正好Chan此時剛滿,則監聽foo的watcher將從synced中踢出,同時foo=f1被保存到victims中
image.png

接下來對foo的任何變動,該watcher都不會記錄。那這些消息就都丟掉了嗎?固然不是,watcher變成受損狀態時記錄下了當時的Revision,這個很重要。

這時要說到兩個工做協程了:

// 咱們在建立watchableStore時,會同時啓動兩個工做協程
go s.syncWatchersLoop()  
go s.syncVictimsLoop()

顧名思義,第一個協程用於將unsynced的watcher同步爲synced。
第二個協程用於循環清除watchableStore中的victims。

在上面的場景中,咱們知道,隊列滿時,當時變動的Event被放入了victims中。這個協程就會試圖清除這個Event。怎麼清除呢?協程會不斷嘗試讓watcher發送這個Event,一旦隊列不滿,watcher將這個Event發出後。該watcher就被劃入了unsycned中,同時再也不是受損狀態。

image.png

此時syncWatchersLoop協程就開始起做用。因爲在受損狀態下,這個watcher已經錯過了不少消息。爲了追回進度,協程會根據watcher保存的Revision,找出受損以後全部的消息,將關於foo的消息所有給watcher,當watcher將這些消息都發送出去後。watcher就脫離了unsynced,成爲了synced。

至此就解決了Chan滿致使的問題。同時也闡明瞭Watch的設計實現。

相關文章
相關標籤/搜索