ZooKeeper系列之(十三):Watcher跟隨者工做模式

客戶端如何用Watcher在以前已經說過了,這裏主要說說服務端的實現機制。node

一、服務端this

ZooKeeperServer接收到getData請求後,首先processPacket方法會被調用,咱們先看看該方法中和Watcher機制相關的代碼邏輯。spa

public void processPacket(ServerCnxn cnxn, ByteBuffer incomingBuffer)  {
   submitRequest(si);
}
調用了submitRequest方法處理getData這個Request。再往下:
public void submitRequest(Request si)  {
   firstProcessor.processRequest(si);
}

Request進入到服務端的調用鏈,咱們假設此次是Leader做爲服務端,則進入LeaderRequestProcessor並沿着調用鏈最終走到FinalRequestProcessor。這裏咱們直接找到和Watcher相關的代碼邏輯。線程

最後咱們發如今FinalRequestProcessor中和Watcher相關的代碼:code

public void processRequest(Request request) {
   case OpCode.getData: {
        lastOp = "GETD";
        GetDataRequest getDataRequest = new GetDataRequest();
        ByteBufferInputStream.byteBuffer2Record(request.request,
                        getDataRequest);
        DataNode n = zks.getZKDatabase().getNode(getDataRequest.getPath());
        if (n == null) {
             throw new KeeperException.NoNodeException();
        }
        Stat stat = new Stat();
        byte b[] = zks.getZKDatabase().getData(getDataRequest.getPath(), stat,
                        getDataRequest.getWatch() ? cnxn : null);
        rsp = new GetDataResponse(b, stat);
        break;
   }
}

調用到ZKDatabase的getData來處理客戶端請求,在參數中也指明瞭是否設置了Watcher。若是客戶端設置了Watcher則把cnxn做爲Watcher傳給ZKDatabase。Cnzx的定義爲:server

ServerCnxn cnxn = request.cnxn;接口

咱們等一會再來看看這個ServerCnxn是怎麼做爲Watcher機制的,先看看ZKDatabase的getData方法:隊列

public byte[] getData(String path, Stat stat, Watcher watcher){
      return dataTree.getData(path, stat, watcher);
}

走到了DataTree的getData方法:事件

public byte[] getData(String path, Stat stat, Watcher watcher)
            throws KeeperException.NoNodeException {
     DataNode n = nodes.get(path);
     if (n == null) {
          throw new KeeperException.NoNodeException();
     }
     synchronized (n) {
          n.copyStat(stat);
          if (watcher != null) {
              dataWatches.addWatch(path, watcher);
          }
          return n.data;
     }
}

在這裏咱們看到將Watcher添加到了dataWatches列表中,該列表定義以下:get

private final WatchManager dataWatches = new WatchManager();

二、WatchManager

WatchManager經過addWatch方法添加新的Watcher接口都監視列表中,代碼:

synchronized void addWatch(String path, Watcher watcher) {
    HashSet<Watcher> list = watchTable.get(path);
    if (list == null) {
        list = new HashSet<Watcher>(4);
        watchTable.put(path, list);
    }
    list.add(watcher);
    HashSet<String> paths = watch2Paths.get(watcher);
    if (paths == null) {
        // cnxns typically have many watches, so use default cap here
        paths = new HashSet<String>();
        watch2Paths.put(watcher, paths);
    }
    paths.add(path);
}

將Watcher記錄到本地列表中,Watcher和path關聯起來了,那麼這些Watcher在何時被觸發呢?

Watcher是經過triggerWatch方法被觸發的。

Set<Watcher> triggerWatch(String path, EventType type, Set<Watcher> supress) {
   WatchedEvent e = new WatchedEvent(type, KeeperState.SyncConnected, path);
   HashSet<Watcher> watchers;
   for (Watcher w : watchers) {
       if (supress != null && supress.contains(w)) {
           continue;
       }
       w.process(e);
   }
   return watchers;
}

具體在這裏就是回調NettyServerCnxn的process方法,這裏再提醒一下,NettyServerCnxn是實現了Watcher接口的。

Public Interface Watcher{
   public abstract void process(WatchedEvent event);
}

那麼triggerWatch在哪裏被觸發呢?在getData這個例子裏,triggerWatch在setData中被觸發。setData方法是設置數據的,設置數據時觸發getData的Watcher。

public Stat setData(String path, byte data[], int version, long zxid,
            long time) throws KeeperException.NoNodeException {
     Stat s = new Stat();
     DataNode n = nodes.get(path);
     if (n == null) {
         throw new KeeperException.NoNodeException();
     }
     byte lastdata[] = null;
     synchronized (n) {
         lastdata = n.data;
         n.data = data;
         n.stat.setMtime(time);
         n.stat.setMzxid(zxid);
         n.stat.setVersion(version);
         n.copyStat(s);
      }
      String lastPrefix = getMaxPrefixWithQuota(path);
      if(lastPrefix != null) {
          this.updateBytes(lastPrefix, (data == null ? 0 : data.length)
              - (lastdata == null ? 0 : lastdata.length));
      }
      dataWatches.triggerWatch(path, EventType.NodeDataChanged);
      return s;
 }

 

三、ClientCnxn

客戶端收到WatchedEvent事件,在ClientCnxn的SendThread中,咱們又回到readResponse方法。

void readResponse(ByteBuffer incomingBuffer) throws IOException {
if (replyHdr.getXid() == -1) {
       // -1 means notification               
        WatcherEvent event = new WatcherEvent();
        event.deserialize(bbia, "response");
        if (chrootPath != null) {
           String serverPath = event.getPath();
           if(serverPath.compareTo(chrootPath)==0)
               event.setPath("/");
           else if (serverPath.length() > chrootPath.length())
               event.setPath(serverPath.substring(chrootPath.length()));
           else {
            	  LOG.warn("Got server path " + event.getPath());
           }
       }
      WatchedEvent we = new WatchedEvent(event);
       eventThread.queueEvent( we );
       return
   }
}

將接收到的WatchedEvent事件放入eventThread線程的等待隊列中,等待處理。

EventThread的主方法:

private void processEvent(Object event) {
if (event instanceof WatcherSetEventPair) {
         // each watcher will process the event
        WatcherSetEventPair pair = (WatcherSetEventPair) event;
        for (Watcher watcher : pair.watchers) {
             try {
                  watcher.process(pair.event);
             } catch (Throwable t) {
                  LOG.error("Error while calling watcher ", t);
             }
         }
    }
}

至此客戶端在調用getData方法時綁定的Watcher被調用,process方法被執行。

相關文章
相關標籤/搜索