NioEventLoop的啓動時機是在服務端的NioServerSocketChannel中的ServerSocketChannel初始化完成,且註冊在NioEventLoop後執行的, 下一步就是去綁定端口,可是在綁定端口前,須要完成NioEventLoop的啓動工做, 由於程序運行到這個階段爲止,依然只有MainThread一條線程,下面就開始閱讀源碼看NioEventLoop如何開啓新的線程自立家門的java
總想說 NioEventLoop的總體結構,像極了這個圖git
的線程開啓之路程序的入口是AbstractBootStrap, 這個抽象的啓動輔助類, 找到它準備綁定端口的doBind0()
private static void doBind0( final ChannelFuture regFuture, final Channel channel, final SocketAddress localAddress, final ChannelPromise promise) { // This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up // the pipeline in its channelRegistered() implementation. // todo 此方法在觸發 channelRegistered() 以前調用, 給用戶一個機會,在 channelRegistered() 中設置pipeline // todo 這是 eventLoop啓動的邏輯 , 下面的Runable就是一個 task任務, 什麼任務的呢? 綁定端口 // todo 進入exeute() System.out.println("00000"); channel.eventLoop().execute(new Runnable() { @Override public void run() { if (regFuture.isSuccess()) { // todo channel綁定端口而且添加了一個listenner channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } else { promise.setFailure(regFuture.cause()); } } }); }
方法, 若是咱們直接使用鼠標點擊進去,會進入java.util.concurrent
接口, 緣由是由於,它是NioEventLoop繼承體系的超頂級接口,見上圖, 咱們進入它的實現類,SingleThreadEventExcutor
, 也就是NioEventLoop
的間接父類, 源碼以下:promise
// todo eventLoop事件循環裏面的task,會在本類SingleThreadEventExecutor裏面: execute() 執行 @Override public void execute(Runnable task) { if (task == null) { throw new NullPointerException("task"); } // todo 一樣判斷當前線程是否是 eventLoop裏面的那條惟一的線程, 若是是的話, 就把當前任務放到任務隊列裏面等着當前的線程執行 // todo ,不是的話就開啓新的線程去執行這個新的任務 // todo , eventLoop一輩子只會綁定一個線程,服務器啓動時只有一條主線程,一直都是在作初始化的工做,並無任何一次start() // todo 因此走的是else, 在else中首先開啓新的線程,然後把任務添加進去 boolean inEventLoop = inEventLoop(); if (inEventLoop) { addTask(task); } else { // todo 開啓線程 , 進入查看 startThread(); // todo 把任務丟進隊列 addTask(task); if (isShutdown() && removeTask(task)) { reject(); } } if (!addTaskWakesUp && wakesUpForTask(task)) { wakeup(inEventLoop); } }
boolean inEventLoop = inEventLoop(); // 方法實現 @Override public boolean inEventLoop(Thread thread) { return thread == this.thread; }
可是發現,主線程並非NioEventLoop惟一綁定的那個線程, 因而他就準備下面兩件事:服務器
private void doStartThread() { assert thread == null; // todo 斷言線程爲空, 而後才建立新的線程 executor.execute(new Runnable() { // todo 每次Execute 都是在使用 默認的線程工廠,建立一個線程並執行 Runable裏面的任務 @Override public void run() { // todo 獲取剛纔建立出來的線程,保存在NioEventLoop中的 thread 變量裏面, 這裏其實就是在進行那個惟一的綁定 thread = Thread.currentThread(); updateLastExecutionTime(); try { // todo 實際啓動線程, 到這裏 NioEventLoop 就啓動完成了; } }
主要作了兩件事第一波高潮來了 1. 調用了NioEventLoop的線程執行器的execute
,這個方法的源碼在下面,能夠看到,excute,其實就是在建立線程, 線程建立完成後,當即把新建立出來的線程看成是NioEventLoop
public final class ThreadPerTaskExecutor implements Executor { private final ThreadFactory threadFactory; public ThreadPerTaskExecutor(ThreadFactory threadFactory) { if (threadFactory == null) { throw new NullPointerException("threadFactory"); } this.threadFactory = threadFactory; } // todo 必須實現 Executor 裏面惟一的抽象方法, execute , 執行性 任務 @Override public void execute(Runnable command) { threadFactory.newThread(command).start(); } }
方法就是真正在幹活的事件循環,可是呢, 在本類中,Run()
呢? 就是NioEventLoop, 它根據本身需求,重寫了這個方法
小結: 到如今,NioEventLoop
, 他是個無限for循環, 主要完成了下面三件事
/** * todo select() 檢查是否有IO事件 * todo ProcessorSelectedKeys() 處理IO事件 * todo RunAllTask() 處理異步任務隊列 */ @Override protected void run() { for (; ; ) { try { switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) { case SelectStrategy.CONTINUE: continue; case SelectStrategy.SELECT: // todo 輪詢IO事件, 等待事件的發生, 本方法下面的代碼是處理接受到的感性趣的事件, 進入查看本方法 select(wakenUp.getAndSet(false)); if (wakenUp.get()) { selector.wakeup(); } default: } cancelledKeys = 0; needsToSelectAgain = false; final int ioRatio = this.ioRatio; // todo 默認50 // todo 若是ioRatio==100 就調用第一個 processSelectedKeys(); 不然就調用第二個 if (ioRatio == 100) { try { // todo 處理 處理髮生的感性趣的事件 processSelectedKeys(); } finally { // Ensure we always run tasks. // todo 用於處理 本 eventLoop外的線程 扔到taskQueue中的任務 runAllTasks(); } } else {// todo 由於ioRatio默認是50 , 因此來else // todo 記錄下開始的時間 final long ioStartTime = System.nanoTime(); try { // todo 處理IO事件 processSelectedKeys(); } finally { // Ensure we always run tasks. // todo 根據處理IO事件耗時 ,控制 下面的runAllTasks執行任務不能超過 ioTime 時間 final long ioTime = System.nanoTime() - ioStartTime; // todo 這裏面有聚合任務的邏輯 runAllTasks(ioTime * (100 - ioRatio) / ioRatio); } } } catch (Throwable t) { handleLoopException(t); } } }
稱做: 基於deadline的任務穿插處理邏輯
下面直接貼出它的源碼:下面的代碼中我寫了一些註解了, 主要是分以下幾步走
,默認是1秒這時可會會出現空輪詢的Bug// todo 循環接受IO事件 // todo 每次進行 select() 操做時, oldWakenUp被標記爲false private void select(boolean oldWakenUp) throws IOException { Selector selector = this.selector; try { ///todo ----------------------------------------- 以下部分代碼, 是 select()的deadLine及任務穿插處理邏輯----------------------------------------------------- // todo selectCnt這個變量記錄了 循環 select的次數 int selectCnt = 0; // todo 記錄當前時間 long currentTimeNanos = System.nanoTime(); // todo 計算出估算的截止時間, 意思是, select()操做不能超過selectDeadLineNanos這個時間, 不讓它一直耗着,外面也可能有任務等着當前線程處理 long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos); // -------for 循環開始 ------- for (; ; ) { // todo 計算超時時間 long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L; if (timeoutMillis <= 0) {// todo 若是超時了 , 而且selectCnt==0 , 就進行非阻塞的 select() , break, 跳出for循環 if (selectCnt == 0) { selector.selectNow(); selectCnt = 1; } break; } // todo 判斷任務隊列中時候還有別的任務, 若是有任務的話, 進入代碼塊, 非阻塞的select() 而且 break; 跳出循環 //todo 經過cas 把線程安全的把 wakenU設置成true表示退出select()方法, 已進入時,咱們設置oldWakenUp是false if (hasTasks() && wakenUp.compareAndSet(false, true)) { selector.selectNow(); selectCnt = 1; break; } ///todo ----------------------------------------- 如上部分代碼, 是 select()的deadLine及任務穿插處理邏輯----------------------------------------------------- ///todo ----------------------------------------- 以下, 是 阻塞式的select() ----------------------------------------------------- // todo 上面設置的超時時間沒到,並且任務爲空,進行阻塞式的 select() , timeoutMillis 默認1 // todo netty任務,如今能夠放心大膽的 阻塞1秒去輪詢 channel鏈接上是否發生的 selector感性的事件 int selectedKeys =; // todo 表示當前已經輪詢了SelectCnt次了 selectCnt++; // todo 阻塞完成輪詢後,立刻進一步判斷 只要知足下面的任意一條. 也將退出無限for循環, select() // todo selectedKeys != 0 表示輪詢到了事件 // todo oldWakenUp 當前的操做是否須要喚醒 // todo wakenUp.get() 可能被外部線程喚醒 // todo hasTasks() 任務隊列中又有新任務了 // todo hasScheduledTasks() 當時定時任務隊列裏面也有任務 if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) { break; } ///todo ----------------------------------------- 如上, 是 阻塞式的select() ----------------------------------------------------- if (Thread.interrupted()) { if (logger.isDebugEnabled()) { logger.debug(" returned prematurely because " + "Thread.currentThread().interrupt() was called. Use " + "NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop."); } selectCnt = 1; break; } // todo 每次執行到這裏就說明,已經進行了一次阻塞式操做 ,而且尚未監聽到任何感興趣的事件,也沒有新的任務添加到隊列, 記錄當前的時間 long time = System.nanoTime(); // todo 若是 當前的時間 - 超時時間 >= 開始時間 把 selectCnt設置爲1 , 代表已經進行了一次阻塞式操做 // todo 每次for循環都會判斷, 當前時間 currentTimeNanos 不能超過預訂的超時時間 timeoutMillis // todo 可是,如今的狀況是, 雖然已經進行了一次 時長爲timeoutMillis時間的阻塞式select了, // todo 然而, 我執行到當前代碼的 時間 - 開始的時間 >= 超時的時間 // todo 可是 若是 當前時間- 超時時間< 開始時間, 也就是說,並無阻塞select, 而是當即返回了, 就代表這是一次空輪詢 // todo 而每次輪詢 selectCnt ++; 因而有了下面的判斷, if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) { // timeoutMillis elapsed without anything selected. selectCnt = 1; } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 && // todo selectCnt若是大於 512 表示cpu確實在空輪詢, 因而rebuild Selector selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) { // The selector returned prematurely many times in a row. // Rebuild the selector to work around the problem. logger.warn( " returned prematurely {} times in a row; rebuilding Selector {}.", selectCnt, selector); // todo 它的邏輯建立一個新的selectKey , 把老的Selector上面的key註冊進這個新的selector上面 , 進入查看 rebuildSelector(); selector = this.selector; // Select again to populate selectedKeys. // todo 解決了Select空輪詢的bug selector.selectNow(); selectCnt = 1; break; } currentTimeNanos = time; } //// -----------for 循環結束 -------------- if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) { if (logger.isDebugEnabled()) { logger.debug(" returned prematurely {} times in a row for Selector {}.", selectCnt - 1, selector); } } } catch (CancelledKeyException e) { if (logger.isDebugEnabled()) { logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?", selector, e); } // Harmless exception - log anyway } }
方法,通過兩次判斷後進入了指定時長的阻塞式輪詢,而咱們常說的空輪詢bug,指的就是原本該阻塞住輪詢,可是卻直接返回了, 在這個死循環中,它的暢通執行極可能使得CPU的使用率飆升, 因而把這種狀況說是jdk的selector的空輪詢的bug
一個分支語句 if(){}else{}
, 首先他記錄下,如今執行判斷時的時間, 而後用下面的公式判斷
當前的時間t1 - 預訂的deadLine截止時間t2 >= 開始進入for循環的時間t3
咱們想, 若是說,上面的阻塞式select(t2)
沒出現任何問題,那麼 我如今來檢驗是否出現了空輪詢是時間t1 = t2+執行其餘代碼的時間, 若是是這樣, 上面的等式確定是成立的, 等式成立說沒bug, 順道把selectCnt = 1;
並無阻塞,而是之間返回了, 那麼如今的時間 t1 = 0+執行其餘代碼的時間, 這時的t1相對於上一個沒有bug的大小,明顯少了一個t2, 這時再用t1-t2 均可能是一個負數, 等式不成立,就進入了else的代碼塊, netty接着判斷,是不是真的在空輪詢, 若是說循環的次數達到了512次, netty就肯定真的出現了空輪詢, 因而nettyrebuild()
Selector ,重新開啓一個Selector, 循環老的Selector上面的上面的註冊的時間,從新註冊進新的 Selector上,用這個中替換Selector的方法,解決了空輪詢的bug
中的?ok, run()
的三部曲第一步輪詢已經完成了, 下一步就是處理輪詢出來的感興趣的IO事件,processSelectedKeys()
,下面咱們進入這個方法, 若是這個selectedKeys不爲空,就進去processSelectedKeysOptimized();
比較有趣的是,這個selectedKeys是誰? ,別忘了咱們是在NioEventLoop
這個名叫set,實爲數組的數據結構, 當時的狀況以下:
的實例 selectedKeySet
中的 selectedKeysField
字段,替換成 selectedKeySet
selectedKeys = selectedKeySet;
看到第三步沒? 也就是說,咱們如今再想獲取裝有感興趣Key的 HashSet集合,已經不可能了,取而代之的是更優秀的selectedKeySet
中取, Selector輪詢到感興趣的事件,也會直接往selectedKeys
private void processSelectedKeys() { // todo selectedKeys 就是通過優化後的keys(底層是數組) if (selectedKeys != null) { processSelectedKeysOptimized(); } else { processSelectedKeysPlain(selector.selectedKeys()); } }
private void processSelectedKeysOptimized() { for (int i = 0; i < selectedKeys.size; ++i) { final SelectionKey k = selectedKeys.keys[i]; // null out entry in the array to allow to have it GC'ed once the Channel close // todo 數組輸出空項, 從而容許在channel 關閉時對其進行垃圾回收 // See // todo 數組中當前循環對應的keys質空, 這種感興趣的事件只處理一次就行 selectedKeys.keys[i] = null; // todo 獲取出 attachment,默認狀況下就是註冊進Selector時,傳入的第三個參數 this===> NioServerSocketChannel // todo 一個Selector中可能被綁定上了成千上萬個Channel, 經過K+attachment 的手段, 精確的取出發生指定事件的channel, 進而獲取channel中的unsafe類進行下一步處理 final Object a = k.attachment(); // todo if (a instanceof AbstractNioChannel) { // todo 進入這個方法, 傳進入 感興趣的key + NioSocketChannel processSelectedKey(k, (AbstractNioChannel) a); } else { @SuppressWarnings("unchecked") NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a; processSelectedKey(k, task); } if (needsToSelectAgain) { // null out entries in the array to allow to have it GC'ed once the Channel close // See selectedKeys.reset(i + 1); selectAgain(); i = -1; } } }
是如何在千百條channel中,精確獲取出現指定感興趣事件的channel的?上面這個方法,就是在真真正正的處理IO事件, 看看這段代碼, 咱們發現了這樣一行代碼
final Object a = k.attachment();
, 這是在幹什麼呢?
@Override protected void doRegister() throws Exception { boolean selected = false; for (;;) { try { // todo javaChannel() -- 返回SelectableChanel 可選擇的Channel,換句話說,能夠和Selector搭配使用,他是channel體系的頂級抽象類, 實際的類型是 ServerSocketChannel // todo eventLoop().unwrappedSelector(), -- > 獲取選擇器, 如今在AbstractNioChannel中 獲取到的eventLoop是BossGroup裏面的 // todo 到目前看, 他是把ServerSocketChannel(系統建立的) 註冊進了 EventLoop的選擇器 // todo 到目前爲止, 雖然註冊上了,可是它不關心任何事件 selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this); return; } catch (CancelledKeyException e) {
這裏的 最後一個參數是 this是當前的channel , 意思是把當前的Channel當成是一個 attachment(附件) 綁定到selector上 做用以下:
ok, 如今就捋清楚了,挖坑,填坑的過程; 下面進入processSelectedKey(SelectionKey k, AbstractNioChannel ch)
執行IO任務, 源碼以下: 咱們能夠看到,具體的處理IO的任務都是用Channel的內部類unSafe()完成的, 到這裏就不往下跟進了, 後續寫新博客連載
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) { // todo 這個unsafe 也是可channel 也是和Channel進行惟一綁定的對象 final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe(); if (!k.isValid()) { // todo 確保Key的合法 final EventLoop eventLoop; try { eventLoop = ch.eventLoop(); } catch (Throwable ignored) { // If the channel implementation throws an exception because there is no event loop, we ignore this // because we are only trying to determine if ch is registered to this event loop and thus has authority // to close ch. return; } // Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop // and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is // still healthy and should not be closed. // See if (eventLoop != this || eventLoop == null) { // todo 確保多線程下的安全性 return; } // close the channel if the key is not valid anymore unsafe.close(unsafe.voidPromise()); return; } // todo NioServerSocketChannel和selectKey都合法的話, 就進入下面的 處理階段 try { // todo 獲取SelectedKey 的 關心的選項 int readyOps = k.readyOps(); // We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise // the NIO JDK channel implementation may throw a NotYetConnectedException. // todo 在read() write()以前咱們須要調用 finishConnect() 方法, 不然 NIO JDK拋出異常 if ((readyOps & SelectionKey.OP_CONNECT) != 0) { // remove OP_CONNECT as otherwise will always return without blocking // See int ops = k.interestOps(); ops &= ~SelectionKey.OP_CONNECT; k.interestOps( ); unsafe.finishConnect(); } // Process OP_WRITE first as we may be able to write some queued buffers and so free memory. if ((readyOps & SelectionKey.OP_WRITE) != 0) { // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write ch.unsafe().forceFlush(); } // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead // to a spin loop // todo 一樣是檢查 readOps是否爲零, 來檢查是否出現了 jdk 空輪詢的bug if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {; } } catch (CancelledKeyException ignored) { unsafe.close(unsafe.voidPromise()); } }
上面的處理IO事件結束後,第三波高潮就來了,處理任務隊列中的任務, runAllTask(timeOutMinils)
, 他也是有生命時長限制的 deadline, 它主要完成了以下的幾步:
protected boolean runAllTasks(long timeoutNanos) { // todo 聚合任務, 會把定時任務放入普通的任務隊列中 進入查看 fetchFromScheduledTaskQueue(); // todo 從普通的隊列中拿出一個任務 Runnable task = pollTask(); if (task == null) { afterRunningAllTasks(); return false; } // todo 計算截止時間, 表示任務的執行,最好別超過這個時間 final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos; long runTasks = 0; long lastExecutionTime; // todo for循環執行任務 for (;;) { // todo 執行任務, 方法裏調用; safeExecute(task); runTasks ++; // Check timeout every 64 tasks because nanoTime() is relatively expensive. // XXX: Hard-coded value - will make it configurable if it is really a problem. // todo 由於 nanoTime();的執行也是個相對耗時的操做,所以沒執行完64個任務後,檢查有沒有超時 if ((runTasks & 0x3F) == 0) { lastExecutionTime = ScheduledFutureTask.nanoTime(); if (lastExecutionTime >= deadline) { break; } } // todo 拿新的任務 task = pollTask(); if (task == null) { lastExecutionTime = ScheduledFutureTask.nanoTime(); break; } } // todo 每一個任務執行結束都有個收尾的構造 afterRunningAllTasks(); this.lastExecutionTime = lastExecutionTime; return true; }
如何聚合任務?聚合任務就是把已經到執行時間的任務從定時任務隊列中所有取出 ,放入普通任務隊列而後執行, 咱們進入上的第一個方法fetchFromScheduledTaskQueue
private boolean fetchFromScheduledTaskQueue() { // todo 拉取第一個聚合任務 long nanoTime = AbstractScheduledEventExecutor.nanoTime(); // todo 從任務丟列中取出 截止時間是 nanoTime的定時任務 , // todo 往定時隊列中添加 ScheduledFutureTask任務, 排序的基準是 ScheduledFutureTask 的compare方法,按照時間,從小到大 // todo 因而當咱們發現隊列中的第一個任務,也就是截止時間最近的任務的截止時間比咱們的 Runnable scheduledTask = pollScheduledTask(nanoTime); while (scheduledTask != null) { // todo scheduledTask != null表示定時任務該被執行了, 因而將定時任務添加到 普通任務隊列 if (!taskQueue.offer(scheduledTask)) { // No space left in the task queue add it back to the scheduledTaskQueue so we pick it up again. // todo 若是添加失敗了, 把這個任務重新放入到定時任務隊列中, 再嘗試添加 scheduledTaskQueue().add((ScheduledFutureTask<?>) scheduledTask); return false; } // todo 循環,嘗試拉取定時任務 , 循環結束後,全部的任務所有會被添加到 task裏面 scheduledTask = pollScheduledTask(nanoTime); } return true; }
根據指定的截止時間,從定時任務隊列中取出任務,定時任務隊列中任務按照時間排序,時間越短的,排在前面, 時間相同,按照添加的順序排序, 如今的任務就是檢查定時任務隊列中任務,嘗試把裏面的任務挨個取出來,因而netty使用這個方法Runnable scheduledTask = pollScheduledTask(nanoTime);
循環中判斷是否存在, 這個方法實現源碼以下, 不難看出,他是在根據時間判斷
/** * Return the {@link Runnable} which is ready to be executed with the given {@code nanoTime}. * You should use {@link #nanoTime()} to retrieve the the correct {@code nanoTime}. * todo 根據給定的納秒值,返回 Runable定時任務 , 而且,每次使用都要衝洗使用是nanoTime() 來矯正時間 */ protected final Runnable pollScheduledTask(long nanoTime) { assert inEventLoop(); Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue; ScheduledFutureTask<?> scheduledTask = scheduledTaskQueue == null ? null : scheduledTaskQueue.peek(); if (scheduledTask == null) { return null; } // todo 若是定時任務的截止時間<= 咱們穿進來的時間, 就把他返回 if (scheduledTask.deadlineNanos() <= nanoTime) { scheduledTaskQueue.remove(); return scheduledTask; } // todo 不然返回kong,表示當前全部的定時任務都沒到期, 沒有能夠執行的 return null; }
通過循環以後,到期的任務,全被添加到 taskQueue裏面了,下面就是執行TaskQueue裏面的任務
源碼以下: 實際上就行執行了 task這個Runable的Run方法
/** * Try to execute the given {@link Runnable} and just log if it throws a {@link Throwable}. */ protected static void safeExecute(Runnable task) { try {; } catch (Throwable t) { logger.warn("A task raised an exception. Task: {}", task, t); } }
總結一下: 到如今爲止,EventLoop已經啓動了, 一說到NioEventLoop老是想起上圖, 如今他能夠接受新的鏈接接入,輪詢,處理任務...