《從Paxos到ZooKeeper 分佈式一致性原理與實踐》閱讀【Watcher】

ZooKeeper 容許客戶端向服務端註冊一個 Watcher 監聽,當服務端的一些指定事件觸發了這個 Watcher,那麼就會向指定客戶端發送一個事件通知來實現分佈式的通知功能。java

ZooKeeper 的 Watcher 機制主要包括客戶端線程、客戶端 WatchManager 和 ZooKeeper 服務器三部分。在具體工做流程上,簡單地講,客戶端在向 ZooKeeper 服務器註冊 Watcher 的同時,會將 Watcher 對象存儲在客戶端的 WatchManager 中。當 ZooKeeper 服務器端觸發 Watcher 事件後,會向客戶端發送通知,客戶端線程從 WatchManager 中取出對應的 Watcher 對象來執行回調邏輯。node

主要會涉及下面這些類apache

1. Watcher 接口

在 ZooKeeper 中,接口類 Watcher 用於表示一個標準的事件處理器,其定義了事件通知相關的邏輯,包含 KeeperState 和 EventType 兩個枚舉類,分別表明了通知狀態和事件類型,同時定義了事件的回調方法:process(WatchedEvent event)數組

1.1 Watcher 事件

KeeperState EventType 觸發條件 說明
  SyncConnected(0)   None(-1) 客戶端與服務端成功創建鏈接   此時客戶端和服務器處於鏈接狀態  
NodeCreated(1) Watcher 監聽的對應數據節點被建立
NodeDeleted(2) Watcher 監聽的對應數據節點被刪除
NodeDataChanged(3) Watcher 監聽的對應數據節點的數據內容發生變動
NodeChildChanged(4) Wather 監聽的對應數據節點的子節點列表發生變動
Disconnected(0) None(-1) 客戶端與 ZooKeeper 服務器斷開鏈接 此時客戶端和服務器處於斷開鏈接狀態
Expired(-112) Node(-1) 會話超時 此時客戶端會話失效,一般同時也會受到 SessionExpiredException 異常
AuthFailed(4) None(-1) 一般有兩種狀況。(1)使用錯誤的 schema 進行權限檢查 (2)SASL 權限檢查失敗 一般同時也會收到 AuthFailedException 異常

1.2 回調方法 process()

process 方法是 Watcher 接口中的一個回調方法,當 ZooKeeper 向客戶端發送一個 Watcher 事件通知時,客戶端就會對相應的 process 方法進行回調,從而實現對事件的處理。promise

org.apache.zookeeper.Watcher#process服務器

abstract public void process(WatchedEvent event);

 

在這裏提一下 WathcerEvent 實體。籠統地講,二者表示的是同一個事物,都是對一個服務端事件的封裝。不一樣的是,WatchedEvent 是一個邏輯事件,用於服務端和客戶端程序執行過程當中所需的邏輯對象,而 WatcherEvent 由於實現了序列化接口,所以能夠用於網絡傳輸。WatchedEvent 包含了每個事件的三個基本屬性:通知狀態(keeperState),事件類型(EventType)和節點路徑(path)。網絡

org.apache.zookeeper.proto.WatcherEventsession

public class WatcherEvent implements Record {
    private int type;
    private int state;
    private String path;
}

 

2. 工做機制數據結構

服務端在生成 WatchedEvent 事件以後,會調用 getWrapper 方法將本身包裝成一個可序列化的 WatcherEvent 事件,以便經過網絡傳輸到客戶端。客戶端在接收到服務端的這個事件對象後,首先會將 WatcherEvent 還原成一個 WatchedEvent 事件,並傳遞給 process 方法處理,回調方法 process 根據入參就可以解析出完整的服務端事件了。app

2.1 客戶端註冊 Watcher

 

在建立一個 ZooKeeper 客戶端的實例時能夠向構造方法中傳入一個默認的 Watcher:

public ZooKeeper(String connectString,int sessionTimeout,Watcher watcher);

 

以 org.apache.zookeeper.ZooKeeper#getData(java.lang.String, org.apache.zookeeper.Watcher, org.apache.zookeeper.data.Stat) 爲例:這個 Watcher 將做爲整個 ZooKeeper 會話期間的默認 Watcher,會一直被保存在客戶端 ZKWatchManager 的 defaultWatcher 中。另外,ZooKeeper 客戶端也能夠經過 getDatagetChildren 和 exist 三個接口來向 ZooKeeper 服務器註冊 Watcher,不管使用哪一種方式,註冊 Watcher 的工做原理都是一致的。

public byte[] getData(final String path, Watcher watcher, Stat stat)
    throws KeeperException, InterruptedException
 {
    final String clientPath = path;
    PathUtils.validatePath(clientPath);

    // the watch contains the un-chroot path
    WatchRegistration wcb = null;
    if (watcher != null) {
        wcb = new DataWatchRegistration(watcher, clientPath);
    }

    final String serverPath = prependChroot(clientPath);

    RequestHeader h = new RequestHeader();
    h.setType(ZooDefs.OpCode.getData);
    GetDataRequest request = new GetDataRequest();
    request.setPath(serverPath);
    request.setWatch(watcher != null);
    GetDataResponse response = new GetDataResponse();
    ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);
    if (r.getErr() != 0) {
        throw KeeperException.create(KeeperException.Code.get(r.getErr()),
                clientPath);
    }
    if (stat != null) {
        DataTree.copyStat(response.getStat(), stat);
    }
    return response.getData();
}

 

在 ZooKeeper 中,Packet 能夠被看做一個最小的通訊協議單元,用於進行客戶端與服務端之間的網絡傳輸,任何須要傳輸的對象都須要包裝成一個 Packet 對象。所以,在 ClientCnxn 中 WatchRegistration 又會被封裝到 Packet 中,而後放入發送隊列中等待客戶端發送:在向 getData 接口註冊 Watcher 後,客戶端首先會對當前客戶端請求 request 進行標記,將其設置爲 「使用 Watcher 監聽」,同時會封裝一個 Watcher 的註冊信息 WatchRegistration 對象,用於暫時保存數據節點的路徑和 Watcher 的對應關係。

org.apache.zookeeper.ClientCnxn#submitRequest

public ReplyHeader submitRequest(RequestHeader h, Record request,
        Record response, WatchRegistration watchRegistration)
        throws InterruptedException {
    ReplyHeader r = new ReplyHeader();
    Packet packet = queuePacket(h, r, request, response, null, null, null,
                null, watchRegistration);
    synchronized (packet) {
        while (!packet.finished) {
            packet.wait();
        }
    }
    return r;
}

org.apache.zookeeper.ClientCnxn#finishPacket隨後,ZooKeeper 客戶端就會向服務端發送這個請求,同時等待請求的返回。完成請求發送後,會由客戶端 SendThread 線程的 readResponse 方法負責接收來自服務端的響應,finishPacket 方法會從 Packet 中取出對應的 Watcher 並註冊到 ZkWatchManager 中去:

private void finishPacket(Packet p) {
    if (p.watchRegistration != null) {
        p.watchRegistration.register(p.replyHeader.getErr());
    }

    if (p.cb == null) {
        synchronized (p) {
            p.finished = true;
            p.notifyAll();
        }
    } else {
        p.finished = true;
        eventThread.queuePacket(p);
    }
}

org.apache.zookeeper.ZooKeeper.WatchRegistration#register從上面的內容中,咱們已經瞭解到客戶端已經將 Watcher 暫時封裝在了 WatchRegistration 對象中,如今就須要從這個封裝對象中再次提取出 Watcher 來:

abstract protected Map<String, Set<Watcher>> getWatches(int rc);
public void register(int rc) {
    if (shouldAddWatch(rc)) {
        Map<String, Set<Watcher>> watches = getWatches(rc);
        synchronized(watches) {
            Set<Watcher> watchers = watches.get(clientPath);
            if (watchers == null) {
                watchers = new HashSet<Watcher>();
                watches.put(clientPath, watchers);
            }
            watchers.add(watcher);
        }
    }
}


org.apache.zookeeper.ZooKeeper.ZKWatchManager

private static class ZKWatchManager implements ClientWatchManager {
    private final Map<String, Set<Watcher>> dataWatches =
        new HashMap<String, Set<Watcher>>();
    private final Map<String, Set<Watcher>> existWatches =
        new HashMap<String, Set<Watcher>>();
    private final Map<String, Set<Watcher>> childWatches =
        new HashMap<String, Set<Watcher>>();

    private volatile Watcher defaultWatcher;
}

在 register 方法中,客戶端會將以前暫時保存的 Watcher 對象轉交給 ZKWatchManager,並最終保存到 dataWatches 中去。ZKWatchManager.dataWatches 是一個 Map<String, Set<Watcher>> 類型的數據結構,用於將數據節點的路徑和 Watcher 對象進行一一映射後管理起來。

在 Packet.createBB() 中,ZooKeeper 只會將 requestHeader 和 reqeust 兩個屬性進行序列化,也就是說,儘管 WatchResgistration 被封裝在了 Packet 中,可是並無被序列化到底層字節數組中去,所以也就不會進行網絡傳輸了。

2.2 服務端處理 Watcher

2.2.1 服務端註冊 Watcher

服務端收到來自客戶端的請求後,在 org.apache.zookeeper.server.FinalRequestProcessor#processRequest 中會判斷當前請求是否須要註冊 Watcher:

case OpCode.getData: {
    lastOp = "GETD";
    GetDataRequest getDataRequest = new GetDataRequest();
    ByteBufferInputStream.byteBuffer2Record(request.request, getDataRequest);
    DataNode n = zks.getZKDatabase().getNode(getDataRequest.getPath());
    if (n == null) {
        throw new KeeperException.NoNodeException();
    }
    PrepRequestProcessor.checkACL(zks, zks.getZKDatabase().aclForNode(n), ZooDefs.Perms.READ, request.authInfo);
    Stat stat = new Stat();
    byte b[] = zks.getZKDatabase().getData(getDataRequest.getPath(), stat, getDataRequest.getWatch() ? cnxn : null);
    rsp = new GetDataResponse(b, stat);
    break;
}

數據節點的節點路徑和 ServerCnxn 最終會被存儲在 WatcherManager 的 watchTable 和 watch2Paths 中。WatchManager 是 ZooKeeper 服務端 Watcher 的管理者,其內部管理的 watchTable 和 watch2Pashs 兩個存儲結構,分別從兩個維度對 Watcher 進行存儲。從 getData 請求的處理邏輯中,咱們能夠看到,當 getDataRequest.getWatch() 爲 true 的時候,ZooKeeper 就認爲當前客戶端請求須要進行 Watcher 註冊,因而就會將當前的 ServerCnxn 對象做爲一個 Watcher 連同數據節點路徑傳入 getData 方法中去。注意到,抽象類 ServerCnxn 實現了 Watcher 接口。

  • watchTable 是從數據節點路徑的粒度來託管 Watcher。
  • watch2Paths 是從 Watcher 的粒度來控制事件觸發須要觸發的數據節點。

org.apache.zookeeper.server.WatchManager#addWatch

public synchronized void addWatch(String path, Watcher watcher) {
    HashSet<Watcher> list = watchTable.get(path);
    if (list == null) {
        // don't waste memory if there are few watches on a node
        // rehash when the 4th entry is added, doubling size thereafter
        // seems like a good compromise
        list = new HashSet<Watcher>(4);
        watchTable.put(path, list);
    }
    list.add(watcher);

    HashSet<String> paths = watch2Paths.get(watcher);
    if (paths == null) {
        // cnxns typically have many watches, so use default cap here
        paths = new HashSet<String>();
        watch2Paths.put(watcher, paths);
    }
    paths.add(path);
}

2.2.2 Watcher 觸發

org.apache.zookeeper.server.DataTree#setData

public Stat setData(String path, byte data[], int version, long zxid,
        long time) throws KeeperException.NoNodeException {
    Stat s = new Stat();
    DataNode n = nodes.get(path);
    if (n == null) {
        throw new KeeperException.NoNodeException();
    }
    byte lastdata[] = null;
    synchronized (n) {
        lastdata = n.data;
        n.data = data;
        n.stat.setMtime(time);
        n.stat.setMzxid(zxid);
        n.stat.setVersion(version);
        n.copyStat(s);
    }
    // now update if the path is in a quota subtree.
    String lastPrefix;
    if((lastPrefix = getMaxPrefixWithQuota(path)) != null) {
      this.updateBytes(lastPrefix, (data == null ? 0 : data.length)
          - (lastdata == null ? 0 : lastdata.length));
    }
    dataWatches.triggerWatch(path, EventType.NodeDataChanged);
    return s;
}

 

在對指定節點進行數據更新後,經過調用 org.apache.zookeeper.server.WatchManager#triggerWatch方法來觸發相關的事件:

 

public Set<Watcher> triggerWatch(String path, EventType type, Set<Watcher> supress) {
    WatchedEvent e = new WatchedEvent(type, KeeperState.SyncConnected, path);
    HashSet<Watcher> watchers;
    synchronized (this) {
        watchers = watchTable.remove(path);
        if (watchers == null || watchers.isEmpty()) {
            return null;
        }
        for (Watcher w : watchers) {
            HashSet<String> paths = watch2Paths.get(w);
            if (paths != null) {
                paths.remove(path);
            }
        }
    }
    for (Watcher w : watchers) {
        if (supress != null && supress.contains(w)) {
            continue;
        }
        w.process(e);
    }
    return watchers;
}

 

不管是 dataWatches 仍是 childWatches 管理器,Watcher 的觸發邏輯都是一致的,基本步驟以下。

    1. 封裝 WatchedEvent

      首先將通知狀態(KeeperState)、事件類型(EventType)以及節點路徑(Path)封裝成一個 WatchedEvent 對象。

    2. 查詢 Watcher。

      根據數據節點的節點路徑從 watchTable 中取出對應的 Watcher。若是沒有找到 Watcher,說明沒有任何客戶端在該數據節點上註冊過 Watcher,直接退出。而若是找到了這個 Watcher,會將其提取出來,同時會直接從 watchTable 和 watch2Paths 中將其刪除——從這裏咱們也能夠看出,Watcher 在服務端是一次性的,即觸發一次就失效了。

調用 process 方法來觸發 Watcher。

在這一步中,會逐個依次地調用從步驟2中找出的全部 Watcher 的 process 方法。這裏的 process 方法,事實上就是 ServerCnxn 的對應方法:

org.apache.zookeeper.server.NIOServerCnxn#process

@Override
synchronized public void process(WatchedEvent event) {
    ReplyHeader h = new ReplyHeader(-1, -1L, 0);
    // Convert WatchedEvent to a type that can be sent over the wire
    WatcherEvent e = event.getWrapper();

    sendResponse(h, e, "notification");
}

在 process 方法中,主要邏輯以下。

    • 在請求頭中標記 「-1」,代表當前是一個通知。
    • 將 WawtchedEvent 包裝成 WatcherEvent,以便進行網絡傳輸序列化。
    • 向客戶端發送該通知。

3. 客戶端回調 Watcher

3.1 SendThread 接收事件通知

對於一個來自服務端的響應,客戶端都是由 org.apache.zookeeper.ClientCnxn.SendThread#readResponse 方法來統一進行處理的,若是響應頭 replyHdr 中標識了 XID 爲 -1,代表這是一個通知類型的響應。

 

if (replyHdr.getXid() == -1) {
    // -1 means notification
    WatcherEvent event = new WatcherEvent();
    event.deserialize(bbia, "response");

    // convert from a server path to a client path
    if (chrootPath != null) {
        String serverPath = event.getPath();
        if(serverPath.compareTo(chrootPath)==0)
            event.setPath("/");
        else if (serverPath.length() > chrootPath.length())
            event.setPath(serverPath.substring(chrootPath.length()));
        else {
            LOG.warn("Got server path " + event.getPath()
                    + " which is too short for chroot path "
                    + chrootPath);
        }
    }
    WatchedEvent we = new WatchedEvent(event);
    eventThread.queueEvent( we );
    return;
}

 

處理過程大致上分爲如下 4 個主要步驟:

  1. 反序列化。

    將字節流轉換成 WatcherEvent 對象。

  2. 處理 chrootPath。

    若是客戶端設置了 chrootPath 屬性,那麼須要對服務端傳過來的完整的節點路徑進行 chrootPath 處理,生成客戶端的一個相對節點路徑。

  3. 還原 WatchedEvent

    將 WatcherEvent 對象轉換成 WatchedEvent

  4. 回調 Watcher。

    將 WatchedEvent 對象交給 EventThread 線程,在下一個輪詢週期中進行 Watcher 回調。

3.2 EventThread 處理事件通知

SendThread 接收到服務端的通知事件後,會經過調用 EventThread.queueEvent 方法將事件傳給 EventThread 線程,其邏輯以下:

org.apache.zookeeper.ClientCnxn.EventThread#queueEvent

public void queueEvent(WatchedEvent event) {
    if (event.getType() == EventType.None
            && sessionState == event.getState()) {
        return;
    }
    sessionState = event.getState();

    // materialize the watchers based on the event
    WatcherSetEventPair pair = new WatcherSetEventPair(
            watcher.materialize(event.getState(), event.getType(),event.getPath()), event);
    // queue the pair (watch set & event) for later processing
    waitingEvents.add(pair);
}

queueEvent 方法首先會根據該通知事件,從 ZKWatchManager 中取出全部相關的 Watcher:

    @Override
    public Set<Watcher> materialize(Watcher.Event.KeeperState state, Watcher.Event.EventType type, String clientPath) {
        Set<Watcher> result = new HashSet<Watcher>();

        switch (type) {
        // ...            
        case NodeDataChanged:
        case NodeCreated:
            synchronized (dataWatches) {
                addTo(dataWatches.remove(clientPath), result);
            }
            synchronized (existWatches) {
                addTo(existWatches.remove(clientPath), result);
            }
            break;
        // ...
        }

        return result;
    }
}

客戶端在識別出事件類型 EventType 後,會從相應的 Watcher 存儲(即 dataWatchesexistWatches 或 childWatches 中的一個或多個)中去除對應的 Watcher。注意,此處使用的是 remove 接口,所以也代表了客戶端的 Watcher 機制一樣也是一次性的,即一旦被觸發後,該 Watcher 就失效了。

獲取到相關的全部 Watcher 後,會將其放入 waitingEvents 這個隊列中去。WaitingEvents 是一個待處理 Watcher 隊列,EventThread 的 run 方法會不斷對該隊列進行處理。EventThread線程每次都會從 waitingEvents 隊列中取出一個 Watcher,並進行串行同步處理。注意,此處 processEvent 方法中的 Watcher 纔是以前客戶端真正註冊的 Watcher,調用其 process 方法就能夠實現 Watcher 的回調了。

總結

一、一次性

Watch是一次性的,每次都須要從新註冊,而且客戶端在會話異常結束時不會收到任何通知,而快速重鏈接時仍不影響接收通知。

二、客戶端串行處理

Watch的回調執行都是順序執行的,而且客戶端在沒有收到關注數據的變化事件通知以前是不會看到最新的數據,另外須要注意不要在Watch回調邏輯中阻塞整個客戶端的Watch回調。

三、輕量

Watch是輕量級的,WatchEvent是最小的通訊單元,結構上只包含通知狀態、事件類型和節點路徑。ZooKeeper服務端只會通知客戶端發生了什麼,並不會告訴具體內容。

相關文章
相關標籤/搜索