ZooKeeper 容許客戶端向服務端註冊一個 Watcher 監聽,當服務端的一些指定事件觸發了這個 Watcher,那麼就會向指定客戶端發送一個事件通知來實現分佈式的通知功能。java
ZooKeeper 的 Watcher 機制主要包括客戶端線程、客戶端 WatchManager 和 ZooKeeper 服務器三部分。在具體工做流程上,簡單地講,客戶端在向 ZooKeeper 服務器註冊 Watcher 的同時,會將 Watcher 對象存儲在客戶端的 WatchManager 中。當 ZooKeeper 服務器端觸發 Watcher 事件後,會向客戶端發送通知,客戶端線程從 WatchManager 中取出對應的 Watcher 對象來執行回調邏輯。node
在 ZooKeeper 中,接口類 Watcher
用於表示一個標準的事件處理器,其定義了事件通知相關的邏輯,包含 KeeperState
和 EventType
兩個枚舉類,分別表明了通知狀態和事件類型,同時定義了事件的回調方法:process(WatchedEvent event)
KeeperState | EventType | 觸發條件 | 說明 |
SyncConnected(0) | None(-1) | 客戶端與服務端成功創建鏈接 | 此時客戶端和服務器處於鏈接狀態 |
NodeCreated(1) | Watcher 監聽的對應數據節點被建立 | ||
NodeDeleted(2) | Watcher 監聽的對應數據節點被刪除 | ||
NodeDataChanged(3) | Watcher 監聽的對應數據節點的數據內容發生變動 | ||
NodeChildChanged(4) | Wather 監聽的對應數據節點的子節點列表發生變動 | ||
Disconnected(0) | None(-1) | 客戶端與 ZooKeeper 服務器斷開鏈接 | 此時客戶端和服務器處於斷開鏈接狀態 |
Expired(-112) | Node(-1) | 會話超時 | 此時客戶端會話失效,一般同時也會受到 SessionExpiredException 異常 |
AuthFailed(4) | None(-1) | 一般有兩種狀況。(1)使用錯誤的 schema 進行權限檢查 (2)SASL 權限檢查失敗 | 一般同時也會收到 AuthFailedException 異常 |
方法是 Watcher
接口中的一個回調方法,當 ZooKeeper 向客戶端發送一個 Watcher
事件通知時,客戶端就會對相應的 process
abstract public void process(WatchedEvent event);
是一個邏輯事件,用於服務端和客戶端程序執行過程當中所需的邏輯對象,而 WatcherEvent
public class WatcherEvent implements Record { private int type; private int state; private String path; }
2. 工做機制數據結構
服務端在生成 WatchedEvent
事件以後,會調用 getWrapper
方法將本身包裝成一個可序列化的 WatcherEvent
事件,以便經過網絡傳輸到客戶端。客戶端在接收到服務端的這個事件對象後,首先會將 WatcherEvent
還原成一個 WatchedEvent
事件,並傳遞給 process
方法處理,回調方法 process
在建立一個 ZooKeeper 客戶端的實例時能夠向構造方法中傳入一個默認的 Watcher:
public ZooKeeper(String connectString,int sessionTimeout,Watcher watcher);
以 org.apache.zookeeper.ZooKeeper#getData(java.lang.String, org.apache.zookeeper.Watcher, org.apache.zookeeper.data.Stat)
爲例:這個 Watcher 將做爲整個 ZooKeeper 會話期間的默認 Watcher,會一直被保存在客戶端 ZKWatchManager
的 defaultWatcher
中。另外,ZooKeeper 客戶端也能夠經過 getData
和 exist
三個接口來向 ZooKeeper 服務器註冊 Watcher,不管使用哪一種方式,註冊 Watcher 的工做原理都是一致的。
public byte[] getData(final String path, Watcher watcher, Stat stat) throws KeeperException, InterruptedException { final String clientPath = path; PathUtils.validatePath(clientPath); // the watch contains the un-chroot path WatchRegistration wcb = null; if (watcher != null) { wcb = new DataWatchRegistration(watcher, clientPath); } final String serverPath = prependChroot(clientPath); RequestHeader h = new RequestHeader(); h.setType(ZooDefs.OpCode.getData); GetDataRequest request = new GetDataRequest(); request.setPath(serverPath); request.setWatch(watcher != null); GetDataResponse response = new GetDataResponse(); ReplyHeader r = cnxn.submitRequest(h, request, response, wcb); if (r.getErr() != 0) { throw KeeperException.create(KeeperException.Code.get(r.getErr()), clientPath); } if (stat != null) { DataTree.copyStat(response.getStat(), stat); } return response.getData(); }
在 ZooKeeper 中,Packet
能夠被看做一個最小的通訊協議單元,用於進行客戶端與服務端之間的網絡傳輸,任何須要傳輸的對象都須要包裝成一個 Packet
對象。所以,在 ClientCnxn
中 WatchRegistration
又會被封裝到 Packet
中,而後放入發送隊列中等待客戶端發送:在向 getData
接口註冊 Watcher 後,客戶端首先會對當前客戶端請求 request
進行標記,將其設置爲 「使用 Watcher 監聽」,同時會封裝一個 Watcher 的註冊信息 WatchRegistration
對象,用於暫時保存數據節點的路徑和 Watcher 的對應關係。
public ReplyHeader submitRequest(RequestHeader h, Record request, Record response, WatchRegistration watchRegistration) throws InterruptedException { ReplyHeader r = new ReplyHeader(); Packet packet = queuePacket(h, r, request, response, null, null, null, null, watchRegistration); synchronized (packet) { while (!packet.finished) { packet.wait(); } } return r; }
org.apache.zookeeper.ClientCnxn#finishPacket隨後,ZooKeeper 客戶端就會向服務端發送這個請求,同時等待請求的返回。完成請求發送後,會由客戶端 SendThread
線程的 readResponse
方法會從 Packet
中取出對應的 Watcher 並註冊到 ZkWatchManager
private void finishPacket(Packet p) { if (p.watchRegistration != null) { p.watchRegistration.register(p.replyHeader.getErr()); } if (p.cb == null) { synchronized (p) { p.finished = true; p.notifyAll(); } } else { p.finished = true; eventThread.queuePacket(p); } }
org.apache.zookeeper.ZooKeeper.WatchRegistration#register從上面的內容中,咱們已經瞭解到客戶端已經將 Watcher 暫時封裝在了 WatchRegistration
對象中,如今就須要從這個封裝對象中再次提取出 Watcher 來:
abstract protected Map<String, Set<Watcher>> getWatches(int rc); public void register(int rc) { if (shouldAddWatch(rc)) { Map<String, Set<Watcher>> watches = getWatches(rc); synchronized(watches) { Set<Watcher> watchers = watches.get(clientPath); if (watchers == null) { watchers = new HashSet<Watcher>(); watches.put(clientPath, watchers); } watchers.add(watcher); } } }
private static class ZKWatchManager implements ClientWatchManager { private final Map<String, Set<Watcher>> dataWatches = new HashMap<String, Set<Watcher>>(); private final Map<String, Set<Watcher>> existWatches = new HashMap<String, Set<Watcher>>(); private final Map<String, Set<Watcher>> childWatches = new HashMap<String, Set<Watcher>>(); private volatile Watcher defaultWatcher; }
在 register
方法中,客戶端會將以前暫時保存的 Watcher 對象轉交給 ZKWatchManager
,並最終保存到 dataWatches
是一個 Map<String, Set<Watcher>>
類型的數據結構,用於將數據節點的路徑和 Watcher 對象進行一一映射後管理起來。
在 Packet.createBB()
中,ZooKeeper 只會將 requestHeader
和 reqeust
兩個屬性進行序列化,也就是說,儘管 WatchResgistration
被封裝在了 Packet
服務端收到來自客戶端的請求後,在 org.apache.zookeeper.server.FinalRequestProcessor#processRequest
中會判斷當前請求是否須要註冊 Watcher:
case OpCode.getData: { lastOp = "GETD"; GetDataRequest getDataRequest = new GetDataRequest(); ByteBufferInputStream.byteBuffer2Record(request.request, getDataRequest); DataNode n = zks.getZKDatabase().getNode(getDataRequest.getPath()); if (n == null) { throw new KeeperException.NoNodeException(); } PrepRequestProcessor.checkACL(zks, zks.getZKDatabase().aclForNode(n), ZooDefs.Perms.READ, request.authInfo); Stat stat = new Stat(); byte b[] = zks.getZKDatabase().getData(getDataRequest.getPath(), stat, getDataRequest.getWatch() ? cnxn : null); rsp = new GetDataResponse(b, stat); break; }
數據節點的節點路徑和 ServerCnxn
最終會被存儲在 WatcherManager
的 watchTable
和 watch2Paths
是 ZooKeeper 服務端 Watcher 的管理者,其內部管理的 watchTable
和 watch2Pashs
兩個存儲結構,分別從兩個維度對 Watcher 進行存儲。從 getData
請求的處理邏輯中,咱們能夠看到,當 getDataRequest.getWatch()
爲 true 的時候,ZooKeeper 就認爲當前客戶端請求須要進行 Watcher 註冊,因而就會將當前的 ServerCnxn
對象做爲一個 Watcher 連同數據節點路徑傳入 getData
方法中去。注意到,抽象類 ServerCnxn
實現了 Watcher
是從數據節點路徑的粒度來託管 Watcher。watch2Paths
是從 Watcher 的粒度來控制事件觸發須要觸發的數據節點。org.apache.zookeeper.server.WatchManager#addWatch
public synchronized void addWatch(String path, Watcher watcher) { HashSet<Watcher> list = watchTable.get(path); if (list == null) { // don't waste memory if there are few watches on a node // rehash when the 4th entry is added, doubling size thereafter // seems like a good compromise list = new HashSet<Watcher>(4); watchTable.put(path, list); } list.add(watcher); HashSet<String> paths = watch2Paths.get(watcher); if (paths == null) { // cnxns typically have many watches, so use default cap here paths = new HashSet<String>(); watch2Paths.put(watcher, paths); } paths.add(path); }
public Stat setData(String path, byte data[], int version, long zxid, long time) throws KeeperException.NoNodeException { Stat s = new Stat(); DataNode n = nodes.get(path); if (n == null) { throw new KeeperException.NoNodeException(); } byte lastdata[] = null; synchronized (n) { lastdata = n.data; n.data = data; n.stat.setMtime(time); n.stat.setMzxid(zxid); n.stat.setVersion(version); n.copyStat(s); } // now update if the path is in a quota subtree. String lastPrefix; if((lastPrefix = getMaxPrefixWithQuota(path)) != null) { this.updateBytes(lastPrefix, (data == null ? 0 : data.length) - (lastdata == null ? 0 : lastdata.length)); } dataWatches.triggerWatch(path, EventType.NodeDataChanged); return s; }
在對指定節點進行數據更新後,經過調用 org.apache.zookeeper.server.WatchManager#triggerWatch
public Set<Watcher> triggerWatch(String path, EventType type, Set<Watcher> supress) { WatchedEvent e = new WatchedEvent(type, KeeperState.SyncConnected, path); HashSet<Watcher> watchers; synchronized (this) { watchers = watchTable.remove(path); if (watchers == null || watchers.isEmpty()) { return null; } for (Watcher w : watchers) { HashSet<String> paths = watch2Paths.get(w); if (paths != null) { paths.remove(path); } } } for (Watcher w : watchers) { if (supress != null && supress.contains(w)) { continue; } w.process(e); } return watchers; }
不管是 dataWatches
仍是 childWatches
管理器,Watcher 的觸發邏輯都是一致的,基本步驟以下。
封裝 WatchedEvent
)封裝成一個 WatchedEvent
查詢 Watcher。
根據數據節點的節點路徑從 watchTable
中取出對應的 Watcher。若是沒有找到 Watcher,說明沒有任何客戶端在該數據節點上註冊過 Watcher,直接退出。而若是找到了這個 Watcher,會將其提取出來,同時會直接從 watchTable
和 watch2Paths
中將其刪除——從這裏咱們也能夠看出,Watcher 在服務端是一次性的,即觸發一次就失效了。
調用 process
方法來觸發 Watcher。
在這一步中,會逐個依次地調用從步驟2中找出的全部 Watcher 的 process
方法。這裏的 process
方法,事實上就是 ServerCnxn
@Override synchronized public void process(WatchedEvent event) { ReplyHeader h = new ReplyHeader(-1, -1L, 0); // Convert WatchedEvent to a type that can be sent over the wire WatcherEvent e = event.getWrapper(); sendResponse(h, e, "notification"); }
在 process
包裝成 WatcherEvent
,以便進行網絡傳輸序列化。對於一個來自服務端的響應,客戶端都是由 org.apache.zookeeper.ClientCnxn.SendThread#readResponse
方法來統一進行處理的,若是響應頭 replyHdr
中標識了 XID 爲 -1,代表這是一個通知類型的響應。
if (replyHdr.getXid() == -1) { // -1 means notification WatcherEvent event = new WatcherEvent(); event.deserialize(bbia, "response"); // convert from a server path to a client path if (chrootPath != null) { String serverPath = event.getPath(); if(serverPath.compareTo(chrootPath)==0) event.setPath("/"); else if (serverPath.length() > chrootPath.length()) event.setPath(serverPath.substring(chrootPath.length())); else { LOG.warn("Got server path " + event.getPath() + " which is too short for chroot path " + chrootPath); } } WatchedEvent we = new WatchedEvent(event); eventThread.queueEvent( we ); return; }
處理過程大致上分爲如下 4 個主要步驟:
將字節流轉換成 WatcherEvent
處理 chrootPath。
若是客戶端設置了 chrootPath 屬性,那麼須要對服務端傳過來的完整的節點路徑進行 chrootPath
還原 WatchedEvent
將 WatcherEvent
對象轉換成 WatchedEvent
回調 Watcher。
將 WatchedEvent
對象交給 EventThread
線程,在下一個輪詢週期中進行 Watcher 回調。
接收到服務端的通知事件後,會經過調用 EventThread.queueEvent
方法將事件傳給 EventThread
public void queueEvent(WatchedEvent event) { if (event.getType() == EventType.None && sessionState == event.getState()) { return; } sessionState = event.getState(); // materialize the watchers based on the event WatcherSetEventPair pair = new WatcherSetEventPair( watcher.materialize(event.getState(), event.getType(),event.getPath()), event); // queue the pair (watch set & event) for later processing waitingEvents.add(pair); }
方法首先會根據該通知事件,從 ZKWatchManager
中取出全部相關的 Watcher:
@Override public Set<Watcher> materialize(Watcher.Event.KeeperState state, Watcher.Event.EventType type, String clientPath) { Set<Watcher> result = new HashSet<Watcher>(); switch (type) { // ... case NodeDataChanged: case NodeCreated: synchronized (dataWatches) { addTo(dataWatches.remove(clientPath), result); } synchronized (existWatches) { addTo(existWatches.remove(clientPath), result); } break; // ... } return result; } }
客戶端在識別出事件類型 EventType
後,會從相應的 Watcher 存儲(即 dataWatches
或 childWatches
中的一個或多個)中去除對應的 Watcher。注意,此處使用的是 remove
接口,所以也代表了客戶端的 Watcher 機制一樣也是一次性的,即一旦被觸發後,該 Watcher 就失效了。
獲取到相關的全部 Watcher 後,會將其放入 waitingEvents
是一個待處理 Watcher 隊列,EventThread
的 run
線程每次都會從 waitingEvents
隊列中取出一個 Watcher,並進行串行同步處理。注意,此處 processEvent
方法中的 Watcher
纔是以前客戶端真正註冊的 Watcher,調用其 process
方法就能夠實現 Watcher 的回調了。