Zookeeper-watcher機制源碼分析(一)

Watcher的基本流程

ZooKeeper 的 Watcher 機制,總的來講能夠分爲三個過程:客戶端註冊 Watcher、服務器處理 Watcher 和客戶端回調 Watcher程序員

客戶端註冊watcher有3種方式,getData、exists、getChildren;以以下代碼爲例來分析整個觸發機制的原理編程

ZooKeeper zookeeper=new ZooKeeper(「192.168.11.152:2181」,4000,new Watcher(){數組

public void processor(WatchedEvent event){緩存

 System.out.println(「event.type」);服務器

}網絡

})session

 

zookeeper.create(「/mic」,」0」.getByte(),ZooDefs.Ids. OPEN_ACL_UNSAFE,CreateModel. PERSISTENT); //建立節點架構

 

zookeeper.exists(「/mic」,true); //註冊監聽app

 

zookeeper.setData(「/mic」, 「1」.getByte(),-1) ; //修改節點的值觸發監聽異步

 

 

ZooKeeper API的初始化過程

ZooKeeper zookeeper=new ZooKeeper(「192.168.11.152:2181」,4000,new Watcher(){

public void processor(WatchedEvent event){

 System.out.println(「event.type」);

}

})

在建立一個 ZooKeeper 客戶端對象實例時,咱們經過new Watcher()向構造方法中傳入一個默認的 Watcher, 這個 Watcher 將做爲整個 ZooKeeper會話期間的默認 Watcher,會一直被保存在客戶端 ZKWatchManager 的 defaultWatcher 中;代碼以下

    public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,

            boolean canBeReadOnly, HostProvider aHostProvider,

            ZKClientConfig clientConfig) throws IOException {

        LOG.info("Initiating client connection, connectString=" + connectString

                + " sessionTimeout=" + sessionTimeout + " watcher=" + watcher);

 

        if (clientConfig == null) {

            clientConfig = new ZKClientConfig();

        }

        this.clientConfig = clientConfig;

        watchManager = defaultWatchManager();

        watchManager.defaultWatcher = watcher;  --在這裏將watcher設置到ZKWatchManager

        ConnectStringParser connectStringParser = new ConnectStringParser(

                connectString);

        hostProvider = aHostProvider;

        --初始化了ClientCnxn,而且調用cnxn.start()方法

        cnxn = new ClientCnxn(connectStringParser.getChrootPath(),

                hostProvider, sessionTimeout, this, watchManager,

                getClientCnxnSocket(), canBeReadOnly);

        cnxn.start();

    }

ClientCnxn:是Zookeeper客戶端和Zookeeper服務器端進行通訊和事件通知處理的主要類,它內部包含兩個類,

1. SendThread  負責客戶端和服務器端的數據通訊, 也包括事件信息的傳輸

2. EventThread :  主要在客戶端回調註冊的Watchers進行通知處理

ClientCnxn初始化

  public ClientCnxn(String chrootPath, HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper,

            ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket,

            long sessionId, byte[] sessionPasswd, boolean canBeReadOnly) {

        this.zooKeeper = zooKeeper;

        this.watcher = watcher;

        this.sessionId = sessionId;

        this.sessionPasswd = sessionPasswd;

        this.sessionTimeout = sessionTimeout;

        this.hostProvider = hostProvider;

        this.chrootPath = chrootPath;

 

        connectTimeout = sessionTimeout / hostProvider.size();

        readTimeout = sessionTimeout * 2 / 3;

        readOnly = canBeReadOnly;

 

        sendThread = new SendThread(clientCnxnSocket);  --初始化sendThread

        eventThread = new EventThread();                --初始化eventThread

        this.clientConfig=zooKeeper.getClientConfig();

    }

 

    public void start() { --啓動兩個線程

        sendThread.start();

        eventThread.start();

    }

 

客戶端經過exists註冊監聽

zookeeper.exists(「/mic」,true); //註冊監聽

經過exists方法來註冊監聽,代碼以下

    public Stat exists(final String path, Watcher watcher)

        throws KeeperException, InterruptedException

    {

        final String clientPath = path;

        PathUtils.validatePath(clientPath);

 

        // the watch contains the un-chroot path

        WatchRegistration wcb = null;

        if (watcher != null) {

            wcb = new ExistsWatchRegistration(watcher, clientPath); //構建ExistWatchRegistration

        }

 

        final String serverPath = prependChroot(clientPath);

 

        RequestHeader h = new RequestHeader();

        h.setType(ZooDefs.OpCode.exists);  //設置操做類型爲exists

        ExistsRequest request = new ExistsRequest();  // 構造ExistsRequest

        request.setPath(serverPath);

        request.setWatch(watcher != null);  //是否註冊監聽

        SetDataResponse response = new SetDataResponse();  //設置服務端響應的接收類

//將封裝的RequestHeader、ExistsRequest、SetDataResponse、WatchRegistration添加到發送隊列

        ReplyHeader r = cnxn.submitRequest(h, request, response, wcb); 

        if (r.getErr() != 0) {

            if (r.getErr() == KeeperException.Code.NONODE.intValue()) {

                return null;

            }

            throw KeeperException.create(KeeperException.Code.get(r.getErr()),

                    clientPath);

        }

        //返回exists獲得的結果(Stat信息)

        return response.getStat().getCzxid() == -1 ? null : response.getStat();

    }

cnxn.submitRequest

public ReplyHeader submitRequest(RequestHeader h, Record request,

            Record response, WatchRegistration watchRegistration,

            WatchDeregistration watchDeregistration)

            throws InterruptedException {

        ReplyHeader r = new ReplyHeader();

        //將消息添加到隊列,並構造一個Packet傳輸對象

        Packet packet = queuePacket(h, r, request, response, null, null, null, null, watchRegistration, watchDeregistration);

        synchronized (packet) {

            while (!packet.finished) { //在數據包沒有處理完成以前,一直阻塞

                packet.wait();

            }

        }

        return r;

    }

 

    public Packet queuePacket(RequestHeader h, ReplyHeader r, Record request,

            Record response, AsyncCallback cb, String clientPath,

            String serverPath, Object ctx, WatchRegistration watchRegistration,

            WatchDeregistration watchDeregistration) {

        //將相關傳輸對象轉化成Packet

        Packet packet = null;

        packet = new Packet(h, r, request, response, watchRegistration);

        packet.cb = cb;

        packet.ctx = ctx;

        packet.clientPath = clientPath;

        packet.serverPath = serverPath;

        packet.watchDeregistration = watchDeregistration;

        

        synchronized (state) {

            if (!state.isAlive() || closing) {

                conLossPacket(packet);

            } else {

                if (h.getType() == OpCode.closeSession) {

                    closing = true;

                }

                outgoingQueue.add(packet); //添加到outgoingQueue

            }

        }

        sendThread.getClientCnxnSocket().packetAdded();//此處是多路複用機制,喚醒Selector,告訴他有數據包添加過來了

        return packet;

    }

 ZooKeeper 中,Packet 是一個最小的通訊協議單元,即數據包。Pakcet 用於進行客戶端與服務端之間的網絡傳輸,任何須要傳輸的對象都須要包裝成一個 Packet 對象。在 ClientCnxn 中 WatchRegistration 也會被封裝到 Pakcet 中,而後由 SendThread 線程調用queuePacket方法把 Packet 放入發送隊列中等待客戶端發送,這又是一個異步過程,分佈式系統採用異步通訊是一個很是常見的手段

SendThread的發送過程

在初始化鏈接的時候,zookeeper初始化了兩個線程而且啓動了。接下來咱們來分析SendThread的發送過程,由於是一個線程,因此啓動的時候會調用SendThread.run方法

public void run() {

            clientCnxnSocket.introduce(this, sessionId, outgoingQueue);

            clientCnxnSocket.updateNow();

            clientCnxnSocket.updateLastSendAndHeard();

            int to;

            long lastPingRwServer = Time.currentElapsedTime();

            final int MAX_SEND_PING_INTERVAL = 10000; //10 seconds

            while (state.isAlive()) {

                try {

                    if (!clientCnxnSocket.isConnected()) {// 若是沒有鏈接:發起鏈接

                        // don't re-establish connection if we are closing

                        if (closing) {

                            break;

                        }

                        startConnect(); //發起鏈接

                        clientCnxnSocket.updateLastSendAndHeard();

                    }

 

                    if (state.isConnected()) { //若是是鏈接狀態,則處理sasl的認證受權

                        // determine whether we need to send an AuthFailed event.

                        if (zooKeeperSaslClient != null) {

                            boolean sendAuthEvent = false;

                            if (zooKeeperSaslClient.getSaslState() == ZooKeeperSaslClient.SaslState.INITIAL) {

                                try {

                                    zooKeeperSaslClient.initialize(ClientCnxn.this);

                                } catch (SaslException e) {

                                   LOG.error("SASL authentication with Zookeeper Quorum member failed: " + e);

                                    state = States.AUTH_FAILED;

                                    sendAuthEvent = true;

                                }

                            }

                            KeeperState authState = zooKeeperSaslClient.getKeeperState();

                            if (authState != null) {

                                if (authState == KeeperState.AuthFailed) {

                                    // An authentication error occurred during authentication with the Zookeeper Server.

                                    state = States.AUTH_FAILED;

                                    sendAuthEvent = true;

                                } else {

                                    if (authState == KeeperState.SaslAuthenticated) {

                                        sendAuthEvent = true;

                                    }

                                }

                            }

 

                            if (sendAuthEvent == true) {

                                eventThread.queueEvent(new WatchedEvent(

                                      Watcher.Event.EventType.None,

                                      authState,null));

                            }

                        }

                        to = readTimeout - clientCnxnSocket.getIdleRecv();

                    } else {

                        to = connectTimeout - clientCnxnSocket.getIdleRecv();

                    }

                    //to,表示客戶端距離timeout還剩多少時間,準備發起ping鏈接

                    if (to <= 0) {//表示已經超時了。

                        String warnInfo;

                        warnInfo = "Client session timed out, have not heard from server in "

                            + clientCnxnSocket.getIdleRecv()

                            + "ms"

                            + " for sessionid 0x"

                            + Long.toHexString(sessionId);

                        LOG.warn(warnInfo);

                        throw new SessionTimeoutException(warnInfo);

                    }

                    if (state.isConnected()) {

                        //計算下一次ping請求的時間

                        int timeToNextPing = readTimeout / 2 - clientCnxnSocket.getIdleSend() -

                         ((clientCnxnSocket.getIdleSend() > 1000) ? 1000 : 0);

                        //send a ping request either time is due or no packet sent out within MAX_SEND_PING_INTERVAL

                        if (timeToNextPing <= 0 || clientCnxnSocket.getIdleSend() > MAX_SEND_PING_INTERVAL) {

                            sendPing(); //發送ping請求

                            clientCnxnSocket.updateLastSend();

                        } else {

                            if (timeToNextPing < to) {

                                to = timeToNextPing;

                            }

                        }

                    }

                    // If we are in read-only mode, seek for read/write server

                    if (state == States.CONNECTEDREADONLY) {

                        long now = Time.currentElapsedTime();

                        int idlePingRwServer = (int) (now - lastPingRwServer);

                        if (idlePingRwServer >= pingRwTimeout) {

                            lastPingRwServer = now;

                            idlePingRwServer = 0;

                            pingRwTimeout =

                                Math.min(2*pingRwTimeout, maxPingRwTimeout);

                            pingRwServer();

                        }

                        to = Math.min(to, pingRwTimeout - idlePingRwServer);

                    }

                    調用clientCnxnSocket,發起傳輸

                    其中 pendingQueue是一個用來存放已經發送、等待迴應的Packet隊列,

clientCnxnSocket默認使用ClientCnxnSocketNIO(ps:還記得在哪裏初始化嗎?在實例化zookeeper的時候)

                    clientCnxnSocket.doTransport(to, pendingQueue, ClientCnxn.this);

                } catch (Throwable e) {

                    if (closing) {

                        if (LOG.isDebugEnabled()) {

                            // closing so this is expected

                            LOG.debug("An exception was thrown while closing send thread for session 0x"

                                    + Long.toHexString(getSessionId())

                                    + " : " + e.getMessage());

                        }

                        break;

                    } else {

                        // this is ugly, you have a better way speak up

                        if (e instanceof SessionExpiredException) {

                            LOG.info(e.getMessage() + ", closing socket connection");

                        } else if (e instanceof SessionTimeoutException) {

                            LOG.info(e.getMessage() + RETRY_CONN_MSG);

                        } else if (e instanceof EndOfStreamException) {

                            LOG.info(e.getMessage() + RETRY_CONN_MSG);

                        } else if (e instanceof RWServerFoundException) {

                            LOG.info(e.getMessage());

                        } else {

                            LOG.warn(

                                    "Session 0x"

                                            + Long.toHexString(getSessionId())

                                            + " for server "

                                            + clientCnxnSocket.getRemoteSocketAddress()

                                            + ", unexpected error"

                                            + RETRY_CONN_MSG, e);

                        }

                        // At this point, there might still be new packets appended to outgoingQueue.

                        // they will be handled in next connection or cleared up if closed.

                        cleanup();

                        if (state.isAlive()) {

                            eventThread.queueEvent(new WatchedEvent(

                                    Event.EventType.None,

                                    Event.KeeperState.Disconnected,

                                    null));

                        }

                        clientCnxnSocket.updateNow();

                        clientCnxnSocket.updateLastSendAndHeard();

                    }

                }

            }

            synchronized (state) {

                // When it comes to this point, it guarantees that later queued

                // packet to outgoingQueue will be notified of death.

                cleanup();

            }

            clientCnxnSocket.close();

            if (state.isAlive()) {

                eventThread.queueEvent(new WatchedEvent(Event.EventType.None,

                        Event.KeeperState.Disconnected, null));

            }

            ZooTrace.logTraceMessage(LOG, ZooTrace.getTextTraceLevel(),

                    "SendThread exited loop for session: 0x"

                           + Long.toHexString(getSessionId()));

        }

client 和 server的網絡交互

@Override

    void doTransport(int waitTimeOut, List<Packet> pendingQueue, ClientCnxn cnxn) throws IOException, InterruptedException {

        try {

            if (!firstConnect.await(waitTimeOut, TimeUnit.MILLISECONDS)) {

                return;

            }

            Packet head = null;

            if (needSasl.get()) {

                if (!waitSasl.tryAcquire(waitTimeOut, TimeUnit.MILLISECONDS)) {

                    return;

                }

            } else {  

                //判斷outgoingQueue是否存在待發送的數據包,不存在則直接返回

                if ((head = outgoingQueue.poll(waitTimeOut, TimeUnit.MILLISECONDS)) == null) {

                    return;

                }

            }

            // check if being waken up on closing.

            if (!sendThread.getZkState().isAlive()) {

                // adding back the patck to notify of failure in conLossPacket().

                addBack(head);

                return;

            }

            // channel disconnection happened

            if (disconnected.get()) { //異常流程,channel關閉了,講當前的packet添加到addBack中

                addBack(head);

                throw new EndOfStreamException("channel for sessionid 0x"

                        + Long.toHexString(sessionId)

                        + " is lost");

            }

            if (head != null) { //若是當前存在須要發送的數據包,則調用doWrite方法,pendingQueue表示處於已經發送過等待響應的packet隊列

                doWrite(pendingQueue, head, cnxn);

            }

        } finally {

            updateNow();

        }

    }

DoWrite方法

    private void doWrite(List<Packet> pendingQueue, Packet p, ClientCnxn cnxn) {

        updateNow();

        while (true) {

            if (p != WakeupPacket.getInstance()) {

                if ((p.requestHeader != null) && //判斷請求頭以及判斷當前請求類型不是ping或者auth操做

                        (p.requestHeader.getType() != ZooDefs.OpCode.ping) &&

                        (p.requestHeader.getType() != ZooDefs.OpCode.auth)) {

                    p.requestHeader.setXid(cnxn.getXid());  //設置xid,這個xid用來區分請求類型

                    synchronized (pendingQueue) {

                        pendingQueue.add(p); //將當前的packet添加到pendingQueue隊列中

                    }

                }

                sendPkt(p); //將數據包發送出去

            }

            if (outgoingQueue.isEmpty()) {

                break;

            }

            p = outgoingQueue.remove();

        }

    }

sendPkt

   private void sendPkt(Packet p) {

        // Assuming the packet will be sent out successfully. Because if it fails,

        // the channel will close and clean up queues.

        p.createBB(); //序列化請求數據

        updateLastSend(); //更新最後一次發送updateLastSend

        sentCount++; //更新發送次數

        channel.write(ChannelBuffers.wrappedBuffer(p.bb)); //經過nio channel發送字節緩存到服務端

    }

 

createBB

public void createBB() {

            try {

                ByteArrayOutputStream baos = new ByteArrayOutputStream();

                BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);

                boa.writeInt(-1, "len"); // We'll fill this in later

                if (requestHeader != null) {

                    requestHeader.serialize(boa, "header"); //序列化header頭(requestHeader)

                }

                if (request instanceof ConnectRequest) {

                    request.serialize(boa, "connect");

                    // append "am-I-allowed-to-be-readonly" flag

                    boa.writeBool(readOnly, "readOnly");

                } else if (request != null) {

                    request.serialize(boa, "request"); //序列化request(request)

                }

                baos.close();

                this.bb = ByteBuffer.wrap(baos.toByteArray());

                this.bb.putInt(this.bb.capacity() - 4);

                this.bb.rewind();

            } catch (IOException e) {

                LOG.warn("Ignoring unexpected exception", e);

            }

        }

從createBB方法中,咱們看到在底層實際的網絡傳輸序列化中,zookeeper只會講requestHeader和request兩個屬性進行序列化,即只有這兩個會被序列化到底層字節數組中去進行網絡傳輸,不會將watchRegistration相關的信息進行網絡傳輸。

總結

用戶調用exists註冊監聽之後,會作幾個事情

  1. 講請求數據封裝爲packet,添加到outgoingQueue
  2. SendThread這個線程會執行數據發送操做,主要是將outgoingQueue隊列中的數據發送到服務端
  3. 經過clientCnxnSocket.doTransport(to, pendingQueue, ClientCnxn.this); 其中ClientCnxnSocket只zookeeper客戶端和服務端的鏈接通訊的封裝,有兩個具體的實現類ClientCnxnSocketNetty和ClientCnxnSocketNIO;具體使用哪個類來實現發送,是在初始化過程是在實例化Zookeeper的時候設置的,代碼以下

 

cnxn = new ClientCnxn(connectStringParser.getChrootPath(),

                hostProvider, sessionTimeout, this, watchManager,

                getClientCnxnSocket(), canBeReadOnly);

private ClientCnxnSocket getClientCnxnSocket() throws IOException {

        String clientCnxnSocketName = getClientConfig().getProperty(

                ZKClientConfig.ZOOKEEPER_CLIENT_CNXN_SOCKET);

        if (clientCnxnSocketName == null) {

            clientCnxnSocketName = ClientCnxnSocketNIO.class.getName();

        }

        try {

            Constructor<?> clientCxnConstructor =

Class.forName(clientCnxnSocketName).getDeclaredConstructor(ZKClientConfig.class);

            ClientCnxnSocket clientCxnSocket = (ClientCnxnSocket) clientCxnConstructor.newInstance(getClientConfig());

            return clientCxnSocket;

        } catch (Exception e) {

            IOException ioe = new IOException("Couldn't instantiate "

                    + clientCnxnSocketName);

            ioe.initCause(e);

            throw ioe;

        }

    }

  1. 基於第3步,最終會在ClientCnxnSocketNetty方法中執行sendPkt將請求的數據包發送到服務端

對Java技術,架構技術感興趣的同窗,歡迎加QQ羣619881427,一塊兒學習,相互討論。

羣內已經有小夥伴將知識體系整理好(源碼,筆記,PPT,學習視頻),歡迎加羣免費領取。

分享給喜歡Java,喜歡編程,有夢想成爲架構師的程序員們,但願可以幫助到大家。

不是Java程序員也不要緊,幫忙轉發給更多朋友!謝謝。

分享一個小技巧點擊閱讀原文也能夠輕鬆獲取到學習資料哦!!

相關文章
相關標籤/搜索