1、前言html
前面已經分析了請求處理鏈中的多數類,接着繼續分析Zookeeper中的網絡通訊模塊。服務器
2、整體框圖網絡
對於網絡通訊模塊,其整體框圖以下所示session
說明:socket
Stats,表示ServerCnxn上的統計數據。ide
Watcher,表示時間處理器。函數
ServerCnxn,表示服務器鏈接,表示一個從客戶端到服務器的鏈接。源碼分析
NettyServerCnxn,基於Netty的鏈接的具體實現。this
NIOServerCnxn,基於NIO的鏈接的具體實現。spa
3、ServerCnxn源碼分析
3.1 類的繼承關係
public abstract class ServerCnxn implements Stats, Watcher {}
說明:ServerCnxn爲抽象類,其繼承Stats和Watcher兩個接口,表示客戶端到服務端的鏈接。
3.2 類的內部類
// 請求關閉異常類 protected static class CloseRequestException extends IOException { private static final long serialVersionUID = -7854505709816442681L; public CloseRequestException(String msg) { super(msg); } } // 流結束異常類 protected static class EndOfStreamException extends IOException { private static final long serialVersionUID = -8255690282104294178L; public EndOfStreamException(String msg) { super(msg); } public String toString() { return "EndOfStreamException: " + getMessage(); } }
說明:ServerCnxn包含了兩個異常類,用於表示在鏈接中發生的異常狀況。
3.3 類的屬性
public abstract class ServerCnxn implements Stats, Watcher { // This is just an arbitrary object to represent requests issued by // (aka owned by) this class // 表明由本類提出的請求 final public static Object me = new Object(); // 認證信息 protected ArrayList<Id> authInfo = new ArrayList<Id>(); /** * If the client is of old version, we don't send r-o mode info to it. * The reason is that if we would, old C client doesn't read it, which * results in TCP RST packet, i.e. "connection reset by peer". */ // 是否爲舊的C客戶端 boolean isOldClient = true; // Zookeeper的Sasl服務器 protected ZooKeeperSaslServer zooKeeperSaslServer = null; /** * CMD命令 **/ /* * See <a href="{@docRoot}/../../../docs/zookeeperAdmin.html#sc_zkCommands"> * Zk Admin</a>. this link is for all the commands. */ // CMD命令 protected final static int confCmd = ByteBuffer.wrap("conf".getBytes()).getInt(); /* * See <a href="{@docRoot}/../../../docs/zookeeperAdmin.html#sc_zkCommands"> * Zk Admin</a>. this link is for all the commands. */ protected final static int consCmd = ByteBuffer.wrap("cons".getBytes()).getInt(); /* * See <a href="{@docRoot}/../../../docs/zookeeperAdmin.html#sc_zkCommands"> * Zk Admin</a>. this link is for all the commands. */ protected final static int crstCmd = ByteBuffer.wrap("crst".getBytes()).getInt(); /* * See <a href="{@docRoot}/../../../docs/zookeeperAdmin.html#sc_zkCommands"> * Zk Admin</a>. this link is for all the commands. */ protected final static int dumpCmd = ByteBuffer.wrap("dump".getBytes()).getInt(); /* * See <a href="{@docRoot}/../../../docs/zookeeperAdmin.html#sc_zkCommands"> * Zk Admin</a>. this link is for all the commands. */ protected final static int enviCmd = ByteBuffer.wrap("envi".getBytes()).getInt(); /* * See <a href="{@docRoot}/../../../docs/zookeeperAdmin.html#sc_zkCommands"> * Zk Admin</a>. this link is for all the commands. */ protected final static int getTraceMaskCmd = ByteBuffer.wrap("gtmk".getBytes()).getInt(); /* * See <a href="{@docRoot}/../../../docs/zookeeperAdmin.html#sc_zkCommands"> * Zk Admin</a>. this link is for all the commands. */ protected final static int ruokCmd = ByteBuffer.wrap("ruok".getBytes()).getInt(); /* * See <a href="{@docRoot}/../../../docs/zookeeperAdmin.html#sc_zkCommands"> * Zk Admin</a>. this link is for all the commands. */ protected final static int setTraceMaskCmd = ByteBuffer.wrap("stmk".getBytes()).getInt(); /* * See <a href="{@docRoot}/../../../docs/zookeeperAdmin.html#sc_zkCommands"> * Zk Admin</a>. this link is for all the commands. */ protected final static int srvrCmd = ByteBuffer.wrap("srvr".getBytes()).getInt(); /* * See <a href="{@docRoot}/../../../docs/zookeeperAdmin.html#sc_zkCommands"> * Zk Admin</a>. this link is for all the commands. */ protected final static int srstCmd = ByteBuffer.wrap("srst".getBytes()).getInt(); /* * See <a href="{@docRoot}/../../../docs/zookeeperAdmin.html#sc_zkCommands"> * Zk Admin</a>. this link is for all the commands. */ protected final static int statCmd = ByteBuffer.wrap("stat".getBytes()).getInt(); /* * See <a href="{@docRoot}/../../../docs/zookeeperAdmin.html#sc_zkCommands"> * Zk Admin</a>. this link is for all the commands. */ protected final static int wchcCmd = ByteBuffer.wrap("wchc".getBytes()).getInt(); /* * See <a href="{@docRoot}/../../../docs/zookeeperAdmin.html#sc_zkCommands"> * Zk Admin</a>. this link is for all the commands. */ protected final static int wchpCmd = ByteBuffer.wrap("wchp".getBytes()).getInt(); /* * See <a href="{@docRoot}/../../../docs/zookeeperAdmin.html#sc_zkCommands"> * Zk Admin</a>. this link is for all the commands. */ protected final static int wchsCmd = ByteBuffer.wrap("wchs".getBytes()).getInt(); /* * See <a href="{@docRoot}/../../../docs/zookeeperAdmin.html#sc_zkCommands"> * Zk Admin</a>. this link is for all the commands. */ protected final static int mntrCmd = ByteBuffer.wrap("mntr".getBytes()) .getInt(); /* * See <a href="{@docRoot}/../../../docs/zookeeperAdmin.html#sc_zkCommands"> * Zk Admin</a>. this link is for all the commands. */ protected final static int isroCmd = ByteBuffer.wrap("isro".getBytes()) .getInt(); // 存儲CMD的整形值與String的鍵值對 protected final static HashMap<Integer, String> cmd2String = new HashMap<Integer, String>(); // specify all of the commands that are available static { cmd2String.put(confCmd, "conf"); cmd2String.put(consCmd, "cons"); cmd2String.put(crstCmd, "crst"); cmd2String.put(dumpCmd, "dump"); cmd2String.put(enviCmd, "envi"); cmd2String.put(getTraceMaskCmd, "gtmk"); cmd2String.put(ruokCmd, "ruok"); cmd2String.put(setTraceMaskCmd, "stmk"); cmd2String.put(srstCmd, "srst"); cmd2String.put(srvrCmd, "srvr"); cmd2String.put(statCmd, "stat"); cmd2String.put(wchcCmd, "wchc"); cmd2String.put(wchpCmd, "wchp"); cmd2String.put(wchsCmd, "wchs"); cmd2String.put(mntrCmd, "mntr"); cmd2String.put(isroCmd, "isro"); } /** * 服務器的統計數據 **/ // 建立鏈接的時間 protected final Date established = new Date(); // 接受的packet數量 protected final AtomicLong packetsReceived = new AtomicLong(); // 發送的packet數量 protected final AtomicLong packetsSent = new AtomicLong(); // 最小延遲 protected long minLatency; // 最大延遲 protected long maxLatency; // 最後操做類型 protected String lastOp; // 最後的cxid protected long lastCxid; // 最後的zxid protected long lastZxid; // 最後的響應時間 protected long lastResponseTime; // 最後的延遲 protected long lastLatency; // 數量 protected long count; // 總的延遲 protected long totalLatency; }
說明:能夠看到,ServerCnxn類維護了不少屬性,主要是服務器的統計信息和和命令行信息。
3.4 核心函數分析
1. 抽象方法
// 獲取會話超時時間 abstract int getSessionTimeout(); // 關閉 abstract void close(); // 發送響應 public abstract void sendResponse(ReplyHeader h, Record r, String tag) throws IOException; /* notify the client the session is closing and close/cleanup socket */ // 關閉會話 abstract void sendCloseSession(); // 處理,Watcher接口中的方法 public abstract void process(WatchedEvent event); // 獲取會話id abstract long getSessionId(); // 設置會話id abstract void setSessionId(long sessionId); // 設置緩衝 abstract void sendBuffer(ByteBuffer closeConn); // 容許接收 abstract void enableRecv(); // 不容許接收 abstract void disableRecv(); // 設置會話超時時間 abstract void setSessionTimeout(int sessionTimeout); // 獲取服務器的統計數據 protected abstract ServerStats serverStats();
說明:以上的方法均爲抽象方法,須要具體子類實現,如process方法是Watcher接口中的方法,在ServerCnxn中並未實現,須要具體子類實現。
2. 具體方法
/** auth info for the cnxn, returns an unmodifyable list */ // 獲取認證信息,返回不可修改的列表 public List<Id> getAuthInfo() { return Collections.unmodifiableList(authInfo); } // 添加認證信息 public void addAuthInfo(Id id) { if (authInfo.contains(id) == false) { authInfo.add(id); } } // 移除認證信息 public boolean removeAuthInfo(Id id) { return authInfo.remove(id); } // 接收的packet protected void packetReceived() { incrPacketsReceived(); ServerStats serverStats = serverStats(); if (serverStats != null) { serverStats().incrementPacketsReceived(); } } // 發送的packet protected void packetSent() { incrPacketsSent(); ServerStats serverStats = serverStats(); if (serverStats != null) { serverStats().incrementPacketsSent(); } } // 重置統計數據 public synchronized void resetStats() { packetsReceived.set(0); packetsSent.set(0); minLatency = Long.MAX_VALUE; maxLatency = 0; lastOp = "NA"; lastCxid = -1; lastZxid = -1; lastResponseTime = 0; lastLatency = 0; count = 0; totalLatency = 0; } // 增長接收的packet數量 protected long incrPacketsReceived() { return packetsReceived.incrementAndGet(); } // 增長outstandingRequest數量 protected void incrOutstandingRequests(RequestHeader h) { } // 增長髮送的packet數量 protected long incrPacketsSent() { return packetsSent.incrementAndGet(); } // 更新響應的統計數據 protected synchronized void updateStatsForResponse(long cxid, long zxid, String op, long start, long end) { // don't overwrite with "special" xids - we're interested // in the clients last real operation if (cxid >= 0) { lastCxid = cxid; } lastZxid = zxid; lastOp = op; lastResponseTime = end; long elapsed = end - start; lastLatency = elapsed; if (elapsed < minLatency) { minLatency = elapsed; } if (elapsed > maxLatency) { maxLatency = elapsed; } count++; totalLatency += elapsed; } public Date getEstablished() { return (Date)established.clone(); } public abstract long getOutstandingRequests(); public long getPacketsReceived() { return packetsReceived.longValue(); } public long getPacketsSent() { return packetsSent.longValue(); } public synchronized long getMinLatency() { return minLatency == Long.MAX_VALUE ? 0 : minLatency; } public synchronized long getAvgLatency() { return count == 0 ? 0 : totalLatency / count; } public synchronized long getMaxLatency() { return maxLatency; } public synchronized String getLastOperation() { return lastOp; } public synchronized long getLastCxid() { return lastCxid; } public synchronized long getLastZxid() { return lastZxid; } public synchronized long getLastResponseTime() { return lastResponseTime; } public synchronized long getLastLatency() { return lastLatency; } /** * Prints detailed stats information for the connection. * * @see dumpConnectionInfo(PrintWriter, boolean) for brief stats */ @Override public String toString() { StringWriter sw = new StringWriter(); PrintWriter pwriter = new PrintWriter(sw); dumpConnectionInfo(pwriter, false); pwriter.flush(); pwriter.close(); return sw.toString(); } public abstract InetSocketAddress getRemoteSocketAddress(); public abstract int getInterestOps(); /** * Print information about the connection. * @param brief iff true prints brief details, otw full detail * @return information about this connection */ protected synchronized void dumpConnectionInfo(PrintWriter pwriter, boolean brief) { pwriter.print(" "); pwriter.print(getRemoteSocketAddress()); pwriter.print("["); int interestOps = getInterestOps(); pwriter.print(interestOps == 0 ? "0" : Integer.toHexString(interestOps)); pwriter.print("](queued="); pwriter.print(getOutstandingRequests()); pwriter.print(",recved="); pwriter.print(getPacketsReceived()); pwriter.print(",sent="); pwriter.print(getPacketsSent()); if (!brief) { long sessionId = getSessionId(); if (sessionId != 0) { pwriter.print(",sid=0x"); pwriter.print(Long.toHexString(sessionId)); pwriter.print(",lop="); pwriter.print(getLastOperation()); pwriter.print(",est="); pwriter.print(getEstablished().getTime()); pwriter.print(",to="); pwriter.print(getSessionTimeout()); long lastCxid = getLastCxid(); if (lastCxid >= 0) { pwriter.print(",lcxid=0x"); pwriter.print(Long.toHexString(lastCxid)); } pwriter.print(",lzxid=0x"); pwriter.print(Long.toHexString(getLastZxid())); pwriter.print(",lresp="); pwriter.print(getLastResponseTime()); pwriter.print(",llat="); pwriter.print(getLastLatency()); pwriter.print(",minlat="); pwriter.print(getMinLatency()); pwriter.print(",avglat="); pwriter.print(getAvgLatency()); pwriter.print(",maxlat="); pwriter.print(getMaxLatency()); } } pwriter.print(")"); }
說明:ServerCnxn實現了Stats接口中的不少方法,其相對簡單,再也不累贅。
4、總結
本篇博文分析了ServerCnxn的源碼,其是抽象類,定義了子類須要實現的方法,較爲簡單,也謝謝各位園友的觀看~