ZooKeeper 容許客戶端向服務端註冊一個 Watcher 監聽,當服務端的一些指定事件觸發了這個 Watcher,那麼就會向指定客戶端發送一個事件通知來實現分佈式的通知功能。java
ZooKeeper 的 Watcher 機制主要包括客戶端線程、客戶端 WatchManager 和 ZooKeeper 服務器三部分。在具體工做流程上,簡單地講,客戶端在向 ZooKeeper 服務器註冊 Watcher 的同時,會將 Watcher 對象存儲在客戶端的 WatchManager 中。當 ZooKeeper 服務器端觸發 Watcher 事件後,會向客戶端發送通知,客戶端線程從 WatchManager 中取出對應的 Watcher 對象來執行回調邏輯。node
主要會涉及下面這些類apache
在 ZooKeeper 中,接口類 Watcher
用於表示一個標準的事件處理器,其定義了事件通知相關的邏輯,包含 KeeperState
和 EventType
兩個枚舉類,分別表明了通知狀態和事件類型,同時定義了事件的回調方法:process(WatchedEvent event)
。數組
KeeperState | EventType | 觸發條件 | 說明 |
---|---|---|---|
SyncConnected(0) | None(-1) | 客戶端與服務端成功創建鏈接 | 此時客戶端和服務器處於鏈接狀態 |
NodeCreated(1) | Watcher 監聽的對應數據節點被建立 | ||
NodeDeleted(2) | Watcher 監聽的對應數據節點被刪除 | ||
NodeDataChanged(3) | Watcher 監聽的對應數據節點的數據內容發生變動 | ||
NodeChildChanged(4) | Wather 監聽的對應數據節點的子節點列表發生變動 | ||
Disconnected(0) | None(-1) | 客戶端與 ZooKeeper 服務器斷開鏈接 | 此時客戶端和服務器處於斷開鏈接狀態 |
Expired(-112) | Node(-1) | 會話超時 | 此時客戶端會話失效,一般同時也會受到 SessionExpiredException 異常 |
AuthFailed(4) | None(-1) | 一般有兩種狀況。(1)使用錯誤的 schema 進行權限檢查 (2)SASL 權限檢查失敗 | 一般同時也會收到 AuthFailedException 異常 |
process
方法是 Watcher
接口中的一個回調方法,當 ZooKeeper 向客戶端發送一個 Watcher
事件通知時,客戶端就會對相應的 process
方法進行回調,從而實現對事件的處理。promise
org.apache.zookeeper.Watcher#process
服務器
abstract public void process(WatchedEvent event);
在這裏提一下
包含了每個事件的三個基本屬性:通知狀態(WathcerEvent
實體。籠統地講,二者表示的是同一個事物,都是對一個服務端事件的封裝。不一樣的是,WatchedEvent
是一個邏輯事件,用於服務端和客戶端程序執行過程當中所需的邏輯對象,而 WatcherEvent
由於實現了序列化接口,所以能夠用於網絡傳輸。WatchedEventkeeperState
),事件類型(EventType
)和節點路徑(path
)。網絡
org.apache.zookeeper.proto.WatcherEvent
session
public class WatcherEvent implements Record { private int type; private int state; private String path; }
2. 工做機制數據結構
服務端在生成 WatchedEvent
事件以後,會調用 getWrapper
方法將本身包裝成一個可序列化的 WatcherEvent
事件,以便經過網絡傳輸到客戶端。客戶端在接收到服務端的這個事件對象後,首先會將 WatcherEvent
還原成一個 WatchedEvent
事件,並傳遞給 process
方法處理,回調方法 process
根據入參就可以解析出完整的服務端事件了。app
在建立一個 ZooKeeper 客戶端的實例時能夠向構造方法中傳入一個默認的 Watcher:
public ZooKeeper(String connectString,int sessionTimeout,Watcher watcher);
以 org.apache.zookeeper.ZooKeeper#getData(java.lang.String, org.apache.zookeeper.Watcher, org.apache.zookeeper.data.Stat)
爲例:這個 Watcher 將做爲整個 ZooKeeper 會話期間的默認 Watcher,會一直被保存在客戶端 ZKWatchManager
的 defaultWatcher
中。另外,ZooKeeper 客戶端也能夠經過 getData
,getChildren
和 exist
三個接口來向 ZooKeeper 服務器註冊 Watcher,不管使用哪一種方式,註冊 Watcher 的工做原理都是一致的。
public byte[] getData(final String path, Watcher watcher, Stat stat) throws KeeperException, InterruptedException { final String clientPath = path; PathUtils.validatePath(clientPath); // the watch contains the un-chroot path WatchRegistration wcb = null; if (watcher != null) { wcb = new DataWatchRegistration(watcher, clientPath); } final String serverPath = prependChroot(clientPath); RequestHeader h = new RequestHeader(); h.setType(ZooDefs.OpCode.getData); GetDataRequest request = new GetDataRequest(); request.setPath(serverPath); request.setWatch(watcher != null); GetDataResponse response = new GetDataResponse(); ReplyHeader r = cnxn.submitRequest(h, request, response, wcb); if (r.getErr() != 0) { throw KeeperException.create(KeeperException.Code.get(r.getErr()), clientPath); } if (stat != null) { DataTree.copyStat(response.getStat(), stat); } return response.getData(); }
在 ZooKeeper 中,Packet
能夠被看做一個最小的通訊協議單元,用於進行客戶端與服務端之間的網絡傳輸,任何須要傳輸的對象都須要包裝成一個 Packet
對象。所以,在 ClientCnxn
中 WatchRegistration
又會被封裝到 Packet
中,而後放入發送隊列中等待客戶端發送:在向 getData
接口註冊 Watcher 後,客戶端首先會對當前客戶端請求 request
進行標記,將其設置爲 「使用 Watcher 監聽」,同時會封裝一個 Watcher 的註冊信息 WatchRegistration
對象,用於暫時保存數據節點的路徑和 Watcher 的對應關係。
org.apache.zookeeper.ClientCnxn#submitRequest
public ReplyHeader submitRequest(RequestHeader h, Record request, Record response, WatchRegistration watchRegistration) throws InterruptedException { ReplyHeader r = new ReplyHeader(); Packet packet = queuePacket(h, r, request, response, null, null, null, null, watchRegistration); synchronized (packet) { while (!packet.finished) { packet.wait(); } } return r; }
org.apache.zookeeper.ClientCnxn#finishPacket隨後,ZooKeeper 客戶端就會向服務端發送這個請求,同時等待請求的返回。完成請求發送後,會由客戶端 SendThread
線程的 readResponse
方法負責接收來自服務端的響應,finishPacket
方法會從 Packet
中取出對應的 Watcher 並註冊到 ZkWatchManager
中去:
private void finishPacket(Packet p) { if (p.watchRegistration != null) { p.watchRegistration.register(p.replyHeader.getErr()); } if (p.cb == null) { synchronized (p) { p.finished = true; p.notifyAll(); } } else { p.finished = true; eventThread.queuePacket(p); } }
org.apache.zookeeper.ZooKeeper.WatchRegistration#register從上面的內容中,咱們已經瞭解到客戶端已經將 Watcher 暫時封裝在了 WatchRegistration
對象中,如今就須要從這個封裝對象中再次提取出 Watcher 來:
abstract protected Map<String, Set<Watcher>> getWatches(int rc); public void register(int rc) { if (shouldAddWatch(rc)) { Map<String, Set<Watcher>> watches = getWatches(rc); synchronized(watches) { Set<Watcher> watchers = watches.get(clientPath); if (watchers == null) { watchers = new HashSet<Watcher>(); watches.put(clientPath, watchers); } watchers.add(watcher); } } }
org.apache.zookeeper.ZooKeeper.ZKWatchManager
private static class ZKWatchManager implements ClientWatchManager { private final Map<String, Set<Watcher>> dataWatches = new HashMap<String, Set<Watcher>>(); private final Map<String, Set<Watcher>> existWatches = new HashMap<String, Set<Watcher>>(); private final Map<String, Set<Watcher>> childWatches = new HashMap<String, Set<Watcher>>(); private volatile Watcher defaultWatcher; }
在 register
方法中,客戶端會將以前暫時保存的 Watcher 對象轉交給 ZKWatchManager
,並最終保存到 dataWatches
中去。ZKWatchManager.dataWatches
是一個 Map<String, Set<Watcher>>
類型的數據結構,用於將數據節點的路徑和 Watcher 對象進行一一映射後管理起來。
在 Packet.createBB()
中,ZooKeeper 只會將 requestHeader
和 reqeust
兩個屬性進行序列化,也就是說,儘管 WatchResgistration
被封裝在了 Packet
中,可是並無被序列化到底層字節數組中去,所以也就不會進行網絡傳輸了。
服務端收到來自客戶端的請求後,在 org.apache.zookeeper.server.FinalRequestProcessor#processRequest
中會判斷當前請求是否須要註冊 Watcher:
case OpCode.getData: { lastOp = "GETD"; GetDataRequest getDataRequest = new GetDataRequest(); ByteBufferInputStream.byteBuffer2Record(request.request, getDataRequest); DataNode n = zks.getZKDatabase().getNode(getDataRequest.getPath()); if (n == null) { throw new KeeperException.NoNodeException(); } PrepRequestProcessor.checkACL(zks, zks.getZKDatabase().aclForNode(n), ZooDefs.Perms.READ, request.authInfo); Stat stat = new Stat(); byte b[] = zks.getZKDatabase().getData(getDataRequest.getPath(), stat, getDataRequest.getWatch() ? cnxn : null); rsp = new GetDataResponse(b, stat); break; }
數據節點的節點路徑和 ServerCnxn
最終會被存儲在 WatcherManager
的 watchTable
和 watch2Paths
中。WatchManager
是 ZooKeeper 服務端 Watcher 的管理者,其內部管理的 watchTable
和 watch2Pashs
兩個存儲結構,分別從兩個維度對 Watcher 進行存儲。從 getData
請求的處理邏輯中,咱們能夠看到,當 getDataRequest.getWatch()
爲 true 的時候,ZooKeeper 就認爲當前客戶端請求須要進行 Watcher 註冊,因而就會將當前的 ServerCnxn
對象做爲一個 Watcher 連同數據節點路徑傳入 getData
方法中去。注意到,抽象類 ServerCnxn
實現了 Watcher
接口。
watchTable
是從數據節點路徑的粒度來託管 Watcher。watch2Paths
是從 Watcher 的粒度來控制事件觸發須要觸發的數據節點。org.apache.zookeeper.server.WatchManager#addWatch
public synchronized void addWatch(String path, Watcher watcher) { HashSet<Watcher> list = watchTable.get(path); if (list == null) { // don't waste memory if there are few watches on a node // rehash when the 4th entry is added, doubling size thereafter // seems like a good compromise list = new HashSet<Watcher>(4); watchTable.put(path, list); } list.add(watcher); HashSet<String> paths = watch2Paths.get(watcher); if (paths == null) { // cnxns typically have many watches, so use default cap here paths = new HashSet<String>(); watch2Paths.put(watcher, paths); } paths.add(path); }
org.apache.zookeeper.server.DataTree#setData
public Stat setData(String path, byte data[], int version, long zxid, long time) throws KeeperException.NoNodeException { Stat s = new Stat(); DataNode n = nodes.get(path); if (n == null) { throw new KeeperException.NoNodeException(); } byte lastdata[] = null; synchronized (n) { lastdata = n.data; n.data = data; n.stat.setMtime(time); n.stat.setMzxid(zxid); n.stat.setVersion(version); n.copyStat(s); } // now update if the path is in a quota subtree. String lastPrefix; if((lastPrefix = getMaxPrefixWithQuota(path)) != null) { this.updateBytes(lastPrefix, (data == null ? 0 : data.length) - (lastdata == null ? 0 : lastdata.length)); } dataWatches.triggerWatch(path, EventType.NodeDataChanged); return s; }
在對指定節點進行數據更新後,經過調用 org.apache.zookeeper.server.WatchManager#triggerWatch
方法來觸發相關的事件:
public Set<Watcher> triggerWatch(String path, EventType type, Set<Watcher> supress) { WatchedEvent e = new WatchedEvent(type, KeeperState.SyncConnected, path); HashSet<Watcher> watchers; synchronized (this) { watchers = watchTable.remove(path); if (watchers == null || watchers.isEmpty()) { return null; } for (Watcher w : watchers) { HashSet<String> paths = watch2Paths.get(w); if (paths != null) { paths.remove(path); } } } for (Watcher w : watchers) { if (supress != null && supress.contains(w)) { continue; } w.process(e); } return watchers; }
不管是 dataWatches
仍是 childWatches
管理器,Watcher 的觸發邏輯都是一致的,基本步驟以下。
封裝 WatchedEvent
。
首先將通知狀態(KeeperState
)、事件類型(EventType
)以及節點路徑(Path
)封裝成一個 WatchedEvent
對象。
查詢 Watcher。
根據數據節點的節點路徑從 watchTable
中取出對應的 Watcher。若是沒有找到 Watcher,說明沒有任何客戶端在該數據節點上註冊過 Watcher,直接退出。而若是找到了這個 Watcher,會將其提取出來,同時會直接從 watchTable
和 watch2Paths
中將其刪除——從這裏咱們也能夠看出,Watcher 在服務端是一次性的,即觸發一次就失效了。
調用 process
方法來觸發 Watcher。
在這一步中,會逐個依次地調用從步驟2中找出的全部 Watcher 的 process
方法。這裏的 process
方法,事實上就是 ServerCnxn
的對應方法:
org.apache.zookeeper.server.NIOServerCnxn#process
@Override synchronized public void process(WatchedEvent event) { ReplyHeader h = new ReplyHeader(-1, -1L, 0); // Convert WatchedEvent to a type that can be sent over the wire WatcherEvent e = event.getWrapper(); sendResponse(h, e, "notification"); }
在 process
方法中,主要邏輯以下。
WawtchedEvent
包裝成 WatcherEvent
,以便進行網絡傳輸序列化。對於一個來自服務端的響應,客戶端都是由 org.apache.zookeeper.ClientCnxn.SendThread#readResponse
方法來統一進行處理的,若是響應頭 replyHdr
中標識了 XID 爲 -1,代表這是一個通知類型的響應。
if (replyHdr.getXid() == -1) { // -1 means notification WatcherEvent event = new WatcherEvent(); event.deserialize(bbia, "response"); // convert from a server path to a client path if (chrootPath != null) { String serverPath = event.getPath(); if(serverPath.compareTo(chrootPath)==0) event.setPath("/"); else if (serverPath.length() > chrootPath.length()) event.setPath(serverPath.substring(chrootPath.length())); else { LOG.warn("Got server path " + event.getPath() + " which is too short for chroot path " + chrootPath); } } WatchedEvent we = new WatchedEvent(event); eventThread.queueEvent( we ); return; }
處理過程大致上分爲如下 4 個主要步驟:
反序列化。
將字節流轉換成 WatcherEvent
對象。
處理 chrootPath。
若是客戶端設置了 chrootPath 屬性,那麼須要對服務端傳過來的完整的節點路徑進行 chrootPath
處理,生成客戶端的一個相對節點路徑。
還原 WatchedEvent
。
將 WatcherEvent
對象轉換成 WatchedEvent
。
回調 Watcher。
將 WatchedEvent
對象交給 EventThread
線程,在下一個輪詢週期中進行 Watcher 回調。
SendThread
接收到服務端的通知事件後,會經過調用 EventThread.queueEvent
方法將事件傳給 EventThread
線程,其邏輯以下:
org.apache.zookeeper.ClientCnxn.EventThread#queueEvent
public void queueEvent(WatchedEvent event) { if (event.getType() == EventType.None && sessionState == event.getState()) { return; } sessionState = event.getState(); // materialize the watchers based on the event WatcherSetEventPair pair = new WatcherSetEventPair( watcher.materialize(event.getState(), event.getType(),event.getPath()), event); // queue the pair (watch set & event) for later processing waitingEvents.add(pair); }
queueEvent
方法首先會根據該通知事件,從 ZKWatchManager
中取出全部相關的 Watcher:
@Override public Set<Watcher> materialize(Watcher.Event.KeeperState state, Watcher.Event.EventType type, String clientPath) { Set<Watcher> result = new HashSet<Watcher>(); switch (type) { // ... case NodeDataChanged: case NodeCreated: synchronized (dataWatches) { addTo(dataWatches.remove(clientPath), result); } synchronized (existWatches) { addTo(existWatches.remove(clientPath), result); } break; // ... } return result; } }
客戶端在識別出事件類型 EventType
後,會從相應的 Watcher 存儲(即 dataWatches
,existWatches
或 childWatches
中的一個或多個)中去除對應的 Watcher。注意,此處使用的是 remove
接口,所以也代表了客戶端的 Watcher 機制一樣也是一次性的,即一旦被觸發後,該 Watcher 就失效了。
獲取到相關的全部 Watcher 後,會將其放入 waitingEvents
這個隊列中去。WaitingEvents
是一個待處理 Watcher 隊列,EventThread
的 run
方法會不斷對該隊列進行處理。EventThread
線程每次都會從 waitingEvents
隊列中取出一個 Watcher,並進行串行同步處理。注意,此處 processEvent
方法中的 Watcher
纔是以前客戶端真正註冊的 Watcher,調用其 process
方法就能夠實現 Watcher 的回調了。
Watch是一次性的,每次都須要從新註冊,而且客戶端在會話異常結束時不會收到任何通知,而快速重鏈接時仍不影響接收通知。
Watch的回調執行都是順序執行的,而且客戶端在沒有收到關注數據的變化事件通知以前是不會看到最新的數據,另外須要注意不要在Watch回調邏輯中阻塞整個客戶端的Watch回調。
Watch是輕量級的,WatchEvent是最小的通訊單元,結構上只包含通知狀態、事件類型和節點路徑。ZooKeeper服務端只會通知客戶端發生了什麼,並不會告訴具體內容。