netty源碼解解析(4.0)-5 線程模型-EventExecutorGroup框架

上一章講了EventExecutorGroup的總體結構和原理,這一章咱們來探究一下它的具體實現。
EventExecutorGroup和EventExecutor接口
io.netty.util.concurrent.EventExecutorGroup
java.util.concurrent.ScheduledExecutorService
EventExecutorGroup繼承了ScheduledExecutorService接口,它本身定義了以下的新方法
方法
說明
EventExecutor next()
取出一個EventExecutor, 這個方法要實現派發任務的策略。
Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit);
優雅地關閉這個executor, 一旦這個方法被調用,isShuttingDown()方法老是老是返回true。和 shutdown方法不一樣,這個方法須要確保在關閉的平靜期(由quietPeriod參數決定)沒有新的任務被提交,若是平靜期有新任務提交,它會接受這個任務,同時停止關閉動做,等任務執行完畢後重新開始關閉流程。
Future<?> shutdownGracefully()
shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit)快捷調用方式。
boolean isShuttingDown()
檢查是否已經調用了shutdownGracefully或shutdown方法。
io.netty.util.concurrent.EventExecutor implement EventExecutorGroup
EventExecutor定義的接口以下
方法
說明
boolean inEventLoop()
若是當前線程是這個Executor返回true
boolean inEventLoop(Thread thread)
若是thread是這個Executor的線程返回true
EventExecutorGroup parent()
返回持有這個Executor的EventExecutorGroup
<V> Promise<V> newPromise()
建立一個新的Promise實例
<V> ProgressivePromise<V> newProgressivePromise()
建立一個新的ProgressivePromise實例
<V> Future<V> newSucceededFuture(V result);
建立一個標記爲success的Future實例,Future#isSuccess()返回true
<V> Future<V> newFailedFuture(Throwable cause)
建立一個標記爲failed的Future實例,Future#isSuccess()返回false
抽象實現AbstractEventExecutorGroup和AbstractEventExecutor
io.netty.util.concurrent.AbstractEventExecutorGroup implement EventExecutorGroup
AbstractEventExecutorGroup實現了EventExecutorGroup接口,它實現方法的形式爲:
XXX(){
next().XXX()
}
如:execute方法的實現爲
public void execute(Runnable command) {
next().execute(command);
}
這裏實現了EventExecutorGroup派發任務的方式,使用next方法取出一EventExecutor, 而後把任務提交給這個executor。其餘提交認任務的方法實submit, schedule, scheduleAtFixedRate, scheduleWithFixedDelay, invokeAll, invokeAny都和這個相似。
io.netty.util.concurrent.AbstractEventExecutor extends AbstractExecutorService implements EventExecutor
形如newXXX的方法,直接new一個JDK提供的類型的實例返回, 如:
public <V> Promise<V> newPromise() {
return new DefaultPromise<V>(this);
}
sumbit方法是調用AbstractExecutorService的實現。
不支持schedule, scheduleAtFixedRate, scheduleWithFixedDelay方法,這幾個方法都會拋出UnsupportedOperationException異常。
多線程實現MultithreadEventExecutorGroup和SingleThreadEventExecutor
io.netty.util.concurrent.MultithreadEventExecutorGroup extends AbstractEventExecutorGroup
MultithreadEventExecutorGroup 主要實現了一下兩個方面的功能:
  1. EventExecutor管理: 建立, 結束SingleThreadEventExecutor,EventExecutor的數據是固定的,由傳入的參數決定。
  2. 任務派發策略: 實現了EventExecutor選擇器,next方法使選擇器選中一個Executor。
這個類的核心功能都在它的構造方法中實現, 構造方法有三個參數:
protected MultithreadEventExecutorGroup(int nThreads, ThreadFactory threadFactory, Object... args).
nThreads: 線程數,即SingleThreadEventExecutor的數量,
threadFactory: 線程工程,傳遞給SingleThreadEventExecutor實例,SingleThreadEventExecutor使用它建立一個工做線程。
args: 傳遞給SingleThreadEventExecutor工做線程的參數。
構造方法主要乾了兩件事:
1. 建立SingleThreadEventExecutor
children = new SingleThreadEventExecutor[nThreads];
for (int i = 0; i < nThreads; i ++) {
children[i] = newChild(threadFactory, args);
}
它把建立的SingleThreadEventExecutor實例放在children屬性中維護。 newChild是個抽象方法,須要子類實現。
2. 建立選擇器
if (isPowerOfTwo(children.length)) {
chooser = new PowerOfTwoEventExecutorChooser();
} else {
chooser = new GenericEventExecutorChooser();
}
MultithreadEventExecutorGroup內部實現兩種類型的選擇器,PowerOfTwoEventExecutorChooser--chooserA, GenericEventExecutorChooser--chooserB, 當線程數是2^n時使用chooserA, 不然使用chooserB。選擇器的實現使用了一點小技巧,從本質上講,這兩種選擇器都是使用取模輪詢的方式選擇下一個executor, 不一樣的是當線程數(children的長度)爲2^n時能夠把取模運算優化成位運算,性能比位運算要好一些。下面是兩個選擇器的算法:
chooserA: children[childIndex.getAndIncrement() & children.length - 1], 當children.length == 2^n時,它等價於 children[Math.abs(childIndex.getAndIncrement() % children.length)
chooserB: children[Math.abs(childIndex.getAndIncrement() % children.length)
這裏咱們能夠得出結論, nThreads儘可能設置成2^n(2, 4, 8, 16, 32 ....), 這樣性能會好一些。
io.netty.util.concurrent.SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor
派生關係
SingleThreadEventExecutor
AbstractScheduledEventExecutor
AbstractEventExecutor
SingleThreadEventExecutor實現了一個單線程的Executor, 它使用外部傳進來的ThreadFactory實例建立一個惟一的線程,executor方法把任務放進taskQueue中,線程消費taskQueue中排隊的任務。這個executor不只要執行由executor提交的任務,還要執行由schedule方法提交定時任務和由invokeAll, invokeAny提交的批量任務。
除了任務呢排隊,這類還實現了一個重要的功能--gracefulShutdown, 優雅地關閉。
下面來詳細分析這些功能的實現。
狀態:
ST_NOT_STARTED = 1: 初始狀態,SingleThreadEventExecutor 實例被建立時處於這個狀態,這個時候只是建立了一個線程,這個線程尚未運行。
ST_STARTED = 2: 運行狀態,ST_NOT_STARTED時,提交的第一個任務會把它變成這個狀態,線程已經開發運行。
ST_SHUTTING_DOWN = 3: 正在執行關閉操做。線程主循環run方法返回或拋出異常,或調用shutdownGracefully 都會變成這個狀態。
ST_SHUTDOWN = 4: 已經關閉。調用shutdown會變成這個狀態。
ST_TERMINATED = 5: 已經結束。這個是最終狀態,ST_SHUTTING_DOWN和ST_SHUTDOWN 狀態的過程執行完畢後會變成這個狀態。
狀態斷定方法
是否處於SHUTTING_DOWN狀態
public boolean isShuttingDown() {return state >= ST_SHUTTING_DOWN;}
是否處於SHUTDOWN狀態
public boolean isShutDown() {return state >= ST_SHUT_DOWN;}
實時任務排隊:
public方法execute, 是提供給用戶提交實時任務的方法,它的調用棧以下:
execute
addTask
offerTask
taskQueue.offer
execute最終會調用taskQueue的offer方法把任務放到隊列中排隊,在此以前,若是檢測處處於SHUTDOWN狀態,就拒絕這個任務,或offer失敗也會拒絕任務。
定時任務排隊:
用戶調用schedule把定時任務到scheduledTaskQueue隊列中,這個隊列是PriorityQueue類型的實例,他是一個優先級隊列。在線程的主循環run中,會調用takeTask,taskTask會優先調用peekScheduledTask,看一看scheduledTaskQueue有沒有定時任務,若是有就嘗試把全部已經到時間的定時任務放到taskQueue中排隊。
批量任務排隊:
批量任務排隊比較簡單,只是簡單地對invokeAll或invokeAny的tasks參數中的全部任務調用一次execute。
取出任務:
takeTask的主要功能是從taskQueue中取出任務,同時它還確保到期的定時任務可以及時地進入taskQueue中排隊。這是一個比較重要的方法,咱們來詳細分析它的實現:
BlockingQueue<Runnable> taskQueue = (BlockingQueue<Runnable>) this.taskQueue;
for (;;) {
ScheduledFutureTask<?> scheduledTask = peekScheduledTask(); // 先看看優先級隊列中是否存在定時任務
if (scheduledTask == null) {
// 若是沒有定時任務,直接從taskQueue中取出一個任務返回
Runnable task = null;
try {
task = taskQueue.take();
if (task == WAKEUP_TASK) {
task = null;
}
} catch (InterruptedException e) {
// Ignore
}
return task;
} else {
// 運行到這裏表示有定時任務
long delayNanos = scheduledTask.delayNanos();
Runnable task = null;
if (delayNanos > 0) {
// 沒有到期的定時任務,
try {
task = taskQueue.poll(delayNanos, TimeUnit.NANOSECONDS);
} catch (InterruptedException e) {
return null;
}
}
if (task == null) {
// 有到期的定時任務,把全部優先級隊列中到期的定時任務放入taskQueue中排隊
fetchFromScheduledTaskQueue();
task = taskQueue.poll();
}
if (task != null) { // 在有定時任務但taskQueue爲空的時候,for循環會一直空轉,直到有定時任務到期纔會跳出
return task;
}
}
}
優雅地關閉:
優雅地關閉是這個類的重要的功能,所謂優雅是指在正在關閉以前要確保已經在taskQueue中排隊的任務都能被執行,在關閉過程當中,若是用戶提交了一個任務,是否提交成功要有明確的反饋,若是一個任務被成功提交,就要確保他最終必定會被執行。
線程的主循環run方法返回的時候,就會觸發優雅關閉的過程。run方法返回肯由多種緣由引發:用戶主動調用了shutdown或shutdownGracefully,run方法拋出異常。執行優雅關閉的過程在confirmShutdown方法中實現,執行這個過程的前提是:
確保當前處於SHUTTINGDOWN狀態即狀態值>=ST_SHUTTING_DOWN
if (!isShuttingDown()) {
return false;
}
確保這個方法在eventLoop線程中執行
if (!inEventLoop()) {
throw new IllegalStateException("must be invoked from an event loop");
}
而後纔是優雅關閉的過程:
清除掉定時任務
cancelScheduledTasks();
若是是第一次嘗試關閉,設置gracefulShutdownStartTime我當前時間
if (gracefulShutdownStartTime == 0) {
gracefulShutdownStartTime = ScheduledFutureTask.nanoTime();
}
把已在隊列中排隊的任務都執行掉。
if (runAllTasks() || runShutdownHooks()) {
檢查當前狀態,若是是關閉狀態:>= ST_SHUTDOWN,已經關閉完成。
if (isShutdown()) {
return true;
}
若是gracefulShutdownQuietPeriod==0表示, 關閉過程沒有安靜期,如今能夠當即結束。
if (gracefulShutdownQuietPeriod == 0) {
return true;
}
執行到這裏,表示關閉過程還沒結束,若是當前狀態是SHUTTINGDOWN向taskQueue中添加一個WAKEUP_TASK, 喚醒在taskQueue阻塞的線程。
wakeup(true);
return false;
}
執行到這裏表示,taskQueue已是空的了,同時執行完了全部的的shutdown hook回調。若是如今已是SHUTDOWN狀態,或者這個關閉過程使用的時間已經超時,表示關閉過程已經完成了。
final long nanoTime = ScheduledFutureTask.nanoTime();
if (isShutdown() || nanoTime - gracefulShutdownStartTime > gracefulShutdownTimeout) {
return true;
}
若是這個方法本次執行的時間沒有超過安靜時間(gracefulShutdownQuietPeriod, 它的值是在調用shutdownGracefully時設置), 100ms以後重新執行關閉過程。
if (nanoTime - lastExecutionTime <= gracefulShutdownQuietPeriod) {
wakeup(true);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
// Ignore
}
return false;
}
return true;
業務線程的默認實現
public class DefaultEventExecutorGroup extends MultithreadEventExecutorGroup
final class DefaultEventExecutor extends SingleThreadEventExecutor
DefaultEventExecutorGroup沒有對MultithreadEventExecutorGroup作任何擴展。
DefaultEventExecutor只是實現了run方法
@Override
protected void run() {
for (;;) {
Runnable task = takeTask();
if (task != null) {
task.run();
updateLastExecutionTime();
}
if (confirmShutdown()) {
break;
}
}
}
這個方法的實現代表,run方法只有在如下3中狀況下跳出:
  1. 用戶主動調用shutdown。
  2. 用戶主動調用shutdownGracefully。
  3. 拋出異常。
相關文章
相關標籤/搜索