netty源碼分析之揭開reactor線程的面紗(一)

netty最核心的就是reactor線程,對應項目中使用普遍的NioEventLoop,那麼NioEventLoop裏面到底在幹些什麼事?netty是如何保證事件循環的高效輪詢和任務的及時執行?又是如何來優雅地fix掉jdk的nio bug?帶着這些疑問,本篇文章將庖丁解牛,帶你逐步瞭解netty reactor線程的真相[源碼基於4.1.6.Final]html

reactor 線程的啓動

NioEventLoop的run方法是reactor線程的主體,在第一次添加任務的時候被啓動java

NioEventLoop 父類 SingleThreadEventExecutor 的execute方法react

@Override
public void execute(Runnable task) {
    ...
    boolean inEventLoop = inEventLoop();
    if (inEventLoop) {
        addTask(task);
    } else {
        startThread();
        addTask(task);
        ...
    }
    ...
}
複製代碼

外部線程在往任務隊列裏面添加任務的時候執行 startThread() ,netty會判斷reactor線程有沒有被啓動,若是沒有被啓動,那就啓動線程再往任務隊列裏面添加任務微信

private void startThread() {
    if (STATE_UPDATER.get(this) == ST_NOT_STARTED) {
        if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
            doStartThread();
        }
    }
}
複製代碼

SingleThreadEventExecutor 在執行doStartThread的時候,會調用內部執行器executor的execute方法,將調用NioEventLoop的run方法的過程封裝成一個runnable塞到一個線程中去執行網絡

private void doStartThread() {
    ...
    executor.execute(new Runnable() {
        @Override
        public void run() {
            thread = Thread.currentThread();
            ...
                SingleThreadEventExecutor.this.run();
            ...
        }
    }
}
複製代碼

該線程就是executor建立,對應netty的reactor線程實體。executor 默認是ThreadPerTaskExecutor併發

默認狀況下,ThreadPerTaskExecutor 在每次執行execute 方法的時候都會經過DefaultThreadFactory建立一個FastThreadLocalThread線程,而這個線程就是netty中的reactor線程實體ide

ThreadPerTaskExecutoroop

public void execute(Runnable command) {
    threadFactory.newThread(command).start();
}
複製代碼

關於爲啥是 ThreadPerTaskExecutorDefaultThreadFactory的組合來new一個FastThreadLocalThread,這裏就再也不詳細描述,經過下面幾段代碼來簡單說明源碼分析

標準的netty程序會調用到NioEventLoopGroup的父類MultithreadEventExecutorGroup的以下代碼學習

protected MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) {
    if (executor == null) {
        executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
    }
}
複製代碼

而後經過newChild的方式傳遞給NioEventLoop

@Override
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
    return new NioEventLoop(this, executor, (SelectorProvider) args[0],
        ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
}
複製代碼

關於reactor線程的建立和啓動就先講這麼多,咱們總結一下:netty的reactor線程在添加一個任務的時候被建立,該線程實體爲 FastThreadLocalThread(這玩意之後會開篇文章重點講講),最後線程執行主體爲NioEventLooprun方法。

reactor 線程的執行

那麼下面咱們就重點剖析一下 NioEventLoop 的run方法

@Override
protected void run() {
    for (;;) {
        try {
            switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                case SelectStrategy.CONTINUE:
                    continue;
                case SelectStrategy.SELECT:
                    select(wakenUp.getAndSet(false));
                    if (wakenUp.get()) {
                        selector.wakeup();
                    }
                default:
                    // fallthrough
            }
            processSelectedKeys();
            runAllTasks(...);
            }
        } catch (Throwable t) {
            handleLoopException(t);
        }
        ...
    }
複製代碼

咱們抽取出主幹,reactor線程作的事情其實很簡單,用下面一幅圖就能夠說明

reactor action

reactor線程大概作的事情分爲對三個步驟不斷循環

1.首先輪詢註冊到reactor線程對用的selector上的全部的channel的IO事件

select(wakenUp.getAndSet(false));
if (wakenUp.get()) {
    selector.wakeup();
}
複製代碼

2.處理產生網絡IO事件的channel

processSelectedKeys();
複製代碼

3.處理任務隊列

runAllTasks(...);
複製代碼

下面對每一個步驟詳細說明

select操做

select(wakenUp.getAndSet(false));
if (wakenUp.get()) {
      selector.wakeup();
}
複製代碼

wakenUp 表示是否應該喚醒正在阻塞的select操做,能夠看到netty在進行一次新的loop以前,都會將wakeUp 被設置成false,標誌新的一輪loop的開始,具體的select操做咱們也拆分開來看

1.定時任務截止事時間快到了,中斷本次輪詢

int selectCnt = 0;
long currentTimeNanos = System.nanoTime();
long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);

for (;;) {
    long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
    if (timeoutMillis <= 0) {
        if (selectCnt == 0) {
            selector.selectNow();
            selectCnt = 1;
        }
        break;
    }
    ....
}
複製代碼

咱們能夠看到,NioEventLoop中reactor線程的select操做也是一個for循環,在for循環第一步中,若是發現當前的定時任務隊列中有任務的截止事件快到了(<=0.5ms),就跳出循環。此外,跳出以前若是發現目前爲止尚未進行過select操做(if (selectCnt == 0)),那麼就調用一次selectNow(),該方法會當即返回,不會阻塞

這裏說明一點,netty裏面定時任務隊列是按照延遲時間從小到大進行排序, delayNanos(currentTimeNanos)方法即取出第一個定時任務的延遲時間

protected long delayNanos(long currentTimeNanos) {
    ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
    if (scheduledTask == null) {
        return SCHEDULE_PURGE_INTERVAL;
    }
    return scheduledTask.delayNanos(currentTimeNanos);
 }
複製代碼

關於netty的任務隊列(包括普通任務,定時任務,tail task)相關的細節後面會另起一片文章,這裏不過多展開

2.輪詢過程當中發現有任務加入,中斷本次輪詢

for (;;) {
    // 1.定時任務截至事時間快到了,中斷本次輪詢
    ...
    // 2.輪詢過程當中發現有任務加入,中斷本次輪詢
    if (hasTasks() && wakenUp.compareAndSet(false, true)) {
        selector.selectNow();
        selectCnt = 1;
        break;
    }
    ....
}
複製代碼

netty爲了保證任務隊列可以及時執行,在進行阻塞select操做的時候會判斷任務隊列是否爲空,若是不爲空,就執行一次非阻塞select操做,跳出循環

3.阻塞式select操做

for (;;) {
    // 1.定時任務截至事時間快到了,中斷本次輪詢
    ...
    // 2.輪詢過程當中發現有任務加入,中斷本次輪詢
    ...
    // 3.阻塞式select操做
    int selectedKeys = selector.select(timeoutMillis);
    selectCnt ++;
    if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
        break;
    }
    ....
}
複製代碼

執行到這一步,說明netty任務隊列裏面隊列爲空,而且全部定時任務延遲時間還未到(大於0.5ms),因而,在這裏進行一次阻塞select操做,截止到第一個定時任務的截止時間

這裏,咱們能夠問本身一個問題,若是第一個定時任務的延遲很是長,好比一個小時,那麼有沒有可能線程一直阻塞在select操做,固然有可能!But,只要在這段時間內,有新任務加入,該阻塞就會被釋放

外部線程調用execute方法添加任務

@Override
public void execute(Runnable task) { 
    ...
    wakeup(inEventLoop); // inEventLoop爲false
    ...
}
複製代碼

調用wakeup方法喚醒selector阻塞

protected void wakeup(boolean inEventLoop) {
    if (!inEventLoop && wakenUp.compareAndSet(false, true)) {
        selector.wakeup();
    }
}
複製代碼

能夠看到,在外部線程添加任務的時候,會調用wakeup方法來喚醒 selector.select(timeoutMillis)

阻塞select操做結束以後,netty又作了一系列的狀態判斷來決定是否中斷本次輪詢,中斷本次輪詢的條件有

  • 輪詢到IO事件 (selectedKeys != 0
  • oldWakenUp 參數爲true
  • 任務隊列裏面有任務(hasTasks
  • 第一個定時任務即將要被執行 (hasScheduledTasks()
  • 用戶主動喚醒(wakenUp.get()

4.解決jdk的nio bug

關於該bug的描述見 bugs.java.com/bugdatabase…

該bug會致使Selector一直空輪詢,最終致使cpu 100%,nio server不可用,嚴格意義上來講,netty沒有解決jdk的bug,而是經過一種方式來巧妙地避開了這個bug,具體作法以下

long currentTimeNanos = System.nanoTime();
for (;;) {
    // 1.定時任務截止事時間快到了,中斷本次輪詢
    ...
    // 2.輪詢過程當中發現有任務加入,中斷本次輪詢
    ...
    // 3.阻塞式select操做
    selector.select(timeoutMillis);
    // 4.解決jdk的nio bug
    long time = System.nanoTime();
    if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
        selectCnt = 1;
    } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
            selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {

        rebuildSelector();
        selector = this.selector;
        selector.selectNow();
        selectCnt = 1;
        break;
    }
    currentTimeNanos = time; 
    ...
 }
複製代碼

netty 會在每次進行 selector.select(timeoutMillis) 以前記錄一下開始時間currentTimeNanos,在select以後記錄一下結束時間,判斷select操做是否至少持續了timeoutMillis秒(這裏將time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos改爲time - currentTimeNanos >= TimeUnit.MILLISECONDS.toNanos(timeoutMillis)或許更好理解一些), 若是持續的時間大於等於timeoutMillis,說明就是一次有效的輪詢,重置selectCnt標誌,不然,代表該阻塞方法並無阻塞這麼長時間,可能觸發了jdk的空輪詢bug,當空輪詢的次數超過一個閥值的時候,默認是512,就開始重建selector

空輪詢閥值相關的設置代碼以下

int selectorAutoRebuildThreshold = SystemPropertyUtil.getInt("io.netty.selectorAutoRebuildThreshold", 512);
if (selectorAutoRebuildThreshold < MIN_PREMATURE_SELECTOR_RETURNS) {
    selectorAutoRebuildThreshold = 0;
}
SELECTOR_AUTO_REBUILD_THRESHOLD = selectorAutoRebuildThreshold;
複製代碼

下面咱們簡單描述一下netty 經過rebuildSelector來fix空輪詢bug的過程,rebuildSelector的操做其實很簡單:new一個新的selector,將以前註冊到老的selector上的的channel從新轉移到新的selector上。咱們抽取完主要代碼以後的骨架以下

public void rebuildSelector() {
    final Selector oldSelector = selector;
    final Selector newSelector;
    newSelector = openSelector();

    int nChannels = 0;
     try {
        for (;;) {
                for (SelectionKey key: oldSelector.keys()) {
                    Object a = key.attachment();
                     if (!key.isValid() || key.channel().keyFor(newSelector) != null) {
                         continue;
                     }
                     int interestOps = key.interestOps();
                     key.cancel();
                     SelectionKey newKey = key.channel().register(newSelector, interestOps, a);
                     if (a instanceof AbstractNioChannel) {
                         ((AbstractNioChannel) a).selectionKey = newKey;
                      }
                     nChannels ++;
                }
                break;
        }
    } catch (ConcurrentModificationException e) {
        // Probably due to concurrent modification of the key set.
        continue;
    }
    selector = newSelector;
    oldSelector.close();
}
複製代碼

首先,經過openSelector()方法建立一個新的selector,而後執行一個死循環,只要執行過程當中出現過一次併發修改selectionKeys異常,就從新開始轉移

具體的轉移步驟爲

  1. 拿到有效的key
  2. 取消該key在舊的selector上的事件註冊
  3. 將該key對應的channel註冊到新的selector上
  4. 從新綁定channel和新的key的關係

轉移完成以後,就能夠將原有的selector廢棄,後面全部的輪詢都是在新的selector進行

最後,咱們總結reactor線程select步驟作的事情:不斷地輪詢是否有IO事件發生,而且在輪詢的過程當中不斷檢查是否有定時任務和普通任務,保證了netty的任務隊列中的任務獲得有效執行,輪詢過程順帶用一個計數器避開了了jdk空輪詢的bug,過程清晰明瞭

因爲篇幅緣由,下面兩個過程將分別放到一篇文章中去講述,盡請期待

process selected keys

未完待續

run tasks

未完待續

最後,經過文章開頭一副圖,咱們再次熟悉一下netty的reactor線程作的事兒

reactor action

  1. 輪詢IO事件
  2. 處理輪詢到的事件
  3. 執行任務隊列中的任務

若是你想系統地學Netty,個人小冊《Netty 入門與實戰:仿寫微信 IM 即時通信系統》能夠幫助你,若是你想系統學習Netty原理,那麼你必定不要錯過個人Netty源碼分析系列視頻:coding.imooc.com/class/230.h…

相關文章
相關標籤/搜索