public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher) public byte[] getData(String path, Watcher watcher, Stat stat) public List<String> getChildren(String path, Watcher watcher) public Stat exists(String path, Watcher watcher)
public byte[] getData(String path, Watcher watcher, Stat stat) throws KeeperException, InterruptedException { PathUtils.validatePath(path); // 封裝一個WatcherRegistration的對象,保存節點路徑和Watcher的對應關係 ZooKeeper.WatchRegistration wcb = null; if (watcher != null) { wcb = new ZooKeeper.DataWatchRegistration(watcher, path); } String serverPath = this.prependChroot(path); RequestHeader h = new RequestHeader(); h.setType(4); GetDataRequest request = new GetDataRequest(); request.setPath(serverPath); // 標記是否有watcher request.setWatch(watcher != null); GetDataResponse response = new GetDataResponse(); ReplyHeader r = this.cnxn.submitRequest(h, request, response, wcb); if (r.getErr() != 0) { throw KeeperException.create(Code.get(r.getErr()), path); } else { if (stat != null) { DataTree.copyStat(response.getStat(), stat); } return response.getData(); } } class DataWatchRegistration extends ZooKeeper.WatchRegistration { // 保存節點路徑和Watcher的關係 public DataWatchRegistration(Watcher watcher, String clientPath) { super(watcher, clientPath); } ... } abstract class WatchRegistration { private Watcher watcher; private String clientPath; public WatchRegistration(Watcher watcher, String clientPath) { this.watcher = watcher; this.clientPath = clientPath; } ... }
public ReplyHeader submitRequest(RequestHeader h, Record request, Record response, WatchRegistration watchRegistration) throws InterruptedException { ReplyHeader r = new ReplyHeader(); // 客戶端與服務端的網絡傳輸 ClientCnxn.Packet packet = this.queuePacket(h, r, request, response, (AsyncCallback)null, (String)null, (String)null, (Object)null, watchRegistration); synchronized(packet) { while(!packet.finished) { packet.wait(); } return r; } } ClientCnxn.Packet queuePacket(RequestHeader h, ReplyHeader r, Record request, Record response, AsyncCallback cb, String clientPath, String serverPath, Object ctx, WatchRegistration watchRegistration) { ClientCnxn.Packet packet = null; LinkedList var11 = this.outgoingQueue; synchronized(this.outgoingQueue) { // 任何傳輸的對象都包裝成Packet對象 packet = new ClientCnxn.Packet(h, r, request, response, watchRegistration); packet.cb = cb; packet.ctx = ctx; packet.clientPath = clientPath; packet.serverPath = serverPath; if (this.state.isAlive() && !this.closing) { if (h.getType() == -11) { this.closing = true; } // 放入發送隊列中,等待發送 this.outgoingQueue.add(packet); } else { this.conLossPacket(packet); } } this.sendThread.getClientCnxnSocket().wakeupCnxn(); return packet; }
void doIO(List<Packet> pendingQueue, LinkedList<Packet> outgoingQueue, ClientCnxn cnxn) throws InterruptedException, IOException { SocketChannel sock = (SocketChannel)this.sockKey.channel(); if (sock == null) { throw new IOException("Socket is null!"); } else { // 是否可讀 if (this.sockKey.isReadable()) { ... } if (this.sockKey.isWritable()) { synchronized(outgoingQueue) { Packet p = this.findSendablePacket(outgoingQueue, cnxn.sendThread.clientTunneledAuthenticationInProgress()); if (p != null) { this.updateLastSend(); if (p.bb == null) { if (p.requestHeader != null && p.requestHeader.getType() != 11 && p.requestHeader.getType() != 100) { p.requestHeader.setXid(cnxn.getXid()); } // 序列化 p.createBB(); } sock.write(p.bb); ... } ... } } } } public void createBB() { try { ByteArrayOutputStream baos = new ByteArrayOutputStream(); BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos); boa.writeInt(-1, "len"); // 序列化header if (this.requestHeader != null) { this.requestHeader.serialize(boa, "header"); } if (this.request instanceof ConnectRequest) { this.request.serialize(boa, "connect"); boa.writeBool(this.readOnly, "readOnly"); // 序列化request } else if (this.request != null) { this.request.serialize(boa, "request"); } baos.close(); this.bb = ByteBuffer.wrap(baos.toByteArray()); this.bb.putInt(this.bb.capacity() - 4); this.bb.rewind(); } catch (IOException var3) { ClientCnxn.LOG.warn("Ignoring unexpected exception", var3); } }
public void processRequest(Request request) { ... PrepRequestProcessor.checkACL(this.zks, this.zks.getZKDatabase().convertLong(aclG), 1, request.authInfo); Stat stat = new Stat(); // 這裏根據客戶端設置的是否有watch變量來傳入watcher對象 // 若是true則將當前的ServerCnxn傳入(ServerCnxn表明客戶端和服務端的鏈接) byte[] b = this.zks.getZKDatabase().getData(getDataRequest.getPath(), stat, getDataRequest.getWatch() ? cnxn : null); rsp = new GetDataResponse(b, stat); ... } public byte[] getData(String path, Stat stat, Watcher watcher) throws NoNodeException { return this.dataTree.getData(path, stat, watcher); }
public byte[] getData(String path, Stat stat, Watcher watcher) throws NoNodeException { DataNode n = (DataNode)this.nodes.get(path); if (n == null) { throw new NoNodeException(); } else { synchronized(n) { n.copyStat(stat); if (watcher != null) { // 添加watcher this.dataWatches.addWatch(path, watcher); } return n.data; } } } public synchronized void addWatch(String path, Watcher watcher) { HashSet<Watcher> list = (HashSet)this.watchTable.get(path); if (list == null) { list = new HashSet(4); this.watchTable.put(path, list); } list.add(watcher); HashSet<String> paths = (HashSet)this.watch2Paths.get(watcher); if (paths == null) { paths = new HashSet(); this.watch2Paths.put(watcher, paths); } paths.add(path); } // 路徑維度 private final HashMap<String, HashSet<Watcher>> watchTable = new HashMap(); // Watcher維度 private final HashMap<Watcher, HashSet<String>> watch2Paths = new HashMap();
private void finishPacket(ClientCnxn.Packet p) { // 客戶端註冊wathcer if (p.watchRegistration != null) { p.watchRegistration.register(p.replyHeader.getErr()); } if (p.cb == null) { synchronized(p) { p.finished = true; p.notifyAll(); } } else { p.finished = true; this.eventThread.queuePacket(p); } } public void register(int rc) { if (this.shouldAddWatch(rc)) { Map<String, Set<Watcher>> watches = this.getWatches(rc); synchronized(watches) { // 根據路徑拿到 Set<Watcher> watchers = (Set)watches.get(this.clientPath); if (watchers == null) { watchers = new HashSet(); watches.put(this.clientPath, watchers); } ((Set)watchers).add(this.watcher); } } }
public Stat setData(String path, byte[] data, int version, long zxid, long time) throws NoNodeException { Stat s = new Stat(); DataNode n = (DataNode)this.nodes.get(path); if (n == null) { throw new NoNodeException(); } else { byte[] lastdata = null; byte[] lastdata; // 賦值node synchronized(n) { lastdata = n.data; n.data = data; n.stat.setMtime(time); n.stat.setMzxid(zxid); n.stat.setVersion(version); n.copyStat(s); } String lastPrefix; if ((lastPrefix = this.getMaxPrefixWithQuota(path)) != null) { this.updateBytes(lastPrefix, (long)((data == null ? 0 : data.length) - (lastdata == null ? 0 : lastdata.length))); } // 觸發watcher this.dataWatches.triggerWatch(path, EventType.NodeDataChanged); return s; } }
public Set<Watcher> triggerWatch(String path, EventType type) { return this.triggerWatch(path, type, (Set)null); } public Set<Watcher> triggerWatch(String path, EventType type, Set<Watcher> supress) { WatchedEvent e = new WatchedEvent(type, KeeperState.SyncConnected, path); HashSet watchers; // 這個同步代碼塊主要作的就是從watchTable和watch2Paths中移除該路徑的watcher synchronized(this) { watchers = (HashSet)this.watchTable.remove(path); if (watchers == null || watchers.isEmpty()) { if (LOG.isTraceEnabled()) { ZooTrace.logTraceMessage(LOG, 64L, "No watchers for " + path); } return null; } Iterator i$ = watchers.iterator(); while(i$.hasNext()) { Watcher w = (Watcher)i$.next(); HashSet<String> paths = (HashSet)this.watch2Paths.get(w); if (paths != null) { paths.remove(path); } } } Iterator i$ = watchers.iterator(); while(true) { Watcher w; do { if (!i$.hasNext()) { return watchers; } w = (Watcher)i$.next(); } while(supress != null && supress.contains(w)); // watcher調用,這裏的e對象裏只有通知狀態(KeeperState)、事件類型(EventType)以及節點路徑(Path) // 沒有修改事後的新值也沒有老的值 w.process(e); } }
public synchronized void process(WatchedEvent event) {
// 請求頭標記-1,代表是通知 ReplyHeader h = new ReplyHeader(-1, -1L, 0); if (LOG.isTraceEnabled()) { ZooTrace.logTraceMessage(LOG, 64L, "Deliver event " + event + " to 0x" + Long.toHexString(this.sessionId) + " through " + this); } WatcherEvent e = event.getWrapper(); // 發送通知 this.sendResponse(h, e, "notification"); }
void readResponse(ByteBuffer incomingBuffer) throws IOException { ... // 若是是通知 } else if (replyHdr.getXid() == -1) { if (ClientCnxn.LOG.isDebugEnabled()) { ClientCnxn.LOG.debug("Got notification sessionid:0x" + Long.toHexString(ClientCnxn.this.sessionId)); } // 反序列化 WatcherEvent event = new WatcherEvent(); event.deserialize(bbia, "response"); if (ClientCnxn.this.chrootPath != null) { String serverPath = event.getPath(); if (serverPath.compareTo(ClientCnxn.this.chrootPath) == 0) { event.setPath("/"); } else if (serverPath.length() > ClientCnxn.this.chrootPath.length()) { event.setPath(serverPath.substring(ClientCnxn.this.chrootPath.length())); } else { ClientCnxn.LOG.warn("Got server path " + event.getPath() + " which is too short for chroot path " + ClientCnxn.this.chrootPath); } } WatchedEvent we = new WatchedEvent(event); if (ClientCnxn.LOG.isDebugEnabled()) { ClientCnxn.LOG.debug("Got " + we + " for sessionid 0x" + Long.toHexString(ClientCnxn.this.sessionId)); } // 交給EventThread線程處理 ClientCnxn.this.eventThread.queueEvent(we); } ... }
public void queueEvent(WatchedEvent event) { if (event.getType() != EventType.None || this.sessionState != event.getState()) { this.sessionState = event.getState(); ClientCnxn.WatcherSetEventPair pair = new ClientCnxn.WatcherSetEventPair(ClientCnxn.this.watcher.materialize(event.getState(), event.getType(), event.getPath()), event); this.waitingEvents.add(pair); } } public Set<Watcher> materialize(KeeperState state, EventType type, String clientPath) { ... // 把該路徑下的全部Watcher都拿出來 // remove方法,因此客戶端也是一次性的,一旦觸發,watcher就失效了 case NodeDataChanged: case NodeCreated: var6 = this.dataWatches; synchronized(this.dataWatches) { this.addTo((Set)this.dataWatches.remove(clientPath), result); } var6 = this.existWatches; synchronized(this.existWatches) { this.addTo((Set)this.existWatches.remove(clientPath), result); break; } ... }
private void processEvent(Object event) { try { if (event instanceof ClientCnxn.WatcherSetEventPair) { ClientCnxn.WatcherSetEventPair pair = (ClientCnxn.WatcherSetEventPair)event; Iterator i$ = pair.watchers.iterator(); while(i$.hasNext()) { // 這裏的watcher就是客戶端傳入的watcher,裏面有真正的回調邏輯代碼 Watcher watcher = (Watcher)i$.next(); try { watcher.process(pair.event); } catch (Throwable var7) { ClientCnxn.LOG.error("Error while calling watcher ", var7); } } } else { ... } ... }
guava to java is Curator to ZooKeeper,開源客戶端Curator引入Cache實現對服務端事件的監聽,從而大大簡化了原生API開發的繁瑣過程。