客戶端如何用Watcher在以前已經說過了,這裏主要說說服務端的實現機制。node
一、服務端this
ZooKeeperServer接收到getData請求後,首先processPacket方法會被調用,咱們先看看該方法中和Watcher機制相關的代碼邏輯。spa
public void processPacket(ServerCnxn cnxn, ByteBuffer incomingBuffer) { submitRequest(si); } 調用了submitRequest方法處理getData這個Request。再往下: public void submitRequest(Request si) { firstProcessor.processRequest(si); }
Request進入到服務端的調用鏈,咱們假設此次是Leader做爲服務端,則進入LeaderRequestProcessor並沿着調用鏈最終走到FinalRequestProcessor。這裏咱們直接找到和Watcher相關的代碼邏輯。線程
最後咱們發如今FinalRequestProcessor中和Watcher相關的代碼:code
public void processRequest(Request request) { case OpCode.getData: { lastOp = "GETD"; GetDataRequest getDataRequest = new GetDataRequest(); ByteBufferInputStream.byteBuffer2Record(request.request, getDataRequest); DataNode n = zks.getZKDatabase().getNode(getDataRequest.getPath()); if (n == null) { throw new KeeperException.NoNodeException(); } Stat stat = new Stat(); byte b[] = zks.getZKDatabase().getData(getDataRequest.getPath(), stat, getDataRequest.getWatch() ? cnxn : null); rsp = new GetDataResponse(b, stat); break; } }
調用到ZKDatabase的getData來處理客戶端請求,在參數中也指明瞭是否設置了Watcher。若是客戶端設置了Watcher則把cnxn做爲Watcher傳給ZKDatabase。Cnzx的定義爲:server
ServerCnxn cnxn = request.cnxn;接口
咱們等一會再來看看這個ServerCnxn是怎麼做爲Watcher機制的,先看看ZKDatabase的getData方法:隊列
public byte[] getData(String path, Stat stat, Watcher watcher){ return dataTree.getData(path, stat, watcher); }
走到了DataTree的getData方法:事件
public byte[] getData(String path, Stat stat, Watcher watcher) throws KeeperException.NoNodeException { DataNode n = nodes.get(path); if (n == null) { throw new KeeperException.NoNodeException(); } synchronized (n) { n.copyStat(stat); if (watcher != null) { dataWatches.addWatch(path, watcher); } return n.data; } }
在這裏咱們看到將Watcher添加到了dataWatches列表中,該列表定義以下:get
private final WatchManager dataWatches = new WatchManager();
二、WatchManager
WatchManager經過addWatch方法添加新的Watcher接口都監視列表中,代碼:
synchronized void addWatch(String path, Watcher watcher) { HashSet<Watcher> list = watchTable.get(path); if (list == null) { list = new HashSet<Watcher>(4); watchTable.put(path, list); } list.add(watcher); HashSet<String> paths = watch2Paths.get(watcher); if (paths == null) { // cnxns typically have many watches, so use default cap here paths = new HashSet<String>(); watch2Paths.put(watcher, paths); } paths.add(path); }
將Watcher記錄到本地列表中,Watcher和path關聯起來了,那麼這些Watcher在何時被觸發呢?
Watcher是經過triggerWatch方法被觸發的。
Set<Watcher> triggerWatch(String path, EventType type, Set<Watcher> supress) { WatchedEvent e = new WatchedEvent(type, KeeperState.SyncConnected, path); HashSet<Watcher> watchers; for (Watcher w : watchers) { if (supress != null && supress.contains(w)) { continue; } w.process(e); } return watchers; }
具體在這裏就是回調NettyServerCnxn的process方法,這裏再提醒一下,NettyServerCnxn是實現了Watcher接口的。
Public Interface Watcher{ public abstract void process(WatchedEvent event); }
那麼triggerWatch在哪裏被觸發呢?在getData這個例子裏,triggerWatch在setData中被觸發。setData方法是設置數據的,設置數據時觸發getData的Watcher。
public Stat setData(String path, byte data[], int version, long zxid, long time) throws KeeperException.NoNodeException { Stat s = new Stat(); DataNode n = nodes.get(path); if (n == null) { throw new KeeperException.NoNodeException(); } byte lastdata[] = null; synchronized (n) { lastdata = n.data; n.data = data; n.stat.setMtime(time); n.stat.setMzxid(zxid); n.stat.setVersion(version); n.copyStat(s); } String lastPrefix = getMaxPrefixWithQuota(path); if(lastPrefix != null) { this.updateBytes(lastPrefix, (data == null ? 0 : data.length) - (lastdata == null ? 0 : lastdata.length)); } dataWatches.triggerWatch(path, EventType.NodeDataChanged); return s; }
三、ClientCnxn
客戶端收到WatchedEvent事件,在ClientCnxn的SendThread中,咱們又回到readResponse方法。
void readResponse(ByteBuffer incomingBuffer) throws IOException { if (replyHdr.getXid() == -1) { // -1 means notification WatcherEvent event = new WatcherEvent(); event.deserialize(bbia, "response"); if (chrootPath != null) { String serverPath = event.getPath(); if(serverPath.compareTo(chrootPath)==0) event.setPath("/"); else if (serverPath.length() > chrootPath.length()) event.setPath(serverPath.substring(chrootPath.length())); else { LOG.warn("Got server path " + event.getPath()); } } WatchedEvent we = new WatchedEvent(event); eventThread.queueEvent( we ); return } }
將接收到的WatchedEvent事件放入eventThread線程的等待隊列中,等待處理。
EventThread的主方法:
private void processEvent(Object event) { if (event instanceof WatcherSetEventPair) { // each watcher will process the event WatcherSetEventPair pair = (WatcherSetEventPair) event; for (Watcher watcher : pair.watchers) { try { watcher.process(pair.event); } catch (Throwable t) { LOG.error("Error while calling watcher ", t); } } } }
至此客戶端在調用getData方法時綁定的Watcher被調用,process方法被執行。