netty最核心的就是reactor線程,對應項目中使用普遍的NioEventLoop,那麼NioEventLoop裏面到底在幹些什麼事?netty是如何保證事件循環的高效輪詢和任務的及時執行?又是如何來優雅地fix掉jdk的nio bug?帶着這些疑問,本篇文章將庖丁解牛,帶你逐步瞭解netty reactor線程的真相[源碼基於4.1.6.Final]html
NioEventLoop的run方法是reactor線程的主體,在第一次添加任務的時候被啓動java
NioEventLoop 父類 SingleThreadEventExecutor 的execute方法react
@Override
public void execute(Runnable task) {
...
boolean inEventLoop = inEventLoop();
if (inEventLoop) {
addTask(task);
} else {
startThread();
addTask(task);
...
}
...
}
複製代碼
外部線程在往任務隊列裏面添加任務的時候執行 startThread()
,netty會判斷reactor線程有沒有被啓動,若是沒有被啓動,那就啓動線程再往任務隊列裏面添加任務微信
private void startThread() {
if (STATE_UPDATER.get(this) == ST_NOT_STARTED) {
if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
doStartThread();
}
}
}
複製代碼
SingleThreadEventExecutor 在執行doStartThread
的時候,會調用內部執行器executor
的execute方法,將調用NioEventLoop的run方法的過程封裝成一個runnable塞到一個線程中去執行網絡
private void doStartThread() {
...
executor.execute(new Runnable() {
@Override
public void run() {
thread = Thread.currentThread();
...
SingleThreadEventExecutor.this.run();
...
}
}
}
複製代碼
該線程就是executor
建立,對應netty的reactor線程實體。executor
默認是ThreadPerTaskExecutor
併發
默認狀況下,ThreadPerTaskExecutor
在每次執行execute
方法的時候都會經過DefaultThreadFactory
建立一個FastThreadLocalThread
線程,而這個線程就是netty中的reactor線程實體ide
ThreadPerTaskExecutoroop
public void execute(Runnable command) {
threadFactory.newThread(command).start();
}
複製代碼
關於爲啥是 ThreadPerTaskExecutor
和 DefaultThreadFactory
的組合來new一個FastThreadLocalThread
,這裏就再也不詳細描述,經過下面幾段代碼來簡單說明源碼分析
標準的netty程序會調用到
NioEventLoopGroup
的父類MultithreadEventExecutorGroup
的以下代碼學習
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) {
if (executor == null) {
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}
}
複製代碼
而後經過newChild的方式傳遞給NioEventLoop
@Override
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
return new NioEventLoop(this, executor, (SelectorProvider) args[0],
((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
}
複製代碼
關於reactor線程的建立和啓動就先講這麼多,咱們總結一下:netty的reactor線程在添加一個任務的時候被建立,該線程實體爲 FastThreadLocalThread
(這玩意之後會開篇文章重點講講),最後線程執行主體爲NioEventLoop
的run
方法。
那麼下面咱們就重點剖析一下 NioEventLoop
的run方法
@Override
protected void run() {
for (;;) {
try {
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.SELECT:
select(wakenUp.getAndSet(false));
if (wakenUp.get()) {
selector.wakeup();
}
default:
// fallthrough
}
processSelectedKeys();
runAllTasks(...);
}
} catch (Throwable t) {
handleLoopException(t);
}
...
}
複製代碼
咱們抽取出主幹,reactor線程作的事情其實很簡單,用下面一幅圖就能夠說明
reactor線程大概作的事情分爲對三個步驟不斷循環
1.首先輪詢註冊到reactor線程對用的selector上的全部的channel的IO事件
select(wakenUp.getAndSet(false));
if (wakenUp.get()) {
selector.wakeup();
}
複製代碼
2.處理產生網絡IO事件的channel
processSelectedKeys();
複製代碼
3.處理任務隊列
runAllTasks(...);
複製代碼
下面對每一個步驟詳細說明
select(wakenUp.getAndSet(false));
if (wakenUp.get()) {
selector.wakeup();
}
複製代碼
wakenUp
表示是否應該喚醒正在阻塞的select操做,能夠看到netty在進行一次新的loop以前,都會將wakeUp
被設置成false,標誌新的一輪loop的開始,具體的select操做咱們也拆分開來看
1.定時任務截止事時間快到了,中斷本次輪詢
int selectCnt = 0;
long currentTimeNanos = System.nanoTime();
long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
for (;;) {
long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
if (timeoutMillis <= 0) {
if (selectCnt == 0) {
selector.selectNow();
selectCnt = 1;
}
break;
}
....
}
複製代碼
咱們能夠看到,NioEventLoop中reactor線程的select操做也是一個for循環,在for循環第一步中,若是發現當前的定時任務隊列中有任務的截止事件快到了(<=0.5ms),就跳出循環。此外,跳出以前若是發現目前爲止尚未進行過select操做(if (selectCnt == 0)
),那麼就調用一次selectNow()
,該方法會當即返回,不會阻塞
這裏說明一點,netty裏面定時任務隊列是按照延遲時間從小到大進行排序, delayNanos(currentTimeNanos)
方法即取出第一個定時任務的延遲時間
protected long delayNanos(long currentTimeNanos) {
ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
if (scheduledTask == null) {
return SCHEDULE_PURGE_INTERVAL;
}
return scheduledTask.delayNanos(currentTimeNanos);
}
複製代碼
關於netty的任務隊列(包括普通任務,定時任務,tail task)相關的細節後面會另起一片文章,這裏不過多展開
2.輪詢過程當中發現有任務加入,中斷本次輪詢
for (;;) {
// 1.定時任務截至事時間快到了,中斷本次輪詢
...
// 2.輪詢過程當中發現有任務加入,中斷本次輪詢
if (hasTasks() && wakenUp.compareAndSet(false, true)) {
selector.selectNow();
selectCnt = 1;
break;
}
....
}
複製代碼
netty爲了保證任務隊列可以及時執行,在進行阻塞select操做的時候會判斷任務隊列是否爲空,若是不爲空,就執行一次非阻塞select操做,跳出循環
3.阻塞式select操做
for (;;) {
// 1.定時任務截至事時間快到了,中斷本次輪詢
...
// 2.輪詢過程當中發現有任務加入,中斷本次輪詢
...
// 3.阻塞式select操做
int selectedKeys = selector.select(timeoutMillis);
selectCnt ++;
if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
break;
}
....
}
複製代碼
執行到這一步,說明netty任務隊列裏面隊列爲空,而且全部定時任務延遲時間還未到(大於0.5ms),因而,在這裏進行一次阻塞select操做,截止到第一個定時任務的截止時間
這裏,咱們能夠問本身一個問題,若是第一個定時任務的延遲很是長,好比一個小時,那麼有沒有可能線程一直阻塞在select操做,固然有可能!But,只要在這段時間內,有新任務加入,該阻塞就會被釋放
外部線程調用execute方法添加任務
@Override
public void execute(Runnable task) {
...
wakeup(inEventLoop); // inEventLoop爲false
...
}
複製代碼
調用wakeup方法喚醒selector阻塞
protected void wakeup(boolean inEventLoop) {
if (!inEventLoop && wakenUp.compareAndSet(false, true)) {
selector.wakeup();
}
}
複製代碼
能夠看到,在外部線程添加任務的時候,會調用wakeup方法來喚醒 selector.select(timeoutMillis)
阻塞select操做結束以後,netty又作了一系列的狀態判斷來決定是否中斷本次輪詢,中斷本次輪詢的條件有
selectedKeys != 0
)hasTasks
)hasScheduledTasks()
)wakenUp.get()
)4.解決jdk的nio bug
關於該bug的描述見 bugs.java.com/bugdatabase…
該bug會致使Selector一直空輪詢,最終致使cpu 100%,nio server不可用,嚴格意義上來講,netty沒有解決jdk的bug,而是經過一種方式來巧妙地避開了這個bug,具體作法以下
long currentTimeNanos = System.nanoTime();
for (;;) {
// 1.定時任務截止事時間快到了,中斷本次輪詢
...
// 2.輪詢過程當中發現有任務加入,中斷本次輪詢
...
// 3.阻塞式select操做
selector.select(timeoutMillis);
// 4.解決jdk的nio bug
long time = System.nanoTime();
if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
selectCnt = 1;
} else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
rebuildSelector();
selector = this.selector;
selector.selectNow();
selectCnt = 1;
break;
}
currentTimeNanos = time;
...
}
複製代碼
netty 會在每次進行 selector.select(timeoutMillis)
以前記錄一下開始時間currentTimeNanos
,在select以後記錄一下結束時間,判斷select操做是否至少持續了timeoutMillis
秒(這裏將time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos
改爲time - currentTimeNanos >= TimeUnit.MILLISECONDS.toNanos(timeoutMillis)
或許更好理解一些), 若是持續的時間大於等於timeoutMillis,說明就是一次有效的輪詢,重置selectCnt
標誌,不然,代表該阻塞方法並無阻塞這麼長時間,可能觸發了jdk的空輪詢bug,當空輪詢的次數超過一個閥值的時候,默認是512,就開始重建selector
空輪詢閥值相關的設置代碼以下
int selectorAutoRebuildThreshold = SystemPropertyUtil.getInt("io.netty.selectorAutoRebuildThreshold", 512);
if (selectorAutoRebuildThreshold < MIN_PREMATURE_SELECTOR_RETURNS) {
selectorAutoRebuildThreshold = 0;
}
SELECTOR_AUTO_REBUILD_THRESHOLD = selectorAutoRebuildThreshold;
複製代碼
下面咱們簡單描述一下netty 經過rebuildSelector
來fix空輪詢bug的過程,rebuildSelector
的操做其實很簡單:new一個新的selector,將以前註冊到老的selector上的的channel從新轉移到新的selector上。咱們抽取完主要代碼以後的骨架以下
public void rebuildSelector() {
final Selector oldSelector = selector;
final Selector newSelector;
newSelector = openSelector();
int nChannels = 0;
try {
for (;;) {
for (SelectionKey key: oldSelector.keys()) {
Object a = key.attachment();
if (!key.isValid() || key.channel().keyFor(newSelector) != null) {
continue;
}
int interestOps = key.interestOps();
key.cancel();
SelectionKey newKey = key.channel().register(newSelector, interestOps, a);
if (a instanceof AbstractNioChannel) {
((AbstractNioChannel) a).selectionKey = newKey;
}
nChannels ++;
}
break;
}
} catch (ConcurrentModificationException e) {
// Probably due to concurrent modification of the key set.
continue;
}
selector = newSelector;
oldSelector.close();
}
複製代碼
首先,經過openSelector()
方法建立一個新的selector,而後執行一個死循環,只要執行過程當中出現過一次併發修改selectionKeys異常,就從新開始轉移
具體的轉移步驟爲
轉移完成以後,就能夠將原有的selector廢棄,後面全部的輪詢都是在新的selector進行
最後,咱們總結reactor線程select步驟作的事情:不斷地輪詢是否有IO事件發生,而且在輪詢的過程當中不斷檢查是否有定時任務和普通任務,保證了netty的任務隊列中的任務獲得有效執行,輪詢過程順帶用一個計數器避開了了jdk空輪詢的bug,過程清晰明瞭
因爲篇幅緣由,下面兩個過程將分別放到一篇文章中去講述,盡請期待
未完待續
未完待續
最後,經過文章開頭一副圖,咱們再次熟悉一下netty的reactor線程作的事兒
若是你想系統地學Netty,個人小冊《Netty 入門與實戰:仿寫微信 IM 即時通信系統》能夠幫助你,若是你想系統學習Netty原理,那麼你必定不要錯過個人Netty源碼分析系列視頻:coding.imooc.com/class/230.h…