watch是mvcc包中的一個功能,之因此拿出來講,是由於它確實有很重的邏輯。watch是監聽一個或一組key,key的任何變化都會發出消息。某種意義上講,這就是發佈訂閱模式。golang
既然Watch機制就是發佈訂閱模式,咱們經過對比Kafka,來更深刻了解Watch。
首先說明結論:緩存
ETCD沒有消費者組的概念,因此不能代替Kafka
對比其餘方面呢:mvc
ETCD | Kafka | |
---|---|---|
消費方式 | 監聽一個Key | 訂閱一個Topic |
生產方式 | Put(Key, Value) | Produce(Topic, Message) |
歷史消息是否保留 | 保留 | 保留 |
可否從指定位置消費 | 能夠從指定Revision消費 | 能夠從指定offset消費 |
可否保證消息不重放 | 不能 | 消費者會主動上報offset,kafka會保存每一個消費者的offset,消費者重啓會從當前進度消費 |
對比Kafka不是試圖用ETCD代替Kafka,是想經過對比了解Watch的特性和侷限性oop
在討論別人是怎麼實現的時候,本身總要先猜測下。想的過程當中就會發現難點在哪。
個人想法:spa
type watcher struct { key string // 要監聽的key ch chan struct{} // 經過ch將消息發出來 } func loop() { for _, w := range []watchers { ch <- message } }
解釋下,個人想法中,每個監聽者都是一個watcher,監聽者會本身消費本身的ch,實現消費功能。在服務端須要維護一個loop,將消息不斷的發送到每個監聽者的ch中。設計
我感受大多數人的最直觀想法應該就是這樣。code
這樣作我實現了協程
但我沒有作到對象
有了這些想法以後,咱們來看看ETCD中Watch是怎麼實現的。blog
在MVCC文章中提到,KV接口的具體實現是store結構體。Watch的實現是在store上封裝了一層,叫作:watchableStore
,重寫了store的Write方法。
經過MVCC中介紹,store的任何寫操做,都須要Write方法返回的TxnWrite。因此這裏重寫Write方法意味這任何寫操做都會通過watchableStore。
func (tw *watchableStoreTxnWrite) End() { changes := tw.Changes() evs := make([]mvccpb.Event, len(changes)) for i, change := range changes { evs[i].Kv = &changes[i] } tw.s.notify(rev, evs) tw.TxnWrite.End() } type watchableStoreTxnWrite struct { TxnWrite s *watchableStore } func (s *watchableStore) Write(trace *traceutil.Trace) TxnWrite { return &watchableStoreTxnWrite{s.store.Write(trace), s} }
以上代碼只列出了核心的邏輯,不難看出,watchableStoreTxnWrite在事務提交時,先將本次變動changes
打包成Event,而後調用notify來將變動通知出去。最後真正提交事務TxnWrite.End()
如今待推送的消息(Event)已經經過notify方法進入到了Watch機制中,咱們看看這個消息是如何流轉的。
首先須要介紹幾個對象:
事件。變動的消息是以Event的形式發送出去的,Event包括KeyValue,同時包括操做類型(Put、Delete等)
watcher監聽一個或一組key,若是有變動,watcher將變動內容經過chan發送出去。
顧名思義,一組watcher。watcherGroup管理多個watcher,可以根據key快速找到監聽該key的一個或多個watcher。
繼承自store,在store基礎上實現了watch功能。watchableStore管理着兩個watcherGroup:synced、unsynced,和一個用於緩存的victims。victims是緩存當前未發出去的Event。
watchStream是對watchableStore的封裝。由於watchableStore繼承自store,因此他實現了不少方法,但這些方法並不都是用於Watch功能。因此watchStream對watchableStore再次封裝,暴露出與Watch有關的方法。
在知道這5個對象以後,咱們是如何使用Watch呢?
func testWatch() { s := newWatchableStore() w := s.NewWatchStream() w.Watch(start_key: foo, end_key: nil) w.Watch(start_key: bar, end_key: nil) for { consume := <- w.Chan() } }
解釋下,咱們先建立了watchableStore,這是ETCD啓動後就建立了的。當咱們要使用Watch功能時,咱們建立了一個watchStream
(s.NewWatchStream)。建立出來的w能夠監聽多個key:foo、bar。以後咱們就能夠消費w.Chan()返回的chan。foo或bar的任何變化,都會經過這個chan發送給消費端consume。
因而咱們便獲得下面這幅圖:
能夠看到watchStream實現了在一大堆kv的變化中,過濾出監聽的key,將key的變化輸出。
緊接着,咱們將這幅圖補充完整:
這幅圖是什麼意思呢?
watchableStore收到了全部key的變動後,將這些key交給synced(watchGroup),synced可以快速
地從全部key中找到監聽的key。將這些key發送給對應的watcher,這些watcher再經過chan將變動信息發送出去。
synced是怎麼快速找到符合條件的key呢?
ETCD中使用了map和adt(紅黑樹)來實現。
不單獨使用map是由於watch能夠監聽一個範圍的key。若是隻監聽一個key
watch(start_key: foo, end_key: nil)
咱們能夠這樣存儲
map[key]*watcher
這樣能夠根據key快速找到對應的watcher,ETCD也是這樣作的。
但對於一組key呢?
watch(start_key: foo, end_key: fop)
這裏我監聽了從foo->fop之間的全部key,理論上這些key的數目是無限的,因此沒法再使用map。
好比:key=fooac也屬於監聽範圍。
ETCD用adt來存儲這種key。
adt的實現這裏不作介紹,只用知道adt可以根據key=fooac快速地找到所屬範圍foo->fop。
adt的原理推薦這篇文章: https://www.jianshu.com/p/e13...
adt的go實現:go.etcd.io/etcd/pkg/ad
在找到watcher後,調用watcher的send()方法,將變動的Event發送出去。
這就是上述圖的意思,也就是正常的Watch流程。
上圖所述是正常流程,可是會有不少不正常的狀況發生。
上圖能夠看到,消息都是經過一個Chan發送出去,但若是消費者消費速度慢,Chan就容易堆積。Chan的空間不可能無限大,那就必然會有滿的時候,滿了後該怎麼辦呢?
接下來就要討論上圖unsynced、victims的做用了。
Chan何時會滿呢?
代碼中Chan的長度是1024。不過這也是一個隨機值,只是沒有如今更好的選擇。
一旦滿了,會發生如下操做:
func (s *watchableStore) notify() { var victim watcherBatch ... w.minRev = rev + 1 // w是當前watcher if victim == nil { victim = make(watcherBatch) } w.victim = true // w被標記爲受損的 victim[w] = eb // eb是當前的變動消息EventBatch s.synced.delete(w) ... s.addVictim(victim) // 將victim添加到s的victims中 }
(victim:受害者、犧牲品、受損的)
watcher會記錄當前的Revision,並將自身標記爲受損的
。這次的變動操做會被保存到watchableStore的victims中。同時該watcher會被從synced踢出。
假設此時有一個寫操做:foo=f1。而正好Chan此時剛滿,則監聽foo的watcher將從synced中踢出,同時foo=f1被保存到victims中
接下來對foo的任何變動,該watcher都不會記錄。那這些消息就都丟掉了嗎?固然不是,watcher變成受損狀態時記錄下了當時的Revision,這個很重要。
這時要說到兩個工做協程了:
// 咱們在建立watchableStore時,會同時啓動兩個工做協程 go s.syncWatchersLoop() go s.syncVictimsLoop()
顧名思義,第一個協程用於將unsynced的watcher同步爲synced。
第二個協程用於循環清除watchableStore中的victims。
在上面的場景中,咱們知道,隊列滿時,當時變動的Event被放入了victims中。這個協程就會試圖清除這個Event。怎麼清除呢?協程會不斷嘗試讓watcher發送這個Event,一旦隊列不滿,watcher將這個Event發出後。該watcher就被劃入了unsycned中,同時再也不是受損狀態。
此時syncWatchersLoop協程就開始起做用。因爲在受損狀態下,這個watcher已經錯過了不少消息。爲了追回進度,協程會根據watcher保存的Revision,找出受損以後全部的消息,將關於foo的消息所有給watcher,當watcher將這些消息都發送出去後。watcher就脫離了unsynced,成爲了synced。
至此就解決了Chan滿致使的問題。同時也闡明瞭Watch的設計實現。