Netty源碼分析(二)EventLoopGroup分析

準備將Netty的源碼過一下,一來對本身是個總結消化的過程,二來但願對那些打算看Netty源碼的人(已經熟悉Netty的Reactor模型)能有一些幫助。目前所看Netty版本是4.1.3.Final。java

1 目錄

2 EventLoopGroup和EventLoop介紹

2.1 EventLoopGroup介紹

在前面一篇文章中提到了,EventLoopGroup主要負責2個事情,這裏再重複下:linux

它主要包含2個方面的功能,註冊Channel和執行一些Runnable任務。git

EventLoopGroup介紹

功能1:先來看看註冊Channel,即將Channel註冊到Selector上,由Selector來調度Channel的相關事件,如讀、寫、Accept等事件。github

而EventLoopGroup的設計是,它包含多個EventLoop(每個EventLoop一般內部包含一個線程),在執行上述註冊過程當中是須要選擇其中的一個EventLoop來執行上述註冊行爲,這裏就出現了一個選擇策略的問題,該選擇策略接口是EventExecutorChooser,你也能夠自定義一個實現。微信

從上面能夠看到,EventLoopGroup作的工做大部分是一些整體性的工做如初始化上述多個EventLoop、EventExecutorChooser等,具體的註冊Channel仍是交給它內部的EventLoop來實現。多線程

功能2:執行一些Runnable任務異步

EventLoopGroup繼承了EventExecutorGroup,EventExecutorGroup也是EventExecutor的集合,EventExecutorGroup也是掌管着EventExecutor的初始化工做,EventExecutorGroup對於Runnable任務的執行也是選擇內部中的一個EventExecutor來作具體的執行工做。ide

netty中不少任務都是異步執行的,一旦當前線程要對某個EventLoop執行相關操做,如註冊Channel到某個EventLoop,若是當前線程和所要操做的EventLoop內部的線程不是同一個,則當前線程就僅僅向EventLoop提交一個註冊任務,對外返回一個ChannelFuture。oop

總結:EventLoopGroup含有上述2種功能,它更多的是一個集合,可是具體的功能實現仍是選擇內部的一個item元素來執行相關任務。 這裏的內部item元素一般即實現了EventLoop,又實現了EventExecutor,如NioEventLoop等ui

繼續來看看EventLoopGroup的總體類圖

輸入圖片說明

從圖中能夠看到有2路分支:

1 MultithreadEventLoopGroup:用於封裝多線程的初始化邏輯,指定線程數等,即初始化對應數量的EventLoop,每一個EventLoop分配到一個線程

MultithreadEventLoopGroup的初始化

上圖中的newChild方法,NioEventLoopGroup就採用NioEventLoop做爲實現,EpollEventLoopGroup就採用EpollEventLoop做爲實現

如NioEventLoopGroup的實現:

protected EventLoop newChild(Executor executor, Object... args) throws Exception {
    return new NioEventLoop(this, executor, (SelectorProvider) args[0],
        ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
}

2 EventLoop接口實現了EventLoopGroup接口,主要由於EventLoopGroup中的功能接口仍是要靠內部的EventLoop來完成具體的操做

2.2 EventLoop介紹

EventLoop主要工做就是註冊Channel,並負責監控管理Channel的讀寫等事件,這就涉及到不一樣的監控方式,linux下有3種方式來進行事件監聽

select、poll、epoll

目前java的Selector接口的實現以下:

  • PollSelectorImpl:實現了poll方式
  • EPollSelectorImpl:實現了epoll方式

而Netty呢則使用以下:

  • NioEventLoop:採用的是jdk Selector接口(使用PollSelectorImpl的poll方式)來實現對Channel的事件檢測

  • EpollEventLoop:沒有采用jdk Selector的接口實現EPollSelectorImpl,而是Netty本身實現的epoll方式來實現對Channel的事件檢測,因此在EpollEventLoop中就不存在jdk的Selector。

2.2.1 NioEventLoop介紹

對於NioEventLoopGroup的功能,NioEventLoop都要作實際的實現,NioEventLoop既要實現註冊功能,又要實現運行Runnable任務

對於註冊Channel:NioEventLoop將Channel註冊到NioEventLoop內部的PollSelectorImpl上,來監聽該Channel的讀寫事件

對於運行Runnable任務:NioEventLoop的父類的父類SingleThreadEventExecutor實現了運行Runnable任務,在SingleThreadEventExecutor中,有一個任務隊列還有一個分配的線程

private final Queue<Runnable> taskQueue;
private volatile Thread thread;

NioEventLoop在該線程中不只要執行Selector帶來的IO事件,還要不斷的從上述taskQueue中取出任務來執行這些非IO事件。下面咱們來詳細看下這個過程

protected void run() {
    for (;;) {
        try {
            switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                case SelectStrategy.CONTINUE:
                    continue;
                case SelectStrategy.SELECT:
                    select(wakenUp.getAndSet(false));
                    if (wakenUp.get()) {
                        selector.wakeup();
                    }
                default:
                    // fallthrough
            }
            cancelledKeys = 0;
            needsToSelectAgain = false;
            final int ioRatio = this.ioRatio;
            if (ioRatio == 100) {
                processSelectedKeys();
                runAllTasks();
            } else {
                final long ioStartTime = System.nanoTime();

                processSelectedKeys();

                final long ioTime = System.nanoTime() - ioStartTime;
                runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
            }

            if (isShuttingDown()) {
                closeAll();
                if (confirmShutdown()) {
                    break;
                }
            }
        } catch (Throwable t) {
            ...
        }
    }
}

來詳細說下這個過程:

  • 1 計算當前是否須要執行select過程

    若是當前沒有Runnable任務,則執行select(這個select過程稍後詳細來講)。

    若是當前有Runnable任務,則要去執行處理流程,此時順便執行下selector.selectNow(),萬一有事件發生那就賺了,沒有白走此次處理流程

  • 2 根據IO任務的時間佔比設置來執行IO任務和非IO任務,即上面提到的Runnable任務

    若是ioRatio=100則每次都是執行所有的IO任務,執行所有的非IO任務 默認ioRatio=50,即一半時間用於處理IO任務,另外一半時間用於處理非IO任務。怎麼去控制非IO任務所佔用時間呢?

    這裏是每執行64個非IO任務(這裏多是每一個非IO任務比較短暫,減小一些判斷帶來的消耗)就判斷下佔用時間是否超過了上述時間限制

接下來詳細看下上述select過程

Selector selector = this.selector;
try {
    int selectCnt = 0;
    long currentTimeNanos = System.nanoTime();
    long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
    for (;;) {
        long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
        if (timeoutMillis <= 0) {
            if (selectCnt == 0) {
                selector.selectNow();
                selectCnt = 1;
            }
            break;
        }

        // If a task was submitted when wakenUp value was true, the task didn't get a chance to call
        // Selector#wakeup. So we need to check task queue again before executing select operation.
        // If we don't, the task might be pended until select operation was timed out.
        // It might be pended until idle timeout if IdleStateHandler existed in pipeline.
        if (hasTasks() && wakenUp.compareAndSet(false, true)) {
            selector.selectNow();
            selectCnt = 1;
            break;
        }

        int selectedKeys = selector.select(timeoutMillis);
        selectCnt ++;

        if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
            // - Selected something,
            // - waken up by user, or
            // - the task queue has a pending task.
            // - a scheduled task is ready for processing
            break;
        }
        if (Thread.interrupted()) {
            // Thread was interrupted so reset selected keys and break so we not run into a busy loop.
            // As this is most likely a bug in the handler of the user or it's client library we will
            // also log it.
            //
            // See https://github.com/netty/netty/issues/2426
            if (logger.isDebugEnabled()) {
                logger.debug("Selector.select() returned prematurely because " +
                        "Thread.currentThread().interrupt() was called. Use " +
                        "NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");
            }
            selectCnt = 1;
            break;
        }

        long time = System.nanoTime();
        if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
            // timeoutMillis elapsed without anything selected.
            selectCnt = 1;
        } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
                selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
            // The selector returned prematurely many times in a row.
            // Rebuild the selector to work around the problem.
            logger.warn(
                    "Selector.select() returned prematurely {} times in a row; rebuilding Selector {}.",
                    selectCnt, selector);

            rebuildSelector();
            selector = this.selector;

            // Select again to populate selectedKeys.
            selector.selectNow();
            selectCnt = 1;
            break;
        }
        currentTimeNanos = time;
    }
} catch (CancelledKeyException e) {
	...
}
  • 1 首先計算這次select過程的截止時間

    protected long delayNanos(long currentTimeNanos) {
            ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
            if (scheduledTask == null) {
                return SCHEDULE_PURGE_INTERVAL;
            }
    
            return scheduledTask.delayNanos(currentTimeNanos);
        }

    這裏其實就是從一個定時 任務隊列中取出定時任務,若是有則計算出離當前定時任務的下一次執行時間之差,若是沒有則按照固定的1s做爲select過程的時間

  • 2 將當前時間差轉化成ms

    若是當前時間差不足0.5ms的話,即timeoutMillis<=0,而且是第一次執行,則認爲時間過短執行執行一次selectNow

  • 3 若是有任務,則當即執行一次selectNow,跳出for循環

  • 4 而後就是普通的selector.select(timeoutMillis)

    在這段時間內若是有事件則跳出for循環,若是沒有事件則已經花費對應的時間差了,再次執行for循環,計算的timeoutMillis就會小於0,也會跳出for循環

    在上述邏輯中,基本selectCnt都是1,不會出現不少次,而這裏針對selectCnt有不少次的處理是基於一個狀況:

    selector.select(timeoutMillis)

    Selector的正常邏輯是一旦有事件就返回,沒有事件則最多等待timeoutMillis時間。 然而底層操做系統實現可能有bug,會出現:即便沒有產生事件就直接返回了,並無按照要求等待timeoutMillis時間。

    如今的解決辦法就是: 記錄上述出現的次數,一旦超過512這個閾值(可設置),就從新創建新的Selector,並將以前的Channel也所有遷移到新的Selector上

至此,NioEventLoop的主邏輯流程就介紹完了,以後就該重點介紹其中對於IO事件的處理了。而後就會引出來ChannelPipeline的處理流程

2.2.2 EpollEventLoop介紹

EpollEventLoop和NioEventLoop的主流程邏輯基本上是差很少的,不一樣之處就在於EpollEventLoop用epoll方式替換NioEventLoop中的PollSelectorImpl的poll方式。

這裏再也不詳細說明了,以後會詳細的說明Netty的epoll方式和jdk中的epoll方式的區別。

3 後續

下一篇就要詳細描述下NioEventLoop對於IO事件的處理,即ChannelPipeline的處理流程。

歡迎關注微信公衆號:乒乓狂魔

微信公衆號

相關文章
相關標籤/搜索