Netty 源碼解析(七): NioEventLoop 工做流程

1588902612(1).jpg
今天是猿燈塔「365篇原創計劃」第七篇。面試

接下來的時間燈塔君持續更新Netty系列一共九篇微信

Netty 源碼解析(一): 開始ide

Netty 源碼解析(二): Netty 的 Channeloop

Netty 源碼解析(三): Netty 的 Future 和 Promisethis

Netty 源碼解析(四): Netty 的 ChannelPipelinespa

Netty 源碼解析(五): Netty 的線程池分析線程

Netty 源碼解析(六): Channel 的 register 操做code

當前:Netty 源碼解析(七): NioEventLoop 工做流程orm

Netty 源碼解析(八): 回到 Channel 的 register 操做blog

Netty 源碼解析(九): connect 過程和 bind 過程分析

今天呢!燈塔君跟你們講:

NioEventLoop 工做流

NioEventLoop 工做流程

前面,咱們在分析線程池的實例化的時候說過,NioEventLoop 中並無啓動 Java 線程。這裏咱們來仔細分析下在 register 過程當中調用的 eventLoop.execute(runnable) 這個方法,這個代碼在父類 SingleThreadEventExecutor 中:

`@Override
public void execute(Runnable task) {

if (task == null) {
    throw new NullPointerException("task");
}
// 判斷添加任務的線程是否就是當前 EventLoop 中的線程
boolean inEventLoop = inEventLoop();`

// 添加任務到以前介紹的 taskQueue 中,
//     若是 taskQueue 滿了(默認大小 16),根據咱們以前說的,默認的策略是拋出異常
addTask(task);

if (!inEventLoop) {
    // 若是不是 NioEventLoop 內部線程提交的 task,那麼判斷下線程是否已經啓動,沒有的話,就啓動線程
    startThread();
    if (isShutdown() && removeTask(task)) {
        reject();
    }
}

if (!addTaskWakesUp && wakesUpForTask(task)) {
    wakeup(inEventLoop);
}

}

原來啓動 NioEventLoop 中的線程的方法在這裏。

另外,上節咱們說的 register 操做進到了 taskQueue 中,因此它實際上是被歸類到了非 IO 操做的範疇。

下面是 startThread 的源碼,判斷線程是否已經啓動來決定是否要進行啓動操做:

`private void startThread() {

if (state == ST_NOT_STARTED) {
    if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
        try {
            doStartThread();
        } catch (Throwable cause) {
            STATE_UPDATER.set(this, ST_NOT_STARTED);
            PlatformDependent.throwException(cause);
        }
    }
}

}`

咱們按照前面的思路,根據線程沒有啓動的狀況,來看看 doStartThread() 方法:

`private void doStartThread() {

assert thread == null;
// 這裏的 executor 你們是否是有點熟悉的感受,它就是一開始咱們實例化 NioEventLoop 的時候傳進來的 ThreadPerTaskExecutor 的實例。它是每次來一個任務,建立一個線程的那種 executor。
// 一旦咱們調用它的 execute 方法,它就會建立一個新的線程,因此這裏終於會建立 Thread 實例
executor.execute(new Runnable() {
    @Override
    public void run() {
        // 看這裏,將 「executor」 中建立的這個線程設置爲 NioEventLoop 的線程!!!
        thread = Thread.currentThread();
        
        if (interrupted) {
            thread.interrupt();
        }

        boolean success = false;
        updateLastExecutionTime();
        try {
            // 執行 SingleThreadEventExecutor 的 run() 方法,它在 NioEventLoop 中實現了
            SingleThreadEventExecutor.this.run();
            success = true;
        } catch (Throwable t) {
            logger.warn("Unexpected exception from an event executor: ", t);
        } finally {
            // ... 咱們直接忽略掉這裏的代碼
        }
    }
});

}`
上面線程啓動之後,會執行 NioEventLoop 中的 run() 方法,這是一個很是重要的方法,這個方法確定是沒那麼容易結束的,必然是像 JDK 線程池的 Worker 那樣,不斷地循環獲取新的任務的。它須要不斷地作 select 操做和輪詢 taskQueue 這個隊列。

咱們先來簡單地看一下它的源碼,這裏先不作深刻地介紹:

`@Override
protected void run() {

// 代碼嵌套在 for 循環中
for (;;) {
    try {
        // selectStrategy 終於要派上用場了
        // 它有兩個值,一個是 CONTINUE 一個是 SELECT
        // 針對這塊代碼,咱們分析一下。
        // 1. 若是 taskQueue 不爲空,也就是 hasTasks() 返回 true,
        //         那麼執行一次 selectNow(),該方法不會阻塞
        // 2. 若是 hasTasks() 返回 false,那麼執行 SelectStrategy.SELECT 分支,
        //    進行 select(...),這塊是帶阻塞的
        // 這個很好理解,就是按照是否有任務在排隊來決定是否能夠進行阻塞
        switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
            case SelectStrategy.CONTINUE:
                continue;
            case SelectStrategy.SELECT:
                // 若是 !hasTasks(),那麼進到這個 select 分支,這裏 select 帶阻塞的
                select(wakenUp.getAndSet(false));
                if (wakenUp.get()) {
                    selector.wakeup();
                }
            default:
        }
        
        
        cancelledKeys = 0;
        needsToSelectAgain = false;
        // 默認地,ioRatio 的值是 50
        final int ioRatio = this.ioRatio;
        
        if (ioRatio == 100) {
            // 若是 ioRatio 設置爲 100,那麼先執行 IO 操做,而後在 finally 塊中執行 taskQueue 中的任務
            try {
                // 1. 執行 IO 操做。由於前面 select 之後,可能有些 channel 是須要處理的。
                processSelectedKeys();
            } finally {
                // 2. 執行非 IO 任務,也就是 taskQueue 中的任務
                runAllTasks();
            }
        } else {
            // 若是 ioRatio 不是 100,那麼根據 IO 操做耗時,限制非 IO 操做耗時
            final long ioStartTime = System.nanoTime();
            try {
                // 執行 IO 操做
                processSelectedKeys();
            } finally {
                // 根據 IO 操做消耗的時間,計算執行非 IO 操做(runAllTasks)能夠用多少時間.
                final long ioTime = System.nanoTime() - ioStartTime;
                runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
            }
        }
    } catch (Throwable t) {
        handleLoopException(t);
    }
    // Always handle shutdown even if the loop processing threw an exception.
    try {
        if (isShuttingDown()) {
            closeAll();
            if (confirmShutdown()) {
                return;
            }
        }
    } catch (Throwable t) {
        handleLoopException(t);
    }
}

}
`

上面這段代碼是 NioEventLoop 的核心,這裏介紹兩點:

  1. 首先,會根據 hasTasks() 的結果來決定是執行 selectNow() 仍是 select(oldWakenUp),這個應該好理解。若是有任務正在等待,那麼應該使用無阻塞的 selectNow(),若是沒有任務在等待,那麼就可使用帶阻塞的 select 操做。
  2. ioRatio 控制 IO 操做所佔的時間比重:

咱們這裏先不要去關心 select(oldWakenUp)、processSelectedKeys() 方法和 runAllTasks(…) 方法的細節,只要先理解它們分別作什麼事情就能夠了。

回過神來,咱們前面在 register 的時候提交了 register 任務給 NioEventLoop,這是 NioEventLoop 接收到的第一個任務,因此這裏會實例化 Thread 而且啓動,而後進入到 NioEventLoop 中的 run 方法。

固然了,實際狀況多是,Channel 實例被 register 到一個已經啓動線程的 NioEventLoop 實例中。

365天干貨不斷微信搜索「猿燈塔」第一時間閱讀,回覆【資料】【面試】【簡歷】有我準備的一線大廠面試資料和簡歷模板

相關文章
相關標籤/搜索