原生的NIO類圖使用有諸多不便,Netty向用戶屏蔽了細節,在與用戶交界處作了封裝。java
ServerBootstrap是Netty服務端的啓動輔助類,它提供了一些列的方法用於設置參數,因爲參數太多,使用builder模式。linux
Netty中的Reactor線程池是EventLoopGroup,它實際上就是EventLoop數組。EventLoop的職責是處理全部註冊到本線程多路複用器Selector上的Channel。Selector的輪詢操做由綁定的EventLoop線程run方法驅動,在一個循環體內循環執行。值得說明的是,EventLoop的職責不只僅是處理網絡I/O事件,用戶自定義的Task和定時任務Task也統一由EventLoop負責處理,這樣線程模型就實現了統一。從調度層面看,也不存在從EvenetLoop線程中再啓動其餘類型的線程用於異步執行另外的任務,這樣就避免了多線程的併發操做和鎖競爭,提高了I/O線程的處理和調度性能。git
服務端須要建立ServerSocketChannel,對原生NIO類庫進行了封裝,對應是NioServerSocketChannelgithub
對用戶而言,不須要關心服務端Channel的底層實現細節和工做原理,只須要指定具體是哪一種服務端算法
Netty的ServerBootstrap提供了channel方法用於指定服務端Channel的類型。Netty經過工廠反射建立NioServerSocketChannel對象。api
public B channel(Class<? extends C> channelClass) { if (channelClass == null) { throw new NullPointerException("channelClass"); } return channelFactory(new ReflectiveChannelFactory<C>(channelClass)); }
它本質上是一個負載處理網絡事件的職責鏈,負載管理和執行ChannelHanler。網絡事件以事件流的形式在ChannelPipeline中流轉,由ChannelPipeline根據ChannelHandler的執行策略調度ChannelHandler的執行。典型的網絡事件以下:數組
(1)鏈路註冊;promise
(2)鏈路激活;安全
(3)鏈路斷開;服務器
(4)接收到請求消息;
(5)請求消息接收並處理完畢;
(6)發送應答消息;
(7)鏈路發生異常;
(8)發生用戶自定義事件。
ChannelHandler是Netty提供給用戶定製和擴展的關鍵接口。利用ChannelHandler用戶能夠完成大多數的功能定製,例如消息編解碼、心跳、安全認證、TSL/SSL認證、流量控制和流量整形等。Netty同時也提供了大量的系統ChannelHandler供用戶使用,比較實用的系統ChannelHandler總結以下:
(1)系統編解碼框架——ByteToMessageCodec;
(2)通用基於長度的半包解碼器——LengthFieldBasedFrameDecoder;
(3)碼流日誌打印Handler——LoggingHandler;
(4)SSL安全認證Handler——SslHandler;
(5)鏈路空閒檢測Handler——IdleStateHandler;
(6)流量整形Handler——ChannelTrafficShapingHandler;
(7)Base64編解碼——Base64Decoder和Base64Encoder。
.childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast( new EchoServerHandler()); } });
在綁定監聽端口以前系統會作一系列的初始化和檢測工做,完成以後,會啓動監聽端口,並將ServerSocketChannel註冊到Selector上監聽客戶端鏈接,相關代碼以下。
public ChannelFuture bind(SocketAddress localAddress) { validate(); if (localAddress == null) { throw new NullPointerException("localAddress"); } return doBind(localAddress); }
由Reactor線程NioEventLoop負責調度和執行Selector輪詢操做,選擇準備就緒的Channel集合,相關代碼以下
private void select(boolean oldWakenUp) throws IOException { Selector selector = this.selector; try { //此處代碼省略... int selectedKeys = selector.select(timeoutMillis); selectCnt ++; //此處代碼省略...
步驟9:執行Netty系統ChannelHandler和用戶添加定製的ChannelHandler。
ChannelPipeline根據網絡事件的類型,調度並執行ChannelHandler,相關代碼以下。
public ChannelHandlerContext fireChannelRead(final Object msg) { invokeChannelRead(findContextInbound(), msg); return this; }
一般會建立兩個EventLoopGroup,也能夠只建立一個並共享。
EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup();
NioEventLoopGroup實際就是Reactor線程池,負責調度和執行具體的任務:
client接入
網絡讀寫事件處理
用戶自定義任務
定時任務
經過ServerBootstrap的group方法將2個EventLoopGroup實例傳入:
public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) { //1. 調用父類的group方法傳入parentGroup super.group(parentGroup); //2. 設置childGroup if (childGroup == null) { throw new NullPointerException("childGroup"); } if (this.childGroup != null) { throw new IllegalStateException("childGroup set already"); } this.childGroup = childGroup; return this; }
若是隻傳一個參數,則2個線程池會被重用。
public ServerBootstrap group(EventLoopGroup group) { return group(group, group); }
根據傳入的channel class建立對應的服務端Channel,調用的是ReflectiveChannelFactory
public class ReflectiveChannelFactory<T extends Channel> implements ChannelFactory<T> { private final Class<? extends T> clazz; public ReflectiveChannelFactory(Class<? extends T> clazz) { if (clazz == null) { throw new NullPointerException("clazz"); } this.clazz = clazz; } @Override public T newChannel() { try { return clazz.newInstance(); } catch (Throwable t) { throw new ChannelException("Unable to create Channel from class " + clazz, t); } } @Override public String toString() { return StringUtil.simpleClassName(clazz) + ".class"; } }
Netty使用一個LinkedHashMap來保存
private final Map<ChannelOption<?>, Object> options = new LinkedHashMap<ChannelOption<?>, Object>();
主要參數是TCP的backlog參數,底層C對應的接口爲:
int listen(int fd, int backlog);
backlog指定了內核爲此套接字接口排隊的最大鏈接個數,內核要爲套接字維護2個隊列:未鏈接隊列和已鏈接隊列,根據TCP三路握手的三個分節分隔這2個隊列。
服務器處於listen狀態時,收到客戶端syn分節(connect)時在未完成隊列中建立一個新的條目,而後用三路握手的第二個分節即服務器的syn響應客戶端,此條目在第三個分節到達前(客戶端對服務器syn的ack)一直保留在未完成隊列中,若是三路握手完成,該條目將從未完成鏈接隊列搬到已完成隊列尾部。
當進程調用accept時,從已完成隊列中的頭部取出一個條目給進程,當已完成隊列爲空時進程將進入睡眠,直到有條目在已完成隊列中才喚醒。backlog則用來規定2個隊列總和的最大值,大多數實現值爲5,可是在高併發場景中顯然不夠,好比Lighttpd中此值達到128*8。須要設置此值更大緣由是未完成鏈接隊列可能由於客戶端syn的到達以及等待握手第三個分節的到達延時而增大。Netty默認是100,用戶能夠調整。
本質區別就是:
private ChannelFuture doBind(final SocketAddress localAddress) { final ChannelFuture regFuture = initAndRegister(); final Channel channel = regFuture.channel(); if (regFuture.cause() != null) { return regFuture; } if (regFuture.isDone()) { // At this point we know that the registration was complete and successful. ChannelPromise promise = channel.newPromise(); doBind0(regFuture, channel, localAddress, promise); return promise; } else { // Registration future is almost always fulfilled already, but just in case it's not. final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel); regFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { Throwable cause = future.cause(); if (cause != null) { // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an // IllegalStateException once we try to access the EventLoop of the Channel. promise.setFailure(cause); } else { // Registration was successful, so set the correct executor to use. // See https://github.com/netty/netty/issues/2586 promise.registered(); doBind0(regFuture, channel, localAddress, promise); } } }); return promise; } }
看紅色標註的initAndRegister方法:
調用 channelFactory.newChannel()常見NioServerSocketChannel
而後調用init(channel)方法,由具體子類實現,這裏實現的子類是ServerBootrap.
final ChannelFuture initAndRegister() { Channel channel = null; try { channel = channelFactory.newChannel(); init(channel); } catch (Throwable t) { if (channel != null) { // channel can be null if newChannel crashed (eg SocketException("too many open files")) channel.unsafe().closeForcibly(); } // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t); } ChannelFuture regFuture = config().group().register(channel); if (regFuture.cause() != null) { if (channel.isRegistered()) { channel.close(); } else { channel.unsafe().closeForcibly(); } } // If we are here and the promise is not failed, it's one of the following cases: // 1) If we attempted registration from the event loop, the registration has been completed at this point. // i.e. It's safe to attempt bind() or connect() now because the channel has been registered. // 2) If we attempted registration from the other thread, the registration request has been successfully // added to the event loop's task queue for later execution. // i.e. It's safe to attempt bind() or connect() now: // because bind() or connect() will be executed *after* the scheduled registration task is executed // because register(), bind(), and connect() are all bound to the same thread. return regFuture; }
下面關注init方法,主要完成了如下功能:
(1). 設置Socket參數和NioServerSocketChannel的附加屬性,代碼以下:
final Map<ChannelOption<?>, Object> options = options0(); synchronized (options) { channel.config().setOptions(options); } final Map<AttributeKey<?>, Object> attrs = attrs0(); synchronized (attrs) { for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) { @SuppressWarnings("unchecked") AttributeKey<Object> key = (AttributeKey<Object>) e.getKey(); channel.attr(key).set(e.getValue()); } }
(2).將AbstractBootstrap的Handler添加到NioServerSocketChannel的ChannelPipeline中,將用於服務端註冊的Handler ServerBootstrapAcceptor添加到ChannelPipeline中,代碼以下:
p.addLast(new ChannelInitializer<Channel>() { @Override public void initChannel(Channel ch) throws Exception { final ChannelPipeline pipeline = ch.pipeline(); ChannelHandler handler = config.handler(); if (handler != null) { pipeline.addLast(handler); } // We add this handler via the EventLoop as the user may have used a ChannelInitializer as handler. // In this case the initChannel(...) method will only be called after this method returns. Because // of this we need to ensure we add our handler in a delayed fashion so all the users handler are // placed in front of the ServerBootstrapAcceptor. ch.eventLoop().execute(new Runnable() { @Override public void run() { pipeline.addLast(new ServerBootstrapAcceptor( currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } }); } });
到此處,Netty服務端監聽的相關資源已經初始化完畢,就剩下最後一步-
在分析註冊代碼以前,咱們先經過下圖看看目前NioServerSocketChannel的ChannelPipeline的組成:
經過Debug最終發如今AbstractChannel類中的register方法上:
public final void register(EventLoop eventLoop, final ChannelPromise promise) { if (eventLoop == null) { throw new NullPointerException("eventLoop"); } if (isRegistered()) { promise.setFailure(new IllegalStateException("registered to an event loop already")); return; } if (!isCompatible(eventLoop)) { promise.setFailure( new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName())); return; } AbstractChannel.this.eventLoop = eventLoop; if (eventLoop.inEventLoop()) { register0(promise); } else { try { eventLoop.execute(new Runnable() { @Override public void run() { register0(promise); } }); } catch (Throwable t) { logger.warn( "Force-closing a channel whose registration task was not accepted by an event loop: {}", AbstractChannel.this, t); closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } } }
首先判斷是不是NioEventLoop自身發起的操做,若是是,則不存在併發操做,直接執行Channel註冊;若是由其它線程發起,則封裝成一個Task放入消息隊列中異步執行。此處,因爲是由ServerBootstrap所在線程執行的註冊操做,因此會將其封裝成Task投遞到NioEventLoop中執行:
private void register0(ChannelPromise promise) { try { // check if the channel is still open as it could be closed in the mean time when the register // call was outside of the eventLoop if (!promise.setUncancellable() || !ensureOpen(promise)) { return; } boolean firstRegistration = neverRegistered; doRegister(); neverRegistered = false; registered = true; // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the // user may already fire events through the pipeline in the ChannelFutureListener. pipeline.invokeHandlerAddedIfNeeded(); safeSetSuccess(promise); pipeline.fireChannelRegistered(); // Only fire a channelActive if the channel has never been registered. This prevents firing // multiple channel actives if the channel is deregistered and re-registered. if (isActive()) { if (firstRegistration) { pipeline.fireChannelActive(); } else if (config().isAutoRead()) { // This channel was registered before and autoRead() is set. This means we need to begin read // again so that we process inbound data. // // See https://github.com/netty/netty/issues/4805 beginRead(); } } } catch (Throwable t) { // Close the channel directly to avoid FD leak. closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } }
將NioServerSocketChannel註冊到NioEventLoop的Selector上,代碼以下:
protected void doRegister() throws Exception { boolean selected = false; for (;;) { try { selectionKey = javaChannel().register(eventLoop().selector, 0, this); return; } catch (CancelledKeyException e) { if (!selected) { // Force the Selector to select now as the "canceled" SelectionKey may still be // cached and not removed because no Select.select(..) operation was called yet. eventLoop().selectNow(); selected = true; } else { // We forced a select operation on the selector before but the SelectionKey is still cached // for whatever reason. JDK bug ? throw e; } } } }
大夥兒可能會很詫異,應該註冊OP_ACCEPT(16)到多路複用器上,怎麼註冊0呢?0表示只註冊,不監放任何網絡操做。這樣作的緣由以下:
註冊成功以後,觸發ChannelRegistered事件,方法以下:
doRegister(); neverRegistered = false; registered = true; // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the // user may already fire events through the pipeline in the ChannelFutureListener. pipeline.invokeHandlerAddedIfNeeded(); safeSetSuccess(promise); pipeline.fireChannelRegistered();
Netty的HeadHandler不須要處理ChannelRegistered事件,因此,直接調用下一個Handler,當ChannelRegistered事件傳遞到TailHandler後結束,TailHandler也不關心ChannelRegistered事件。
ChannelRegistered事件傳遞完成後,判斷ServerSocketChannel監聽是否成功,若是成功,須要出發NioServerSocketChannel的ChannelActive事件,判斷方法即isActive().
pipeline.fireChannelRegistered(); // Only fire a channelActive if the channel has never been registered. This prevents firing // multiple channel actives if the channel is deregistered and re-registered. if (isActive()) { if (firstRegistration) { pipeline.fireChannelActive(); } else if (config().isAutoRead()) { // This channel was registered before and autoRead() is set. This means we need to begin read // again so that we process inbound data. // // See https://github.com/netty/netty/issues/4805 beginRead(); } }
其中isActive()是一個多態方法,若是是服務端則,判斷監聽是否啓動;若是是客戶端,判斷TCP鏈接是否完成。ChannelActive事件在ChannelPipeline()傳遞,完成以後根據配置決定是否自動觸發Channel的讀操做。
讀方法最終到:
public final void beginRead() { assertEventLoop(); if (!isActive()) { return; } try { doBeginRead(); } catch (final Exception e) { invokeLater(new Runnable() { @Override public void run() { pipeline.fireExceptionCaught(e); } }); close(voidPromise()); } }
因爲不一樣類型的Channel對讀的準備工做不一樣,所以doBeginRead也是個多態方法。
對於NIO通訊,不管是客戶端,仍是服務端,都要設置網絡監聽操做位爲本身感興趣的,對於NioServerSocketChannel感興趣的是OP_ACCEPT(16),因而修改操做位:
@Override protected void doBeginRead() throws Exception { // Channel.read() or ChannelHandlerContext.read() was called final SelectionKey selectionKey = this.selectionKey; if (!selectionKey.isValid()) { return; } readPending = true; final int interestOps = selectionKey.interestOps(); if ((interestOps & readInterestOp) == 0) { selectionKey.interestOps(interestOps | readInterestOp); } }
某些狀況下,當前監聽的操做類型和Channel關心的網絡事件是一致的,不須要重複註冊,因此增長了&的判斷。JDK SelectionKey有4種操做類型,分別爲:
(1) OP_READ = 1 <<0;
(2) OP_WRTE = 1 <<2;
(3) OP_CONNECT = 1 <<3;
(4) OP_ACCEPT = 1 <<4;
此時,服務器監聽啓動部分源碼已經分析結束。
負責處理網絡讀寫、鏈接和客戶端請求介入的Reactor線程就是NioEventLoop,下面分析如何處理新的客戶端接入。
當多路複用器檢測到新的Channel時候,默認執行processSelectedKeysOptimized方法.
private void processSelectedKeys() { if (selectedKeys != null) { processSelectedKeysOptimized(selectedKeys.flip()); } else { processSelectedKeysPlain(selector.selectedKeys()); } }
因爲Channel的Attachment是NioServerSocketChannel,因此執行processSelectedKey方法。
if (a instanceof AbstractNioChannel) { processSelectedKey(k, (AbstractNioChannel) a); } else { @SuppressWarnings("unchecked") NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a; processSelectedKey(k, task); }
繼續點入該方法debug,因爲監聽的是鏈接操做,會執行unsafe.read()方法。因爲不一樣的Channel執行不一樣的操做,因此NioUnsafe被設計爲接口。
debug發現使用的是:
其read()方法以下所示:
@Override public void read() { assert eventLoop().inEventLoop(); final ChannelConfig config = config(); final ChannelPipeline pipeline = pipeline(); final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle(); allocHandle.reset(config); boolean closed = false; Throwable exception = null; try { try { do { int localRead = doReadMessages(readBuf); if (localRead == 0) { break; } //代碼省略... }
對doReadMessages方法進行分析,發現它實際就是接受新的客戶端鏈接而且建立NioSocketChannel:
protected int doReadMessages(List<Object> buf) throws Exception { SocketChannel ch = javaChannel().accept(); try { if (ch != null) { buf.add(new NioSocketChannel(this, ch)); return 1; } } catch (Throwable t) { logger.warn("Failed to create a new channel from an accepted socket.", t); try { ch.close(); } catch (Throwable t2) { logger.warn("Failed to close a socket.", t2); } } return 0; }
接收到新的鏈接以後,觸發ChannelPipeLine的ChannelRead方法,代碼以下:
int size = readBuf.size(); for (int i = 0; i < size; i ++) { readPending = false; pipeline.fireChannelRead(readBuf.get(i)); }
因而觸發pipeLine調用鏈,事件在ChannelPipeline中傳遞,執行ServerBootstrapAcceptor中的channelRead方法,代碼以下:
public void channelRead(ChannelHandlerContext ctx, Object msg) { final Channel child = (Channel) msg; child.pipeline().addLast(childHandler); //(1) for (Entry<ChannelOption<?>, Object> e: childOptions) { try { if (!child.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) { logger.warn("Unknown channel option: " + e); } } catch (Throwable t) { logger.warn("Failed to set a channel option: " + child, t); } } //(2) for (Entry<AttributeKey<?>, Object> e: childAttrs) { child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue()); } //(3) try { childGroup.register(child).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (!future.isSuccess()) { forceClose(child, future.cause()); } } }); } catch (Throwable t) { forceClose(child, t); } }
上面方法主要分爲3個步驟:
(1) 加入childHandler到客戶端SocketChannel的ChannelPipeline中
(2) 設置SocketChannel的TCP參數
(3) 註冊SocketChannel到多路複用器
注意這裏register註冊也是註冊操做位爲0.
執行完註冊後,緊接着會觸發ChannelReadComplete事件。
Netty的Header和Tailer自己不關於這個事件,所以ChannelReadComplete是直接透傳, 執行完ChannelReadComplete後,接着執行PipeLine的read()方法,最終到HeadHandler的read()方法。read()方法在前面說過,會修改操做位,此時這裏debug發現把操做位修改OP_READ。
此時,客戶端鏈接處理完成,能夠進行網絡讀寫等I/O操做。
Bootstrap b = new Bootstrap();
Bootstrap是Socket客戶端建立工具類,經過API設置建立客戶端相關的參數,異步發起客戶端鏈接。
EventLoopGroup group = new NioEventLoopGroup();
b.group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true)
此處的NioSocketChannel相似於Java NIO提供的SocketChannel。
b.group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new HelloClientHandler()); } });
用於調度和執行網絡事件。
// 發起異步鏈接操做 ChannelFuture f = b.connect(host, port).sync();
SocketChannel執行connect()操做後有如下三種結果:
Netty客戶端建立流程很是繁瑣,這裏只針對關鍵步驟進行分析。
1. 設置I/O線程組:只須要一個線程組EventLoopGroup
public <T> B option(ChannelOption<T> option, T value) { if (option == null) { throw new NullPointerException("option"); } if (value == null) { synchronized (options) { options.remove(option); } } else { synchronized (options) { options.put(option, value); } } return (B) this; }
主要TCP參數以下:
一樣使用反射建立NioSocketChannel
Bootstrap爲了簡化Handler的編排,提供了ChannelInitializer,當TCP鏈路註冊成功後,調用initChannel接口:
public final void channelRegistered(ChannelHandlerContext ctx) throws Exception { // Normally this method will never be called as handlerAdded(...) should call initChannel(...) and remove // the handler. if (initChannel(ctx)) { // we called initChannel(...) so we need to call now pipeline.fireChannelRegistered() to ensure we not // miss an event. ctx.pipeline().fireChannelRegistered(); } else { // Called initChannel(...) before which is the expected behavior, so just forward the event. ctx.fireChannelRegistered(); } }
其中InitChannel爲抽象接口,即下面紅色標註的代碼,用戶即是在這個方法中設置ChannelHandler:
private boolean initChannel(ChannelHandlerContext ctx) throws Exception { if (initMap.putIfAbsent(ctx, Boolean.TRUE) == null) { // Guard against re-entrance. try { initChannel((C) ctx.channel()); } catch (Throwable cause) { // Explicitly call exceptionCaught(...) as we removed the handler before calling initChannel(...). // We do so to prevent multiple calls to initChannel(...). exceptionCaught(ctx, cause); } finally { remove(ctx); } return true; } return false; }
private ChannelFuture doResolveAndConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) { final ChannelFuture regFuture = initAndRegister(); final Channel channel = regFuture.channel();
//.....
建立以後,初始化,而後在註冊:
final ChannelFuture initAndRegister() { Channel channel = null; try { channel = channelFactory.newChannel(); init(channel); } catch (Throwable t) { if (channel != null) { // channel can be null if newChannel crashed (eg SocketException("too many open files")) channel.unsafe().closeForcibly(); } // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t); } ChannelFuture regFuture = config().group().register(channel); if (regFuture.cause() != null) { if (channel.isRegistered()) { channel.close(); } else { channel.unsafe().closeForcibly(); } } // If we are here and the promise is not failed, it's one of the following cases: // 1) If we attempted registration from the event loop, the registration has been completed at this point. // i.e. It's safe to attempt bind() or connect() now because the channel has been registered. // 2) If we attempted registration from the other thread, the registration request has been successfully // added to the event loop's task queue for later execution. // i.e. It's safe to attempt bind() or connect() now: // because bind() or connect() will be executed *after* the scheduled registration task is executed // because register(), bind(), and connect() are all bound to the same thread. return regFuture; }
public void connect( ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) throws Exception { unsafe.connect(remoteAddress, localAddress, promise); }
的connect操做以下:
if (doConnect(remoteAddress, localAddress)) { fulfillConnectPromise(promise, wasActive); } else { //...
異步返回以後,須要判斷鏈接結果,若是成功,則觸發ChannelActive事件。最終會將NioSocketChannel中的selectionKey設置爲SelectionKey.OP_READ,用於監聽網絡讀操做。
NioEventLoop的Selector輪詢客戶端鏈接Channel,當服務端返回應答後,進行判斷。依舊是NioEventLoop中的processSelectedKey方法:
if ((readyOps & SelectionKey.OP_CONNECT) != 0) { // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking // See https://github.com/netty/netty/issues/924 int ops = k.interestOps(); ops &= ~SelectionKey.OP_CONNECT; k.interestOps(ops); unsafe.finishConnect(); }
下面分析finishConnect方法:
@Override public final void finishConnect() { // Note this method is invoked by the event loop only if the connection attempt was // neither cancelled nor timed out. assert eventLoop().inEventLoop(); try { boolean wasActive = isActive(); doFinishConnect(); //判斷SocketChannel的鏈接結果,true表示成功 fulfillConnectPromise(connectPromise, wasActive); //觸發鏈路激活 } catch (Throwable t) { fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress)); } finally { // Check for null as the connectTimeoutFuture is only created if a connectTimeoutMillis > 0 is used // See https://github.com/netty/netty/issues/1770 if (connectTimeoutFuture != null) { connectTimeoutFuture.cancel(false); } connectPromise = null; } }
fulfillConnectPromise方法則觸發鏈路激活事件,並由ChannelPipeline進行傳播:
private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) { if (promise == null) { // Closed via cancellation and the promise has been notified already. return; } // Get the state as trySuccess() may trigger an ChannelFutureListener that will close the Channel. // We still need to ensure we call fireChannelActive() in this case. boolean active = isActive(); // trySuccess() will return false if a user cancelled the connection attempt. boolean promiseSet = promise.trySuccess(); // Regardless if the connection attempt was cancelled, channelActive() event should be triggered, // because what happened is what happened. if (!wasActive && active) { pipeline().fireChannelActive(); } // If a user cancelled the connection attempt, close the channel, which is followed by channelInactive(). if (!promiseSet) { close(voidPromise()); } }
跟以前相似,將網絡監聽修改成讀操做。
由Netty本身實現的客戶端超時機制,在AbstractNioChannel的connect方法中:
public final void connect( final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) { if (!promise.setUncancellable() || !ensureOpen(promise)) { return; } try { if (connectPromise != null) { // Already a connect in process. throw new ConnectionPendingException(); } boolean wasActive = isActive(); if (doConnect(remoteAddress, localAddress)) { fulfillConnectPromise(promise, wasActive); } else { connectPromise = promise; requestedRemoteAddress = remoteAddress; // Schedule connect timeout. int connectTimeoutMillis = config().getConnectTimeoutMillis(); if (connectTimeoutMillis > 0) { connectTimeoutFuture = eventLoop().schedule(new Runnable() { @Override public void run() { ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise; ConnectTimeoutException cause = new ConnectTimeoutException("connection timed out: " + remoteAddress); if (connectPromise != null && connectPromise.tryFailure(cause)) { close(voidPromise()); } } }, connectTimeoutMillis, TimeUnit.MILLISECONDS); } promise.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (future.isCancelled()) { if (connectTimeoutFuture != null) { connectTimeoutFuture.cancel(false); } connectPromise = null; close(voidPromise()); } } }); } } catch (Throwable t) { promise.tryFailure(annotateConnectException(t, remoteAddress)); closeIfClosed(); } }
一旦超時定時器執行,則說明客戶端超時,構造異常,將異常結果設置到connectPromise中,同時關閉客戶端句柄。
若是在超時以前獲取結果,則直接刪除定時器,防止其被觸發。