Netty源碼分析第2章(NioEventLoop)---->第6節: 執行select操做

 

Netty源碼分析第二章: NioEventLoophtml

 

第六節: 執行select操做ide

 

分析完了selector的建立和優化的過程, 這一小節分析select相關操做oop

跟到跟到select操做的入口,NioEventLoop的run方法:源碼分析

protected void run() { for (;;) { try { switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) { case SelectStrategy.CONTINUE: continue; case SelectStrategy.SELECT: //輪詢io事件(1)
                    select(wakenUp.getAndSet(false)); if (wakenUp.get()) { selector.wakeup(); } default: } cancelledKeys = 0; needsToSelectAgain = false; //默認是50
            final int ioRatio = this.ioRatio; if (ioRatio == 100) { try { processSelectedKeys(); } finally { runAllTasks(); } } else { //記錄下開始時間
                final long ioStartTime = System.nanoTime(); try { //處理輪詢到的key(2)
 processSelectedKeys(); } finally { //計算耗時
                    final long ioTime = System.nanoTime() - ioStartTime; //執行task(3)
                    runAllTasks(ioTime * (100 - ioRatio) / ioRatio); } } } catch (Throwable t) { handleLoopException(t); } //代碼省略
 } }

代碼比較長, 其實主要分爲三部分:優化

1. 輪詢io事件ui

2. 處理輪詢到的keythis

3. 執行taskspa

這一小節, 主要剖析第一部分, 輪詢io事件線程

首先switch塊中默認會走到SelectStrategy.SELECT中, 執行select(wakenUp.getAndSet(false))方法rest

參數wakenUp.getAndSet(false)表明當前select操做是未喚醒狀態

進入到select(wakenUp.getAndSet(false))方法中:

private void select(boolean oldWakenUp) throws IOException { Selector selector = this.selector; try { int selectCnt = 0; //當前系統的納秒數
        long currentTimeNanos = System.nanoTime(); //截止時間=當前時間+隊列第一個任務剩餘執行時間
        long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos); for (;;) { //阻塞時間(毫秒)=(截止時間-當前時間+0.5毫秒)
            long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L; if (timeoutMillis <= 0) { if (selectCnt == 0) { selector.selectNow(); selectCnt = 1; } break; } if (hasTasks() && wakenUp.compareAndSet(false, true)) { selector.selectNow(); selectCnt = 1; break; } //進行阻塞式的select操做
            int selectedKeys = selector.select(timeoutMillis); //輪詢次數
            selectCnt ++; //若是輪詢到一個事件(selectedKeys != 0), 或者當前select操做須要喚醒(oldWakenUp), //或者在執行select操做時已經被外部線程喚醒(wakenUp.get()), //或者任務隊列已經有任務(hasTask), 或者定時任務隊列中有任務(hasScheduledTasks())
            if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) { break; } //省略 //記錄下當前時間
            long time = System.nanoTime(); //當前時間-開始時間>=超時時間(條件成立, 執行過一次select操做, 條件不成立, 有可能發生空輪詢)
            if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) { //表明已經進行了一次阻塞式select操做, 操做次數重置爲1
                selectCnt = 1; } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 && selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) { //省略日誌代碼 //若是空輪詢的次數大於一個閾值(512), 解決空輪詢的bug
 rebuildSelector(); selector = this.selector; selector.selectNow(); selectCnt = 1; break; } currentTimeNanos = time; } //代碼省略
    } catch (CancelledKeyException e) { //省略
 } }

首先經過 long currentTimeNanos = System.nanoTime() 獲取系統的納秒數

繼續往下看:

long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);

delayNanos(currentTimeNanos)表明距定時任務中第一個任務剩餘多長時間, 這個時間+當前時間表明此次操做不能超過的時間, 由於超過以後定時任務不能嚴格按照預約時間執行, 其中定時任務隊列是已經按照執行時間有小到大排列好的隊列, 因此第一個任務則是最近須要執行的任務, selectDeadLineNanos就表明了當前操做不能超過的時間

而後就進入到了無限for循環

for循環中咱們關注:

long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L

 selectDeadLineNanos - currentTimeNanos+500000L 表明截止時間-當前時間+0.5毫秒的調整時間, 除以1000000表示將計算的時間轉化爲毫秒數

最後算出的時間就是selector操做的阻塞時間, 並賦值到局部變量的timeoutMillis

 

後面有個判斷 if(imeoutMillis<0) , 表明當前時間已經超過了最後截止時間+0.5毫秒,  selectCnt == 0 表明沒有進行select操做, 知足這兩個條件, 則執行selectNow()以後, selectCnt賦值爲1以後跳出循環

 

若是沒超過截止時間, 就進行了 if(hasTasks() && wakenUp.compareAndSet(false, true)) 判斷

 

這裏咱們關注hasTasks()方法, 這裏是判斷當前NioEventLoop所綁定的taskQueue是否有任務, 若是有任務, 則執行selectNow()以後, selectCnt賦值爲1以後跳出循環(跳出循環以後去執行任務隊列中的任務)

 

hasTasks()方法能夠本身跟一下, 很是簡單

若是沒有知足上述條件, 就會執行 int selectedKeys = selector.select(timeoutMillis) 進行阻塞式輪詢, 而且自增輪詢次數, 然後會進行以下判斷:

if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) { break; }

selectedKeys != 0表明已經有輪詢到的事件, oldWakenUp表明當前select操做是否須要喚醒, wakenUp.get()說明已經被外部線程喚醒, hasTasks()表明任務隊列是否有任務, hasScheduledTasks()表明定時任務隊列是否任務, 知足條件之一, 就跳出循環

 

 long time = System.nanoTime() 記錄了當前的時間, 以後有個判斷:

 if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) 這裏的意思是當前時間-阻塞時間>方法開始執行的時間, 這裏說明已經完整的執行完成了一個阻塞的select()操做, selectCnt設置成1

 

若是此條件不成立, 說明沒有完整執行select()操做, 可能觸發了一次空輪詢, 根據前一個selectCnt++這步咱們知道, 每觸發一次空輪詢selectCnt都會自增

以後會進入第二個判斷 SELECTOR_AUTO_REBUILD_THRESHOLD > 0 && selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD 

 

其中SELECTOR_AUTO_REBUILD_THRESHOLD默認是512, 這個判斷意思就是空輪詢的次數若是超過512, 則會認爲是發生了epoll bug, 這樣會經過rebuildSelector()方法從新構建selector, 而後將從新構建的selector賦值到局部變量selector, 執行一次selectNow(), selectCnt初始化1, 跳出循環

 

rebuildSelector()方法中, netty是如何解決epoll bug:

public void rebuildSelector() { //是不是由其餘線程發起的
    if (!inEventLoop()) { //若是是其餘線程發起的, 將rebuildSelector()封裝成任務隊列, 由NioEventLoop進行調用
        execute(new Runnable() { @Override public void run() { rebuildSelector(); } }); return; } final Selector oldSelector = selector; final Selector newSelector; if (oldSelector == null) { return; } try { //從新建立一個select
        newSelector = openSelector(); } catch (Exception e) { logger.warn("Failed to create a new Selector.", e); return; } int nChannels = 0; for (;;) { try { //拿到舊select中全部的key
            for (SelectionKey key: oldSelector.keys()) { Object a = key.attachment(); try { Object a = key.attachment(); //代碼省略 //獲取key註冊的事件
                    int interestOps = key.interestOps(); //將key註冊的事件取消
 key.cancel(); //註冊到從新建立的新的selector中
                    SelectionKey newKey = key.channel().register(newSelector, interestOps, a); //若是channel是NioChannel
                    if (a instanceof AbstractNioChannel) { //從新賦值
                        ((AbstractNioChannel) a).selectionKey = newKey; } nChannels ++; } catch (Exception e) { //代碼省略
 } } } catch (ConcurrentModificationException e) { continue; } break; } selector = newSelector; //代碼省略
}

首先會判斷是否是當前NioEventLoop線程執行的, 若是不是, 則將構建方法封裝成task由當前NioEventLoop執行

 final Selector oldSelector = selector 表示拿到舊的selector

而後經過 newSelector = openSelector() 建立新的selector

經過for循環遍歷全部註冊在selector中的key

 Object a = key.attachment() 是獲取channel, 第一章講過, 在註冊時, 將自身做爲屬性綁定在key

for循環體中, 經過 int interestOps = key.interestOps() 獲取其註冊的事件

key.cancel()將註冊的事件進行取消

 SelectionKey newKey = key.channel().register(newSelector, interestOps, a) 將channel以及註冊的事件註冊在新的selector

 if (a instanceof AbstractNioChannel) 判斷是否是NioChannel

若是是NioChannel, 則經過 ((AbstractNioChannel) a).selectionKey = newKey 將自身的屬性selectionKey賦值爲新返回的key

 selector = newSelector 將自身NioEventLoop屬性selector賦值爲新建立的newSelector

至此, 就是netty解決epoll bug的步驟, 其實就是建立一個新的selector, 將舊selector中註冊的channel和事件從新註冊到新的selector, 而後將自身selector屬性替換成新建立的selector

 

 

上一節: 優化selector

下一節: 處理IO事件

相關文章
相關標籤/搜索