Netty源碼分析之NioEventLoop(二)—NioEventLoop的啓動

上篇文章中咱們對Netty中NioEventLoop建立流程與源碼進行了跟蹤分析。本篇文章中咱們接着分析NioEventLoop的啓動流程;git

Netty中會在服務端啓動和新鏈接接入時經過chooser選擇器,分別爲NioServerSocketChannel與NioSocketChannel選擇綁定一個NioEventLoop,接下來咱們就分別從這兩個方面梳理NioEventLoop的啓動源碼github

1、服務端啓動promise

首先咱們結合下圖看下Netty服務啓動過程當中,NioServerSocketChannel綁定的NioEventLoop啓動流程ide

bind()部分源碼咱們在以前服務端啓動過程當中進行過說明,咱們進一步跟蹤進入doBind0()方法中能夠看到channel.eventLoop().execute的執行,須要說明的是這裏其實啓動的NioServerSocketChannel綁定的 bossGroup,用來負責處理新鏈接接入的。oop

    /**
     * read by jsf
     * 
     * @param regFuture
     * @param channel
     * @param localAddress
     * @param promise
     */
    private static void doBind0(final ChannelFuture regFuture, final Channel channel, final SocketAddress localAddress,
            final ChannelPromise promise) {
        //該方法向 NioServerSocketChannel 的 eventLoop 提交了一個任務,當 future(其實就是 promise) 成功後執行
        //NioServerSocketChannel 的 bind 方法,並添加一個關閉監聽器。咱們主要關注 bind 方法。
        // This method is invoked before channelRegistered() is triggered. Give user
        // handlers a chance to set up
        // the pipeline in its channelRegistered() implementation.
        channel.eventLoop().execute(new Runnable() {
            @Override
            public void run() {
                if (regFuture.isSuccess()) {
                    
                    channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
                } else {
                    promise.setFailure(regFuture.cause());
                }
            }
        });
    }

進入NioEventLoop父類SingleThreadEventExecutor中的execute方法,改方法經過inEventLoop()會首先判斷當前的線程是不是NioEventLoop自己綁定的線程,結合inEventLoop的代碼能夠看到NioEventLoop自己線程還未初始化爲空,這裏返回false,執行啓動線程操做,同時會任務放入任務隊列中。this

    @Override
    public void execute(Runnable task) {
        if (task == null) {
            throw new NullPointerException("task");
        }

        //首先判斷當前線程是不是該EventLoop綁定的線程
        boolean inEventLoop = inEventLoop();
        //把傳入的任務加入任務對立
        addTask(task);
        if (!inEventLoop) {//若是不是同一條線程
            startThread();
            if (isShutdown() && removeTask(task)) {
                reject();
            }
        }

        if (!addTaskWakesUp && wakesUpForTask(task)) {
            wakeup(inEventLoop);
        }
    }
    @Override
    public boolean inEventLoop(Thread thread) {
        return thread == this.thread;
    }

 繼續跟蹤進入startThread()方法中spa

    private void startThread() {
        if (state == ST_NOT_STARTED) {
            if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
                try {
                    doStartThread();
                } catch (Throwable cause) {
                    STATE_UPDATER.set(this, ST_NOT_STARTED);
                    PlatformDependent.throwException(cause);
                }
            }
        }
    }

 在 doStartThread()中主要實現瞭如下功能:線程

一、執行傳入的ThreadPerTaskExecutor的execute方法,建立一個新的線程,並與這個NioEventLoop對象綁定;rest

二、在開啓的線程中執行SingleThreadEventExecutor.this.run(),也就是NioEventLoop的run方法,開始NioEventLoop的執行操做;netty

    private void doStartThread() {
        assert thread == null;
        //線程執行器經過線程工廠建立線程
        executor.execute(new Runnable() {
            @Override
            public void run() {
                //開啓線程,並賦值
                thread = Thread.currentThread();
                if (interrupted) {
                    thread.interrupt();
                }

                boolean success = false;
                updateLastExecutionTime();
                try {
                    //執行NioEventLoop的run方法
                    SingleThreadEventExecutor.this.run();
                    success = true;
                } catch (Throwable t) {
                    logger.warn("Unexpected exception from an event executor: ", t);
                } finally {
                    for (;;) {
                        int oldState = state;
                        if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(
                                SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {
                            break;
                        }
                    }

                    // Check if confirmShutdown() was called at the end of the loop.
                    if (success && gracefulShutdownStartTime == 0) {
                        if (logger.isErrorEnabled()) {
                            logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +
                                    SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must " +
                                    "be called before run() implementation terminates.");
                        }
                    }

                    try {
                        // Run all remaining tasks and shutdown hooks.
                        for (;;) {
                            if (confirmShutdown()) {
                                break;
                            }
                        }
                    } finally {
                        try {
                            cleanup();
                        } finally {
                            STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);
                            threadLock.release();
                            if (!taskQueue.isEmpty()) {
                                if (logger.isWarnEnabled()) {
                                    logger.warn("An event executor terminated with " +
                                            "non-empty task queue (" + taskQueue.size() + ')');
                                }
                            }

                            terminationFuture.setSuccess(null);
                        }
                    }
                }
            }
        });
    }

OK到這一步,基於服務端啓動綁定端口的NioServerSocketChannel,也就是服務端Channel綁定的NioEventLoop已經啓動。

2、新鏈接接入

首先咱們結合下圖看下當有客戶端接入時,建立NioSocketChannel,而後綁定NioEventLoop並啓動的流程

服務端啓動時會在NioServerSocketChannel的任務鏈中添加ServerBootstrapAcceptor對象,這就是用來處理新新鏈接接入的

        p.addLast(new ChannelInitializer<Channel>() {
            @Override
            public void initChannel(final Channel ch) throws Exception {
                final ChannelPipeline pipeline = ch.pipeline();
                ChannelHandler handler = config.handler();
                if (handler != null) {
                    pipeline.addLast(handler);
                }

                // 服務端NioServerSocketChannel的pipeline中添加ServerBootstrapAcceptor
                ch.eventLoop().execute(new Runnable() {
                    @Override
                    public void run() {
                        pipeline.addLast(new ServerBootstrapAcceptor(
                                ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                    }
                });
            }
        });

在新鏈接接入事件觸發時,執行unsafe.read();

   private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
        final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
        if (!k.isValid()) {
            final EventLoop eventLoop;
            try {
                eventLoop = ch.eventLoop();
            } catch (Throwable ignored) {
                // If the channel implementation throws an exception because there is no event loop, we ignore this
                // because we are only trying to determine if ch is registered to this event loop and thus has authority
                // to close ch.
                return;
            }
            // Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop
            // and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is
            // still healthy and should not be closed.
            // See https://github.com/netty/netty/issues/5125
            if (eventLoop != this || eventLoop == null) {
                return;
            }
            // close the channel if the key is not valid anymore
            unsafe.close(unsafe.voidPromise());
            return;
        }

        try {
            int readyOps = k.readyOps();
            // We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise
            // the NIO JDK channel implementation may throw a NotYetConnectedException.
            if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
                // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
                // See https://github.com/netty/netty/issues/924
                int ops = k.interestOps();
                ops &= ~SelectionKey.OP_CONNECT;
                k.interestOps(ops);

                unsafe.finishConnect();
            }

            // Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
            if ((readyOps & SelectionKey.OP_WRITE) != 0) {
                // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
                ch.unsafe().forceFlush();
            }

            // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
            // to a spin loop
            //新鏈接接入
            if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                unsafe.read();
            }
        } catch (CancelledKeyException ignored) {
            unsafe.close(unsafe.voidPromise());
        }
    }

unsafe.read()的具體實現爲NioMessageUnsafe中的read(),在read()方法中主要實現了兩個功能:

一、建立客戶端Channel,也就是NioSocketChannel;

二、開始服務端NioServerSocketChannel的任務鏈傳遞,首先執行以前已經加入任務鏈的ServerBootstrapAcceptor中的channelRead

        @Override
        public void read() {
            assert eventLoop().inEventLoop();
            final ChannelConfig config = config();
            final ChannelPipeline pipeline = pipeline();
            final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
            allocHandle.reset(config);

            boolean closed = false;
            Throwable exception = null;
            try {
                try {
                    do {
                        //這裏建立客戶端鏈接,也就是NioSocketChannelChannel
                        int localRead = doReadMessages(readBuf);
                        if (localRead == 0) {
                            break;
                        }
                        if (localRead < 0) {
                            closed = true;
                            break;
                        }

                        allocHandle.incMessagesRead(localRead);
                    } while (allocHandle.continueReading());
                } catch (Throwable t) {
                    exception = t;
                }

                int size = readBuf.size();
                for (int i = 0; i < size; i ++) {
                    readPending = false;
                    //在這裏開始NioServerSocketChannel的任務鏈傳遞,會首先執行ServerBootstrapAcceptor中的channelRead
                    pipeline.fireChannelRead(readBuf.get(i));
                }
                readBuf.clear();
                allocHandle.readComplete();
                pipeline.fireChannelReadComplete();

                if (exception != null) {
                    closed = closeOnReadError(exception);

                    pipeline.fireExceptionCaught(exception);
                }

                if (closed) {
                    inputShutdown = true;
                    if (isOpen()) {
                        close(voidPromise());
                    }
                }
            } finally {
                // Check if there is a readPending which was not processed yet.
                // This could be for two reasons:
                // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
                // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
                //
                // See https://github.com/netty/netty/issues/2254
                if (!readPending && !config.isAutoRead()) {
                    removeReadOp();
                }
            }
        }

接下來在ServerBootstrapAcceptor中的channelRead中會獲取到傳入的NioSocketChannel,針對NioSocketChannel主要會執行如下操做:

一、配置childHandler任務鏈;

二、配置childOptions;

三、爲NioSocketChannel分配NioEventLoop

        @Override
        @SuppressWarnings("unchecked")
        public void channelRead(ChannelHandlerContext ctx, Object msg) {
            final Channel child = (Channel) msg;

            //配置childHandler任務鏈
            child.pipeline().addLast(childHandler);


            //配置childOptions
            setChannelOptions(child, childOptions, logger);

            for (Entry<AttributeKey<?>, Object> e: childAttrs) {
                child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
            }

            try {
                //爲新鏈接分配NioEventLoop,並啓動執行
                childGroup.register(child).addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture future) throws Exception {
                        if (!future.isSuccess()) {
                            forceClose(child, future.cause());
                        }
                    }
                });
            } catch (Throwable t) {
                forceClose(child, t);
            }
        }

看以看到EventLoopGroup中register具體實實現:

一、關於next(),咱們以前講過是專門用來分配NioEventLoop;

二、register()主要負責了EventLoop的綁定和啓動;

   @Override
    public ChannelFuture register(ChannelPromise promise) {
        return next().register(promise);
    }
        @Override
        public final void register(EventLoop eventLoop, final ChannelPromise promise) {
            if (eventLoop == null) {
                throw new NullPointerException("eventLoop");
            }
            if (isRegistered()) {
                promise.setFailure(new IllegalStateException("registered to an event loop already"));
                return;
            }
            if (!isCompatible(eventLoop)) {
                promise.setFailure(
                        new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
                return;
            }

            //與NioEventLoop綁定
            AbstractChannel.this.eventLoop = eventLoop;

            //首先判斷線程是否一致,當前線程是NioServerSocketChannel的線程,與當前建立NioSocketChannel的eventLoop線程不一致
            if (eventLoop.inEventLoop()) {
                register0(promise);
            } else {
                try {
                    //在這裏NioEventLoop啓動
                    eventLoop.execute(new Runnable() {
                        @Override
                        public void run() {
                            register0(promise);
                        }
                    });
                } catch (Throwable t) {
                    logger.warn(
                            "Force-closing a channel whose registration task was not accepted by an event loop: {}",
                            AbstractChannel.this, t);
                    closeForcibly();
                    closeFuture.setClosed();
                    safeSetFailure(promise, t);
                }
            }
        }

上面代碼中的 eventLoop.execute咱們已經分析過,通過一系列的流程,最後會執行NioEventLoop的run方法開始輪詢感興趣的IO事件。

 以上咱們主要從服務啓動與客戶端鏈接兩個方面分析了NioEventLoop的啓動流程與源碼,其實也就對應NioServerSocketChannel與NioSocketChannel分別綁定的NioEventLoop,其中有錯誤和不足之處還請指正與海涵。

相關文章
相關標籤/搜索