上兩篇博文(netty源碼分析之揭開reactor線程的面紗(一),netty源碼分析之揭開reactor線程的面紗(二))已經描述了netty的reactor線程前兩個步驟所處理的工做,在這裏,咱們用這張圖片來回顧一下:html
簡單總結一下reactor線程三部曲java
今天,咱們要進行的是三部曲中的最後一曲【處理任務隊列】,也就是上面圖中的紫色部分。react
讀完本篇文章,你將瞭解到netty的異步task機制,定時任務的處理邏輯,這些細節能夠更好地幫助你寫出netty應用promise
咱們取三種典型的task使用場景來分析安全
ctx.channel().eventLoop().execute(new Runnable() {
@Override
public void run() {
//...
}
});
複製代碼
咱們跟進execute
方法,看重點微信
@Override
public void execute(Runnable task) {
//...
addTask(task);
//...
}
複製代碼
execute
方法調用 addTask
方法多線程
protected void addTask(Runnable task) {
// ...
if (!offerTask(task)) {
reject(task);
}
}
複製代碼
而後調用offerTask
方法,若是offer失敗,那就調用reject
方法,經過默認的 RejectedExecutionHandler
直接拋出異常併發
final boolean offerTask(Runnable task) {
// ...
return taskQueue.offer(task);
}
複製代碼
跟到offerTask
方法,基本上task就落地了,netty內部使用一個taskQueue
將task保存起來,那麼這個taskQueue
又是何方神聖?框架
咱們查看 taskQueue
定義的地方和被初始化的地方異步
private final Queue<Runnable> taskQueue;
taskQueue = newTaskQueue(this.maxPendingTasks);
@Override
protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {
// This event loop never calls takeTask()
return PlatformDependent.newMpscQueue(maxPendingTasks);
}
複製代碼
咱們發現 taskQueue
在NioEventLoop中默認是mpsc隊列,mpsc隊列,即多生產者單消費者隊列,netty使用mpsc,方便的將外部線程的task彙集,在reactor線程內部用單線程來串行執行,咱們能夠借鑑netty的任務執行模式來處理相似多線程數據上報,定時聚合的應用
在本節討論的任務場景中,全部代碼的執行都是在reactor線程中的,因此,全部調用 inEventLoop()
的地方都返回true,既然都是在reactor線程中執行,那麼其實這裏的mpsc隊列其實沒有發揮真正的做用,mpsc大顯身手的地方其實在第二種場景
// non reactor thread
channel.write(...)
複製代碼
上面一種狀況在push系統中比較常見,通常在業務線程裏面,根據用戶的標識,找到對應的channel引用,而後調用write類方法向該用戶推送消息,就會進入到這種場景
關於channel.write()類方法的調用鏈,後面會單獨拉出一篇文章來深刻剖析,這裏,咱們只須要知道,最終write方法串至如下方法
AbstractChannelHandlerContext.java
private void write(Object msg, boolean flush, ChannelPromise promise) {
// ...
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
if (flush) {
next.invokeWriteAndFlush(m, promise);
} else {
next.invokeWrite(m, promise);
}
} else {
AbstractWriteTask task;
if (flush) {
task = WriteAndFlushTask.newInstance(next, m, promise);
} else {
task = WriteTask.newInstance(next, m, promise);
}
safeExecute(executor, task, promise, m);
}
}
複製代碼
外部線程在調用write
的時候,executor.inEventLoop()
會返回false,直接進入到else分支,將write封裝成一個WriteTask
(這裏僅僅是write而沒有flush,所以flush
參數爲false), 而後調用 safeExecute
方法
private static void safeExecute(EventExecutor executor, Runnable runnable, ChannelPromise promise, Object msg) {
// ...
executor.execute(runnable);
// ...
}
複製代碼
接下來的調用鏈就進入到第一種場景了,可是和第一種場景有個明顯的區別就是,第一種場景的調用鏈的發起線程是reactor線程,第二種場景的調用鏈的發起線程是用戶線程,用戶線程可能會有不少個,顯然多個線程併發寫taskQueue
可能出現線程同步問題,因而,這種場景下,netty的mpsc queue就有了用武之地
ctx.channel().eventLoop().schedule(new Runnable() {
@Override
public void run() {
}
}, 60, TimeUnit.SECONDS);
複製代碼
第三種場景就是定時任務邏輯了,用的最多的即是如上方法:在必定時間以後執行任務
咱們跟進schedule
方法
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
//...
return schedule(new ScheduledFutureTask<Void>(
this, command, null, ScheduledFutureTask.deadlineNanos(unit.toNanos(delay))));
}
複製代碼
經過 ScheduledFutureTask
, 將用戶自定義任務再次包裝成一個netty內部的任務
<V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) {
// ...
scheduledTaskQueue().add(task);
// ...
return task;
}
複製代碼
到了這裏,咱們有點似曾相識,在非定時任務的處理中,netty經過一個mpsc隊列將任務落地,這裏,是否也有一個相似的隊列來承載這類定時任務呢?帶着這個疑問,咱們繼續向前
Queue<ScheduledFutureTask<?>> scheduledTaskQueue() {
if (scheduledTaskQueue == null) {
scheduledTaskQueue = new PriorityQueue<ScheduledFutureTask<?>>();
}
return scheduledTaskQueue;
}
複製代碼
果不其然,scheduledTaskQueue()
方法,會返回一個優先級隊列,而後調用 add
方法將定時任務加入到隊列中去,可是,這裏爲何要使用優先級隊列,而不須要考慮多線程的併發?
由於咱們如今討論的場景,調用鏈的發起方是reactor線程,不會存在多線程併發這些問題
可是,萬一有的用戶在reactor以外執行定時任務呢?雖然這類場景不多見,可是netty做爲一個無比健壯的高性能io框架,必需要考慮到這種狀況。
對此,netty的處理是,若是是在外部線程調用schedule,netty將添加定時任務的邏輯封裝成一個普通的task,這個task的任務是添加[添加定時任務]的任務,而不是添加定時任務,其實也就是第二種場景,這樣,對 PriorityQueue
的訪問就變成單線程,即只有reactor線程
完整的schedule方法
<V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) {
if (inEventLoop()) {
scheduledTaskQueue().add(task);
} else {
// 進入到場景二,進一步封裝任務
execute(new Runnable() {
@Override
public void run() {
scheduledTaskQueue().add(task);
}
});
}
return task;
}
複製代碼
在閱讀源碼細節的過程當中,咱們應該多問幾個爲何?這樣會有利於看源碼的時候不至於犯困!好比這裏,爲何定時任務要保存在優先級隊列中,咱們能夠先不看源碼,來思考一下優先級對列的特性
優先級隊列按必定的順序來排列內部元素,內部元素必須是能夠比較的,聯繫到這裏每一個元素都是定時任務,那就說明定時任務是能夠比較的,那麼到底有哪些地方能夠比較?
每一個任務都有一個下一次執行的截止時間,截止時間是能夠比較的,截止時間相同的狀況下,任務添加的順序也是能夠比較的,就像這樣,閱讀源碼的過程當中,必定要多和本身對話,多問幾個爲何
帶着猜測,咱們研究與一下ScheduledFutureTask
,抽取出關鍵部分
final class ScheduledFutureTask<V> extends PromiseTask<V> implements ScheduledFuture<V> {
private static final AtomicLong nextTaskId = new AtomicLong();
private static final long START_TIME = System.nanoTime();
static long nanoTime() {
return System.nanoTime() - START_TIME;
}
private final long id = nextTaskId.getAndIncrement();
/* 0 - no repeat, >0 - repeat at fixed rate, <0 - repeat with fixed delay */
private final long periodNanos;
@Override
public int compareTo(Delayed o) {
//...
}
// 精簡過的代碼
@Override
public void run() {
}
複製代碼
這裏,咱們一眼就找到了compareTo
方法,cmd+u
跳轉到實現的接口,發現就是Comparable
接口
public int compareTo(Delayed o) {
if (this == o) {
return 0;
}
ScheduledFutureTask<?> that = (ScheduledFutureTask<?>) o;
long d = deadlineNanos() - that.deadlineNanos();
if (d < 0) {
return -1;
} else if (d > 0) {
return 1;
} else if (id < that.id) {
return -1;
} else if (id == that.id) {
throw new Error();
} else {
return 1;
}
}
複製代碼
進入到方法體內部,咱們發現,兩個定時任務的比較,確實是先比較任務的截止時間,截止時間相同的狀況下,再比較id,即任務添加的順序,若是id再相同的話,就拋Error
這樣,在執行定時任務的時候,就能保證最近截止時間的任務先執行
下面,咱們再來看下netty是如何來保證各類定時任務的執行的,netty裏面的定時任務分如下三種
1.若干時間後執行一次 2.每隔一段時間執行一次 3.每次執行結束,隔必定時間再執行一次
netty使用一個 periodNanos
來區分這三種狀況,正如netty的註釋那樣
/* 0 - no repeat, >0 - repeat at fixed rate, <0 - repeat with fixed delay */
private final long periodNanos;
複製代碼
瞭解這些背景以後,咱們來看下netty是如何來處理這三種不一樣類型的定時任務的
public void run() {
if (periodNanos == 0) {
V result = task.call();
setSuccessInternal(result);
} else {
task.call();
long p = periodNanos;
if (p > 0) {
deadlineNanos += p;
} else {
deadlineNanos = nanoTime() - p;
}
scheduledTaskQueue.add(this);
}
}
}
複製代碼
if (periodNanos == 0)
對應 若干時間後執行一次
的定時任務類型,執行完了該任務就結束了。
不然,進入到else代碼塊,先執行任務,而後再區分是哪一種類型的任務,periodNanos
大於0,表示是以固定頻率執行某個任務,和任務的持續時間無關,而後,設置該任務的下一次截止時間爲本次的截止時間加上間隔時間periodNanos
,不然,就是每次任務執行完畢以後,間隔多長時間以後再次執行,截止時間爲當前時間加上間隔時間,-p
就表示加上一個正的間隔時間,最後,將當前任務對象再次加入到隊列,實現任務的定時執行
netty內部的任務添加機制瞭解地差很少以後,咱們就能夠查看reactor第三部曲是如何來調度這些任務的
首先,咱們將目光轉向最外層的外觀代碼
runAllTasks(long timeoutNanos);
複製代碼
顧名思義,這行代碼表示了儘可能在必定的時間內,將全部的任務都取出來run一遍。timeoutNanos
表示該方法最多執行這麼長時間,netty爲何要這麼作?咱們能夠想想,reactor線程若是在此停留的時間過長,那麼將積攢許多的IO事件沒法處理(見reactor線程的前面兩個步驟),最終致使大量客戶端請求阻塞,所以,默認狀況下,netty將控制內部隊列的執行時間
好,咱們繼續跟進
protected boolean runAllTasks(long timeoutNanos) {
fetchFromScheduledTaskQueue();
Runnable task = pollTask();
//...
final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
long runTasks = 0;
long lastExecutionTime;
for (;;) {
safeExecute(task);
runTasks ++;
if ((runTasks & 0x3F) == 0) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
if (lastExecutionTime >= deadline) {
break;
}
}
task = pollTask();
if (task == null) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
break;
}
}
afterRunningAllTasks();
this.lastExecutionTime = lastExecutionTime;
return true;
}
複製代碼
這段代碼即是reactor執行task的全部邏輯,能夠拆解成下面幾個步驟
按照這個步驟,咱們一步步來分析下
首先調用 fetchFromScheduledTaskQueue()
方法,將到期的定時任務轉移到mpsc queue裏面
private boolean fetchFromScheduledTaskQueue() {
long nanoTime = AbstractScheduledEventExecutor.nanoTime();
Runnable scheduledTask = pollScheduledTask(nanoTime);
while (scheduledTask != null) {
if (!taskQueue.offer(scheduledTask)) {
// No space left in the task queue add it back to the scheduledTaskQueue so we pick it up again.
scheduledTaskQueue().add((ScheduledFutureTask<?>) scheduledTask);
return false;
}
scheduledTask = pollScheduledTask(nanoTime);
}
return true;
}
複製代碼
能夠看到,netty在把任務從scheduledTaskQueue轉移到taskQueue的時候仍是很是當心的,當taskQueue沒法offer的時候,須要把從scheduledTaskQueue裏面取出來的任務從新添加回去
從scheduledTaskQueue從拉取一個定時任務的邏輯以下,傳入的參數nanoTime
爲當前時間(實際上是當前納秒減去ScheduledFutureTask
類被加載的納秒個數)
protected final Runnable pollScheduledTask(long nanoTime) {
assert inEventLoop();
Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;
ScheduledFutureTask<?> scheduledTask = scheduledTaskQueue == null ? null : scheduledTaskQueue.peek();
if (scheduledTask == null) {
return null;
}
if (scheduledTask.deadlineNanos() <= nanoTime) {
scheduledTaskQueue.remove();
return scheduledTask;
}
return null;
}
複製代碼
能夠看到,每次 pollScheduledTask
的時候,只有在當前任務的截止時間已經到了,纔會取出來
Runnable task = pollTask();
//...
final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
long runTasks = 0;
long lastExecutionTime;
複製代碼
這一步將取出第一個任務,用reactor線程傳入的超時時間 timeoutNanos
來計算出當前任務循環的deadline,而且使用了runTasks
,lastExecutionTime
來時刻記錄任務的狀態
for (;;) {
safeExecute(task);
runTasks ++;
if ((runTasks & 0x3F) == 0) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
if (lastExecutionTime >= deadline) {
break;
}
}
task = pollTask();
if (task == null) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
break;
}
}
複製代碼
這一步即是netty裏面執行全部任務的核心代碼了。 首先調用safeExecute
來確保任務安全執行,忽略任何異常
protected static void safeExecute(Runnable task) {
try {
task.run();
} catch (Throwable t) {
logger.warn("A task raised an exception. Task: {}", task, t);
}
}
複製代碼
而後將已運行任務 runTasks
加一,每隔0x3F
任務,即每執行完64個任務以後,判斷當前時間是否超過本次reactor任務循環的截止時間了,若是超過,那就break掉,若是沒有超過,那就繼續執行。能夠看到,netty對性能的優化考慮地至關的周到,假設netty任務隊列裏面若是有海量小任務,若是每次都要執行完任務都要判斷一下是否到截止時間,那麼效率是比較低下的
afterRunningAllTasks();
this.lastExecutionTime = lastExecutionTime;
複製代碼
收尾工做很簡單,調用一下 afterRunningAllTasks
方法
@Override
protected void afterRunningAllTasks() {
runAllTasksFrom(tailTasks);
}
複製代碼
NioEventLoop
能夠經過父類SingleTheadEventLoop
的executeAfterEventLoopIteration
方法向tailTasks
中添加收尾任務,好比,你想統計一下一次執行一次任務循環花了多長時間就能夠調用此方法
public final void executeAfterEventLoopIteration(Runnable task) {
// ...
if (!tailTasks.offer(task)) {
reject(task);
}
//...
}
複製代碼
this.lastExecutionTime = lastExecutionTime;
簡單記錄一下任務執行的時間,搜了一下該field的引用,發現這個field並無使用過,只是每次不停地賦值,賦值,賦值...,改天再去向netty官方提個issue...
reactor線程第三曲到了這裏基本上就給你講完了,若是你讀到這以爲很輕鬆,那麼恭喜你,你對netty的task機制已經很是比較熟悉了,也恭喜一下我,把這些機制給你將清楚了。咱們最後再來一次總結,以tips的方式
若是你想系統地學Netty,個人小冊《Netty 入門與實戰:仿寫微信 IM 即時通信系統》能夠幫助你,若是你想系統學習Netty原理,那麼你必定不要錯過個人Netty源碼分析系列視頻:coding.imooc.com/class/230.h…