做者:fredalxin\
地址:https://fredal.xin/zookeeper-...java
咱們可使用 zookeeper 做爲註冊中心來實現服務的註冊與發現,curator 框架提供了 curator-x-discovery 擴展實現了開箱即用的服務註冊發現,但更多時候咱們仍是選擇本身去實現,那這個時候咱們須要額外關注 zookeeper 的 1 個特性,即 wathcer。node
在微服務場景中,watcher 機制主要提供了服務通知功能,好比 Instance1 這個實例在 Service1 服務節點下注冊了一個 emphemeral 子節點後,它的某個服務消費者根據依賴配置在 Service1 節點上註冊了一個子節點 watcher,就如圖中的紅鑰匙。面試
子節點類型的 watcher 會觀測 Service1 的子節點,即 InstanceX 節點,但不會觀測孫子節點 config1。那麼當 Instance1 節點掛掉以後,watcher 能夠作到通知給註冊自身的那個服務消費者,通知完一次後 wacther 也就被銷燬了。spring
zookeeper 的 watcher 主要由 client、server 以及 watchManager 之間配合完成,包括 watcher 註冊以及觸發 2 個階段。session
在 client 端註冊表爲 ZkWatchManager,其中包括了 dataWatches、existWatches 以及 childWatches。在 server 端的註冊表在 DataTree 類中,封裝了 2 類 WatchManager,即 dataWatches 和 existWatches。dataWatches 表明當前節點的數據監聽,childWathes 表明子節點監聽,與 client 比少的 existWatches 也很容易理解,由於節點是否存在須要客戶端去判斷。數據結構
註冊階段客戶端的 getData 和 exists 請求能夠註冊 dataWatches,getChilden 能夠註冊 childWatches。而觸發階段,setData 請求會觸發當前節點 dataWatches,create 請求會觸發當前節點 dataWatches 以及父節點的 childWatches,delete 請求則會觸發當前節點、父節點、子節點的 dataWatches,以及父節點的 childWatches。intellij-idea
watchManager包含兩個很是重要的數據結構:watchTable和watch2Paths。前者表示path-set<watcher>,後者表示watcher-set<path>。注意這裏的watcher含義表示遠程鏈接,因此watchTable表示一個目錄下可能有多個消費者的監聽鏈接,而watch2Paths表示一個消費者可能會對多個目錄創建監聽,顯然多目錄的監聽會複用一個鏈接。框架
請求階段的傳輸數據(包括 watcher 信息)會封裝在 request 和 response 中,好比 getData 請求會封裝 getDataRequest/getDataResponse。而觸發階段的 watcher 通知則經過事件 event 進行通訊,server 端會發送一個 watcherEvent,而 client 端則會將其轉換成 watchedEvent 再進行處理。ide
每一個客戶端都會維護 2 個線程,SendThread 負責處理客戶端與服務端的請求通訊,好比發送 getDataRequest,而 EventThread 則負責處理服務端的事件通知,即 watcher 的事件。微服務
咱們來看看 watcher 註冊的部分源碼。首先是在客戶端,以 Zookeeper 中 getData 方法爲例,會入隊一個 watch 爲 true 的 packet。
public byte[] getData(final String path, Watcher watcher, Stat stat) throws KeeperException, InterruptedException { ... GetDataRequest request = new GetDataRequest(); request.setPath(serverPath); request.setWatch(watcher != null); GetDataResponse response = new GetDataResponse(); ReplyHeader r = cnxn.submitRequest(h, request, response, wcb); ... }
能夠看到這邊封裝了 GetDataRequest 以及 GetDataResponse,而 request 中設置了 watch 參數爲 true,最後將其進行 submitRequest,submitRequest 乾的事兒其實就是將這些放入事件隊列等待 sendThread 調度發送。
接着這個請求會被服務端所接收到,全部請求的服務端處理都在 FinalRequestProcessor#processRequest 方法中進行。
case OpCode.getData: { lastOp = "GETD"; GetDataRequest getDataRequest = new GetDataRequest(); ... byte b[] = zks.getZKDatabase().getData(getDataRequest.getPath(), stat, getDataRequest.getWatch() ? cnxn : null); ... }
這邊會經過一些 case 來判斷請求類型,仍是以 getData 爲例,最終會調用到 DataTree 的 getData 方法,咱們以前講到 DataTree 裏包含了 2 種 watcher,那這邊除了獲取數據外,天然是註冊 dataWatchers 了。
public byte[] getData(String path, Stat stat, Watcher watcher) throws NoNodeException { DataNode n = (DataNode)this.nodes.get(path); if (n == null) { throw new NoNodeException(); } else { synchronized(n) { n.copyStat(stat); if (watcher != null) { this.dataWatches.addWatch(path, watcher); } return n.data; } } }
addWatch 方法主要是將數據節點的路徑以及 ServerCnxn(遠程通訊信息) 信息存儲到 WatchManager 的 watchTable 和 watch2Paths 中。至此服務端已經接受到了 watcher 並註冊到了 watchManager 中了。
咱們將客戶端本身也會保存一個 watchManager,這裏實際上是在接收到 getData 響應後進行的,在 ClientCnxn$SendThread 類的 readResponse->finishPacket 方法中。
private void finishPacket(ClientCnxn.Packet p) { if (p.watchRegistration != null) { p.watchRegistration.register(p.replyHeader.getErr()); } if (p.cb == null) { synchronized(p) { p.finished = true; p.notifyAll(); } } else { p.finished = true; this.eventThread.queuePacket(p); } }
能夠看到這邊調用了 watchRegistration 的 register 方法,而它就是根據請求類型來裝入對應的 watchManager 中了(dataWatches、existWatches、childWatches)。
整個大體的時序圖能夠參考下面:
wathcer 觸發部分,咱們還以 服務端 DataTree 類處理 setData 請求 爲例。
public Stat setData(String path, byte data[], int version, long zxid, long time) throws KeeperException.NoNodeException { ... dataWatches.triggerWatch(path, EventType.NodeDataChanged); return s; }
能夠看到在處理完數據後調用了 triggerWatch,它乾的事兒是從以前的 watchManager 中得到 watchers,而後一個個調用 process 方法。
public Set<Watcher> triggerWatch(String path, EventType type, Set<Watcher> supress) { WatchedEvent e = new WatchedEvent(type, KeeperState.SyncConnected, path); HashSet<Watcher> watchers; synchronized (this) { watchers = watchTable.remove(path); if (watchers == null || watchers.isEmpty()) { if (LOG.isTraceEnabled()) { ZooTrace.logTraceMessage(LOG, ZooTrace.EVENT_DELIVERY_TRACE_MASK, "No watchers for " + path); } return null; } for (Watcher w : watchers) { HashSet<String> paths = watch2Paths.get(w); if (paths != null) { paths.remove(path); } } } for (Watcher w : watchers) { if (supress != null && supress.contains(w)) { continue; } w.process(e); } return watchers; }
獲取了須要本次觸發的監聽後,在 watchTable 和 watch2Paths 中還移除了自身,因此 watcher 是單次的。這裏封裝好了 watchedEvent 後塞入到了 Watcher的process 方法中,process 方法其實就是發送通知,以 Watcher的一個實現類NioServerCnxn 爲例就是調用了其 sendResponse 方法將通知事件發送到客戶端,發送前會將 watchedEvent 轉換成 watcherEvent 進行發送。
那麼客戶端首先接收到請求的仍然是 ClientCnxn$sendThread 的 readResponse 方法,這裏講 watcherEvent 轉換爲 watchedEvent 後入列 eventThread 的事件隊列 等待後續進行處理。
... WatchedEvent we = new WatchedEvent(event); if (ClientCnxn.LOG.isDebugEnabled()) { ClientCnxn.LOG.debug("Got " + we + " for sessionid 0x" + Long.toHexString(ClientCnxn.this.sessionId)); } ClientCnxn.this.eventThread.queueEvent(we); ...
咱們直接看下 EventThread 的 run 方法吧,方法很簡單,就是不斷從 waitingEvents 事件隊列中取通知事件。而後調用 processEvent 方法處理事件。
private void processEvent(Object event) { try { if (event instanceof WatcherSetEventPair) { // each watcher will process the event WatcherSetEventPair pair = (WatcherSetEventPair) event; for (Watcher watcher : pair.watchers) { try { watcher.process(pair.event); } catch (Throwable t) { LOG.error("Error while calling watcher ", t); } } } else { ...省略 }
這裏就是簡單地取出本次事件須要通知的 watcher 集合,而後循環調用每一個 watcher 的 process 方法了。那麼在本身實現服務註冊發現的場景裏,顯然 watcher 的 process 方法是咱們自定義的啦。
整個 watcher 觸發的時序圖能夠參考下面:
至此,zookeeper 的整個 watcher 交互邏輯就已經結束了。
近期熱文推薦:
1.600+ 道 Java面試題及答案整理(2021最新版)
2.終於靠開源項目弄到 IntelliJ IDEA 激活碼了,真香!
3.阿里 Mock 工具正式開源,幹掉市面上全部 Mock 工具!
4.Spring Cloud 2020.0.0 正式發佈,全新顛覆性版本!
以爲不錯,別忘了隨手點贊+轉發哦!