Netty(6)源碼-服務端與客戶端建立

原生的NIO類圖使用有諸多不便,Netty向用戶屏蔽了細節,在與用戶交界處作了封裝。java

1、服務端建立時序圖

步驟一:建立ServerBootstrap實例

ServerBootstrap是Netty服務端的啓動輔助類,它提供了一些列的方法用於設置參數,因爲參數太多,使用builder模式。linux

步驟二:設置而且綁定好Reactor線程池

Netty中的Reactor線程池是EventLoopGroup,它實際上就是EventLoop數組。EventLoop的職責是處理全部註冊到本線程多路複用器Selector上的Channel。Selector的輪詢操做由綁定的EventLoop線程run方法驅動,在一個循環體內循環執行。值得說明的是,EventLoop的職責不只僅是處理網絡I/O事件用戶自定義的Task定時任務Task也統一由EventLoop負責處理,這樣線程模型就實現了統一。從調度層面看,也不存在從EvenetLoop線程中再啓動其餘類型的線程用於異步執行另外的任務,這樣就避免了多線程的併發操做和鎖競爭,提高了I/O線程的處理和調度性能。git

步驟三:設置並綁定服務端Channel

服務端須要建立ServerSocketChannel,對原生NIO類庫進行了封裝,對應是NioServerSocketChannelgithub

對用戶而言,不須要關心服務端Channel的底層實現細節和工做原理,只須要指定具體是哪一種服務端算法

Netty的ServerBootstrap提供了channel方法用於指定服務端Channel的類型。Netty經過工廠反射建立NioServerSocketChannel對象。api

public B channel(Class<? extends C> channelClass) {
        if (channelClass == null) {
            throw new NullPointerException("channelClass");
        }
        return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
    }

步驟四:鏈路創建的時候建立而且初始化ChannelPipeline。

它本質上是一個負載處理網絡事件的職責鏈,負載管理和執行ChannelHanler。網絡事件以事件流的形式在ChannelPipeline中流轉,由ChannelPipeline根據ChannelHandler的執行策略調度ChannelHandler的執行。典型的網絡事件以下:數組

(1)鏈路註冊;promise

(2)鏈路激活;安全

(3)鏈路斷開;服務器

(4)接收到請求消息;

(5)請求消息接收並處理完畢;

(6)發送應答消息;

(7)鏈路發生異常;

(8)發生用戶自定義事件。

步驟五:初始化ChannelPipeline完成以後,添加並設置ChannelHandler。

ChannelHandler是Netty提供給用戶定製和擴展的關鍵接口。利用ChannelHandler用戶能夠完成大多數的功能定製,例如消息編解碼、心跳、安全認證、TSL/SSL認證、流量控制和流量整形等。Netty同時也提供了大量的系統ChannelHandler供用戶使用,比較實用的系統ChannelHandler總結以下:

(1)系統編解碼框架——ByteToMessageCodec

(2)通用基於長度的半包解碼器——LengthFieldBasedFrameDecoder;

(3)碼流日誌打印Handler——LoggingHandler

(4)SSL安全認證Handler——SslHandler

(5)鏈路空閒檢測Handler——IdleStateHandler

(6)流量整形Handler——ChannelTrafficShapingHandler;

(7)Base64編解碼——Base64DecoderBase64Encoder

.childHandler(new ChannelInitializer<SocketChannel>() {  
            @Override  
            public void initChannel(SocketChannel ch)  
                throws Exception {  
                ch.pipeline().addLast(  
                    new EchoServerHandler());  
            }  
            }); 

步驟6:綁定並啓動監聽端口。

在綁定監聽端口以前系統會作一系列的初始化和檢測工做,完成以後,會啓動監聽端口,並將ServerSocketChannel註冊到Selector上監聽客戶端鏈接,相關代碼以下。

   public ChannelFuture bind(SocketAddress localAddress) {
        validate();
        if (localAddress == null) {
            throw new NullPointerException("localAddress");
        }
        return doBind(localAddress);
    }

步驟7:Selector輪詢。

由Reactor線程NioEventLoop負責調度和執行Selector輪詢操做,選擇準備就緒的Channel集合,相關代碼以下

private void select(boolean oldWakenUp) throws IOException {
        Selector selector = this.selector;  
        try {  
 
                //此處代碼省略...  
                int selectedKeys = selector.select(timeoutMillis);  
                selectCnt ++;  
                //此處代碼省略...  

步驟8:當輪詢到準備就緒的Channel以後,就由Reactor線程NioEventLoop執行ChannelPipeline的相應方法,最終調度並執行ChannelHandler

步驟9:執行Netty系統ChannelHandler和用戶添加定製的ChannelHandler。

ChannelPipeline根據網絡事件的類型,調度並執行ChannelHandler,相關代碼以下。

    public ChannelHandlerContext fireChannelRead(final Object msg) {
        invokeChannelRead(findContextInbound(), msg);
        return this;
    }

2、Netty服務端建立源碼分析

1. 建立線程組:

  一般會建立兩個EventLoopGroup,也能夠只建立一個並共享。

        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();

NioEventLoopGroup實際就是Reactor線程池,負責調度和執行具體的任務:

  client接入

  網絡讀寫事件處理

  用戶自定義任務

  定時任務

經過ServerBootstrapgroup方法將2個EventLoopGroup實例傳入:

    public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
        //1. 調用父類的group方法傳入parentGroup
        super.group(parentGroup);
        //2. 設置childGroup
        if (childGroup == null) {
            throw new NullPointerException("childGroup");
        }
        if (this.childGroup != null) {
            throw new IllegalStateException("childGroup set already");
        }
        this.childGroup = childGroup;
        return this;
    }

若是隻傳一個參數,則2個線程池會被重用。

    public ServerBootstrap group(EventLoopGroup group) {
        return group(group, group);
    }

2. 設置服務端Channel用於端口監聽和客戶端鏈路接入:

根據傳入的channel class建立對應的服務端Channel,調用的是ReflectiveChannelFactory

public class ReflectiveChannelFactory<T extends Channel> implements ChannelFactory<T> {

    private final Class<? extends T> clazz;

    public ReflectiveChannelFactory(Class<? extends T> clazz) {
        if (clazz == null) {
            throw new NullPointerException("clazz");
        }
        this.clazz = clazz;
    }

    @Override
    public T newChannel() {
        try {
            return clazz.newInstance();
        } catch (Throwable t) {
            throw new ChannelException("Unable to create Channel from class " + clazz, t);
        }
    }

    @Override
    public String toString() {
        return StringUtil.simpleClassName(clazz) + ".class";
    }
}

3. 下面要設定服務端TCP參數:

Netty使用一個LinkedHashMap來保存

private final Map<ChannelOption<?>, Object> options = new LinkedHashMap<ChannelOption<?>, Object>();

主要參數是TCP的backlog參數,底層C對應的接口爲:

int listen(int fd, int backlog);

backlog指定了內核爲此套接字接口排隊的最大鏈接個數,內核要爲套接字維護2個隊列:未鏈接隊列已鏈接隊列,根據TCP三路握手的三個分節分隔這2個隊列。

服務器處於listen狀態時,收到客戶端syn分節(connect)時在未完成隊列中建立一個新的條目,而後用三路握手的第二個分節即服務器的syn響應客戶端,此條目在第三個分節到達前(客戶端對服務器syn的ack)一直保留在未完成隊列中,若是三路握手完成,該條目將從未完成鏈接隊列搬到已完成隊列尾部。

當進程調用accept時,從已完成隊列中的頭部取出一個條目給進程,當已完成隊列爲空時進程將進入睡眠,直到有條目在已完成隊列中才喚醒。backlog則用來規定2個隊列總和的最大值,大多數實現值爲5,可是在高併發場景中顯然不夠,好比Lighttpd中此值達到128*8。須要設置此值更大緣由是未完成鏈接隊列可能由於客戶端syn的到達以及等待握手第三個分節的到達延時而增大。Netty默認是100,用戶能夠調整。

4. 下面能夠爲啓動輔助類和其父類分別指定Handler。

本質區別就是:

  • ServerBootstrap中的Handler是NioServerSocketChannel使用的,全部鏈接該監聽端口的客戶端都會執行它;
  • 父類AbstractBootstrap中的Handler是個工廠類,它爲每一個新接入的客戶端都建立一個新的Handler;

5. 服務端啓動的最後一步,就是綁定本地端口,啓動服務:

    private ChannelFuture doBind(final SocketAddress localAddress) {
        final ChannelFuture regFuture = initAndRegister();
        final Channel channel = regFuture.channel();
        if (regFuture.cause() != null) {
            return regFuture;
        }

        if (regFuture.isDone()) {
            // At this point we know that the registration was complete and successful.
            ChannelPromise promise = channel.newPromise();
            doBind0(regFuture, channel, localAddress, promise);
            return promise;
        } else {
            // Registration future is almost always fulfilled already, but just in case it's not.
            final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
            regFuture.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    Throwable cause = future.cause();
                    if (cause != null) {
                        // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
                        // IllegalStateException once we try to access the EventLoop of the Channel.
                        promise.setFailure(cause);
                    } else {
                        // Registration was successful, so set the correct executor to use.
                        // See https://github.com/netty/netty/issues/2586
                        promise.registered();

                        doBind0(regFuture, channel, localAddress, promise);
                    }
                }
            });
            return promise;
        }
    }

看紅色標註的initAndRegister方法:

調用 channelFactory.newChannel()常見NioServerSocketChannel

而後調用init(channel)方法,由具體子類實現,這裏實現的子類是ServerBootrap.

    final ChannelFuture initAndRegister() {
        Channel channel = null;
        try {
            channel = channelFactory.newChannel();
            init(channel);
        } catch (Throwable t) {
            if (channel != null) {
                // channel can be null if newChannel crashed (eg SocketException("too many open files"))
                channel.unsafe().closeForcibly();
            }
            // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
            return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
        }

        ChannelFuture regFuture = config().group().register(channel);
        if (regFuture.cause() != null) {
            if (channel.isRegistered()) {
                channel.close();
            } else {
                channel.unsafe().closeForcibly();
            }
        }

        // If we are here and the promise is not failed, it's one of the following cases:
        // 1) If we attempted registration from the event loop, the registration has been completed at this point.
        //    i.e. It's safe to attempt bind() or connect() now because the channel has been registered.
        // 2) If we attempted registration from the other thread, the registration request has been successfully
        //    added to the event loop's task queue for later execution.
        //    i.e. It's safe to attempt bind() or connect() now:
        //         because bind() or connect() will be executed *after* the scheduled registration task is executed
        //         because register(), bind(), and connect() are all bound to the same thread.

        return regFuture;
    }

下面關注init方法,主要完成了如下功能:

(1). 設置Socket參數和NioServerSocketChannel的附加屬性,代碼以下:

        final Map<ChannelOption<?>, Object> options = options0();
        synchronized (options) {
            channel.config().setOptions(options);
        }

        final Map<AttributeKey<?>, Object> attrs = attrs0();
        synchronized (attrs) {
            for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
                @SuppressWarnings("unchecked")
                AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
                channel.attr(key).set(e.getValue());
            }
        }

(2).將AbstractBootstrap的Handler添加到NioServerSocketChannel的ChannelPipeline中,將用於服務端註冊的Handler ServerBootstrapAcceptor添加到ChannelPipeline中,代碼以下:

p.addLast(new ChannelInitializer<Channel>() {
            @Override
            public void initChannel(Channel ch) throws Exception {
                final ChannelPipeline pipeline = ch.pipeline();
                ChannelHandler handler = config.handler();
                if (handler != null) {
                    pipeline.addLast(handler);
                }

                // We add this handler via the EventLoop as the user may have used a ChannelInitializer as handler.
                // In this case the initChannel(...) method will only be called after this method returns. Because
                // of this we need to ensure we add our handler in a delayed fashion so all the users handler are
                // placed in front of the ServerBootstrapAcceptor.
                ch.eventLoop().execute(new Runnable() {
                    @Override
                    public void run() {
                        pipeline.addLast(new ServerBootstrapAcceptor(
                                currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                    }
                });
            }
        });

到此處,Netty服務端監聽的相關資源已經初始化完畢,就剩下最後一步-

6. 註冊NioServerSocketChannel到Reactor線程的多路複用器上,而後輪詢客戶端鏈接事件。

在分析註冊代碼以前,咱們先經過下圖看看目前NioServerSocketChannel的ChannelPipeline的組成:

經過Debug最終發如今AbstractChannel類中的register方法上:

public final void register(EventLoop eventLoop, final ChannelPromise promise) {
            if (eventLoop == null) {
                throw new NullPointerException("eventLoop");
            }
            if (isRegistered()) {
                promise.setFailure(new IllegalStateException("registered to an event loop already"));
                return;
            }
            if (!isCompatible(eventLoop)) {
                promise.setFailure(
                        new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
                return;
            }

            AbstractChannel.this.eventLoop = eventLoop;

            if (eventLoop.inEventLoop()) {
                register0(promise);
            } else {
                try {
                    eventLoop.execute(new Runnable() {
                        @Override
                        public void run() {
                            register0(promise);
                        }
                    });
                } catch (Throwable t) {
                    logger.warn(
                            "Force-closing a channel whose registration task was not accepted by an event loop: {}",
                            AbstractChannel.this, t);
                    closeForcibly();
                    closeFuture.setClosed();
                    safeSetFailure(promise, t);
                }
            }
        }

首先判斷是不是NioEventLoop自身發起的操做,若是是,則不存在併發操做,直接執行Channel註冊;若是由其它線程發起,則封裝成一個Task放入消息隊列中異步執行。此處,因爲是由ServerBootstrap所在線程執行的註冊操做,因此會將其封裝成Task投遞到NioEventLoop中執行:

        private void register0(ChannelPromise promise) {
            try {
                // check if the channel is still open as it could be closed in the mean time when the register
                // call was outside of the eventLoop
                if (!promise.setUncancellable() || !ensureOpen(promise)) {
                    return;
                }
                boolean firstRegistration = neverRegistered;
                doRegister();
                neverRegistered = false;
                registered = true;

                // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
                // user may already fire events through the pipeline in the ChannelFutureListener.
                pipeline.invokeHandlerAddedIfNeeded();

                safeSetSuccess(promise);
                pipeline.fireChannelRegistered(); // Only fire a channelActive if the channel has never been registered. This prevents firing // multiple channel actives if the channel is deregistered and re-registered.
                if (isActive()) { if (firstRegistration) { pipeline.fireChannelActive(); } else if (config().isAutoRead()) { // This channel was registered before and autoRead() is set. This means we need to begin read // again so that we process inbound data. //
                        // See https://github.com/netty/netty/issues/4805
 beginRead(); } }
            } catch (Throwable t) {
                // Close the channel directly to avoid FD leak.
                closeForcibly();
                closeFuture.setClosed();
                safeSetFailure(promise, t);
            }
        }

將NioServerSocketChannel註冊到NioEventLoop的Selector上,代碼以下:

    protected void doRegister() throws Exception {
        boolean selected = false;
        for (;;) {
            try {
                selectionKey = javaChannel().register(eventLoop().selector, 0, this);
                return;
            } catch (CancelledKeyException e) {
                if (!selected) {
                    // Force the Selector to select now as the "canceled" SelectionKey may still be
                    // cached and not removed because no Select.select(..) operation was called yet.
                    eventLoop().selectNow();
                    selected = true;
                } else {
                    // We forced a select operation on the selector before but the SelectionKey is still cached
                    // for whatever reason. JDK bug ?
                    throw e;
                }
            }
        }
    }

大夥兒可能會很詫異,應該註冊OP_ACCEPT(16)到多路複用器上,怎麼註冊0呢?0表示只註冊,不監放任何網絡操做。這樣作的緣由以下:

  • 註冊方法是多態的,它既能夠被NioServerSocketChannel用來監聽客戶端的鏈接接入,也能夠用來註冊SocketChannel,用來監聽網絡讀或者寫操做;
  • 經過SelectionKey的interestOps(int ops)方法能夠方便的修改監聽操做位。因此,此處註冊須要獲取SelectionKey並給AbstractNioChannel的成員變量selectionKey賦值。

註冊成功以後,觸發ChannelRegistered事件,方法以下:

                doRegister();
                neverRegistered = false;
                registered = true;

                // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
                // user may already fire events through the pipeline in the ChannelFutureListener.
                pipeline.invokeHandlerAddedIfNeeded();

                safeSetSuccess(promise);
                pipeline.fireChannelRegistered();

Netty的HeadHandler不須要處理ChannelRegistered事件,因此,直接調用下一個Handler,當ChannelRegistered事件傳遞到TailHandler後結束,TailHandler也不關心ChannelRegistered事件。

ChannelRegistered事件傳遞完成後,判斷ServerSocketChannel監聽是否成功,若是成功,須要出發NioServerSocketChannel的ChannelActive事件,判斷方法即isActive().

          pipeline.fireChannelRegistered();
                // Only fire a channelActive if the channel has never been registered. This prevents firing
                // multiple channel actives if the channel is deregistered and re-registered.
                if (isActive()) {
                    if (firstRegistration) {
                        pipeline.fireChannelActive();
                    } else if (config().isAutoRead()) {
                        // This channel was registered before and autoRead() is set. This means we need to begin read
                        // again so that we process inbound data.
                        //
                        // See https://github.com/netty/netty/issues/4805
                        beginRead();
                    }
                }

其中isActive()是一個多態方法,若是是服務端則,判斷監聽是否啓動;若是是客戶端,判斷TCP鏈接是否完成。ChannelActive事件在ChannelPipeline()傳遞,完成以後根據配置決定是否自動觸發Channel的讀操做。

讀方法最終到:

        public final void beginRead() {
            assertEventLoop();

            if (!isActive()) {
                return;
            }

            try {
                doBeginRead();
            } catch (final Exception e) {
                invokeLater(new Runnable() {
                    @Override
                    public void run() {
                        pipeline.fireExceptionCaught(e);
                    }
                });
                close(voidPromise());
            }
        }

因爲不一樣類型的Channel對讀的準備工做不一樣,所以doBeginRead也是個多態方法。

對於NIO通訊,不管是客戶端,仍是服務端,都要設置網絡監聽操做位爲本身感興趣的,對於NioServerSocketChannel感興趣的是OP_ACCEPT(16),因而修改操做位:

 @Override
    protected void doBeginRead() throws Exception {
        // Channel.read() or ChannelHandlerContext.read() was called
        final SelectionKey selectionKey = this.selectionKey;
        if (!selectionKey.isValid()) {
            return;
        }

        readPending = true;

        final int interestOps = selectionKey.interestOps();
        if ((interestOps & readInterestOp) == 0) {
            selectionKey.interestOps(interestOps | readInterestOp);
        }
    }

某些狀況下,當前監聽的操做類型和Channel關心的網絡事件是一致的,不須要重複註冊,因此增長了&的判斷。JDK SelectionKey有4種操做類型,分別爲:

(1) OP_READ = 1 <<0;

(2) OP_WRTE = 1 <<2;

(3) OP_CONNECT = 1 <<3;

(4) OP_ACCEPT = 1 <<4;

此時,服務器監聽啓動部分源碼已經分析結束。

3、客戶端接入源碼分析

負責處理網絡讀寫、鏈接和客戶端請求介入的Reactor線程就是NioEventLoop,下面分析如何處理新的客戶端接入。

當多路複用器檢測到新的Channel時候,默認執行processSelectedKeysOptimized方法.

    private void processSelectedKeys() {
        if (selectedKeys != null) {
            processSelectedKeysOptimized(selectedKeys.flip());
        } else {
            processSelectedKeysPlain(selector.selectedKeys());
        }
    }

因爲Channel的Attachment是NioServerSocketChannel,因此執行processSelectedKey方法。

            if (a instanceof AbstractNioChannel) {
                processSelectedKey(k, (AbstractNioChannel) a);
            } else {
                @SuppressWarnings("unchecked")
                NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
                processSelectedKey(k, task);
            }

繼續點入該方法debug,因爲監聽的是鏈接操做,會執行unsafe.read()方法。因爲不一樣的Channel執行不一樣的操做,因此NioUnsafe被設計爲接口。

debug發現使用的是:

其read()方法以下所示:

 @Override
        public void read() {
            assert eventLoop().inEventLoop();
            final ChannelConfig config = config();
            final ChannelPipeline pipeline = pipeline();
            final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
            allocHandle.reset(config);

            boolean closed = false;
            Throwable exception = null;
            try {
                try {
                    do {
                        int localRead = doReadMessages(readBuf); if (localRead == 0) {
                            break;
                        }
                       //代碼省略...
    }

doReadMessages方法進行分析,發現它實際就是接受新的客戶端鏈接而且建立NioSocketChannel:

    protected int doReadMessages(List<Object> buf) throws Exception {
        SocketChannel ch = javaChannel().accept(); try {
            if (ch != null) {
                buf.add(new NioSocketChannel(this, ch));
                return 1;
            }
        } catch (Throwable t) {
            logger.warn("Failed to create a new channel from an accepted socket.", t);

            try {
                ch.close();
            } catch (Throwable t2) {
                logger.warn("Failed to close a socket.", t2);
            }
        }

        return 0;
    }

接收到新的鏈接以後,觸發ChannelPipeLine的ChannelRead方法,代碼以下:

                int size = readBuf.size();
                for (int i = 0; i < size; i ++) {
                    readPending = false;
                    pipeline.fireChannelRead(readBuf.get(i));
                }

因而觸發pipeLine調用鏈,事件在ChannelPipeline中傳遞,執行ServerBootstrapAcceptor中的channelRead方法,代碼以下:

        public void channelRead(ChannelHandlerContext ctx, Object msg) {
            final Channel child = (Channel) msg;

            child.pipeline().addLast(childHandler); //(1) for (Entry<ChannelOption<?>, Object> e: childOptions) {
                try {
                    if (!child.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {
                        logger.warn("Unknown channel option: " + e);
                    }
                } catch (Throwable t) {
                    logger.warn("Failed to set a channel option: " + child, t);
                }
            }
        //(2)
            for (Entry<AttributeKey<?>, Object> e: childAttrs) {
                child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
            }
        //(3)
            try {
                childGroup.register(child).addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture future) throws Exception {
                        if (!future.isSuccess()) {
                            forceClose(child, future.cause());
                        }
                    }
                });
            } catch (Throwable t) {
                forceClose(child, t);
            }
        }

上面方法主要分爲3個步驟:

(1) 加入childHandler到客戶端SocketChannel的ChannelPipeline中

(2) 設置SocketChannel的TCP參數

(3) 註冊SocketChannel到多路複用器

注意這裏register註冊也是註冊操做位爲0.

執行完註冊後,緊接着會觸發ChannelReadComplete事件。

Netty的Header和Tailer自己不關於這個事件,所以ChannelReadComplete是直接透傳, 執行完ChannelReadComplete後,接着執行PipeLine的read()方法,最終到HeadHandler的read()方法。read()方法在前面說過,會修改操做位,此時這裏debug發現把操做位修改OP_READ。

此時,客戶端鏈接處理完成,能夠進行網絡讀寫等I/O操做。

4、Netty客戶端建立流程

一、用戶線程建立Bootstrap

Bootstrap b = new Bootstrap();

Bootstrap是Socket客戶端建立工具類,經過API設置建立客戶端相關的參數,異步發起客戶端鏈接。

二、建立處理客戶端鏈接、IO讀寫的Reactor線程組NioEventLoopGroup

EventLoopGroup group = new NioEventLoopGroup();

三、經過Bootstrap的ChannelFactory和用戶指定的Channel類型建立用於客戶端鏈接的NioSocketChannel

b.group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true)

 此處的NioSocketChannel相似於Java NIO提供的SocketChannel。

四、建立默認的channel Handler pipeline

b.group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true)
 .handler(new ChannelInitializer<SocketChannel>()
  {
       @Override
       public void initChannel(SocketChannel ch) throws Exception
       {
         ch.pipeline().addLast(new HelloClientHandler());
        }
  });

用於調度和執行網絡事件。

五、異步發起TCP鏈接

 // 發起異步鏈接操做
 ChannelFuture f = b.connect(host, port).sync();

SocketChannel執行connect()操做後有如下三種結果:

  • 鏈接成功,然會true;
  • 暫時沒有鏈接上,服務器端沒有返回ACK應答,鏈接結果不肯定,返回false。此種結果下,須要將NioSocketChannel中的selectionKey設置爲OP_CONNECT,監聽鏈接結果;
  • 接連失敗,直接拋出I/O異常

六、註冊對應的網絡監聽狀態位到多路複用器

七、由多路複用器在I/O中輪詢個Channel,處理鏈接結果

八、若是鏈接成功,設置Future結果,發送鏈接成功事件,觸發ChannelPipeline執行

九、由ChannelPipeline調度執行系統和用戶的ChannelHandler,執行業務邏輯

5、客戶端建立源碼分析

Netty客戶端建立流程很是繁瑣,這裏只針對關鍵步驟進行分析。

5.1 客戶端鏈接輔助類Bootstrap

1. 設置I/O線程組:只須要一個線程組EventLoopGroup

2. TCP參數設置:

    public <T> B option(ChannelOption<T> option, T value) {
        if (option == null) {
            throw new NullPointerException("option");
        }
        if (value == null) {
            synchronized (options) {
                options.remove(option);
            }
        } else {
            synchronized (options) {
                options.put(option, value);
            }
        }
        return (B) this;
    }

主要TCP參數以下:

  • (1) SO_TIMEOUT: 控制讀取操做將阻塞多少毫秒,若是返回值爲0,計時器就被禁止了,該線程將被無限期阻塞。
  • (2) SO_SNDBUF: 套接字使用的發送緩衝區大小
  • (3) SO_RCVBUF: 套接字使用的接收緩衝區大小
  • (4) SO_REUSEADDR : 是否容許重用端口
  • (5) CONNECT_TIMEOUT_MILLIS: 客戶端鏈接超時時間,原生NIO不提供該功能,Netty使用的是自定義鏈接超時定時器檢測和超時控制
  • (6) TCP_NODELAY : 是否使用Nagle算法

3. channel接口

一樣使用反射建立NioSocketChannel

4. 設置Handler接口

Bootstrap爲了簡化Handler的編排,提供了ChannelInitializer,當TCP鏈路註冊成功後,調用initChannel接口:

    public final void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        // Normally this method will never be called as handlerAdded(...) should call initChannel(...) and remove
        // the handler.
        if (initChannel(ctx)) {
            // we called initChannel(...) so we need to call now pipeline.fireChannelRegistered() to ensure we not
            // miss an event.
            ctx.pipeline().fireChannelRegistered();
        } else {
            // Called initChannel(...) before which is the expected behavior, so just forward the event.
            ctx.fireChannelRegistered();
        }
    }

其中InitChannel爲抽象接口,即下面紅色標註的代碼,用戶即是在這個方法中設置ChannelHandler:

    private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
        if (initMap.putIfAbsent(ctx, Boolean.TRUE) == null) { // Guard against re-entrance.
            try {
                initChannel((C) ctx.channel());
            } catch (Throwable cause) {
                // Explicitly call exceptionCaught(...) as we removed the handler before calling initChannel(...).
                // We do so to prevent multiple calls to initChannel(...).
                exceptionCaught(ctx, cause);
            } finally {
                remove(ctx);
            }
            return true;
        }
        return false;
    }

5.2 客戶端鏈接操做

1. 首先要建立和初始化NioSocketChannel

    private ChannelFuture doResolveAndConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) {
        final ChannelFuture regFuture = initAndRegister();
        final Channel channel = regFuture.channel();
     //.....

建立以後,初始化,而後在註冊:

final ChannelFuture initAndRegister() {
        Channel channel = null;
        try {
            channel = channelFactory.newChannel(); init(channel);
        } catch (Throwable t) {
            if (channel != null) {
                // channel can be null if newChannel crashed (eg SocketException("too many open files"))
                channel.unsafe().closeForcibly();
            }
            // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
            return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
        }

        ChannelFuture regFuture = config().group().register(channel);
        if (regFuture.cause() != null) {
            if (channel.isRegistered()) {
                channel.close();
            } else {
                channel.unsafe().closeForcibly();
            }
        }

        // If we are here and the promise is not failed, it's one of the following cases:
        // 1) If we attempted registration from the event loop, the registration has been completed at this point.
        //    i.e. It's safe to attempt bind() or connect() now because the channel has been registered.
        // 2) If we attempted registration from the other thread, the registration request has been successfully
        //    added to the event loop's task queue for later execution.
        //    i.e. It's safe to attempt bind() or connect() now:
        //         because bind() or connect() will be executed *after* the scheduled registration task is executed
        //         because register(), bind(), and connect() are all bound to the same thread.

        return regFuture;
    }

2. 建立完成後,鏈接操做會異步執行,最終調用到HeadContext的connect方法.

        public void connect(
                ChannelHandlerContext ctx,
                SocketAddress remoteAddress, SocketAddress localAddress,
                ChannelPromise promise) throws Exception {
            unsafe.connect(remoteAddress, localAddress, promise);
        }

的connect操做以下:

 if (doConnect(remoteAddress, localAddress)) {
                    fulfillConnectPromise(promise, wasActive);
                } else {
//...

3. doConnect三種可能結果

  • 鏈接成功,然會true;
  • 暫時沒有鏈接上,服務器端沒有返回ACK應答,鏈接結果不肯定,返回false。此種結果下,須要將NioSocketChannel中的selectionKey設置爲OP_CONNECT,監聽鏈接結果;
  • 接連失敗,直接拋出I/O異常

異步返回以後,須要判斷鏈接結果,若是成功,則觸發ChannelActive事件。最終會將NioSocketChannel中的selectionKey設置爲SelectionKey.OP_READ,用於監聽網絡讀操做。

5.3 異步鏈接結果通知

NioEventLoop的Selector輪詢客戶端鏈接Channel,當服務端返回應答後,進行判斷。依舊是NioEventLoop中的processSelectedKey方法:

 if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
                // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
                // See https://github.com/netty/netty/issues/924
                int ops = k.interestOps();
                ops &= ~SelectionKey.OP_CONNECT;
                k.interestOps(ops);

                unsafe.finishConnect();
            }

下面分析finishConnect方法:

        @Override
        public final void finishConnect() {
            // Note this method is invoked by the event loop only if the connection attempt was
            // neither cancelled nor timed out.

            assert eventLoop().inEventLoop();

            try {
                boolean wasActive = isActive();
                doFinishConnect(); //判斷SocketChannel的鏈接結果,true表示成功
                fulfillConnectPromise(connectPromise, wasActive); //觸發鏈路激活
            } catch (Throwable t) {
                fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress));
            } finally {
                // Check for null as the connectTimeoutFuture is only created if a connectTimeoutMillis > 0 is used
                // See https://github.com/netty/netty/issues/1770
                if (connectTimeoutFuture != null) {
                    connectTimeoutFuture.cancel(false);
                }
                connectPromise = null;
            }
        }

fulfillConnectPromise方法則觸發鏈路激活事件,並由ChannelPipeline進行傳播:

        private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {
            if (promise == null) {
                // Closed via cancellation and the promise has been notified already.
                return;
            }

            // Get the state as trySuccess() may trigger an ChannelFutureListener that will close the Channel.
            // We still need to ensure we call fireChannelActive() in this case.
            boolean active = isActive();

            // trySuccess() will return false if a user cancelled the connection attempt.
            boolean promiseSet = promise.trySuccess();

            // Regardless if the connection attempt was cancelled, channelActive() event should be triggered,
            // because what happened is what happened.
            if (!wasActive && active) {
                pipeline().fireChannelActive();
            }

            // If a user cancelled the connection attempt, close the channel, which is followed by channelInactive().
            if (!promiseSet) {
                close(voidPromise());
            }
        }

跟以前相似,將網絡監聽修改成讀操做。

5.4 客戶端鏈接超時機制

由Netty本身實現的客戶端超時機制,在AbstractNioChannel的connect方法中:

 public final void connect(
                final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
            if (!promise.setUncancellable() || !ensureOpen(promise)) {
                return;
            }

            try {
                if (connectPromise != null) {
                    // Already a connect in process.
                    throw new ConnectionPendingException();
                }

                boolean wasActive = isActive();
                if (doConnect(remoteAddress, localAddress)) {
                    fulfillConnectPromise(promise, wasActive);
                } else {
                    connectPromise = promise;
                    requestedRemoteAddress = remoteAddress;

                    // Schedule connect timeout.
                    int connectTimeoutMillis = config().getConnectTimeoutMillis();
                    if (connectTimeoutMillis > 0) {
                        connectTimeoutFuture = eventLoop().schedule(new Runnable() {
                            @Override
                            public void run() {
                                ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise;
                                ConnectTimeoutException cause =
                                        new ConnectTimeoutException("connection timed out: " + remoteAddress);
                                if (connectPromise != null && connectPromise.tryFailure(cause)) { close(voidPromise()); } } }, connectTimeoutMillis, TimeUnit.MILLISECONDS); }

                    promise.addListener(new ChannelFutureListener() {
                        @Override
                        public void operationComplete(ChannelFuture future) throws Exception {
                            if (future.isCancelled()) {
                                if (connectTimeoutFuture != null) {
                                    connectTimeoutFuture.cancel(false);
                                }
                                connectPromise = null; close(voidPromise()); } }
                    });
                }
            } catch (Throwable t) {
                promise.tryFailure(annotateConnectException(t, remoteAddress));
                closeIfClosed();
            }
        }

一旦超時定時器執行,則說明客戶端超時,構造異常,將異常結果設置到connectPromise中,同時關閉客戶端句柄。

若是在超時以前獲取結果,則直接刪除定時器,防止其被觸發。

相關文章
相關標籤/搜索