類的繼承關係
public class NettyServerCnxn extends ServerCnxn {}
類的內部類
SendBufferWriter類
private class SendBufferWriter extends Writer { private StringBuffer sb = new StringBuffer(); /** * Check if we are ready to send another chunk. * @param force force sending, even if not a full chunk */ // 是否準備好發送另外一塊 private void checkFlush(boolean force) { if ((force && sb.length() > 0) || sb.length() > 2048) { // 當強制發送而且sb大小大於0,或者sb大小大於2048即發送緩存 sendBuffer(ByteBuffer.wrap(sb.toString().getBytes())); // clear our internal buffer sb.setLength(0); } } @Override public void close() throws IOException { if (sb == null) return; // 關閉以前須要強制性發送緩存 checkFlush(true); sb = null; // clear out the ref to ensure no reuse } @Override public void flush() throws IOException { checkFlush(true); } @Override public void write(char[] cbuf, int off, int len) throws IOException { sb.append(cbuf, off, len); checkFlush(false); } }
ResumeMessageEvent類
static class ResumeMessageEvent implements MessageEvent { // 通道 Channel channel; // 構造函數 ResumeMessageEvent(Channel channel) { this.channel = channel; } @Override public Object getMessage() {return null;} @Override public SocketAddress getRemoteAddress() {return null;} @Override public Channel getChannel() {return channel;} @Override public ChannelFuture getFuture() {return null;} };
3. CommandThread類
private abstract class CommandThread /*extends Thread*/ { PrintWriter pw; CommandThread(PrintWriter pw) { this.pw = pw; } public void start() { run(); } public void run() { try { commandRun(); } catch (IOException ie) { LOG.error("Error in running command ", ie); } finally { cleanupWriterSocket(pw); } } public abstract void commandRun() throws IOException; }
類的屬性
public class NettyServerCnxn extends ServerCnxn { // 日誌 Logger LOG = LoggerFactory.getLogger(NettyServerCnxn.class); // 通道 Channel channel; // 通道緩存 ChannelBuffer queuedBuffer; // 節流與否 volatile boolean throttled; // Byte緩衝區 ByteBuffer bb; // 四個字節的緩衝區 ByteBuffer bbLen = ByteBuffer.allocate(4); // 會話ID long sessionId; // 會話超時時間 int sessionTimeout; // 計數 AtomicLong outstandingCount = new AtomicLong(); /** The ZooKeeperServer for this connection. May be null if the server * is not currently serving requests (for example if the server is not * an active quorum participant. */ // Zookeeper服務器 private volatile ZooKeeperServer zkServer; // NettyServerCnxn工廠 NettyServerCnxnFactory factory; // 初始化與否 boolean initialized; // 四個字節 private static final byte[] fourBytes = new byte[4]; private static final String ZK_NOT_SERVING = "This ZooKeeper instance is not currently serving requests"; }
類的構造函數
NettyServerCnxn(Channel channel, ZooKeeperServer zks, NettyServerCnxnFactory factory) { // 給屬性賦值 this.channel = channel; this.zkServer = zks; this.factory = factory; if (this.factory.login != null) { // 須要登陸信息(用戶名和密碼登陸) this.zooKeeperSaslServer = new ZooKeeperSaslServer(factory.login); } }
核心函數分析
receiveMessage函數
public void receiveMessage(ChannelBuffer message) { try { while(message.readable() && !throttled) { // 當writerIndex > readerIndex,而且不節流時,知足條件 if (bb != null) { // 不爲null if (LOG.isTraceEnabled()) { LOG.trace("message readable " + message.readableBytes() + " bb len " + bb.remaining() + " " + bb); ByteBuffer dat = bb.duplicate(); dat.flip(); LOG.trace(Long.toHexString(sessionId) + " bb 0x" + ChannelBuffers.hexDump( ChannelBuffers.copiedBuffer(dat))); } if (bb.remaining() > message.readableBytes()) { // bb剩餘空間大於message中可讀字節大小 // 肯定新的limit int newLimit = bb.position() + message.readableBytes(); bb.limit(newLimit); } // 將message寫入bb中 message.readBytes(bb); // 重置bb的limit bb.limit(bb.capacity()); if (LOG.isTraceEnabled()) { LOG.trace("after readBytes message readable " + message.readableBytes() + " bb len " + bb.remaining() + " " + bb); ByteBuffer dat = bb.duplicate(); dat.flip(); LOG.trace("after readbytes " + Long.toHexString(sessionId) + " bb 0x" + ChannelBuffers.hexDump( ChannelBuffers.copiedBuffer(dat))); } if (bb.remaining() == 0) { // 已經讀完message,表示內容已經所有接收 // 統計接收信息 packetReceived(); // 翻轉,可讀 bb.flip(); ZooKeeperServer zks = this.zkServer; if (zks == null) { // Zookeeper服務器爲空 throw new IOException("ZK down"); } if (initialized) { // 未被初始化 // 處理bb中包含的包信息 zks.processPacket(this, bb); if (zks.shouldThrottle(outstandingCount.incrementAndGet())) { // 是否已經節流 // 不接收數據 disableRecvNoWait(); } } else { // 已經初始化 LOG.debug("got conn req request from " + getRemoteSocketAddress()); // 處理鏈接請求 zks.processConnectRequest(this, bb); initialized = true; } bb = null; } } else { // bb爲null if (LOG.isTraceEnabled()) { LOG.trace("message readable " + message.readableBytes() + " bblenrem " + bbLen.remaining()); // 複製bbLen緩衝 ByteBuffer dat = bbLen.duplicate(); // 翻轉 dat.flip(); LOG.trace(Long.toHexString(sessionId) + " bbLen 0x" + ChannelBuffers.hexDump( ChannelBuffers.copiedBuffer(dat))); } if (message.readableBytes() < bbLen.remaining()) { // bb剩餘空間大於message中可讀字節大小 // 重設bbLen的limit bbLen.limit(bbLen.position() + message.readableBytes()); } // 將message內容寫入bbLen中 message.readBytes(bbLen); // 重置bbLen的limit bbLen.limit(bbLen.capacity()); if (bbLen.remaining() == 0) { // 已經讀完message,表示內容已經所有接收 // 翻轉 bbLen.flip(); if (LOG.isTraceEnabled()) { LOG.trace(Long.toHexString(sessionId) + " bbLen 0x" + ChannelBuffers.hexDump( ChannelBuffers.copiedBuffer(bbLen))); } // 讀取position後四個字節 int len = bbLen.getInt(); if (LOG.isTraceEnabled()) { LOG.trace(Long.toHexString(sessionId) + " bbLen len is " + len); } // 清除緩存 bbLen.clear(); if (!initialized) { // 未被初始化 if (checkFourLetterWord(channel, message, len)) { // 是不是四個字母的命令 return; } } if (len < 0 || len > BinaryInputArchive.maxBuffer) { throw new IOException("Len error " + len); } // 根據len從新分配緩衝,以便接收內容 bb = ByteBuffer.allocate(len); } } } } catch(IOException e) { LOG.warn("Closing connection to " + getRemoteSocketAddress(), e); close(); } }
sendResponse函數
public void sendResponse(ReplyHeader h, Record r, String tag) throws IOException { if (!channel.isOpen()) { return; } ByteArrayOutputStream baos = new ByteArrayOutputStream(); // Make space for length BinaryOutputArchive bos = BinaryOutputArchive.getArchive(baos); try { // 向baos中寫入四個字節(空) baos.write(fourBytes); // 寫入記錄 bos.writeRecord(h, "header"); if (r != null) { // 寫入記錄 bos.writeRecord(r, tag); } // 關閉 baos.close(); } catch (IOException e) { LOG.error("Error serializing response"); } // 轉化爲Byte Array byte b[] = baos.toByteArray(); // 將Byte Array封裝成ByteBuffer ByteBuffer bb = ByteBuffer.wrap(b); bb.putInt(b.length - 4).rewind(); // 發送緩衝 sendBuffer(bb); if (h.getXid() > 0) { // zks cannot be null otherwise we would not have gotten here! if (!zkServer.shouldThrottle(outstandingCount.decrementAndGet())) { enableRecv(); } } }
process函數
public void process(WatchedEvent event) { // 建立響應頭 ReplyHeader h = new ReplyHeader(-1, -1L, 0); if (LOG.isTraceEnabled()) { ZooTrace.logTraceMessage(LOG, ZooTrace.EVENT_DELIVERY_TRACE_MASK, "Deliver event " + event + " to 0x" + Long.toHexString(this.sessionId) + " through " + this); } // Convert WatchedEvent to a type that can be sent over the wire WatcherEvent e = event.getWrapper(); try { // 發送響應 sendResponse(h, e, "notification"); } catch (IOException e1) { if (LOG.isDebugEnabled()) { LOG.debug("Problem sending to " + getRemoteSocketAddress(), e1); } close(); } }