書接上文,因爲服務端增長了TelnetServerHandler
,而該Handler覆寫了channelActive方法,因此在客戶端connect服務端時,服務端會向客戶端寫出數據。而因爲客戶端增長了TelnetClientHandler
,而該Handler覆寫了messageReceived方法。因此在接收到服務端消息後,會將服務端內容打印出來。java
@Override public void TelnetServerHandler.channelActive(ChannelHandlerContext ctx) throws Exception { // Send greeting for a new connection. ctx.write( "Welcome to " + InetAddress.getLocalHost().getHostName() + "!\r\n"); ctx.write("It is " + new Date() + " now.\r\n"); ctx.flush(); } @Override protected void TelnetClientHandler.messageReceived(ChannelHandlerContext ctx, String msg) throws Exception { System.err.println(msg); }
當客戶端執行了javaChannel().connect(remoteAddress);
方法後,會致使服務端程序接收到數據包,並做出響應。git
private static void NioEventLoop.processSelectedKey(SelectionKey k, AbstractNioChannel ch) { final NioUnsafe unsafe = ch.unsafe(); if (!k.isValid()) { // close the channel if the key is not valid anymore unsafe.close(unsafe.voidPromise()); return; } try { int readyOps = k.readyOps(); // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead // to a spin loop if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { unsafe.read();//tag1 if (!ch.isOpen()) { // Connection already closed - no need to handle write. return; } } if ((readyOps & SelectionKey.OP_WRITE) != 0) { // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write ch.unsafe().forceFlush(); } if ((readyOps & SelectionKey.OP_CONNECT) != 0) { // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking // See https://github.com/netty/netty/issues/924 int ops = k.interestOps(); ops &= ~SelectionKey.OP_CONNECT; k.interestOps(ops); unsafe.finishConnect(); } } catch (CancelledKeyException e) { unsafe.close(unsafe.voidPromise()); } }
此時會執行tag1 的 unsafe.read();
方法。github
@Override public void read() { assert eventLoop().inEventLoop(); if (!config().isAutoRead()) { removeReadOp(); } final ChannelConfig config = config(); final int maxMessagesPerRead = config.getMaxMessagesPerRead(); final boolean autoRead = config.isAutoRead(); final ChannelPipeline pipeline = pipeline(); boolean closed = false; Throwable exception = null; try { for (;;) { int localRead = doReadMessages(readBuf);//tag1.1 if (localRead == 0) { break; } if (localRead < 0) { closed = true; break; } if (readBuf.size() >= maxMessagesPerRead | !autoRead) { break; } } } catch (Throwable t) { exception = t; } int size = readBuf.size(); for (int i = 0; i < size; i ++) { pipeline.fireChannelRead(readBuf.get(i));//tag1.2 } readBuf.clear(); pipeline.fireChannelReadComplete();//tag1.3 if (exception != null) { if (exception instanceof IOException) { // ServerChannel should not be closed even on IOException because it can often continue // accepting incoming connections. (e.g. too many open files) closed = !(AbstractNioMessageChannel.this instanceof ServerChannel); } pipeline.fireExceptionCaught(exception); } if (closed) { if (isOpen()) { close(voidPromise()); } } }
在tag1.1.1中,執行accept方法,該方法在JDK doc中簡單描述以下:若是該channel處於非阻塞狀態並且沒有等待(pending)的鏈接,那麼該方法會返回null;不然該方法會阻塞直到鏈接可用或者發生I/O錯誤。此時實際上Client發送了connect請求而且服務端是處於non-blocking模式下,那麼這個accept()
會返回一個不爲null的channel。promise
而後繼續執行tag1.1.2代碼,並使用了不一樣的EventLoop實例,即childEventLoopGroup().next()。 接着doReadMessages
返回1。而後程序繼續執行上面的代碼:readBuf.size() >= maxMessagesPerRead | !autoRead
,readBuf.size() >= maxMessagesPerRead
值爲false;!autoRead
仍爲false,則|操做後仍爲false。此時繼續執行外面的for循環,因爲不知足「若是該channel出於非阻塞狀態並且沒有等待(pending)的鏈接,那麼該方法會返回null」這個約束,因此在第二次執行doReadMessages
返回0,並最終退出循環。網絡
@Override protected int NioServerSocketChannel.doReadMessages(List<Object> buf) throws Exception { SocketChannel ch = javaChannel().accept();//tag1.1.1 try { if (ch != null) { buf.add(new NioSocketChannel(this, childEventLoopGroup().next(), ch));//tag1.1.2 return 1; } } catch (Throwable t) { logger.warn("Failed to create a new channel from an accepted socket.", t); try { ch.close(); } catch (Throwable t2) { logger.warn("Failed to close a socket.", t2); } } return 0; }
下面程序繼續執行tag1.2代碼,readBuf變量的值爲服務端accept後的 NioSocketChannel,readBuf.size()值爲1。不過先回憶下,此時的handler鏈是HeadHandler,ServerBootstrapAcceptor和TailHandler。less
@Override public ChannelPipeline fireChannelRead(Object msg) { head.fireChannelRead(msg); return this; }
因爲ServerBootstrap
覆寫了 channelRead 方法,因此程序執行了ServerBootstrapAcceptor.channelRead
方法。socket
@Override @SuppressWarnings("unchecked") public void channelRead(ChannelHandlerContext ctx, Object msg) { Channel child = (Channel) msg; child.pipeline().addLast(childHandler);//tag1.2.1 for (Entry<ChannelOption<?>, Object> e: childOptions) { try { if (!child.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) { logger.warn("Unknown channel option: " + e); } } catch (Throwable t) { logger.warn("Failed to set a channel option: " + child, t); } } for (Entry<AttributeKey<?>, Object> e: childAttrs) { child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue()); } child.unsafe().register(child.newPromise());//tag1.2.2 }
在執行tag1.2.1代碼段前,child的handler鏈是HeadHandler,TailHandler。請讀者注意,不要和parent的Handler鏈混淆。在執行完tag1.2.1後,此時的handler鏈是HeadHandler,TelnetServerInitializer和TailHandleride
而後開始執行 tag1.2.2 child.unsafe().register(child.newPromise());
。有一個小細節就是,此時會開一個新worker線程,去執行這個register0操做。oop
@Override public final void AbstractChannel.AbstractUnsafe.register(final ChannelPromise promise) { if (eventLoop.inEventLoop()) { register0(promise); } else { try { eventLoop.execute(new Runnable() { @Override public void run() { register0(promise);//tag1.2.2.1 } }); } catch (Throwable t) { logger.warn( "Force-closing a channel whose registration task was not accepted by an event loop: {}", AbstractChannel.this, t); closeForcibly(); closeFuture.setClosed(); promise.setFailure(t); } } }
在tag1.2.2.1,開始執行 register0(promise);this
private void AbstractChannel.register0(ChannelPromise promise) { try { // check if the channel is still open as it could be closed in the mean time when the register // call was outside of the eventLoop if (!ensureOpen(promise)) { return; } doRegister();//tag1.2.2.1.1 registered = true; promise.setSuccess(); pipeline.fireChannelRegistered();//tag1.2.2.1.2 if (isActive()) { pipeline.fireChannelActive();//tag1.2.2.1.3 } } catch (Throwable t) { // Close the channel directly to avoid FD leak. closeForcibly(); closeFuture.setClosed(); if (!promise.tryFailure(t)) { logger.warn( "Tried to fail the registration promise, but it is complete already. " + "Swallowing the cause of the registration failure:", t); } } }
tag1.2.2.1.1,主要執行javaChannel().register(eventLoop().selector, 0, this);
;有點奇怪的是,這裏的ops參數是0。TODO。
@Override protected void doRegister() throws Exception { boolean selected = false; for (;;) { try { selectionKey = javaChannel().register(eventLoop().selector, 0, this); return; } catch (CancelledKeyException e) { if (!selected) { // Force the Selector to select now as the "canceled" SelectionKey may still be // cached and not removed because no Select.select(..) operation was called yet. eventLoop().selectNow(); selected = true; } else { // We forced a select operation on the selector before but the SelectionKey is still cached // for whatever reason. JDK bug ? throw e; } } } }
tag1.2.2.1.2,當執行fireChannelRegistered時,裏面會繼續執行TelnetServerInitializer父類的channelRegistered方法。
@Override public ChannelPipeline fireChannelRegistered() { head.fireChannelRegistered(); return this; } @Override @SuppressWarnings("unchecked") public final void channelRegistered(ChannelHandlerContext ctx) throws Exception { ChannelPipeline pipeline = ctx.pipeline(); boolean success = false; try { initChannel((C) ctx.channel()); pipeline.remove(this); ctx.fireChannelRegistered(); success = true; } catch (Throwable t) { logger.warn("Failed to initialize a channel. Closing: " + ctx.channel(), t); } finally { if (pipeline.context(this) != null) { pipeline.remove(this); } if (!success) { ctx.close(); } } }
在channelRegistered方法中,調用TelnetServerInitializer.initChannel方法,進而完成將下面的幾個handler加入到Handler鏈中。此時,child handler鏈是HeadHandler,DelimiterBasedFrameDecoder,StringDecoder,StringEncoder,TelnetServerHandler和TailHandler。
@Override public void TelnetServerInitializer.initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); // Add the text line codec combination first, pipeline.addLast("framer", new DelimiterBasedFrameDecoder( 8192, Delimiters.lineDelimiter())); // the encoder and decoder are static as these are sharable pipeline.addLast("decoder", DECODER); pipeline.addLast("encoder", ENCODER); // and then business logic. pipeline.addLast("handler", SERVERHANDLER); }
此時完成tag1.2.2.1.2 代碼段的執行,而後繼續執行 tag1.2.2.1.3 的代碼段 pipeline.fireChannelActive();
, @Override public ChannelPipeline DefaultChannelPipeline.fireChannelActive() { head.fireChannelActive();//tag1.2.2.1.3.1
if (channel.config().isAutoRead()) { channel.read();//tag1.2.2.1.3.2 } return this; }
因爲child handler鏈裏只有TelnetServerHandler覆寫了channelActive方法,因此僅執行了TelnetServerHandler。
@Override public void TelnetServerHandler.channelActive(ChannelHandlerContext ctx) throws Exception { // Send greeting for a new connection. ctx.write( "Welcome to " + InetAddress.getLocalHost().getHostName() + "!\r\n");//tag1.2.2.1.3.1.1 ctx.write("It is " + new Date() + " now.\r\n"); ctx.flush();//tag1.2.2.1.3.1.2
}
@Override public ChannelFuture DefaultChannelHandlerContext.write(Object msg) { return write(msg, newPromise()); } @Override public ChannelFuture DefaultChannelHandlerContext.write(Object msg, ChannelPromise promise) { DefaultChannelHandlerContext next = findContextOutbound(MASK_WRITE); next.invoker.invokeWrite(next, msg, promise); return promise; } @Override public void DefaultChannelHandlerInvoker.invokeWrite(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) { if (msg == null) { throw new NullPointerException("msg"); } validatePromise(ctx, promise, true); if (executor.inEventLoop()) { invokeWriteNow(ctx, msg, promise);//tag1.2.2.1.3.1.1.1 } else { AbstractChannel channel = (AbstractChannel) ctx.channel(); int size = channel.estimatorHandle().size(msg); if (size > 0) { ChannelOutboundBuffer buffer = channel.unsafe().outboundBuffer(); // Check for null as it may be set to null if the channel is closed already if (buffer != null) { buffer.incrementPendingOutboundBytes(size); } } safeExecuteOutbound(WriteTask.newInstance(ctx, msg, size, promise), promise, msg); } }
在tag1.2.2.1.3.1.1.1處,執行了StringEncoder父類的MessageToMessageEncoder.write方法。因爲筆者目前對這部分細節不感興趣,因此暫時略去分析(TODO)。
在StringEncoder的父類MessageToMessageEncoder的write方法的finally塊裏,經過執行 ctx.write(out.get(sizeMinusOne), promise);
來繼續執行下一個handler:HeadHandler,從而完成 ctx.write()方法的Handler執行。
(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception
@Override public void HeadHandler.write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { unsafe.write(msg, promise); } @Override public void AbstractChannel.AbstractUnsafe.write(Object msg, ChannelPromise promise) { if (!isActive()) { // Mark the write request as failure if the channel is inactive. if (isOpen()) { promise.tryFailure(NOT_YET_CONNECTED_EXCEPTION); } else { promise.tryFailure(CLOSED_CHANNEL_EXCEPTION); } // release message now to prevent resource-leak ReferenceCountUtil.release(msg); } else { outboundBuffer.addMessage(msg, promise);//暫時略去不分析 TODO } }
該 outboundBuffer.addMessage(msg, promise) 將msg存儲到ChannelOutboundBuffer中。至此,簡單分析了ctx.write()方法,下面接着執行tag1.2.2.1.3.1.2 ctx.flush();
方法
@Override public ChannelHandlerContext DefaultChannelHandlerContext.flush() { DefaultChannelHandlerContext next = findContextOutbound(MASK_FLUSH); next.invoker.invokeFlush(next); return this; } @Override public void HeadHandler.flush(ChannelHandlerContext ctx) throws Exception { unsafe.flush(); } @Override public void AbstractChannel.AbstractUnsafe.flush() { ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; if (outboundBuffer == null) { return; } outboundBuffer.addFlush(); flush0(); } @Override protected void AbstractNioChannel.AbstractNioUnsafe.flush0() { // Flush immediately only when there's no pending flush. // If there's a pending flush operation, event loop will call forceFlush() later, // and thus there's no need to call it now. if (isFlushPending()) { return; } super.flush0(); } protected void AbstractChannel.AbstractUnsafe.flush0() { if (inFlush0) { // Avoid re-entrance return; } final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; if (outboundBuffer == null || outboundBuffer.isEmpty()) { return; } inFlush0 = true; // Mark all pending write requests as failure if the channel is inactive. if (!isActive()) { try { if (isOpen()) { outboundBuffer.failFlushed(NOT_YET_CONNECTED_EXCEPTION); } else { outboundBuffer.failFlushed(CLOSED_CHANNEL_EXCEPTION); } } finally { inFlush0 = false; } return; } try { doWrite(outboundBuffer);//tag1.2.2.1.3.1.2.1 } catch (Throwable t) { outboundBuffer.failFlushed(t); } finally { inFlush0 = false; } }
在tag1.2.2.1.3.1.2.1處,最終調用了NioSocketChannel.doWrite方法,在該方法內部執行了final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt);
這句話,從而保證數據寫入到socket緩衝區中。
@Override protected void NioSocketChannel.doWrite(ChannelOutboundBuffer in) throws Exception { for (;;) { // Do non-gathering write for a single buffer case. final int msgCount = in.size(); if (msgCount <= 1) { super.doWrite(in); return; } // Ensure the pending writes are made of ByteBufs only. ByteBuffer[] nioBuffers = in.nioBuffers(); if (nioBuffers == null) { super.doWrite(in); return; } int nioBufferCnt = in.nioBufferCount(); long expectedWrittenBytes = in.nioBufferSize(); final SocketChannel ch = javaChannel(); long writtenBytes = 0; boolean done = false; boolean setOpWrite = false; for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) { final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt);//數據最終寫出 if (localWrittenBytes == 0) { setOpWrite = true; break; } expectedWrittenBytes -= localWrittenBytes; writtenBytes += localWrittenBytes; if (expectedWrittenBytes == 0) { done = true; break; } } if (done) { // Release all buffers for (int i = msgCount; i > 0; i --) { in.remove(); } // Finish the write loop if no new messages were flushed by in.remove(). if (in.isEmpty()) { clearOpWrite();//tag1.2.2.1.3.1.2.1.1 break; } } else { // Did not write all buffers completely. // Release the fully written buffers and update the indexes of the partially written buffer. for (int i = msgCount; i > 0; i --) { final ByteBuf buf = (ByteBuf) in.current(); final int readerIndex = buf.readerIndex(); final int readableBytes = buf.writerIndex() - readerIndex; if (readableBytes < writtenBytes) { in.progress(readableBytes); in.remove(); writtenBytes -= readableBytes; } else if (readableBytes > writtenBytes) { buf.readerIndex(readerIndex + (int) writtenBytes); in.progress(writtenBytes); break; } else { // readableBytes == writtenBytes in.progress(readableBytes); in.remove(); break; } } incompleteWrite(setOpWrite); break; } } }
就在剛執行了final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt);
這句話後,客戶端當即進入了NioEventLoop.processSelectedKey()方法中,準備開始讀入數據了。不過此刻稍緩下,先把服務端的流程走完。
還有一個很是重要的小細節,就是在tag1.2.2.1.3.1.2.1.1處,執行了AbstractNioByteChannel.clearOpWrite() 方法,避免發生CPU100%問題。
因爲在執行tag1.2.2 時,那時是新開了一個線程執行的。因此,當新線程執行時,舊線程繼續執行tag1.3,即pipeline.fireChannelReadComplete();
,最終該線程執行TailHandler.channelReadComplete(),該方法也是空實現。
就在服務端剛剛執行完 javaChannel.write()方法後,客戶端就收到服務端的數據,開始執行NioEventLoop.processSelectedKey()
方法。在其內部執行unsafe.read();
方法。
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { unsafe.read();//tag2 if (!ch.isOpen()) { // Connection already closed - no need to handle write. return; }
}
此時,程序開始執行tag2 方法
AbstractNioByteChannel.NioByteUnsafe.read()
@Override public void AbstractNioByteChannel.NioByteUnsafe.read() { final ChannelConfig config = config(); final ChannelPipeline pipeline = pipeline(); final ByteBufAllocator allocator = config.getAllocator(); final int maxMessagesPerRead = config.getMaxMessagesPerRead(); RecvByteBufAllocator.Handle allocHandle = this.allocHandle; if (allocHandle == null) { this.allocHandle = allocHandle = config.getRecvByteBufAllocator().newHandle(); } if (!config.isAutoRead()) { removeReadOp(); } ByteBuf byteBuf = null; int messages = 0; boolean close = false; try { int byteBufCapacity = allocHandle.guess(); int totalReadAmount = 0; do { byteBuf = allocator.ioBuffer(byteBufCapacity); int writable = byteBuf.writableBytes(); int localReadAmount = doReadBytes(byteBuf);//tag2.1 if (localReadAmount <= 0) { // not was read release the buffer byteBuf.release(); close = localReadAmount < 0; break; } pipeline.fireChannelRead(byteBuf);//tag2.2 byteBuf = null; if (totalReadAmount >= Integer.MAX_VALUE - localReadAmount) { // Avoid overflow. totalReadAmount = Integer.MAX_VALUE; break; } totalReadAmount += localReadAmount; if (localReadAmount < writable) { // Read less than what the buffer can hold, // which might mean we drained the recv buffer completely. break; } } while (++ messages < maxMessagesPerRead); pipeline.fireChannelReadComplete();//tag2.3 allocHandle.record(totalReadAmount);//tag2.4 if (close) { closeOnRead(pipeline); close = false; } } catch (Throwable t) { handleReadException(pipeline, byteBuf, t, close); } } }
在tag2.1代碼中,最終執行UnpooledUnsafeDirectByteBuf.setBytes
方法,在該方法內部in.read(tmpBuf);
,從而完成網絡數據的讀取。
@Override public int UnpooledUnsafeDirectByteBuf.setBytes(int index, ScatteringByteChannel in, int length) throws IOException { ensureAccessible(); ByteBuffer tmpBuf = internalNioBuffer(); tmpBuf.clear().position(index).limit(index + length); try { return in.read(tmpBuf); } catch (ClosedChannelException e) { return -1; } }
接着執行tag2.2的代碼,執行pipeline.fireChannelRead(byteBuf);
,開始執行DelimiterBasedFrameDecoder(ByteToMessageDecoder).channelRead()
方法。該類能夠經過指定分隔符,把ByteBuf再分紅多條消息。因此,在執行完 callDecode(ctx, cumulation, out);
方法後,變量out還有兩條記錄,也就是服務端發過來的兩條消息。
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if (msg instanceof ByteBuf) { RecyclableArrayList out = RecyclableArrayList.newInstance(); try { ByteBuf data = (ByteBuf) msg; first = cumulation == null; if (first) { cumulation = data; } else { if (cumulation.writerIndex() > cumulation.maxCapacity() - data.readableBytes()) { expandCumulation(ctx, data.readableBytes()); } cumulation.writeBytes(data); data.release(); } callDecode(ctx, cumulation, out);//tag2.2.1 } catch (DecoderException e) { throw e; } catch (Throwable t) { throw new DecoderException(t); } finally { if (cumulation != null && !cumulation.isReadable()) { cumulation.release(); cumulation = null; } int size = out.size(); decodeWasNull = size == 0;
for (int i = 0; i < size; i ++) { ctx.fireChannelRead(out.get(i));//tag2.2.2 } out.recycle(); } } else { ctx.fireChannelRead(msg); } }
接着繼續執行tag2.2的代碼,從而執行StringDecoder(MessageToMessageDecoder<I>).channelRead()
方法,在該方法內部,即tag2.2.2.1會執行StringDecoder.decode(hannelHandlerContext ctx, ByteBuf msg, List<Object> out)
方法,從而完成將字節轉成字符串的功能。
@Override public void MessageToMessageDecoder.channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { RecyclableArrayList out = RecyclableArrayList.newInstance(); try { if (acceptInboundMessage(msg)) { @SuppressWarnings("unchecked") I cast = (I) msg; try { decode(ctx, cast, out);//tag2.2.2.1 } finally { ReferenceCountUtil.release(cast); } } else { out.add(msg); } } catch (DecoderException e) { throw e; } catch (Exception e) { throw new DecoderException(e); } finally { int size = out.size(); for (int i = 0; i < size; i ++) { ctx.fireChannelRead(out.get(i));//tag2.2.2.2 } out.recycle(); } }
在tag2.2.2.2處,會繼續執行TelnetClientHandler(SimpleChannelInboundHandler<I>).channelRead(ChannelHandlerContext, Object)
方法,在該方法內部會執行TelnetClientHandler.messageReceived方法,在該方法內部執行 System.err.println(msg);
方法。
@Override protected void messageReceived(ChannelHandlerContext ctx, String msg) throws Exception { System.err.println(msg); }
而後代碼返回到tag2.2.2處,繼續執行下一個循環,從而最終完成兩次消息打印。
接着代碼繼續返回到tag2.3處,繼續執行pipeline.fireChannelReadComplete();
,從而觸發了ByteToMessageDecoder和TailHandler的channelReadComplete()方法執行。
行文到此,服務端和客戶端交互分析完畢。後續再進行總結下閱讀Netty代碼過程的思考