ZooKeeper 的 Watcher 機制,總的來講能夠分爲三個過程:客戶端註冊 Watcher、服務器處理 Watcher 和客戶端回調 Watcher程序員
客戶端註冊watcher有3種方式,getData、exists、getChildren;以以下代碼爲例來分析整個觸發機制的原理編程
ZooKeeper zookeeper=new ZooKeeper(「192.168.11.152:2181」,4000,new Watcher(){數組 public void processor(WatchedEvent event){緩存 System.out.println(「event.type」);服務器 }網絡 });session
zookeeper.create(「/mic」,」0」.getByte(),ZooDefs.Ids. OPEN_ACL_UNSAFE,CreateModel. PERSISTENT); //建立節點架構
zookeeper.exists(「/mic」,true); //註冊監聽app
zookeeper.setData(「/mic」, 「1」.getByte(),-1) ; //修改節點的值觸發監聽異步
|
ZooKeeper zookeeper=new ZooKeeper(「192.168.11.152:2181」,4000,new Watcher(){ public void processor(WatchedEvent event){ System.out.println(「event.type」); } }); |
在建立一個 ZooKeeper 客戶端對象實例時,咱們經過new Watcher()向構造方法中傳入一個默認的 Watcher, 這個 Watcher 將做爲整個 ZooKeeper會話期間的默認 Watcher,會一直被保存在客戶端 ZKWatchManager 的 defaultWatcher 中;代碼以下
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly, HostProvider aHostProvider, ZKClientConfig clientConfig) throws IOException { LOG.info("Initiating client connection, connectString=" + connectString + " sessionTimeout=" + sessionTimeout + " watcher=" + watcher);
if (clientConfig == null) { clientConfig = new ZKClientConfig(); } this.clientConfig = clientConfig; watchManager = defaultWatchManager(); watchManager.defaultWatcher = watcher; --在這裏將watcher設置到ZKWatchManager ConnectStringParser connectStringParser = new ConnectStringParser( connectString); hostProvider = aHostProvider; --初始化了ClientCnxn,而且調用cnxn.start()方法 cnxn = new ClientCnxn(connectStringParser.getChrootPath(), hostProvider, sessionTimeout, this, watchManager, getClientCnxnSocket(), canBeReadOnly); cnxn.start(); } |
ClientCnxn:是Zookeeper客戶端和Zookeeper服務器端進行通訊和事件通知處理的主要類,它內部包含兩個類,
1. SendThread :負責客戶端和服務器端的數據通訊, 也包括事件信息的傳輸
2. EventThread : 主要在客戶端回調註冊的Watchers進行通知處理
public ClientCnxn(String chrootPath, HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper, ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket, long sessionId, byte[] sessionPasswd, boolean canBeReadOnly) { this.zooKeeper = zooKeeper; this.watcher = watcher; this.sessionId = sessionId; this.sessionPasswd = sessionPasswd; this.sessionTimeout = sessionTimeout; this.hostProvider = hostProvider; this.chrootPath = chrootPath;
connectTimeout = sessionTimeout / hostProvider.size(); readTimeout = sessionTimeout * 2 / 3; readOnly = canBeReadOnly;
sendThread = new SendThread(clientCnxnSocket); --初始化sendThread eventThread = new EventThread(); --初始化eventThread this.clientConfig=zooKeeper.getClientConfig(); }
public void start() { --啓動兩個線程 sendThread.start(); eventThread.start(); } |
zookeeper.exists(「/mic」,true); //註冊監聽 |
經過exists方法來註冊監聽,代碼以下
public Stat exists(final String path, Watcher watcher) throws KeeperException, InterruptedException { final String clientPath = path; PathUtils.validatePath(clientPath);
// the watch contains the un-chroot path WatchRegistration wcb = null; if (watcher != null) { wcb = new ExistsWatchRegistration(watcher, clientPath); //構建ExistWatchRegistration }
final String serverPath = prependChroot(clientPath);
RequestHeader h = new RequestHeader(); h.setType(ZooDefs.OpCode.exists); //設置操做類型爲exists ExistsRequest request = new ExistsRequest(); // 構造ExistsRequest request.setPath(serverPath); request.setWatch(watcher != null); //是否註冊監聽 SetDataResponse response = new SetDataResponse(); //設置服務端響應的接收類 //將封裝的RequestHeader、ExistsRequest、SetDataResponse、WatchRegistration添加到發送隊列 ReplyHeader r = cnxn.submitRequest(h, request, response, wcb); if (r.getErr() != 0) { if (r.getErr() == KeeperException.Code.NONODE.intValue()) { return null; } throw KeeperException.create(KeeperException.Code.get(r.getErr()), clientPath); } //返回exists獲得的結果(Stat信息) return response.getStat().getCzxid() == -1 ? null : response.getStat(); } |
public ReplyHeader submitRequest(RequestHeader h, Record request, Record response, WatchRegistration watchRegistration, WatchDeregistration watchDeregistration) throws InterruptedException { ReplyHeader r = new ReplyHeader(); //將消息添加到隊列,並構造一個Packet傳輸對象 Packet packet = queuePacket(h, r, request, response, null, null, null, null, watchRegistration, watchDeregistration); synchronized (packet) { while (!packet.finished) { //在數據包沒有處理完成以前,一直阻塞 packet.wait(); } } return r; } |
public Packet queuePacket(RequestHeader h, ReplyHeader r, Record request, Record response, AsyncCallback cb, String clientPath, String serverPath, Object ctx, WatchRegistration watchRegistration, WatchDeregistration watchDeregistration) { //將相關傳輸對象轉化成Packet Packet packet = null; packet = new Packet(h, r, request, response, watchRegistration); packet.cb = cb; packet.ctx = ctx; packet.clientPath = clientPath; packet.serverPath = serverPath; packet.watchDeregistration = watchDeregistration;
synchronized (state) { if (!state.isAlive() || closing) { conLossPacket(packet); } else { if (h.getType() == OpCode.closeSession) { closing = true; } outgoingQueue.add(packet); //添加到outgoingQueue } } sendThread.getClientCnxnSocket().packetAdded();//此處是多路複用機制,喚醒Selector,告訴他有數據包添加過來了 return packet; } |
在 ZooKeeper 中,Packet 是一個最小的通訊協議單元,即數據包。Pakcet 用於進行客戶端與服務端之間的網絡傳輸,任何須要傳輸的對象都須要包裝成一個 Packet 對象。在 ClientCnxn 中 WatchRegistration 也會被封裝到 Pakcet 中,而後由 SendThread 線程調用queuePacket方法把 Packet 放入發送隊列中等待客戶端發送,這又是一個異步過程,分佈式系統採用異步通訊是一個很是常見的手段
在初始化鏈接的時候,zookeeper初始化了兩個線程而且啓動了。接下來咱們來分析SendThread的發送過程,由於是一個線程,因此啓動的時候會調用SendThread.run方法
public void run() { clientCnxnSocket.introduce(this, sessionId, outgoingQueue); clientCnxnSocket.updateNow(); clientCnxnSocket.updateLastSendAndHeard(); int to; long lastPingRwServer = Time.currentElapsedTime(); final int MAX_SEND_PING_INTERVAL = 10000; //10 seconds while (state.isAlive()) { try { if (!clientCnxnSocket.isConnected()) {// 若是沒有鏈接:發起鏈接 // don't re-establish connection if we are closing if (closing) { break; } startConnect(); //發起鏈接 clientCnxnSocket.updateLastSendAndHeard(); }
if (state.isConnected()) { //若是是鏈接狀態,則處理sasl的認證受權 // determine whether we need to send an AuthFailed event. if (zooKeeperSaslClient != null) { boolean sendAuthEvent = false; if (zooKeeperSaslClient.getSaslState() == ZooKeeperSaslClient.SaslState.INITIAL) { try { zooKeeperSaslClient.initialize(ClientCnxn.this); } catch (SaslException e) { LOG.error("SASL authentication with Zookeeper Quorum member failed: " + e); state = States.AUTH_FAILED; sendAuthEvent = true; } } KeeperState authState = zooKeeperSaslClient.getKeeperState(); if (authState != null) { if (authState == KeeperState.AuthFailed) { // An authentication error occurred during authentication with the Zookeeper Server. state = States.AUTH_FAILED; sendAuthEvent = true; } else { if (authState == KeeperState.SaslAuthenticated) { sendAuthEvent = true; } } }
if (sendAuthEvent == true) { eventThread.queueEvent(new WatchedEvent( Watcher.Event.EventType.None, authState,null)); } } to = readTimeout - clientCnxnSocket.getIdleRecv(); } else { to = connectTimeout - clientCnxnSocket.getIdleRecv(); } //to,表示客戶端距離timeout還剩多少時間,準備發起ping鏈接 if (to <= 0) {//表示已經超時了。 String warnInfo; warnInfo = "Client session timed out, have not heard from server in " + clientCnxnSocket.getIdleRecv() + "ms" + " for sessionid 0x" + Long.toHexString(sessionId); LOG.warn(warnInfo); throw new SessionTimeoutException(warnInfo); } if (state.isConnected()) { //計算下一次ping請求的時間 int timeToNextPing = readTimeout / 2 - clientCnxnSocket.getIdleSend() - ((clientCnxnSocket.getIdleSend() > 1000) ? 1000 : 0); //send a ping request either time is due or no packet sent out within MAX_SEND_PING_INTERVAL if (timeToNextPing <= 0 || clientCnxnSocket.getIdleSend() > MAX_SEND_PING_INTERVAL) { sendPing(); //發送ping請求 clientCnxnSocket.updateLastSend(); } else { if (timeToNextPing < to) { to = timeToNextPing; } } } // If we are in read-only mode, seek for read/write server if (state == States.CONNECTEDREADONLY) { long now = Time.currentElapsedTime(); int idlePingRwServer = (int) (now - lastPingRwServer); if (idlePingRwServer >= pingRwTimeout) { lastPingRwServer = now; idlePingRwServer = 0; pingRwTimeout = Math.min(2*pingRwTimeout, maxPingRwTimeout); pingRwServer(); } to = Math.min(to, pingRwTimeout - idlePingRwServer); } 調用clientCnxnSocket,發起傳輸 其中 pendingQueue是一個用來存放已經發送、等待迴應的Packet隊列, clientCnxnSocket默認使用ClientCnxnSocketNIO(ps:還記得在哪裏初始化嗎?在實例化zookeeper的時候) clientCnxnSocket.doTransport(to, pendingQueue, ClientCnxn.this); } catch (Throwable e) { if (closing) { if (LOG.isDebugEnabled()) { // closing so this is expected LOG.debug("An exception was thrown while closing send thread for session 0x" + Long.toHexString(getSessionId()) + " : " + e.getMessage()); } break; } else { // this is ugly, you have a better way speak up if (e instanceof SessionExpiredException) { LOG.info(e.getMessage() + ", closing socket connection"); } else if (e instanceof SessionTimeoutException) { LOG.info(e.getMessage() + RETRY_CONN_MSG); } else if (e instanceof EndOfStreamException) { LOG.info(e.getMessage() + RETRY_CONN_MSG); } else if (e instanceof RWServerFoundException) { LOG.info(e.getMessage()); } else { LOG.warn( "Session 0x" + Long.toHexString(getSessionId()) + " for server " + clientCnxnSocket.getRemoteSocketAddress() + ", unexpected error" + RETRY_CONN_MSG, e); } // At this point, there might still be new packets appended to outgoingQueue. // they will be handled in next connection or cleared up if closed. cleanup(); if (state.isAlive()) { eventThread.queueEvent(new WatchedEvent( Event.EventType.None, Event.KeeperState.Disconnected, null)); } clientCnxnSocket.updateNow(); clientCnxnSocket.updateLastSendAndHeard(); } } } synchronized (state) { // When it comes to this point, it guarantees that later queued // packet to outgoingQueue will be notified of death. cleanup(); } clientCnxnSocket.close(); if (state.isAlive()) { eventThread.queueEvent(new WatchedEvent(Event.EventType.None, Event.KeeperState.Disconnected, null)); } ZooTrace.logTraceMessage(LOG, ZooTrace.getTextTraceLevel(), "SendThread exited loop for session: 0x" + Long.toHexString(getSessionId())); } |
void doTransport(int waitTimeOut, List<Packet> pendingQueue, ClientCnxn cnxn) throws IOException, InterruptedException { try { if (!firstConnect.await(waitTimeOut, TimeUnit.MILLISECONDS)) { return; } Packet head = null; if (needSasl.get()) { if (!waitSasl.tryAcquire(waitTimeOut, TimeUnit.MILLISECONDS)) { return; } } else { //判斷outgoingQueue是否存在待發送的數據包,不存在則直接返回 if ((head = outgoingQueue.poll(waitTimeOut, TimeUnit.MILLISECONDS)) == null) { return; } } // check if being waken up on closing. if (!sendThread.getZkState().isAlive()) { // adding back the patck to notify of failure in conLossPacket(). addBack(head); return; } // channel disconnection happened if (disconnected.get()) { //異常流程,channel關閉了,講當前的packet添加到addBack中 addBack(head); throw new EndOfStreamException("channel for sessionid 0x" + Long.toHexString(sessionId) + " is lost"); } if (head != null) { //若是當前存在須要發送的數據包,則調用doWrite方法,pendingQueue表示處於已經發送過等待響應的packet隊列 doWrite(pendingQueue, head, cnxn); } } finally { updateNow(); } } |
private void doWrite(List<Packet> pendingQueue, Packet p, ClientCnxn cnxn) { updateNow(); while (true) { if (p != WakeupPacket.getInstance()) { if ((p.requestHeader != null) && //判斷請求頭以及判斷當前請求類型不是ping或者auth操做 (p.requestHeader.getType() != ZooDefs.OpCode.ping) && (p.requestHeader.getType() != ZooDefs.OpCode.auth)) { p.requestHeader.setXid(cnxn.getXid()); //設置xid,這個xid用來區分請求類型 synchronized (pendingQueue) { pendingQueue.add(p); //將當前的packet添加到pendingQueue隊列中 } } sendPkt(p); //將數據包發送出去 } if (outgoingQueue.isEmpty()) { break; } p = outgoingQueue.remove(); } } |
private void sendPkt(Packet p) { // Assuming the packet will be sent out successfully. Because if it fails, // the channel will close and clean up queues. p.createBB(); //序列化請求數據 updateLastSend(); //更新最後一次發送updateLastSend sentCount++; //更新發送次數 channel.write(ChannelBuffers.wrappedBuffer(p.bb)); //經過nio channel發送字節緩存到服務端 } |
public void createBB() { try { ByteArrayOutputStream baos = new ByteArrayOutputStream(); BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos); boa.writeInt(-1, "len"); // We'll fill this in later if (requestHeader != null) { requestHeader.serialize(boa, "header"); //序列化header頭(requestHeader) } if (request instanceof ConnectRequest) { request.serialize(boa, "connect"); // append "am-I-allowed-to-be-readonly" flag boa.writeBool(readOnly, "readOnly"); } else if (request != null) { request.serialize(boa, "request"); //序列化request(request) } baos.close(); this.bb = ByteBuffer.wrap(baos.toByteArray()); this.bb.putInt(this.bb.capacity() - 4); this.bb.rewind(); } catch (IOException e) { LOG.warn("Ignoring unexpected exception", e); } } |
從createBB方法中,咱們看到在底層實際的網絡傳輸序列化中,zookeeper只會講requestHeader和request兩個屬性進行序列化,即只有這兩個會被序列化到底層字節數組中去進行網絡傳輸,不會將watchRegistration相關的信息進行網絡傳輸。
用戶調用exists註冊監聽之後,會作幾個事情
cnxn = new ClientCnxn(connectStringParser.getChrootPath(), hostProvider, sessionTimeout, this, watchManager, getClientCnxnSocket(), canBeReadOnly); |
private ClientCnxnSocket getClientCnxnSocket() throws IOException { String clientCnxnSocketName = getClientConfig().getProperty( ZKClientConfig.ZOOKEEPER_CLIENT_CNXN_SOCKET); if (clientCnxnSocketName == null) { clientCnxnSocketName = ClientCnxnSocketNIO.class.getName(); } try { Constructor<?> clientCxnConstructor = Class.forName(clientCnxnSocketName).getDeclaredConstructor(ZKClientConfig.class); ClientCnxnSocket clientCxnSocket = (ClientCnxnSocket) clientCxnConstructor.newInstance(getClientConfig()); return clientCxnSocket; } catch (Exception e) { IOException ioe = new IOException("Couldn't instantiate " + clientCnxnSocketName); ioe.initCause(e); throw ioe; } } |
對Java技術,架構技術感興趣的同窗,歡迎加QQ羣619881427,一塊兒學習,相互討論。
羣內已經有小夥伴將知識體系整理好(源碼,筆記,PPT,學習視頻),歡迎加羣免費領取。
分享給喜歡Java,喜歡編程,有夢想成爲架構師的程序員們,但願可以幫助到大家。
不是Java程序員也不要緊,幫忙轉發給更多朋友!謝謝。
分享一個小技巧點擊閱讀原文也能夠輕鬆獲取到學習資料哦!!