樣例代碼來自於io.netty.example.telnet.TelnetClient
,完整樣例請參考NettyExample工程。java
客戶端和服務端比較類似,因此本篇會在必定程度上略去重複的部分,以減小篇幅。git
public void run() throws Exception { EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); b.group(group) .channel(NioSocketChannel.class) .handler(new TelnetClientInitializer()); // Start the connection attempt. Channel ch = b.connect(host, port).sync().channel(); // Read commands from the stdin. ChannelFuture lastWriteFuture = null; BufferedReader in = new BufferedReader(new InputStreamReader(System.in)); for (;;) { String line = in.readLine(); if (line == null) { break; } // Sends the received line to the server. lastWriteFuture = ch.writeAndFlush(line + "\r\n"); // If user typed the 'bye' command, wait until the server closes // the connection. if ("bye".equals(line.toLowerCase())) { ch.closeFuture().sync(); break; } } // Wait until all messages are flushed before closing the channel. if (lastWriteFuture != null) { lastWriteFuture.sync(); } } finally { group.shutdownGracefully(); } }
Bootstrap b = new Bootstrap(); //tag0 b.group(group) //tag1 .channel(NioSocketChannel.class) //tag2 .handler(new TelnetClientInitializer());//tag3
tag0代碼主要初始化了父類的 options和attrs 屬性;代碼略。github
tag1設置了group屬性promise
@SuppressWarnings("unchecked") public B group(EventLoopGroup group) { if (group == null) { throw new NullPointerException("group"); } if (this.group != null) { throw new IllegalStateException("group set already"); } this.group = group; return (B) this; }
tag2設置了channelFactory屬性服務器
public Bootstrap channel(Class<? extends Channel> channelClass) { if (channelClass == null) { throw new NullPointerException("channelClass"); } return channelFactory(new BootstrapChannelFactory<Channel>(channelClass)); }
tag3設置了handler屬性app
public B handler(ChannelHandler handler) { if (handler == null) { throw new NullPointerException("handler"); } this.handler = handler; return (B) this; }less
下面開始第二段代碼分析,依次執行下面的方法。socket
Channel ch = b.connect(host, port) //tag4 .sync().channel(); //tag5 public ChannelFuture connect(String inetHost, int inetPort) { return connect(new InetSocketAddress(inetHost, inetPort)); } public ChannelFuture connect(SocketAddress remoteAddress) { if (remoteAddress == null) { throw new NullPointerException("remoteAddress"); } validate(); return doConnect(remoteAddress, localAddress()); } private ChannelFuture doConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) { final ChannelFuture regFuture = initAndRegister();//tag4.1 final Channel channel = regFuture.channel(); if (regFuture.cause() != null) { return regFuture; } final ChannelPromise promise = channel.newPromise(); if (regFuture.isDone()) { doConnect0(regFuture, channel, remoteAddress, localAddress, promise);//tag4.2 } else { regFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { doConnect0(regFuture, channel, remoteAddress, localAddress, promise); } }); } return promise; } 分析tag4.1代碼,細心的讀者注意到,這些和服務端的代碼執行過程是同樣的。運用模板模式,子類定義獨特的實現。 final ChannelFuture AbstractBootstrap.initAndRegister() { Channel channel; try { channel = createChannel();//tag4.1.1 } catch (Throwable t) { return VoidChannel.INSTANCE.newFailedFuture(t); } try { init(channel);//tag4.1.2 } catch (Throwable t) { channel.unsafe().closeForcibly(); return channel.newFailedFuture(t); } ChannelPromise regFuture = channel.newPromise(); channel.unsafe().register(regFuture);//tag4.1.3 if (regFuture.cause() != null) { if (channel.isRegistered()) { channel.close(); } else { channel.unsafe().closeForcibly(); } }
分析 tag4.1.1,裏面經過反射來實例化NioSocketChannelide
@Override Channel createChannel() { EventLoop eventLoop = group().next(); return channelFactory().newChannel(eventLoop);//tag4.1.1.1 } public NioSocketChannel(EventLoop eventLoop) { this(eventLoop, newSocket());//調用下面的newSocket()方法 } private static SocketChannel newSocket() { try { return SocketChannel.open(); } catch (IOException e) { throw new ChannelException("Failed to open a socket.", e); } } public NioSocketChannel(EventLoop eventLoop, SocketChannel socket) { this(null, eventLoop, socket); } protected AbstractNioByteChannel(Channel parent, EventLoop eventLoop, SelectableChannel ch) { super(parent, eventLoop, ch, SelectionKey.OP_READ);//調用父類方法 } protected AbstractNioChannel(Channel parent, EventLoop eventLoop, SelectableChannel ch, int readInterestOp) { super(parent, eventLoop);//調用父類方法,tag4.1.1.1 this.ch = ch; this.readInterestOp = readInterestOp; try { ch.configureBlocking(false);//tag4.1.1.2 } catch (IOException e) { try { ch.close(); } catch (IOException e2) { if (logger.isWarnEnabled()) { logger.warn( "Failed to close a partially initialized socket.", e2); } } throw new ChannelException("Failed to enter non-blocking mode.", e); } } protected AbstractChannel(Channel parent, EventLoop eventLoop) { this.parent = parent; this.eventLoop = validate(eventLoop); unsafe = newUnsafe(); pipeline = new DefaultChannelPipeline(this);//tag4.1.1.1.1 }
分析tag4.1.1.1.1,裏面調用DefaultChannelPipeline構造器,和服務端的邏輯同樣,故不做分析。oop
此時系統返回到tag4.1.1.2 繼續執行ch.configureBlocking(false);
,此時完成tag4.1.1 方法執行,開始執行tag4.1.2方法
@Override @SuppressWarnings("unchecked") void init(Channel channel) throws Exception { ChannelPipeline p = channel.pipeline(); p.addLast(handler());//tag4.1.2.1 final Map<ChannelOption<?>, Object> options = options(); synchronized (options) { for (Entry<ChannelOption<?>, Object> e: options.entrySet()) { try { if (!channel.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) { logger.warn("Unknown channel option: " + e); } } catch (Throwable t) { logger.warn("Failed to set a channel option: " + channel, t); } } } final Map<AttributeKey<?>, Object> attrs = attrs(); synchronized (attrs) { for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) { channel.attr((AttributeKey<Object>) e.getKey()).set(e.getValue()); } } }
分析tag4.1.2.1,裏面將這個TelnetClientInitializer Handler加入到pipeline中,此時handler鏈是HeadHandler,TelnetClientInitializer,TailHandler,共計3個。
此時程序返回到tag4.1.3繼續執行,
@Override public final void AbstractChannel.register(final ChannelPromise promise) { if (eventLoop.inEventLoop()) { register0(promise); } else { try { eventLoop.execute(new Runnable() { @Override public void run() { register0(promise);//tag4.1.3.1 } }); } catch (Throwable t) { logger.warn( "Force-closing a channel whose registration task was not accepted by an event loop: {}", AbstractChannel.this, t); closeForcibly(); closeFuture.setClosed(); promise.setFailure(t); } } } private void AbstractChannel.AbstractUnsafe.register0(ChannelPromise promise) { try { // check if the channel is still open as it could be closed in the mean time when the register // call was outside of the eventLoop if (!ensureOpen(promise)) { return; } doRegister();//tag4.1.3.1.1 registered = true; promise.setSuccess(); pipeline.fireChannelRegistered();//tag4.1.3.1.2 if (isActive()) { pipeline.fireChannelActive(); } } catch (Throwable t) { // Close the channel directly to avoid FD leak. closeForcibly(); closeFuture.setClosed(); if (!promise.tryFailure(t)) { logger.warn( "Tried to fail the registration promise, but it is complete already. " + "Swallowing the cause of the registration failure:", t); } } }
tag4.1.3.1.1 代碼以下
@Override protected void AbstractNioChannel.doRegister() throws Exception { boolean selected = false; for (;;) { try { selectionKey = javaChannel().register(eventLoop().selector, 0, this);//tag4.1.3.1.2.1 return; } catch (CancelledKeyException e) { if (!selected) { // Force the Selector to select now as the "canceled" SelectionKey may still be // cached and not removed because no Select.select(..) operation was called yet. eventLoop().selectNow(); selected = true; } else { // We forced a select operation on the selector before but the SelectionKey is still cached // for whatever reason. JDK bug ? throw e; } } } }
tag4.1.3.1.2.1 把selector註冊到javaChannel上;而後程序繼續執行tag4.1.3.1.2代碼。
@Override public ChannelPipeline fireChannelRegistered() { head.fireChannelRegistered(); return this; } @Override public ChannelHandlerContext fireChannelRegistered() { DefaultChannelHandlerContext next = findContextInbound(MASK_CHANNEL_REGISTERED); next.invoker.invokeChannelRegistered(next); return this; } @Override @SuppressWarnings("unchecked") public final void ChannelInitializer.channelRegistered(ChannelHandlerContext ctx) throws Exception { ChannelPipeline pipeline = ctx.pipeline(); boolean success = false; try { initChannel((C) ctx.channel());//tag4.1.3.1.2.1 pipeline.remove(this);//tag4.1.3.1.2.2 ctx.fireChannelRegistered();//tag4.1.3.1.2.3 success = true; } catch (Throwable t) { logger.warn("Failed to initialize a channel. Closing: " + ctx.channel(), t); } finally { if (pipeline.context(this) != null) { pipeline.remove(this); } if (!success) { ctx.close(); } } }
後面的邏輯和服務端相似,此時執行的handler是TelnetClientInitializer,並執行ChannelInitializer的channelRegistered方法,channelRegistered方法裏面接着調用了initChannel。
標記 tag4.1.3.1.2.1 代碼以下
@Override public void TelnetClientInitializer.initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); // Add the text line codec combination first, pipeline.addLast("framer", new DelimiterBasedFrameDecoder( 8192, Delimiters.lineDelimiter())); pipeline.addLast("decoder", DECODER); pipeline.addLast("encoder", ENCODER); // and then business logic. pipeline.addLast("handler", CLIENTHANDLER); }
在完成tag4.1.3.1.2.2的 pipeline.remove(this);
後,此時handler鏈以下:HeadHandler,DelimiterBasedFrameDecoder,StringDecoder,StringEncoder,TelnetClientHandler, TailHandler。
接着程序又開始執行下一個handler,最終找到TailHandler的channelRegistered方法。TailHandler的channelRegistered方法是空方法。
此時 tag4.1 的代碼執行結束,開始執行 tag4.2的代碼
private static void doConnect0( final ChannelFuture regFuture, final Channel channel, final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
// This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up // the pipeline in its channelRegistered() implementation. channel.eventLoop().execute(new Runnable() { @Override public void run() { if (regFuture.isSuccess()) { if (localAddress == null) { channel.connect(remoteAddress, promise);//tag4.2.1 } else { channel.connect(remoteAddress, localAddress, promise); } promise.addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } else { promise.setFailure(regFuture.cause()); } } }); } @Override public ChannelFuture AbstractChannel.connect(SocketAddress remoteAddress, ChannelPromise promise) { return pipeline.connect(remoteAddress, promise); }
通過一番計算,找到HeadHandler,執行unsafe的方法。
@Override public void HeadHandler.connect( ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) throws Exception { unsafe.connect(remoteAddress, localAddress, promise); }
AbstractNioChannel.AbstractNioUnsafe的 connect方法以下:
@Override public void connect( final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) { if (!ensureOpen(promise)) { return; } try { if (connectPromise != null) { throw new IllegalStateException("connection attempt already made"); } boolean wasActive = isActive(); if (doConnect(remoteAddress, localAddress)) {//tag4.2.1.1 fulfillConnectPromise(promise, wasActive);//tag4.2.1.2 } else { connectPromise = promise; requestedRemoteAddress = remoteAddress; // Schedule connect timeout. int connectTimeoutMillis = config().getConnectTimeoutMillis(); if (connectTimeoutMillis > 0) { connectTimeoutFuture = eventLoop().schedule(new Runnable() { @Override public void run() {//tag4.2.1.3 ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise; ConnectTimeoutException cause = new ConnectTimeoutException("connection timed out: " + remoteAddress); if (connectPromise != null && connectPromise.tryFailure(cause)) { close(voidPromise()); } } }, connectTimeoutMillis, TimeUnit.MILLISECONDS); } promise.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (future.isCancelled()) { if (connectTimeoutFuture != null) { connectTimeoutFuture.cancel(false); } connectPromise = null; close(voidPromise()); } } }); } } catch (Throwable t) { if (t instanceof ConnectException) { Throwable newT = new ConnectException(t.getMessage() + ": " + remoteAddress); newT.setStackTrace(t.getStackTrace()); t = newT; } promise.tryFailure(t); closeIfClosed(); } }
tag4.2.1.1 代碼以下,進行了bind本地端口和connect遠程服務器的操做。
@Override protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception { if (localAddress != null) { javaChannel().socket().bind(localAddress); } boolean success = false; try { boolean connected = javaChannel().connect(remoteAddress);//tag4.2.1.1.1 if (!connected) { selectionKey().interestOps(SelectionKey.OP_CONNECT);//tag4.2.1.1.2 } success = true; return connected; } finally { if (!success) { doClose(); } } }
tag4.2.1.1.2 裏面執行了connect遠程服務器的操做,,個人機器上該方法返回false(返回值詳見connect方法說明)。而後會觸發執行selectionKey().interestOps(SelectionKey.OP_CONNECT);
須要額外說明的是,此時服務器會觸發channelActivi事件。在本例的服務端代碼裏,會在客戶端鏈接時,發送消息給客戶端。不過先暫時忽略服務端和客戶端的數據交互,下文分析。
而後tag4.2.1.1 執行結束,因爲此時的返回值是false,因此不會執行tag4.2.1.2的 fulfillConnectPromise(promise, wasActive);
而後程序繼續執行tag4.2.1.3 代碼,進行鏈接超時處理:若是設置了超時時間,那麼等待指定的超時時間後,再看看是否已經鏈接上。若是連不上,則設置失敗狀態。
接着開始下一個事件循環,因爲在tag4.2.1.1.2執行了selectionKey().interestOps(SelectionKey.OP_CONNECT)
操做,會進入到下面的代碼。這裏咱們重點關注tag4.3的代碼。
private static void NioEventLoop.processSelectedKey(SelectionKey k, AbstractNioChannel ch) { //略XXXX if ((readyOps & SelectionKey.OP_CONNECT) != 0) { // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking // See https://github.com/netty/netty/issues/924 int ops = k.interestOps(); ops &= ~SelectionKey.OP_CONNECT; k.interestOps(ops); unsafe.finishConnect();//tag4.3 //略XXXX } @Override public void finishConnect() { // Note this method is invoked by the event loop only if the connection attempt was // neither cancelled nor timed out. assert eventLoop().inEventLoop(); assert connectPromise != null; try { boolean wasActive = isActive(); doFinishConnect();//tag4.3.1 fulfillConnectPromise(connectPromise, wasActive);//tag4.3.2 } catch (Throwable t) { if (t instanceof ConnectException) { Throwable newT = new ConnectException(t.getMessage() + ": " + requestedRemoteAddress); newT.setStackTrace(t.getStackTrace()); t = newT; } // Use tryFailure() instead of setFailure() to avoid the race against cancel(). connectPromise.tryFailure(t); closeIfClosed(); } finally { // Check for null as the connectTimeoutFuture is only created if a connectTimeoutMillis > 0 is used // See https://github.com/netty/netty/issues/1770 if (connectTimeoutFuture != null) { connectTimeoutFuture.cancel(false); } connectPromise = null; } } @Override protected void NioSocketChannel.doFinishConnect() throws Exception { if (!javaChannel().finishConnect()) { throw new Error(); } }
在執行完下面的boolean promiseSet = promise.trySuccess();
方法後,實例代碼中的 Channel ch = b.connect(host, port).sync().channel();
就執行完畢了,而後主線程就阻塞在實例代碼中的 String line = in.readLine();
這句代碼裏了。
private void AbstractNioChannel.AbstractNioUnsafe.fulfillConnectPromise(ChannelPromise promise, boolean wasActive) { // trySuccess() will return false if a user cancelled the connection attempt. boolean promiseSet = promise.trySuccess(); // Regardless if the connection attempt was cancelled, channelActive() event should be triggered, // because what happened is what happened. if (!wasActive && isActive()) { pipeline().fireChannelActive();//tag4.3.2.1 } // If a user cancelled the connection attempt, close the channel, which is followed by channelInactive(). if (!promiseSet) { close(voidPromise()); } } @Override public ChannelPipeline fireChannelActive() { head.fireChannelActive();//tag4.3.2.1.1 if (channel.config().isAutoRead()) { channel.read();//tag4.3.2.1.2 } return this; }
此時,繼續執行tag4.3.2.1的代碼,進而執行tag4.3.2.1.1的代碼,最終執行TailHandler.channelActive方法。因爲TailHandler類內部的方法基本都是空實現,因此再也不貼代碼了。而後再執行tag4.3.2.1.2的channel.read();
代碼,最終執行了AbstractNioChannel.doBeginRead()方法的selectionKey.interestOps(interestOps | readInterestOp);
,等同於執行了selectionKey.interestOps(SelectionKey.OP_READ);
。
此時方法返回,在NioEventLoop.run()通過了一些簡單的數據清理後,而後有機會對服務端的channelActive時發送的數據進行處理了(在tag4.2.1.1.2曾經提過)。客戶端和服務端交互過程詳見下篇。