Netty源碼解析 -- EventLoop實現與Accept事件處理

文本分享Netty中NioEventLoop的具體實現與Accept事件處理過程。
源碼分析基於Netty 4.1java

EventLoop的啓動

前面文章說過了,NioEventLoop能夠視爲一個線程,Netty會將register,bind等操做做爲一個任務提交給EventLoop,而EventLoop的啓動正是在提交任務的execute方法中git

public void execute(Runnable task) {
    ...

    boolean inEventLoop = inEventLoop();
    addTask(task);
    // #1
    if (!inEventLoop) {
        startThread();
        if (isShutdown() && removeTask(task)) {
            reject();
        }
    }
    // #2
    if (!addTaskWakesUp && wakesUpForTask(task)) {
        wakeup(inEventLoop);
    }
}

#1 若當前線程非EventLoop執行線程,startThread會啓動一個新的線程,執行run方法。這裏就是啓動EventLoop了。
#2 若當前EventLoop執行線程阻塞等待IO事件(Selector#select方法),喚醒它執行任務github

NioEventLoop中維護了一個關注事件Key集合SelectedSelectionKeySet,它使用數組代替原(Jvm)Selector的中的HashSet,提升性能。數組默認大小爲1024,不夠用時容量*2。
另外NioEventLoop還維護了(jvm)Selector和優化後的SelectedSelectionKeySetSelector,SelectedSelectionKeySetSelector每次調用select前都清除SelectedSelectionKeySet。
NioEventLoop#構造方法 -> NioEventLoop#openSelector數組

private SelectorTuple openSelector() {
    final Selector unwrappedSelector;
    try {
        // #1
        unwrappedSelector = provider.openSelector();    
    } catch (IOException e) {
        throw new ChannelException("failed to open a new selector", e);
    }

    ...

    final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass;
    final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();

    Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() {
        @Override
        public Object run() {
            try {
                // #2
                Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
                Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");

                ...

                selectedKeysField.set(unwrappedSelector, selectedKeySet);
                publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);
                return null;
            } ...
        }
    });

    ...
    selectedKeys = selectedKeySet;
    logger.trace("instrumented a special java.util.Set into: {}", unwrappedSelector);
    // #3
    return new SelectorTuple(unwrappedSelector,
                             new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet));
}

#1 經過(jvm)SelectorProvider打開一個(jvm)Selector
#2 構造了SelectedSelectionKeySet對象selectedKeySet,並經過反射將該對象設置到(jvm)Selector的selectedKeys,publicSelectedKeys屬性中,這樣Selector監聽到的事件就會存儲到selectedKeySet。
#3 構造了SelectedSelectionKeySetSelector對象
NioEventLoop#unwrappedSelector維護的是JVM的Selector對象,NioEventLoop#selector則是Netty的SelectedSelectionKeySetSelector對象微信

EventLoop實現

下面看一下Netty的核心方法,NioEventLoop#runapp

protected void run() {
    for (;;) {
        try {
            // #1
            switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                case SelectStrategy.CONTINUE:
                    continue;
                case SelectStrategy.SELECT:
                    // #2
                    select(wakenUp.getAndSet(false));
                    // #3
                    if (wakenUp.get()) {
                        selector.wakeup();
                    }
                default:
            }

            cancelledKeys = 0;
            needsToSelectAgain = false;
            final int ioRatio = this.ioRatio;
            // #4
            if (ioRatio == 100) {
                try {
                    processSelectedKeys();
                } finally {
                    runAllTasks();
                }
            } else {
                final long ioStartTime = System.nanoTime();
                try {
                    processSelectedKeys();
                } finally {
                    final long ioTime = System.nanoTime() - ioStartTime;
                    runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                }
            }
        } catch (Throwable t) {
            handleLoopException(t);
        }
        // Always handle shutdown even if the loop processing threw an exception.
        try {
            // #5
            if (isShuttingDown()) {
                closeAll();
                if (confirmShutdown()) {
                    return;
                }
            }
        } catch (Throwable t) {
            handleLoopException(t);
        }
    }
}

run方法就是事件循環機制的具體實現,經過一個for循環不斷處理任務和IO事件。
#1 若是當前存在任務,調用selector.selectNow(),這時跳出switch循環,處理事件和任務,不然返回SelectStrategy.SELECT
#2 select方法會調用(jvm)Selector.select,阻塞線程,等待事件
#3 預喚醒線程
因爲頻繁調用(jvm)Selector.wakeup會形成性能消耗,NioEventLoop維護了一個喚醒標識wakenUp。
NioEventLoop#wakeup方法中,只有wakenUp.compareAndSet(false, true)成功才調用selector.wakeup()jvm

考慮這樣的場景,select方法阻塞期間,有新任務加入,這時wakenUp會被設置爲true。
任務處理完後,從新執行到Run方法#1,#2步驟間,若是這時有任務加入,因爲wakenUp已是true,沒法調用selector.wakeup(),致使該任務大量延遲。
爲了不這種狀況,Netty作了一次預喚醒。ide

#4 ioRatio表示執行IO事件所佔百分比,默認50,
這裏根據ioRatio,使用processSelectedKeys方法處理IO事件,runAllTasks方法處理Task,注意方法參數,經過ioTime,ioRatio計算任務應該執行的時間。
#5 檢查狀態,若是Netty中止,則關閉全部Channeloop

NioEventLoop#select源碼分析

private void select(boolean oldWakenUp) throws IOException {
    Selector selector = this.selector;
    try {
        int selectCnt = 0;
        long currentTimeNanos = System.nanoTime();
        long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);

        for (;;) {
            // #1
            long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
            if (timeoutMillis <= 0) {
                if (selectCnt == 0) {
                    selector.selectNow();
                    selectCnt = 1;
                }
                break;
            }
            // #2
            ...

            // #3
            int selectedKeys = selector.select(timeoutMillis);
            selectCnt ++;
            // #4
            if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
                break;
            }
            if (Thread.interrupted()) {
                ...
                selectCnt = 1;
                break;
            }

            // #5
            long time = System.nanoTime();
            if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
                selectCnt = 1;
            } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
                    selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
                // #6
                ...
                rebuildSelector();
                selector = this.selector;
                
                selector.selectNow();
                selectCnt = 1;
                break;
            }

            currentTimeNanos = time;
        }

        ...
    } catch (CancelledKeyException e) {
        ...
    }
}

select方法中使用for循環不斷阻塞線程,直達發送IO事件或加入了新任務。
#1 delayNanos方法計算延遲任務隊列中第一個任務的到期執行時間(即最晚還能延遲多長時間執行),沒有任務則返回1s。
阻塞時間timeoutMillis,爲第一個延遲任務的到期時間和當前時間差,加上預留時間5毫秒
timeoutMillis小於等於0(延遲任務到期了),跳出for循環,直接返回
#2 這裏的處理邏輯在Netty5.0.0.Alpha2版本移除,這裏也不列出
#3 調用select方法,阻塞線程等待IO時間或被喚醒
#4 當前已經發生了IO事件或者加入了任務,延遲任務(被喚醒),跳出for循環,直接返回
#5 舊版本的Java NIO在Linux Epoll實現上存在bug,(jvm)Selector.select方法可能在沒有任何就緒事件的狀況下返回,致使CPU空轉,利用率飆升到100%。
因而,Netty計算select方法重複調用次數selectCnt,並在selectCnt大於SELECTOR_AUTO_REBUILD_THRESHOLD配置(默認512)時,重建selector,從而規避該問題。
這裏因爲select方法阻塞時間大於等於timeoutMillis,則認爲當前非空轉,selectCnt次數重置。
#6 selectCnt大於SELECTOR_AUTO_REBUILD_THRESHOLD,重建Selector,將原Selector中註冊的Channle和關注事件Key轉移到新的Selector,並經過selectNow方法刷新selectedKeys。
幸虧在JDK6_6u4,JDK7_b12已修復該Bug。

NioEventLoop#processSelectedKeys -> (沒有禁用SelectedSelectionKeySet)processSelectedKeysOptimized -> processSelectedKey

private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
    final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
    ...

    try {
        int readyOps = k.readyOps();
        // #1
        if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
            // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
            // See https://github.com/netty/netty/issues/924
            int ops = k.interestOps();
            ops &= ~SelectionKey.OP_CONNECT;
            k.interestOps(ops);

            unsafe.finishConnect();
        }

        // #2
        if ((readyOps & SelectionKey.OP_WRITE) != 0) {
            // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
            ch.unsafe().forceFlush();
        }

        // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
        // to a spin loop
        // #3
        if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
            unsafe.read();
        }
    } catch (CancelledKeyException ignored) {
        unsafe.close(unsafe.voidPromise());
    }
}

#1 在調用read方法或write方法前,須要先調用finishConnect方法,不然(jvm)Channel可能拋出NotYetConnectedException異常
#2 先處理OP_WRITE事件,可以儘早寫入數據釋放內存
#3 處理OP_READ或OP_ACCEPT事件

Accept事件處理

下面咱們來看一下AcceptGroup中如何處理ServerChannel上監聽到的accept事件。

NioEventLoop#processSelectedKey方法#3步驟 -> NioMessageUnsafe#read(Channel,UnSafe各實現類關係請查看前面文章)

public void read() {
    ...
    try {
        try {
            do {
                // #1
                int localRead = doReadMessages(readBuf);
                if (localRead == 0) {
                    break;
                }
                if (localRead < 0) {
                    closed = true;
                    break;
                }

                allocHandle.incMessagesRead(localRead);
            } while (allocHandle.continueReading());
        } catch (Throwable t) {
            exception = t;
        }

        int size = readBuf.size();
        for (int i = 0; i < size; i ++) {
            readPending = false;
            // #2
            pipeline.fireChannelRead(readBuf.get(i));
        }
        readBuf.clear();
        allocHandle.readComplete();
        // #3
        pipeline.fireChannelReadComplete();

        ...
    } ...
}

#1 調用NioServerSocketChannel#doReadMessages,處理Accept事件。
注意,readBuf是一個List,用於接收處理結果。
#2 觸發DefaultChannelPipeline#fireChannelRead
#3 觸發DefaultChannelPipeline#fireChannelReadComplete

NioServerSocketChannel#doReadMessages

protected int doReadMessages(List<Object> buf) throws Exception {
    // #1
    SocketChannel ch = SocketUtils.accept(javaChannel());

    try {
        if (ch != null) {
            // #2
            buf.add(new NioSocketChannel(this, ch));
            return 1;
        }
    } catch (Throwable t) {
        ...
    }

    return 0;
}

#1 調用(jvm)ServerSocketChannel#accept方法,生成的(jvm)SocketChannel
#2 使用(jvm)SocketChannel構造NioSocketChannel

前面文章說過,ServerChannel註冊到AcceptGroup時,會給ServerChannel的ChannelPipeline添加一個ServerBootstrapAcceptor,用於處理accept事件。
DefaultChannelPipeline#fireChannelRead -> ServerBootstrapAcceptor#channelRead

public void channelRead(ChannelHandlerContext ctx, Object msg) {
    // #1
    final Channel child = (Channel) msg;

    child.pipeline().addLast(childHandler);

    setChannelOptions(child, childOptions, logger);

    for (Entry<AttributeKey<?>, Object> e: childAttrs) {
        child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
    }

    try {
        // #2
        childGroup.register(child).addListener(new ChannelFutureListener() {
            public void operationComplete(ChannelFuture future) throws Exception {
                if (!future.isSuccess()) {
                    forceClose(child, future.cause());
                }
            }
        });
    } catch (Throwable t) {
        forceClose(child, t);
    }
}

#1 注意msg參數,就是NioServerSocketChannel#doReadMessages方法中生成的NioSocketChannel。
上一篇文章已經說過,ServerBootstrap#init方法中會將ServerBootstrap的childHandler,currentChildHandler,currentChildOptions,currentChildAttrs變量都交給ServerBootstrapAcceptor。
這裏配置NioSocketChannel的Options,Attribute,並將childHandler添加給pipeline。
(咱們通常使用ChannelInitializer做爲childHandler,添加實際處理邏輯的ChannelHandler)
#2 將NioSocketChannel註冊到ReadGroup中,註冊過程相似於NioServerSocketChannel註冊到AcceptGroup類型,調用AbstractUnsafe#register方法實現。
但有一點不一樣,調用AbstractNioChannel#doBeginRead方法註冊關注事件Key時,關注事件Key(即AbstractNioChannel#readInterestOp),是來自子類AbstractNioByteChannel#構造方法,固定爲SelectionKey.OP_READ。(關於Channel註冊的內容,請參考 -- Netty Server啓動原理解析
到這裏,(jvm)SocketChannel已經註冊到ReadGroupo維護中(jvm)Selector,關注的事件Key爲read。

若是您以爲本文不錯,歡迎關注個人微信公衆號,您的關注是我堅持的動力!

相關文章
相關標籤/搜索