netty源碼解解析(4.0)-6 線程模型-IO線程EventLoopGroup和NIO實現(一)

接口定義
io.netty.channel.EventLoopGroup extends EventExecutorGroup
方法
說明
ChannelFuture register(Channel channel)
把一個channel註冊到一個EventLoop
ChannelFuture register(Channel channel, ChannelPromise promise);
同上
io.netty.channel.EventLoop extends OrderedEventExecutor, EventLoopGroup
方法
說明
EventLoopGroup parent()
獲得建立這個eventLoop的EventLoopGroup
EventLoopGroup定義的主要方法是register, 這個方法的語義是把channel和eventLoop綁定在一塊兒。一個channel對應一個eventLoop, 一個eventLoop會持有多個channel。
I/O線程EventLoopGroup的抽象實現
io.netty.channel.MultithreadEventLoopGroup extends MultithreadEventExecutorGroup implements EventLoopGroup
io.netty.channel.SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop
兩個類主功能都是實現了EventLoopGroup定義的register方法
MultithreadEventLoopGroup
public ChannelFuture register(Channel channel) {
return next().register(channel);
}
public ChannelFuture register(Channel channel, ChannelPromise promise) {
return next().register(channel, promise);
}
SingleThreadEventLoop
public ChannelFuture register(Channel channel) {
return register(channel, new DefaultChannelPromise(channel, this));
}
public ChannelFuture register(final Channel channel, final ChannelPromise promise) {
channel.unsafe().register(this, promise);
return promise;
}
register的實現主要是爲了調用Channel.Unsafe實例的register方法。
NIO實現
io.netty.channel.nio.NioEventLoopGroup extends MultithreadEventLoopGroup
io.netty.channel.nio.NioEventLoop extends SingleThreadEventLoop
NioEventLoopGroup是在MultithreadEventLoopGroup基礎上實現了對JDK NIO Selector的封裝, 它實現如下幾個功能:
  • 建立selector
  • 在selector上註冊channel感興趣的NIO事件
  • 實現EventExecutor的run方法,定義NIO事件和Executor任務的處理流程。
  • 把NIO事件轉換成對channel unsafe的調用或NioTask的調用
  • 控制線程執行I/O操做和排隊任務的用時比例
  • 處理epoll selector cpu 100%的bug
下面來具體分析這幾個功能的實現。
建立Selector
NioEventLoop#openSelector()實現了建立selector的功能,默認狀況下,使用SelectorProvider#openSelector()方法建立一個新個selector:
final Selector unwrappedSelector = provider.openSelector();
若是設置環境變量io.netty.noKeySetOptimization=true, 會建立一個selectedKeySet = new SelectedSelectionKeySet(), 而後使用java的反射機制把selector的selectedKeys和publicSelectedKeys替換成selectedKeySet,具體步驟是:
1.獲得selector的真正類型: sun.nio.ch.SelectorImpl
Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() {
@Override
public Object run() {
try {
return Class.forName(
" sun.nio.ch.SelectorImpl",
false,
PlatformDependent.getSystemClassLoader());
} catch (Throwable cause) {
return cause;
}
}
});
final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass;
2.替換selector是屬性unwrappedSelector
Field selectedKeysField = selectorImplClass.getDeclaredField(" selectedKeys");
Field publicSelectedKeysField = selectorImplClass.getDeclaredField(" publicSelectedKeys");
selectedKeysField.set(unwrappedSelector, selectedKeySet);
publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);
之因此會設計一個這樣的優化選項,是由於通常狀況下調用完selector的select或selectNow方法後須要調用Selector#selectedKeys()獲得觸發NIO事件的的SelectableChannel,這樣優化以後,能夠直接從selectedKeySet中獲得已經觸發了NIO事件的SelectableChannel。
在selector上註冊channel感興趣的NIO事件
NioEventLoop提供了unwrappedSelector方法,這個方法返回了它建立好的Selector實例。這樣任何的外部類均可以把任意的SelectableChannel註冊到這selector上。在AbstractNioChannel中, doRegister方法的實現就是使用了這個方法:
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
另外,它還提供了一個register方法:
public void register(final SelectableChannel ch, final int interestOps, final NioTask<?> task)
這個方法會把task當成SelectableChannel的附件註冊到selector上:
ch.register(selector, interestOps, task);
實現EventExecutor的run方法,定義NIO事件和Executor任務的處理流程
在NioEventLoop的run方法中實現NIO事件和EventExecutor的任務處理邏輯,這個run方法在io.netty.util.concurrent.SingleThreadEventExecutor中定義。在上一章中,咱們看到了DefaultEventExecutor中是如何實現這個run方法的,這裏咱們將要看到這run方法的另外一個實現。和SingleThreadEventExecutor中的run方法相比,NioEventLoop的run方法不只要及時地執行taskQueue中的任務,還要能及時地處理NIO事件,所以它會同時檢查selector中的NIO事件和和taskQueue隊列,任何一箇中有事件須要處理或有任務須要執行,它不會阻塞線程。同時它也保證了在沒有NIO事件和任務的狀況下線程不會無謂的空轉浪費CUP資源。
run主要實現以下,爲了更清晰的說明它的主要功能,我對原來的代碼進行了一些刪減。
for(;;){
try{
/ /phase1: 同時檢查NIO事件和任務
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.SELECT:
select(wakenUp.getAndSet(false)); // 在taskQueue中沒有任務的時候執行select
}
// phase2: 進入處理NIO事件,執行executor任務
try{
// 處理NIO事件
processSelectedKeys();
}finally{
// 處理taskQueu中的任務
runAllTasks();
}
}catch(Throwable t){
handleLoopException(t);
}
}
run方法有兩個階段構成:
phase1: 檢查NIO事件或executor任務,若是有任何的NIO事件或executor任務進入phase2。
這樣階段的主要工做在selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())和select中完成。
selectStrategy.calculateStrategy實現
selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())
這行代碼的含義是: 若是hasTasks() == true, 調用如下selector#selectNow, 而後進入phase2。 不然調用select。這裏使用了strategy模式,默認的strategy實現是io.netty.channe.DefaultSelectStrategy implements SelectStrategy
@Override
public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception {
return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT;
}
DefaultSelectStrategy實現了SelectStrategy接口,這接口定義了兩個常量:
int SELECT = -1;
int CONTINUE = -2;
運行時selectSuppler參數傳入的是selectNowSupplier, 它的實現以下:
private final IntSupplier selectNowSupplier = new IntSupplier() {
@Override
public int get() throws Exception {
return selectNow();
}
};
這裏的get方法調用了selectNow, selectNow調用的是Selector#selectNew方法,這個方法的返回值是>=0。
hashTasks的傳入的參數是hasTask()的返回值: return !taskQueue.isEmpty();
代碼讀到這裏就會發現,使用默認的的SelectStrategy實現,calculateStrategy在hasTasks()==true時返回值>=0, hasTasks() == false時返回值是SelectStrategy.SELECT,不會返回SelectStrategy.CONTINUE。
select實現
select的執行邏輯是:
1. 計算超select方法的結束時間selectDeadLineNanos
long currentTimeNanos = System.nanoTime();
long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
2. 進入循環,檢查超時--超時跳出循環。
long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
if (timeoutMillis <= 0) {
if (selectCnt == 0) {
selector.selectNow();
selectCnt = 1;
}
break;
}
3. 若是在select執行過程當中有executor任務提交或能夠當前的wakeUp由false變成true, 跳出循環
if (hasTasks() && wakenUp.compareAndSet(false, true)) {
selector.selectNow();
selectCnt = 1;
break;
}
4. 調用selector#select等待NIO事件。
int selectedKeys = selector.select(timeoutMillis);
selectCnt ++;
5. 若是知足這些條件的任何一個,跳出循環: 有NIO事件、wakeUp的新舊值都是true、taskQueue中有任務、有定時任務到期。
if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
break;
}
6. 若是線程被中斷,跳出循環。
if (Thread.interrupted()) {
break;
}
7. 若是selector.select超時,沒有檢查到任何NIO事件, 會在下次循環開始時跳出循環。 若是每次超時,跳到第2步繼續下一次循環。
long time = System.nanoTime();
if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
selectCnt = 1;
}
currentTimeNanos = time;
select 最遲會在當前時間>= selectDeadLineNanos時返回,這個時間是最近一個到期的定時任務執行的時間,換言之若是沒有任何的NIO事件或executor任務,select會在定時任務到期時返回。若是沒有定時任務,delayNanos(currentTimeNanos)返回的值是 TimeUnit.SECONDS.toNanos(1),即1秒。 select會在檢查到任何NIO事件或executor任務時返回,爲了保證這點,在selector.select(timeoutMillis)先後都會調用hasTasks檢查executor任務,爲了能在調用executet提交任務時喚醒selector.select,NioEventLoop覆蓋了SingleThreadEventExecutor的wake方法:
protected void wakeup(boolean inEventLoop) {
if (!inEventLoop && wakenUp.compareAndSet(false, true)) {
selector.wakeup();
}
}
這個方法會及時的喚醒selector.select, 保證新提交的任務能夠獲得及時的執行。
phase2: 進入處理NIO事件,執行executor任務
這個階段是先調用processSelectedKeys()處理NIO事件,而後掉用 runAllTasks()處理全部已經到期的定時任務和已經在排隊的任務。這個階段還實現了NIO事件和executor任務的用時比例管理,這個特性稍後會詳細分析。
相關文章
相關標籤/搜索