搞懂ZooKeeper的Watcher之源碼分析及特性總結

前言

  本章講ZooKeeper重要的機制,Watcher特性。ZooKeeper容許客戶端向服務端註冊Watcher監聽,當服務端一些指定事件觸發了這個Watcher,那麼就會向指定客戶端發送一個事件通知客戶端執行回調邏輯java

 

一.Watcher機制

  ZooKeeper容許客戶端向服務端註冊感興趣的Watcher監聽,當服務端觸發了這個Watcher,那麼就會向客戶端發送一個時間來實現分佈式的通知功能。真正的Watcher回調與業務邏輯執行都在客戶端node

  那麼須要注意一下,給客戶端的通知裏只會告訴你通知狀態(KeeperState),事件類型(EventType)和路徑(Path)。不會告訴你原始數據和更新事後的數據!apache

  Watcher機制包括三部分:註冊、存儲、通知數組

  1. 註冊:註冊Watcher
  2. 存儲:講Watcher對象存在客戶端的WatcherManager中
  3. 通知:服務端觸發Watcher事件,通知客戶端,客戶端從WatcherManager中取出對應的Watcher對象執行回調 

  那麼接下來,咱們就分這3步來分析:網絡

註冊

  咱們能夠經過如下方式向服務端註冊Watcher,主要是構造參數、getData、getChildren和exists方法:session

public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher)
public byte[] getData(String path, Watcher watcher, Stat stat)
public List<String> getChildren(String path, Watcher watcher)
public Stat exists(String path, Watcher watcher)

  咱們就看getData方法,從源碼角度看看如何註冊的,能夠看到首先封裝了一個WatchRegistration對象,保存了節點的路徑和Watcher對象的關係,而後在請求的request設置了是否有watcher這麼一個boolean的成員變量:app

public byte[] getData(String path, Watcher watcher, Stat stat) throws KeeperException, InterruptedException {
    PathUtils.validatePath(path);

    // 封裝一個WatcherRegistration的對象,保存節點路徑和Watcher的對應關係
    ZooKeeper.WatchRegistration wcb = null;
    if (watcher != null) {
        wcb = new ZooKeeper.DataWatchRegistration(watcher, path);
    }
    String serverPath = this.prependChroot(path);
    RequestHeader h = new RequestHeader();
    h.setType(4);
    GetDataRequest request = new GetDataRequest();
    request.setPath(serverPath);

    // 標記是否有watcher
    request.setWatch(watcher != null);
    GetDataResponse response = new GetDataResponse();
    ReplyHeader r = this.cnxn.submitRequest(h, request, response, wcb);
    if (r.getErr() != 0) {
        throw KeeperException.create(Code.get(r.getErr()), path);
    } else {
        if (stat != null) {
            DataTree.copyStat(response.getStat(), stat);
        }
        return response.getData();
    }
}

class DataWatchRegistration extends ZooKeeper.WatchRegistration {
    // 保存節點路徑和Watcher的關係
    public DataWatchRegistration(Watcher watcher, String clientPath) {
        super(watcher, clientPath);
    }

    ...
}

abstract class WatchRegistration {
    private Watcher watcher;
    private String clientPath;

    public WatchRegistration(Watcher watcher, String clientPath) {
        this.watcher = watcher;
        this.clientPath = clientPath;
    }
    ...
}

  而後咱們繼續接着看這個wcb變量作了什麼(已經用紫色標註該變量),簡單來講就是這個變量被封裝在了packet對象裏,packet能夠當作一個最小的通訊協議單元,傳輸信息。最後將packet對象放到了發送隊列裏SendThread分佈式

public ReplyHeader submitRequest(RequestHeader h, Record request, Record response, WatchRegistration watchRegistration) throws InterruptedException {
    ReplyHeader r = new ReplyHeader();
    // 客戶端與服務端的網絡傳輸
    ClientCnxn.Packet packet = this.queuePacket(h, r, request, response, (AsyncCallback)null, (String)null, (String)null, (Object)null, watchRegistration);
    synchronized(packet) {
        while(!packet.finished) {
            packet.wait();
        }
        return r;
    }
}

ClientCnxn.Packet queuePacket(RequestHeader h, ReplyHeader r, Record request, Record response, AsyncCallback cb, String clientPath, String serverPath, Object ctx, WatchRegistration watchRegistration) {
    ClientCnxn.Packet packet = null;
    LinkedList var11 = this.outgoingQueue;
    synchronized(this.outgoingQueue) {
        // 任何傳輸的對象都包裝成Packet對象
        packet = new ClientCnxn.Packet(h, r, request, response, watchRegistration);
        packet.cb = cb;
        packet.ctx = ctx;
        packet.clientPath = clientPath;
        packet.serverPath = serverPath;
        if (this.state.isAlive() && !this.closing) {
            if (h.getType() == -11) {
                this.closing = true;
            }

            // 放入發送隊列中,等待發送
            this.outgoingQueue.add(packet);
        } else {
            this.conLossPacket(packet);
        }
    }

    this.sendThread.getClientCnxnSocket().wakeupCnxn();
    return packet;
}

  而後咱們看org.apache.zookeeper.ClientCnxnSocketNIO#doIO這個方法,關鍵代碼已經用紅色標註出來了,從要發送的隊列outgoingQueue中取出packet而後序列化到底層數組,注意了,這裏沒有序列化前面說的WatchRegistration對象,只序列化了requestHeader和request兩個屬性,也就是說,服務端並不會接收到階段路徑和watcher對象的關係,回調的業務邏輯代碼也不會給服務端!性能

void doIO(List<Packet> pendingQueue, LinkedList<Packet> outgoingQueue, ClientCnxn cnxn) throws InterruptedException, IOException {
    SocketChannel sock = (SocketChannel)this.sockKey.channel();
    if (sock == null) {
        throw new IOException("Socket is null!");
    } else {
        // 是否可讀
        if (this.sockKey.isReadable()) {
            ...
        }

        if (this.sockKey.isWritable()) {
            synchronized(outgoingQueue) {
                Packet p = this.findSendablePacket(outgoingQueue, cnxn.sendThread.clientTunneledAuthenticationInProgress());
                if (p != null) {
                    this.updateLastSend();
                    if (p.bb == null) {
                        if (p.requestHeader != null && p.requestHeader.getType() != 11 && p.requestHeader.getType() != 100) {
                            p.requestHeader.setXid(cnxn.getXid());
                        }

                        // 序列化
 p.createBB();
                    }
                    
                    sock.write(p.bb);
                    ...
                }

                ...
            }
        }
    }
}

public void createBB() {
    try {
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
        boa.writeInt(-1, "len");

        // 序列化header
        if (this.requestHeader != null) {
            this.requestHeader.serialize(boa, "header");
        }
        if (this.request instanceof ConnectRequest) {
            this.request.serialize(boa, "connect");
            boa.writeBool(this.readOnly, "readOnly");

            // 序列化request
        } else if (this.request != null) {
            this.request.serialize(boa, "request");
        }
        baos.close();
        this.bb = ByteBuffer.wrap(baos.toByteArray());
        this.bb.putInt(this.bb.capacity() - 4);
        this.bb.rewind();
    } catch (IOException var3) {
        ClientCnxn.LOG.warn("Ignoring unexpected exception", var3);
    }
}

 

存儲

  上面都是客戶端發起請求的過程,那麼接下來咱們看服務端接收到請求會作些什麼,ZooKeeper的服務端對於客戶端的請求,採用了典型的責任鏈模式,也就是說客戶端的每一個請求都由幾個不一樣的處理器來依次進行處理,咱們這裏就看這個方法:org.apache.zookeeper.server.FinalRequestProcessor#processRequestthis

public void processRequest(Request request) {
    ...
    PrepRequestProcessor.checkACL(this.zks, this.zks.getZKDatabase().convertLong(aclG), 1, request.authInfo);
    Stat stat = new Stat();
    // 這裏根據客戶端設置的是否有watch變量來傳入watcher對象
    // 若是true則將當前的ServerCnxn傳入(ServerCnxn表明客戶端和服務端的鏈接)
    byte[] b = this.zks.getZKDatabase().getData(getDataRequest.getPath(), stat, getDataRequest.getWatch() ? cnxn : null);
    rsp = new GetDataResponse(b, stat);
    ...
}

public byte[] getData(String path, Stat stat, Watcher watcher) throws NoNodeException {
    return this.dataTree.getData(path, stat, watcher);
}

  緊接着,將數據節點路徑和ServerCnxn對象存儲在WatcherManager的watchTable和watch2Paths中。前者是路徑維度,後者是Watcher維度

public byte[] getData(String path, Stat stat, Watcher watcher) throws NoNodeException {
    DataNode n = (DataNode)this.nodes.get(path);
    if (n == null) {
        throw new NoNodeException();
    } else {
        synchronized(n) {
            n.copyStat(stat);
            if (watcher != null) {
                // 添加watcher
                this.dataWatches.addWatch(path, watcher);
            }

            return n.data;
        }
    }
}

public synchronized void addWatch(String path, Watcher watcher) {
    HashSet<Watcher> list = (HashSet)this.watchTable.get(path);
    if (list == null) {
        list = new HashSet(4);
        this.watchTable.put(path, list);
    }

    list.add(watcher);
    HashSet<String> paths = (HashSet)this.watch2Paths.get(watcher);
    if (paths == null) {
        paths = new HashSet();
        this.watch2Paths.put(watcher, paths);
    }

    paths.add(path);
}

// 路徑維度
private final HashMap<String, HashSet<Watcher>> watchTable = new HashMap();
// Watcher維度
private final HashMap<Watcher, HashSet<String>> watch2Paths = new HashMap();

  當服務端處理完畢以後,客戶端的SendThread線程負責接收服務端的響應,finishPacket方法會從packet中取出WatchRegistration並註冊到ZKWatchManager中:

private void finishPacket(ClientCnxn.Packet p) {
    // 客戶端註冊wathcer
    if (p.watchRegistration != null) {
 p.watchRegistration.register(p.replyHeader.getErr());
    }

    if (p.cb == null) {
        synchronized(p) {
            p.finished = true;
            p.notifyAll();
        }
    } else {
        p.finished = true;
        this.eventThread.queuePacket(p);
    }

}

public void register(int rc) {
    if (this.shouldAddWatch(rc)) {
        Map<String, Set<Watcher>> watches = this.getWatches(rc);
        synchronized(watches) {
            // 根據路徑拿到
            Set<Watcher> watchers = (Set)watches.get(this.clientPath);
            if (watchers == null) {
                watchers = new HashSet();
                watches.put(this.clientPath, watchers);
            }

            ((Set)watchers).add(this.watcher);
        }
    }

}

 

通知

   當服務端對應的數據節點內容發生改變,那麼會觸發watcher,對應的代碼在org.apache.zookeeper.server.DataTree#setData

public Stat setData(String path, byte[] data, int version, long zxid, long time) throws NoNodeException {
    Stat s = new Stat();
    DataNode n = (DataNode)this.nodes.get(path);
    if (n == null) {
        throw new NoNodeException();
    } else {
        byte[] lastdata = null;
        byte[] lastdata;
        // 賦值node
        synchronized(n) {
            lastdata = n.data;
            n.data = data;
            n.stat.setMtime(time);
            n.stat.setMzxid(zxid);
            n.stat.setVersion(version);
            n.copyStat(s);
        }

        String lastPrefix;
        if ((lastPrefix = this.getMaxPrefixWithQuota(path)) != null) {
            this.updateBytes(lastPrefix, (long)((data == null ? 0 : data.length) - (lastdata == null ? 0 : lastdata.length)));
        }

        // 觸發watcher
        this.dataWatches.triggerWatch(path, EventType.NodeDataChanged);
        return s;
    }
}

  觸發watcher,從watchTable和watch2Paths中移除該路徑的watcher。這裏能夠看出,Watcher在服務端是一次性的,觸發一次就失效了

public Set<Watcher> triggerWatch(String path, EventType type) {
    return this.triggerWatch(path, type, (Set)null);
}

public Set<Watcher> triggerWatch(String path, EventType type, Set<Watcher> supress) {
    WatchedEvent e = new WatchedEvent(type, KeeperState.SyncConnected, path);
    HashSet watchers;

    // 這個同步代碼塊主要作的就是從watchTable和watch2Paths中移除該路徑的watcher
    synchronized(this) {
        watchers = (HashSet)this.watchTable.remove(path);
        if (watchers == null || watchers.isEmpty()) {
            if (LOG.isTraceEnabled()) {
                ZooTrace.logTraceMessage(LOG, 64L, "No watchers for " + path);
            }

            return null;
        }

        Iterator i$ = watchers.iterator();

        while(i$.hasNext()) {
            Watcher w = (Watcher)i$.next();
            HashSet<String> paths = (HashSet)this.watch2Paths.get(w);
            if (paths != null) {
                paths.remove(path);
            }
        }
    }

    Iterator i$ = watchers.iterator();

    while(true) {
        Watcher w;
        do {
            if (!i$.hasNext()) {
                return watchers;
            }

            w = (Watcher)i$.next();
        } while(supress != null && supress.contains(w));

        // watcher調用,這裏的e對象裏只有通知狀態(KeeperState)、事件類型(EventType)以及節點路徑(Path)
        // 沒有修改事後的新值也沒有老的值
 w.process(e);
    }
}

  最後看一下process方法裏,其實作的事情就是把事件發送給客戶端,因此咱們能夠看出,真正的回調和業務邏輯執行都在客戶端org.apache.zookeeper.server.NIOServerCnxn#process:

public synchronized void process(WatchedEvent event) {
  // 請求頭標記-1,代表是通知 ReplyHeader h
= new ReplyHeader(-1, -1L, 0); if (LOG.isTraceEnabled()) { ZooTrace.logTraceMessage(LOG, 64L, "Deliver event " + event + " to 0x" + Long.toHexString(this.sessionId) + " through " + this); } WatcherEvent e = event.getWrapper(); // 發送通知 this.sendResponse(h, e, "notification"); }

  客戶端收到該通知,由org.apache.zookeeper.ClientCnxn.SendThread#readResponse處理,主要作的就是反序列化而後交給EventThread線程

void readResponse(ByteBuffer incomingBuffer) throws IOException {
    ...
    // 若是是通知
    } else if (replyHdr.getXid() == -1) {
        if (ClientCnxn.LOG.isDebugEnabled()) {
            ClientCnxn.LOG.debug("Got notification sessionid:0x" + Long.toHexString(ClientCnxn.this.sessionId));
        }

        // 反序列化
        WatcherEvent event = new WatcherEvent();
        event.deserialize(bbia, "response");
        if (ClientCnxn.this.chrootPath != null) {
            String serverPath = event.getPath();
            if (serverPath.compareTo(ClientCnxn.this.chrootPath) == 0) {
                event.setPath("/");
            } else if (serverPath.length() > ClientCnxn.this.chrootPath.length()) {
                event.setPath(serverPath.substring(ClientCnxn.this.chrootPath.length()));
            } else {
                ClientCnxn.LOG.warn("Got server path " + event.getPath() + " which is too short for chroot path " + ClientCnxn.this.chrootPath);
            }
        }

        WatchedEvent we = new WatchedEvent(event);
        if (ClientCnxn.LOG.isDebugEnabled()) {
            ClientCnxn.LOG.debug("Got " + we + " for sessionid 0x" + Long.toHexString(ClientCnxn.this.sessionId));
        }

        // 交給EventThread線程處理
        ClientCnxn.this.eventThread.queueEvent(we);
    } 
    ...
}

  而後從以前註冊的ZKWatcherManager中獲取到全部該路徑的watcher,注意了,客戶端的Watcher機制也是一次性的!

public void queueEvent(WatchedEvent event) {
    if (event.getType() != EventType.None || this.sessionState != event.getState()) {
        this.sessionState = event.getState();
        ClientCnxn.WatcherSetEventPair pair = new ClientCnxn.WatcherSetEventPair(ClientCnxn.this.watcher.materialize(event.getState(), event.getType(), event.getPath()), event);
        this.waitingEvents.add(pair);
    }
}


public Set<Watcher> materialize(KeeperState state, EventType type, String clientPath) {
    ...
    // 把該路徑下的全部Watcher都拿出來
    // remove方法,因此客戶端也是一次性的,一旦觸發,watcher就失效了
    case NodeDataChanged:
    case NodeCreated:
        var6 = this.dataWatches;
        synchronized(this.dataWatches) {
            this.addTo((Set)this.dataWatches.remove(clientPath), result);
        }

        var6 = this.existWatches;
        synchronized(this.existWatches) {
            this.addTo((Set)this.existWatches.remove(clientPath), result);
            break;
        }
    ...
}

  最後EventThread會從waitingEvents隊列中取出Watcher並執行串行化同步處理。看一下這個方法:org.apache.zookeeper.ClientCnxn.EventThread#processEvent

private void processEvent(Object event) {
    try {
        if (event instanceof ClientCnxn.WatcherSetEventPair) {
             ClientCnxn.WatcherSetEventPair pair = (ClientCnxn.WatcherSetEventPair)event;
             Iterator i$ = pair.watchers.iterator();
  
             while(i$.hasNext()) {
             
                 // 這裏的watcher就是客戶端傳入的watcher,裏面有真正的回調邏輯代碼
                Watcher watcher = (Watcher)i$.next();
 
                try {
                    watcher.process(pair.event);
                } catch (Throwable var7) {
                    ClientCnxn.LOG.error("Error while calling watcher ", var7);
                }
             }
        } else {
        ...
    }
    ...
}

  嗯,就是這樣,走完了,從網上找到一張圖,我以爲畫的很不錯。以上三步驟,註冊,存儲,通知能夠結合這張圖來看,最好請打開原圖來看:

 

 

三.總結

Watcher特性總結

一次性

  不管客戶端仍是服務端,一旦watcher被觸發,都會被移除

客戶端串行執行

  從源碼也看到了,watcher回調是串行同步化執行過程,注意不要一個watcher中放不少處理邏輯形成影響別的watcher回調

性能輕量

  註冊watcher把watcher對象傳給服務端,回調的時候並不會告訴節點的具體變化先後的內容。很是輕量  

時效

  發生CONNECTIONLOSS以後,只要在session_timeout以內再次鏈接上(即不發生SESSIONEXPIRED),那麼這個鏈接註冊的watches依然在。

節點通知

  guava to java is Curator to ZooKeeper,開源客戶端Curator引入Cache實現對服務端事件的監聽,從而大大簡化了原生API開發的繁瑣過程。

  雖然咱們能夠經過Curator或者ZKClient避免每次要watcher註冊的痛苦,可是咱們沒法保證在節點更新頻率很高的狀況下客戶端能收到每一次節點變化的通知

  緣由在於:當一次數據修改,通知客戶端,客戶端再次註冊watch,在這個過程當中,可能數據已經發生了許屢次數據修改

 

 

參考:

偷來的圖:https://blog.csdn.net/huyangyamin/article/details/77743624

相關文章
相關標籤/搜索