NioEventLoop啓動觸發條件:java
1.服務端綁定本地端口bootstrap
2.新鏈接接入經過chooser綁定一個NioEventLoop數組
服務端綁定本地端口promise
綁定本地端口,使用下面方法;安全
ChannelFuture future = bootstrap.bind(host, port).sync();
最終會調用doBind0()方法:異步
private static void doBind0(final ChannelFuture regFuture, final Channel channel, final SocketAddress localAddress, final ChannelPromise promise) { channel.eventLoop().execute(new Runnable() { public void run() { if(regFuture.isSuccess()) { channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } else { promise.setFailure(regFuture.cause()); } } }); }
這個時候就會調用channel對應NioEventLoop的execute方法,會判斷是否在當前的eventloop對應的thread中,若是在,直接向任務隊列中添加綁定端口的任務,若是不在,首先要start當前eventLoop對應的thread,再將任務放到任務隊列中。這裏的excute(task)方法,並非讓線程直接執行它,而是將它放到線程的任務隊列中,等待線程去執行它。ide
public void execute(Runnable task) { if(task == null) { throw new NullPointerException("task"); } else { boolean inEventLoop = this.inEventLoop(); if(inEventLoop) { this.addTask(task); } else { this.startThread(); this.addTask(task); if(this.isShutdown() && this.removeTask(task)) { reject(); } } if(!this.addTaskWakesUp && this.wakesUpForTask(task)) { this.wakeup(inEventLoop); } } }
這裏會調用startThread去啓動一個線程,首先會根據狀態判斷線程是否建立成功,不然使用CAS去建立線程,並調用一個doStartThread去建立一個FastThreadLocalThread,而且這個函數會將一個NioEventLoop與一個thread進行綁定。函數
private void startThread() { if (this.state == 1 && STATE_UPDATER.compareAndSet(this, 1, 2)) { this.doStartThread(); } }
NioEventLoop線程執行邏輯oop
NioEventLoop對應線程的run方法,run()方法裏面是一個死循環,主要的邏輯是首先採用select檢查是否有IO事件,若是有IO事件,就採用processSelectedKey()對IO事件進行處理,最後調用runAllTasks()處理任務隊列中的任務。fetch
protected void run() { while(true) { boolean oldWakenUp = this.wakenUp.getAndSet(false); try { if(this.hasTasks()) { this.selectNow(); } else { this.select(oldWakenUp); if(this.wakenUp.get()) { this.selector.wakeup(); } } this.cancelledKeys = 0; this.needsToSelectAgain = false; int t = this.ioRatio; if(t == 100) { this.processSelectedKeys(); this.runAllTasks(); } else { long e = System.nanoTime(); this.processSelectedKeys(); long ioTime = System.nanoTime() - e; this.runAllTasks(ioTime * (long)(100 - t) / (long)t); } if(this.isShuttingDown()) { this.closeAll(); if(this.confirmShutdown()) { return; } } } catch (Throwable var8) { logger.warn("Unexpected exception in the selector loop.", var8); try { Thread.sleep(1000L); } catch (InterruptedException var7) { ; } } } }
這段代碼中的ioRadio是控制執行IO事件和執行任務隊列中的任務的一個事件比,默認是50,表明執行IO事件處理和執行任務隊列的任務事件比是1:1。
1)使用select檢測IO事件
經過Selector的select()方法能夠選擇已經準備就緒的通道 (這些通道包含你感興趣的的事件)。好比你對讀就緒的通道感興趣,那麼select()方法就會返回讀事件已經就緒的那些通道。Java中的Selector幾個重載的select()方法:
select()方法返回的int值表示有多少通道已經就緒,是自上次調用select()方法後有多少通道變成就緒狀態。以前在select()調用時進入就緒的通道不會在本次調用中被記入,而在前一次select()調用進入就緒但如今已經不在處於就緒的通道也不會被記入。例如:首次調用select()方法,若是有一個通道變成就緒狀態,返回了1,若再次調用select()方法,若是另外一個通道就緒了,它會再次返回1。若是對第一個就緒的channel沒有作任何操做,如今就有兩個就緒的通道,但在每次select()方法調用之間,只有一個通道就緒了。
一旦調用select()方法,而且返回值不爲0時,則能夠經過調用Selector的selectedKeys()方法來訪問已選擇鍵集合 。以下:
Set selectedKeys=selector.selectedKeys();
Netty中首先判斷任務隊列是否爲空,若是爲空的話,就採用select(ltimeout)有超時設置的阻塞方法,若是不爲空的話,就調用非阻塞的selectNow()方法,由於即便沒有IO事件處理,也能夠對任務隊列中的任務進行處理。Netty中NioEventLoop的select和selectNow方法其實底層仍是依靠selector的select方法。
void selectNow() throws IOException { try { this.selector.selectNow(); } finally { if(this.wakenUp.get()) { this.selector.wakeup(); } } } private void select(boolean oldWakenUp) throws IOException { Selector selector = this.selector; try { int e = 0; long currentTimeNanos = System.nanoTime(); long selectDeadLineNanos = currentTimeNanos + this.delayNanos(currentTimeNanos); while(true) { long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L; if(timeoutMillis <= 0L) { if(e == 0) { selector.selectNow(); e = 1; } break; } int selectedKeys = selector.select(timeoutMillis); ++e; if(selectedKeys != 0 || oldWakenUp || this.wakenUp.get() || this.hasTasks() || this.hasScheduledTasks()) { break; } if(Thread.interrupted()) { if(logger.isDebugEnabled()) { logger.debug("Selector.select() returned prematurely because Thread.currentThread().interrupt() was called. Use NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop."); } e = 1; break; } long time = System.nanoTime(); if(time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) { e = 1; } else if(SELECTOR_AUTO_REBUILD_THRESHOLD > 0 && e >= SELECTOR_AUTO_REBUILD_THRESHOLD) { logger.warn("Selector.select() returned prematurely {} times in a row; rebuilding selector.", Integer.valueOf(e)); this.rebuildSelector(); selector = this.selector; selector.selectNow(); e = 1; break; } currentTimeNanos = time; } if(e > 3 && logger.isDebugEnabled()) { logger.debug("Selector.select() returned prematurely {} times in a row.", Integer.valueOf(e - 1)); } } catch (CancelledKeyException var13) { if(logger.isDebugEnabled()) { logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector - JDK bug?", var13); } } }
能夠看到調用selectNow方法是直接調用java nio的select.selectNow方法,而Netty的select方法中有一個參數oldWakeUp記錄當前操做是不是喚醒狀態(不太清楚這個喚醒狀態的做用),每次進行select操做以前,會將其標誌位false,表示要進行select操做,並且是未喚醒狀態。
Netty中的select方法首先是根據當前時間時間去計算截止時間,這裏使用到了超時隊列(超時隊列的做用也不太清楚),而後根據截止時間去計算超時時間,若是超時時間小於0,就執行selectNow操做,並退出這次select操做,不然執行帶有超時時間的select方法,若是返回的selectKey不等於0,也就是有channel在select上註冊了,或者該select操做被喚醒了(?),或者任務隊列中有了任務,定時任務隊列中有了任務,都會break出來。
接下來的代碼邏輯是避免JDK空輪詢的,當JDK發生了空輪訓,select會直接返回,這時並無IO事件到達,也沒有超過超時時間,這樣會致使線程進入死循環,CPU利用率飆升至100%,JDK到如今也並無解決這個問題。
而Netty是經過記錄空輪詢的次數,若是這個次數達到了一個上限,上限默認是512,那麼就新建一個selector,將註冊在老selector上的channel註冊到新的selector上,而且關閉老的selector,將新的selector替代老的selector。Netty經過rebuildSelector方法重建selector。
public void rebuildSelector() { if(!this.inEventLoop()) { this.execute(new Runnable() { public void run() { NioEventLoop.this.rebuildSelector(); } }); } else { Selector oldSelector = this.selector; if(oldSelector != null) { Selector newSelector; try { newSelector = this.openSelector(); } catch (Exception var9) { logger.warn("Failed to create a new Selector.", var9); return; } int nChannels = 0; label69: while(true) { try { Iterator t = oldSelector.keys().iterator(); while(true) { if(!t.hasNext()) { break label69; } SelectionKey key = (SelectionKey)t.next(); Object a = key.attachment(); try { if(key.isValid() && key.channel().keyFor(newSelector) == null) { int e = key.interestOps(); key.cancel(); SelectionKey var14 = key.channel().register(newSelector, e, a); if(a instanceof AbstractNioChannel) { ((AbstractNioChannel)a).selectionKey = var14; } ++nChannels; } } catch (Exception var11) { logger.warn("Failed to re-register a Channel to the new Selector.", var11); if(a instanceof AbstractNioChannel) { AbstractNioChannel var13 = (AbstractNioChannel)a; var13.unsafe().close(var13.unsafe().voidPromise()); } else { NioTask task = (NioTask)a; invokeChannelUnregistered(task, key, var11); } } } } catch (ConcurrentModificationException var12) { ; } } this.selector = newSelector; try { oldSelector.close(); } catch (Throwable var10) { if(logger.isWarnEnabled()) { logger.warn("Failed to close the old Selector.", var10); } } logger.info("Migrated " + nChannels + " channel(s) to the new Selector."); } } }
2)processSelectedKey()
netty中selectedKey的優化
經過調用Selector的selectedKeys()方法來訪問已選擇鍵集合,此時返回的是HashSet。可是netty是經過反射的方式,將HashSet替換成數組pssSelectedKeysOptimized去處理IO事件。
private Selector openSelector() { AbstractSelector selector; try { selector = this.provider.openSelector(); } catch (IOException var7) { throw new ChannelException("failed to open a new selector", var7); } if(DISABLE_KEYSET_OPTIMIZATION) { return selector; } else { try { SelectedSelectionKeySet t = new SelectedSelectionKeySet(); Class selectorImplClass = Class.forName("sun.nio.ch.SelectorImpl", false, PlatformDependent.getSystemClassLoader()); if(!selectorImplClass.isAssignableFrom(selector.getClass())) { return selector; } Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys"); Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys"); selectedKeysField.setAccessible(true); publicSelectedKeysField.setAccessible(true); selectedKeysField.set(selector, t); publicSelectedKeysField.set(selector, t); this.selectedKeys = t; logger.trace("Instrumented an optimized java.util.Set into: {}", selector); } catch (Throwable var6) { this.selectedKeys = null; logger.trace("Failed to instrument an optimized java.util.Set into: {}", selector, var6); } return selector; } }
首先會調用JDK的openSelector方法返回建立的selector,而後會判斷是否要對keySet進行優化,經過判斷DISABLE_KEYSET_OPTIMIZATION,是否要對keyset進行優化,默認是要對keyset進行優化的。這裏的SelectedSelectionKeySet是優化事後的keyset,底層是經過兩個數組加上兩個數組的大小進行實現的,這樣可使得add操做達到o(1)的時間複雜度(可是是HashSet的add操做時間複雜度不也是o(1))嘛,
processSelectedKey調用processSelectedKeysOptimized
該方法的流程就是遍歷數組中全部的selectedKey,一旦遍歷完,就將該引用指向爲空。獲取每個selectorKey對應的channel,而後經過調用processSelectedKey去處理該channel上感興趣的事件。
private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) { int i = 0;
//遍歷SelectedKsys while(true) { SelectionKey k = selectedKeys[i]; if(k == null) { return; } selectedKeys[i] = null;
//獲取selectKey對應的channel Object a = k.attachment(); if(a instanceof AbstractNioChannel) { processSelectedKey(k, (AbstractNioChannel)((AbstractNioChannel)a)); } else { NioTask task = (NioTask)a; processSelectedKey(k, (NioTask)task); } if(this.needsToSelectAgain) { while(selectedKeys[i] != null) { selectedKeys[i] = null; ++i; } this.selectAgain(); selectedKeys = this.selectedKeys.flip(); i = -1; } ++i; } }
這裏處理selector上面的IO事件,底層其實都是經過channel的unsafe類進行操做的,這裏read和accept事件對應的都是channel的read方法。
private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) { AbstractNioChannel.NioUnsafe unsafe = ch.unsafe(); if(!k.isValid()) { unsafe.close(unsafe.voidPromise()); } else { try { int ignored = k.readyOps();
//若是是read或者accept事件就對channel進行讀操做 if((ignored & 17) != 0 || ignored == 0) { unsafe.read(); if(!ch.isOpen()) { return; } }
//write事件 if((ignored & 4) != 0) { ch.unsafe().forceFlush(); }
//connect事件 if((ignored & 8) != 0) { int ops = k.interestOps(); ops &= -9; k.interestOps(ops); unsafe.finishConnect(); } } catch (CancelledKeyException var5) { unsafe.close(unsafe.voidPromise()); } } }
3)使用runAllTasks()執行任務隊列中的事件
定時任務隊列是一個PriorityQueue(優先級隊列),定時的任務的排序是按照任務的截止時間排序的,也是非線程安全的隊列。
private <V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) { if(task == null) { throw new NullPointerException("task"); } else { if(this.inEventLoop()) { this.delayedTaskQueue.add(task); } else { this.execute(new Runnable() { public void run() { SingleThreadEventExecutor.this.delayedTaskQueue.add(task); } }); } return task; } }
runAllTask首先從定時任務隊列中拉取定時任務,將須要執行的定時任務加入到普通任務隊列中,並計算截止時間,而後循環的從普通任務隊列中拉取任務,並執行任務,這裏判斷是否到達超時時間,是每相隔64個任務,就判斷是否到達最大任務執行時間。爲啥要每隔64個任務判斷是否超時呢?由於nanoTime也是比較費時的。
protected boolean runAllTasks(long timeoutNanos) { this.fetchFromDelayedQueue(); Runnable task = this.pollTask(); if(task == null) { return false; } else { long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos; long runTasks = 0L; long lastExecutionTime; while(true) { try { task.run(); } catch (Throwable var11) { logger.warn("A task raised an exception.", var11); } ++runTasks; if((runTasks & 63L) == 0L) { lastExecutionTime = ScheduledFutureTask.nanoTime(); if(lastExecutionTime >= deadline) { break; } } task = this.pollTask(); if(task == null) { lastExecutionTime = ScheduledFutureTask.nanoTime(); break; } } this.lastExecutionTime = lastExecutionTime; return true; } }
從定時隊列中拉取任務,這裏拉取的任務是拉取截止時間不超過nanoTime的任務,將任務從定時任務隊列中刪除,將任務加入到普通任務隊列中。這個while循環執行完成以後,全部須要執行的定時任務所有都加入到普通任務隊列中。
private void fetchFromDelayedQueue() { long nanoTime = 0L; while (true) { ScheduledFutureTask delayedTask = (ScheduledFutureTask) this.delayedTaskQueue.peek(); if (delayedTask == null) { break; } if (nanoTime == 0L) { nanoTime = ScheduledFutureTask.nanoTime(); } if (delayedTask.deadlineNanos() > nanoTime) { break; } this.delayedTaskQueue.remove(); this.taskQueue.add(delayedTask); } }
定時任務隊列是一個優先級隊列,隊列按照優先級進行排序,這裏的優先級是每一個任務的截止時間,隊列是按照截止時間的遲早對任務進行排序的。
public int compareTo(Delayed o) { if(this == o) { return 0; } else { ScheduledFutureTask that = (ScheduledFutureTask)o; long d = this.deadlineNanos() - that.deadlineNanos(); if(d < 0L) { return -1; } else if(d > 0L) { return 1; } else if(this.id < that.id) { return -1; } else if(this.id == that.id) { throw new Error(); } else { return 1; } } }
總結:
1.默認狀況下,NioEventLoopGroup會建立2*cpu個數的線程池,在調用NioEventLoop.execute(task)的時候,若是當前的NioEventLoop沒有建立本身的線程,就會建立線程。
2.Netty如何解決JDK空輪訓bug?經過計算空輪訓操做的個數,這裏的空輪訓的判斷是既沒有IO事件的到達,也沒有達到超時時間,若是空輪訓的個數超過閾值(512),就會新建一個selector,將舊selector的selectorKey註冊到新的selector上,將舊的selector關閉,用新的selector替代舊的selector。
3.Netty在全部外部線程調用NioEventLoop的操做時,若是經過InEventLoop判斷是否在NioEventLoop所屬的線程,若是不在經過startThread啓動NioEventLoop的線程,而且將任務添加到NioEventLoop的任務隊列中,全部NioEventLoop對應一個線程,其中的操做只會被一個線程所執行,實現了異步串行無鎖化。
4.當NioEventLoop第一次調用execute()方法,會新建一個FastThreadLocalThread與NioEventLoop綁定。