1、前言緩存
前面已經學習了NIOServerCnxn,接着繼續學習NettyServerCnxn。服務器
2、NettyServerCnxn源碼分析session
2.1 類的繼承關係 app
public class NettyServerCnxn extends ServerCnxn {}
說明:NettyServerCnxn繼承了ServerCnxn抽象類,使用Netty框架來高效處理與客戶端之間的通訊。框架
2.2 類的內部類ide
1. SendBufferWriter類 函數
private class SendBufferWriter extends Writer { private StringBuffer sb = new StringBuffer(); /** * Check if we are ready to send another chunk. * @param force force sending, even if not a full chunk */ // 是否準備好發送另外一塊 private void checkFlush(boolean force) { if ((force && sb.length() > 0) || sb.length() > 2048) { // 當強制發送而且sb大小大於0,或者sb大小大於2048即發送緩存 sendBuffer(ByteBuffer.wrap(sb.toString().getBytes())); // clear our internal buffer sb.setLength(0); } } @Override public void close() throws IOException { if (sb == null) return; // 關閉以前須要強制性發送緩存 checkFlush(true); sb = null; // clear out the ref to ensure no reuse } @Override public void flush() throws IOException { checkFlush(true); } @Override public void write(char[] cbuf, int off, int len) throws IOException { sb.append(cbuf, off, len); checkFlush(false); } }
說明:與NIOServerCnxn中相同,該類用來將給客戶端的響應進行分塊,再也不累贅。源碼分析
2. ResumeMessageEvent類 學習
static class ResumeMessageEvent implements MessageEvent { // 通道 Channel channel; // 構造函數 ResumeMessageEvent(Channel channel) { this.channel = channel; } @Override public Object getMessage() {return null;} @Override public SocketAddress getRemoteAddress() {return null;} @Override public Channel getChannel() {return channel;} @Override public ChannelFuture getFuture() {return null;} };
說明:ResumeMessageEvent繼承MessageEvent,其表示消息的傳輸或接收。this
3. CommandThread類
private abstract class CommandThread /*extends Thread*/ { PrintWriter pw; CommandThread(PrintWriter pw) { this.pw = pw; } public void start() { run(); } public void run() { try { commandRun(); } catch (IOException ie) { LOG.error("Error in running command ", ie); } finally { cleanupWriterSocket(pw); } } public abstract void commandRun() throws IOException; }
說明:其與NIOServerCnxn中相似,也是每一個子類對應着一個命令,值得注意的是針對每一個CMD命令,其僅僅使用一個線程來處理。
2.3 類的屬性
public class NettyServerCnxn extends ServerCnxn { // 日誌 Logger LOG = LoggerFactory.getLogger(NettyServerCnxn.class); // 通道 Channel channel; // 通道緩存 ChannelBuffer queuedBuffer; // 節流與否 volatile boolean throttled; // Byte緩衝區 ByteBuffer bb; // 四個字節的緩衝區 ByteBuffer bbLen = ByteBuffer.allocate(4); // 會話ID long sessionId; // 會話超時時間 int sessionTimeout; // 計數 AtomicLong outstandingCount = new AtomicLong(); /** The ZooKeeperServer for this connection. May be null if the server * is not currently serving requests (for example if the server is not * an active quorum participant. */ // Zookeeper服務器 private volatile ZooKeeperServer zkServer; // NettyServerCnxn工廠 NettyServerCnxnFactory factory; // 初始化與否 boolean initialized; // 四個字節 private static final byte[] fourBytes = new byte[4]; private static final String ZK_NOT_SERVING = "This ZooKeeper instance is not currently serving requests"; }
說明:NettyServerCnxn維護了與客戶端之間的通道緩衝、緩衝區及會話的相關屬性。
2.4 類的構造函數
NettyServerCnxn(Channel channel, ZooKeeperServer zks, NettyServerCnxnFactory factory) { // 給屬性賦值 this.channel = channel; this.zkServer = zks; this.factory = factory; if (this.factory.login != null) { // 須要登陸信息(用戶名和密碼登陸) this.zooKeeperSaslServer = new ZooKeeperSaslServer(factory.login); } }
說明:構造函數對NettyServerCnxn中的部分重要屬性進行了賦值,其中還涉及到是否須要用戶登陸。
2.5 核心函數分析
1. receiveMessage函數
public void receiveMessage(ChannelBuffer message) { try { while(message.readable() && !throttled) { // 當writerIndex > readerIndex,而且不節流時,知足條件 if (bb != null) { // 不爲null if (LOG.isTraceEnabled()) { LOG.trace("message readable " + message.readableBytes() + " bb len " + bb.remaining() + " " + bb); ByteBuffer dat = bb.duplicate(); dat.flip(); LOG.trace(Long.toHexString(sessionId) + " bb 0x" + ChannelBuffers.hexDump( ChannelBuffers.copiedBuffer(dat))); } if (bb.remaining() > message.readableBytes()) { // bb剩餘空間大於message中可讀字節大小 // 肯定新的limit int newLimit = bb.position() + message.readableBytes(); bb.limit(newLimit); } // 將message寫入bb中 message.readBytes(bb); // 重置bb的limit bb.limit(bb.capacity()); if (LOG.isTraceEnabled()) { LOG.trace("after readBytes message readable " + message.readableBytes() + " bb len " + bb.remaining() + " " + bb); ByteBuffer dat = bb.duplicate(); dat.flip(); LOG.trace("after readbytes " + Long.toHexString(sessionId) + " bb 0x" + ChannelBuffers.hexDump( ChannelBuffers.copiedBuffer(dat))); } if (bb.remaining() == 0) { // 已經讀完message,表示內容已經所有接收 // 統計接收信息 packetReceived(); // 翻轉,可讀 bb.flip(); ZooKeeperServer zks = this.zkServer; if (zks == null) { // Zookeeper服務器爲空 throw new IOException("ZK down"); } if (initialized) { // 未被初始化 // 處理bb中包含的包信息 zks.processPacket(this, bb); if (zks.shouldThrottle(outstandingCount.incrementAndGet())) { // 是否已經節流 // 不接收數據 disableRecvNoWait(); } } else { // 已經初始化 LOG.debug("got conn req request from " + getRemoteSocketAddress()); // 處理鏈接請求 zks.processConnectRequest(this, bb); initialized = true; } bb = null; } } else { // bb爲null if (LOG.isTraceEnabled()) { LOG.trace("message readable " + message.readableBytes() + " bblenrem " + bbLen.remaining()); // 複製bbLen緩衝 ByteBuffer dat = bbLen.duplicate(); // 翻轉 dat.flip(); LOG.trace(Long.toHexString(sessionId) + " bbLen 0x" + ChannelBuffers.hexDump( ChannelBuffers.copiedBuffer(dat))); } if (message.readableBytes() < bbLen.remaining()) { // bb剩餘空間大於message中可讀字節大小 // 重設bbLen的limit bbLen.limit(bbLen.position() + message.readableBytes()); } // 將message內容寫入bbLen中 message.readBytes(bbLen); // 重置bbLen的limit bbLen.limit(bbLen.capacity()); if (bbLen.remaining() == 0) { // 已經讀完message,表示內容已經所有接收 // 翻轉 bbLen.flip(); if (LOG.isTraceEnabled()) { LOG.trace(Long.toHexString(sessionId) + " bbLen 0x" + ChannelBuffers.hexDump( ChannelBuffers.copiedBuffer(bbLen))); } // 讀取position後四個字節 int len = bbLen.getInt(); if (LOG.isTraceEnabled()) { LOG.trace(Long.toHexString(sessionId) + " bbLen len is " + len); } // 清除緩存 bbLen.clear(); if (!initialized) { // 未被初始化 if (checkFourLetterWord(channel, message, len)) { // 是不是四個字母的命令 return; } } if (len < 0 || len > BinaryInputArchive.maxBuffer) { throw new IOException("Len error " + len); } // 根據len從新分配緩衝,以便接收內容 bb = ByteBuffer.allocate(len); } } } } catch(IOException e) { LOG.warn("Closing connection to " + getRemoteSocketAddress(), e); close(); } }
說明:該函數用於接收ChannelBuffer中的數據,函數在while循環體中,當writerIndex大於readerIndex(表示ChannelBuffer中還有可讀內容)且throttled爲false時執行while循環體,該函數大體能夠分爲兩部分,首先是當bb不爲空時,表示已經準備好讀取ChannelBuffer中的內容,其流程以下
if (bb != null) { // 不爲null,表示已經準備好讀取message if (LOG.isTraceEnabled()) { LOG.trace("message readable " + message.readableBytes() + " bb len " + bb.remaining() + " " + bb); ByteBuffer dat = bb.duplicate(); dat.flip(); LOG.trace(Long.toHexString(sessionId) + " bb 0x" + ChannelBuffers.hexDump( ChannelBuffers.copiedBuffer(dat))); } if (bb.remaining() > message.readableBytes()) { // bb剩餘空間大於message中可讀字節大小 // 肯定新的limit int newLimit = bb.position() + message.readableBytes(); bb.limit(newLimit); } // 將message寫入bb中 message.readBytes(bb); // 重置bb的limit bb.limit(bb.capacity()); if (LOG.isTraceEnabled()) { LOG.trace("after readBytes message readable " + message.readableBytes() + " bb len " + bb.remaining() + " " + bb); ByteBuffer dat = bb.duplicate(); dat.flip(); LOG.trace("after readbytes " + Long.toHexString(sessionId) + " bb 0x" + ChannelBuffers.hexDump( ChannelBuffers.copiedBuffer(dat))); } if (bb.remaining() == 0) { // 已經讀完message,表示內容已經所有接收 // 統計接收信息 packetReceived(); // 翻轉,可讀 bb.flip(); ZooKeeperServer zks = this.zkServer; if (zks == null) { // Zookeeper服務器爲空 throw new IOException("ZK down"); } if (initialized) { // 未被初始化 // 處理bb中包含的包信息 zks.processPacket(this, bb); if (zks.shouldThrottle(outstandingCount.incrementAndGet())) { // 是否已經節流 // 不接收數據 disableRecvNoWait(); } } else { // 已經初始化 LOG.debug("got conn req request from " + getRemoteSocketAddress()); // 處理鏈接請求 zks.processConnectRequest(this, bb); initialized = true; } bb = null; } }
其中主要的部分是判斷bb的剩餘空間是否大於message中的內容,簡單而言,就是判斷bb是否還有足夠空間存儲message內容,而後設置bb的limit,以後將message內容讀入bb緩衝中,以後再次肯定時候已經讀完message內容,統計接收信息,再根據是否已經初始化來處理包或者是鏈接請求,其中的請求內容都存儲在bb中。而當bb爲空時,其流程以下
else { // bb爲null if (LOG.isTraceEnabled()) { LOG.trace("message readable " + message.readableBytes() + " bblenrem " + bbLen.remaining()); // 複製bbLen緩衝 ByteBuffer dat = bbLen.duplicate(); // 翻轉 dat.flip(); LOG.trace(Long.toHexString(sessionId) + " bbLen 0x" + ChannelBuffers.hexDump( ChannelBuffers.copiedBuffer(dat))); } if (message.readableBytes() < bbLen.remaining()) { // bb剩餘空間大於message中可讀字節大小 // 重設bbLen的limit bbLen.limit(bbLen.position() + message.readableBytes()); } // 將message內容寫入bbLen中 message.readBytes(bbLen); // 重置bbLen的limit bbLen.limit(bbLen.capacity()); if (bbLen.remaining() == 0) { // 已經讀完message,表示內容已經所有接收 // 翻轉 bbLen.flip(); if (LOG.isTraceEnabled()) { LOG.trace(Long.toHexString(sessionId) + " bbLen 0x" + ChannelBuffers.hexDump( ChannelBuffers.copiedBuffer(bbLen))); } // 讀取position後四個字節 int len = bbLen.getInt(); if (LOG.isTraceEnabled()) { LOG.trace(Long.toHexString(sessionId) + " bbLen len is " + len); } // 清除緩存 bbLen.clear(); if (!initialized) { // 未被初始化 if (checkFourLetterWord(channel, message, len)) { // 是不是四個字母的命令 return; } } if (len < 0 || len > BinaryInputArchive.maxBuffer) { throw new IOException("Len error " + len); } // 根據len從新分配緩衝,以便接收內容 bb = ByteBuffer.allocate(len); } }
當bb爲空時,表示尚未給bb分配足夠的內存空間來讀取message,首先仍是將message內容(後續內容的長度)讀入bbLen中,而後再肯定讀入的內容表明後續真正內容的長度len,而後再根據len來爲bb分配存儲空間,方便後續讀取真正的內容。
2. sendResponse函數
public void sendResponse(ReplyHeader h, Record r, String tag) throws IOException { if (!channel.isOpen()) { return; } ByteArrayOutputStream baos = new ByteArrayOutputStream(); // Make space for length BinaryOutputArchive bos = BinaryOutputArchive.getArchive(baos); try { // 向baos中寫入四個字節(空) baos.write(fourBytes); // 寫入記錄 bos.writeRecord(h, "header"); if (r != null) { // 寫入記錄 bos.writeRecord(r, tag); } // 關閉 baos.close(); } catch (IOException e) { LOG.error("Error serializing response"); } // 轉化爲Byte Array byte b[] = baos.toByteArray(); // 將Byte Array封裝成ByteBuffer ByteBuffer bb = ByteBuffer.wrap(b); bb.putInt(b.length - 4).rewind(); // 發送緩衝 sendBuffer(bb); if (h.getXid() > 0) { // zks cannot be null otherwise we would not have gotten here! if (!zkServer.shouldThrottle(outstandingCount.decrementAndGet())) { enableRecv(); } } }
說明:其首先會將header和record都寫入baos,以後再將baos轉化爲ByteBuffer,以後在調用sendBuffer來發送緩衝,而sendBuffer完成的操做是將ByteBuffer寫入ChannelBuffer中。
3. process函數
public void process(WatchedEvent event) { // 建立響應頭 ReplyHeader h = new ReplyHeader(-1, -1L, 0); if (LOG.isTraceEnabled()) { ZooTrace.logTraceMessage(LOG, ZooTrace.EVENT_DELIVERY_TRACE_MASK, "Deliver event " + event + " to 0x" + Long.toHexString(this.sessionId) + " through " + this); } // Convert WatchedEvent to a type that can be sent over the wire WatcherEvent e = event.getWrapper(); try { // 發送響應 sendResponse(h, e, "notification"); } catch (IOException e1) { if (LOG.isDebugEnabled()) { LOG.debug("Problem sending to " + getRemoteSocketAddress(), e1); } close(); } }
說明:首先建立ReplyHeader,而後再調用sendResponse來發送響應,最後調用close函數進行後續關閉處理。
3、總結
本篇博文講解了基於Netty完成服務端與客戶端之間的通訊,其效率相對較高,也謝謝各位園友的觀看~