zk如何實現watch

在客戶端發送命令:stat /zhang watchnode

在zk server中產生以下圖的調用棧:promise

//在DataTree類中有
private final WatchManager dataWatches = new WatchManager();

//在WatchManager類中有
private final HashMap<String, HashSet<Watcher>> watchTable =
    new HashMap<String, HashSet<Watcher>>();

private final HashMap<Watcher, HashSet<String>> watch2Paths =
    new HashMap<Watcher, HashSet<String>>();

咱們詳細分析addWatch代碼:this

//path就是客戶端命令中的"/zhang",而Watcher就是客戶端的鏈接對象NIOServerCnxn
//能夠理解爲客戶端就是一個Watcher
//public abstract class ServerCnxn implements Stats, Watcher
//public class NIOServerCnxn extends ServerCnxn
public synchronized void addWatch(String path, Watcher watcher) {
    //多個客戶端關注一個path
    HashSet<Watcher> list = watchTable.get(path);
    if (list == null) {
        // don't waste memory if there are few watches on a node
        // rehash when the 4th entry is added, doubling size thereafter
        // seems like a good compromise
        list = new HashSet<Watcher>(4);
        watchTable.put(path, list);
    }
    list.add(watcher);

    //一個客戶端關注多個path
    HashSet<String> paths = watch2Paths.get(watcher);
    if (paths == null) {
        // cnxns typically have many watches, so use default cap here
        paths = new HashSet<String>();
        watch2Paths.put(watcher, paths);
    }
    paths.add(path);
}

在建立、刪除、設置節點數據時,會觸發watch:spa

public Set<Watcher> triggerWatch(String path, EventType type, Set<Watcher> supress) {
    //生成事件
    WatchedEvent e = new WatchedEvent(type,    KeeperState.SyncConnected, path);
    HashSet<Watcher> watchers;
    synchronized (this) {
        //刪除關注path的客戶端
        watchers = watchTable.remove(path);
        if (watchers == null || watchers.isEmpty()) {
            if (LOG.isTraceEnabled()) {
                ZooTrace.logTraceMessage(LOG,
                        ZooTrace.EVENT_DELIVERY_TRACE_MASK,
                        "No watchers for " + path);
            }
            return null;
        }
        for (Watcher w : watchers) {
            //刪除該客戶端關注的path
            HashSet<String> paths = watch2Paths.get(w);
            if (paths != null) {
                paths.remove(path);
            }
        }
    }
    for (Watcher w : watchers) {
        if (supress != null && supress.contains(w)) {
            continue;
        }
        //每一個客戶端處理watch事件
        //NIOServerCnxn.process(WatchedEvent event)
        w.process(e);
    }
    return watchers;
}
相關文章
相關標籤/搜索