深刻了解 zookeeper 的 watcher 機制!

做者:fredalxin\
地址:https://fredal.xin/zookeeper-...java

咱們可使用 zookeeper 做爲註冊中心來實現服務的註冊與發現,curator 框架提供了 curator-x-discovery 擴展實現了開箱即用的服務註冊發現,但更多時候咱們仍是選擇本身去實現,那這個時候咱們須要額外關注 zookeeper 的 1 個特性,即 wathcer。node

在微服務場景中,watcher 機制主要提供了服務通知功能,好比 Instance1 這個實例在 Service1 服務節點下注冊了一個 emphemeral 子節點後,它的某個服務消費者根據依賴配置在 Service1 節點上註冊了一個子節點 watcher,就如圖中的紅鑰匙。面試

子節點類型的 watcher 會觀測 Service1 的子節點,即 InstanceX 節點,但不會觀測孫子節點 config1。那麼當 Instance1 節點掛掉以後,watcher 能夠作到通知給註冊自身的那個服務消費者,通知完一次後 wacther 也就被銷燬了。spring

wacther 原理框架

zookeeper 的 watcher 主要由 client、server 以及 watchManager 之間配合完成,包括 watcher 註冊以及觸發 2 個階段。session

在 client 端註冊表爲 ZkWatchManager,其中包括了 dataWatches、existWatches 以及 childWatches。在 server 端的註冊表在 DataTree 類中,封裝了 2 類 WatchManager,即 dataWatches 和 existWatches。dataWatches 表明當前節點的數據監聽,childWathes 表明子節點監聽,與 client 比少的 existWatches 也很容易理解,由於節點是否存在須要客戶端去判斷。數據結構

註冊階段客戶端的 getData 和 exists 請求能夠註冊 dataWatches,getChilden 能夠註冊 childWatches。而觸發階段,setData 請求會觸發當前節點 dataWatches,create 請求會觸發當前節點 dataWatches 以及父節點的 childWatches,delete 請求則會觸發當前節點、父節點、子節點的 dataWatches,以及父節點的 childWatches。intellij-idea

watchManager包含兩個很是重要的數據結構:watchTable和watch2Paths。前者表示path-set<watcher>,後者表示watcher-set<path>。注意這裏的watcher含義表示遠程鏈接,因此watchTable表示一個目錄下可能有多個消費者的監聽鏈接,而watch2Paths表示一個消費者可能會對多個目錄創建監聽,顯然多目錄的監聽會複用一個鏈接。框架

請求階段的傳輸數據(包括 watcher 信息)會封裝在 request 和 response 中,好比 getData 請求會封裝 getDataRequest/getDataResponse。而觸發階段的 watcher 通知則經過事件 event 進行通訊,server 端會發送一個 watcherEvent,而 client 端則會將其轉換成 watchedEvent 再進行處理。ide

每一個客戶端都會維護 2 個線程,SendThread 負責處理客戶端與服務端的請求通訊,好比發送 getDataRequest,而 EventThread 則負責處理服務端的事件通知,即 watcher 的事件。微服務

watcher 註冊源碼

咱們來看看 watcher 註冊的部分源碼。首先是在客戶端,以 Zookeeper 中 getData 方法爲例,會入隊一個 watch 爲 true 的 packet。

public byte[] getData(final String path, Watcher watcher, Stat stat)
      throws KeeperException, InterruptedException {
    ...
        GetDataRequest request = new GetDataRequest();
    request.setPath(serverPath);
    request.setWatch(watcher != null);
    GetDataResponse response = new GetDataResponse();
    ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);
    ...
}

能夠看到這邊封裝了 GetDataRequest 以及 GetDataResponse,而 request 中設置了 watch 參數爲 true,最後將其進行 submitRequest,submitRequest 乾的事兒其實就是將這些放入事件隊列等待 sendThread 調度發送。

接着這個請求會被服務端所接收到,全部請求的服務端處理都在 FinalRequestProcessor#processRequest 方法中進行。

case OpCode.getData: {
    lastOp = "GETD";
    GetDataRequest getDataRequest = new GetDataRequest();
    ...
        byte b[] = zks.getZKDatabase().getData(getDataRequest.getPath(), stat,
                                               getDataRequest.getWatch() ? cnxn : null);
    ...
}

這邊會經過一些 case 來判斷請求類型,仍是以 getData 爲例,最終會調用到 DataTree 的 getData 方法,咱們以前講到 DataTree 裏包含了 2 種 watcher,那這邊除了獲取數據外,天然是註冊 dataWatchers 了。

public byte[] getData(String path, Stat stat, Watcher watcher) throws NoNodeException {
     DataNode n = (DataNode)this.nodes.get(path);
     if (n == null) {
         throw new NoNodeException();
     } else {
         synchronized(n) {
             n.copyStat(stat);
             if (watcher != null) {
                 this.dataWatches.addWatch(path, watcher);
             }
             return n.data;
         }
     }
 }

addWatch 方法主要是將數據節點的路徑以及 ServerCnxn(遠程通訊信息) 信息存儲到 WatchManager 的 watchTable 和 watch2Paths 中。至此服務端已經接受到了 watcher 並註冊到了 watchManager 中了。

咱們將客戶端本身也會保存一個 watchManager,這裏實際上是在接收到 getData 響應後進行的,在 ClientCnxn$SendThread 類的 readResponse->finishPacket 方法中。

private void finishPacket(ClientCnxn.Packet p) {
     if (p.watchRegistration != null) {
         p.watchRegistration.register(p.replyHeader.getErr());
     }

     if (p.cb == null) {
         synchronized(p) {
             p.finished = true;
             p.notifyAll();
         }
     } else {
         p.finished = true;
         this.eventThread.queuePacket(p);
     }

}

能夠看到這邊調用了 watchRegistration 的 register 方法,而它就是根據請求類型來裝入對應的 watchManager 中了(dataWatches、existWatches、childWatches)。

整個大體的時序圖能夠參考下面:

watcher 觸發源碼

wathcer 觸發部分,咱們還以 服務端 DataTree 類處理 setData 請求 爲例。

public Stat setData(String path, byte data[], int version, long zxid,
           long time) throws KeeperException.NoNodeException {
    ...
        dataWatches.triggerWatch(path, EventType.NodeDataChanged);
    return s;
}

能夠看到在處理完數據後調用了 triggerWatch,它乾的事兒是從以前的 watchManager 中得到 watchers,而後一個個調用 process 方法。

public Set<Watcher> triggerWatch(String path, EventType type, Set<Watcher> supress) {
    WatchedEvent e = new WatchedEvent(type,
                                      KeeperState.SyncConnected, path);
    HashSet<Watcher> watchers;
    synchronized (this) {
        watchers = watchTable.remove(path);
        if (watchers == null || watchers.isEmpty()) {
            if (LOG.isTraceEnabled()) {
                ZooTrace.logTraceMessage(LOG,
                                         ZooTrace.EVENT_DELIVERY_TRACE_MASK,
                                         "No watchers for " + path);
            }
            return null;
        }
        for (Watcher w : watchers) {
            HashSet<String> paths = watch2Paths.get(w);
            if (paths != null) {
                paths.remove(path);
            }
        }
    }
    for (Watcher w : watchers) {
        if (supress != null && supress.contains(w)) {
            continue;
        }
        w.process(e);
    }
    return watchers;
}

獲取了須要本次觸發的監聽後,在 watchTable 和 watch2Paths 中還移除了自身,因此 watcher 是單次的。這裏封裝好了 watchedEvent 後塞入到了 Watcher的process 方法中,process 方法其實就是發送通知,以 Watcher的一個實現類NioServerCnxn 爲例就是調用了其 sendResponse 方法將通知事件發送到客戶端,發送前會將 watchedEvent 轉換成 watcherEvent 進行發送。

那麼客戶端首先接收到請求的仍然是 ClientCnxn$sendThread 的 readResponse 方法,這裏講 watcherEvent 轉換爲 watchedEvent 後入列 eventThread 的事件隊列 等待後續進行處理。

...
WatchedEvent we = new WatchedEvent(event);
if (ClientCnxn.LOG.isDebugEnabled()) {
    ClientCnxn.LOG.debug("Got " + we + " for sessionid 0x" + Long.toHexString(ClientCnxn.this.sessionId));
}

ClientCnxn.this.eventThread.queueEvent(we);
...

咱們直接看下 EventThread 的 run 方法吧,方法很簡單,就是不斷從 waitingEvents 事件隊列中取通知事件。而後調用 processEvent 方法處理事件。

private void processEvent(Object event) {
    try {
        if (event instanceof WatcherSetEventPair) {
            // each watcher will process the event
            WatcherSetEventPair pair = (WatcherSetEventPair) event;
            for (Watcher watcher : pair.watchers) {
                try {
                    watcher.process(pair.event);
                } catch (Throwable t) {
                    LOG.error("Error while calling watcher ", t);
                }
            }
        } else {
            ...省略


        }

這裏就是簡單地取出本次事件須要通知的 watcher 集合,而後循環調用每一個 watcher 的 process 方法了。那麼在本身實現服務註冊發現的場景裏,顯然 watcher 的 process 方法是咱們自定義的啦。

整個 watcher 觸發的時序圖能夠參考下面:

至此,zookeeper 的整個 watcher 交互邏輯就已經結束了。

近期熱文推薦:

1.600+ 道 Java面試題及答案整理(2021最新版)

2.終於靠開源項目弄到 IntelliJ IDEA 激活碼了,真香!

3.阿里 Mock 工具正式開源,幹掉市面上全部 Mock 工具!

4.Spring Cloud 2020.0.0 正式發佈,全新顛覆性版本!

5.《Java開發手冊(嵩山版)》最新發布,速速下載!

以爲不錯,別忘了隨手點贊+轉發哦!

相關文章
相關標籤/搜索