在Netty啓動後,Netty的線程池會起一個Selector線程處理IO事件和其餘業務事件,下面來看下Selector流程java
Selector線程是一個循環線程它一直處理IO事件和其餘業務事件。這裏須要說明Selector線程是處理IO事件和處理其餘業務共享線程,也就是說Selector線程會按用戶配置比例來處理IO接事件和其餘業務事件(如channel註冊事件),可能此次在執行IO事件下次若是有其餘任務來了就忙其餘任務去了。ide
Selector的主流程就是一個run()方法,源碼以下:oop
@Override protected void run() { for (;;) { //wakenUp設置成false,並獲取原來wakenUp狀態 boolean oldWakenUp = wakenUp.getAndSet(false); try { //判斷當前任務隊列是否有任務,若是有任務直接快速select一次以便能處理到任務 if (hasTasks()) { selectNow(); } else { //若是任務隊列沒有任務則進行阻塞select並讓出cup時間片 select(oldWakenUp); //若是wakenUp是true,則中斷select一次 if (wakenUp.get()) { selector.wakeup(); } } //處理io事件和根據佔時比例配置處理任務 cancelledKeys = 0; needsToSelectAgain = false; final int ioRatio = this.ioRatio; if (ioRatio == 100) { //處理IO事件 processSelectedKeys(); //處理任務 runAllTasks(); } else { final long ioStartTime = System.nanoTime(); //處理IO事件 processSelectedKeys(); final long ioTime = System.nanoTime() - ioStartTime; //處理任務 runAllTasks(ioTime * (100 - ioRatio) / ioRatio); } if (isShuttingDown()) { closeAll(); if (confirmShutdown()) { break; } } } catch (Throwable t) { try { Thread.sleep(1000); } catch (InterruptedException e) { } } } }
主要有如下幾個流程性能
這裏的wakenUp是控制select的中斷的標誌,若是有其餘任務加入會中斷線程的select,代碼以下fetch
@Override public void execute(Runnable task) { ... if (!addTaskWakesUp && wakesUpForTask(task)) { wakeup(inEventLoop); } }
因此這裏每次循環開始都會把wakenUp標誌清理掉。ui
判斷當前任務隊列是否有任務,若是有任務直接快速select一次以便能處理到任務。this
若是任務隊列沒有任務則進行阻塞select並讓出cup時間片。編碼
若是wakenUp是true,則中斷select一次。.net
這裏會有個迷惑,就是若是上次加入任務中斷了select一次這裏狀態還未清理還會再中斷一次,這樣重複中斷設計的意義是什麼?看官方說法是若是沒有這個操做,第一次select被中斷後等待任務執行過程當中的全部加入的任務都不能改變wakenUp的狀態爲ture,由於改變狀態是用cas方式wakenUp.compareAndSet(false, true),因此下次select會出現沒必要要的阻塞。所以這裏作了重複的喚醒。這樣作在沒任務加入狀況下實際上是浪費的因此官方稱這種作法inefficient。線程
這裏配置比例主要是讓用戶協調io事件和任務執行的時間,若是ioRatio配置100,會先執行io事件而後執行所有的任務;默認ioRatio配置50,會先執行io事件而後用io事件50%執行時間處理任務,處理任務的時間計算公式以下:
io事件處理時間 * (100 - ioRatio) / ioRatio
下面來看下select的實現:
private void select(boolean oldWakenUp) throws IOException { Selector selector = this.selector; try { //select計數器 int selectCnt = 0; long currentTimeNanos = System.nanoTime(); //根據定時任務計算select延遲時間 long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos); for (;;) { //計算select阻塞時間 long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L; if (timeoutMillis <= 0) { if (selectCnt == 0) { selector.selectNow(); selectCnt = 1; } break; } //阻塞式select int selectedKeys = selector.select(timeoutMillis); //計時器加1 selectCnt ++; //若是發現待處理io事件或老喚醒標記true或最新喚醒標記爲true或隊列中有任務或有定時任務,跳出循環,中斷本次輪詢 if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) { break; } //若是有線程被中斷也,,跳出循環,中斷本次輪詢 if (Thread.interrupted()) { selectCnt = 1; break; } //若是的selet時間大於等於timeoutMillis,說明selet正常,計數器重歸於1 long time = System.nanoTime(); if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) { selectCnt = 1; //若是selet在timeoutMillis時間內返回次數大於配置次數說明可能觸發了jdk的nio bug,則重建selector } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 && selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) { rebuildSelector(); selector = this.selector; selector.selectNow(); selectCnt = 1; break; } currentTimeNanos = time; } } catch (CancelledKeyException e) { } }
select的操做是個循環,select會一直循環直到出現如下狀況:
select的操做主要有如下幾個流程:
根據定時任務計算select延遲時間。 這裏計算方式就取定時任務隊列裏的第一個任務(任務根據執行時間從小到大)獲取執行時間加0.5毫秒,若是沒有任務默認1秒加0.5毫秒。
阻塞式select。
若是發現待處理io事件或老喚醒標記true或最新喚醒標記爲true或隊列中有任務或有定時任務,跳出循環,中斷本次輪詢。
若是的select時間大於等於timeoutMillis,說明select正常,計數器重歸於1。
若是select在timeoutMillis時間內返回次數大於配置次數說明可能觸發了jdk的nio bug,則重建selector。 此bug會在沒有IO事件發生時select當即返回,因此會形成無心義的循環最後可能致使cpu飆到100%狀況,因此NIO採用計數器和重建selector方法解決這個bug
關於該bug的描述見 http://bugs.java.com/bugdatabase/view_bug.do?bug_id=6595055
重建的過程就是先建立新的selector,將老的selector中監聽key複製到新selector中,而後註冊Channel到新selector中,最後關閉老的selector。
下面看下IO處理的實現
IO處理的實現就是先獲取待處理的key,而後交給processSelectedKey方法去處理,咱們看下processSelectedKey方法的實現
private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) { final NioUnsafe unsafe = ch.unsafe(); if (!k.isValid()) { unsafe.close(unsafe.voidPromise()); return; } try { int readyOps = k.readyOps(); if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0){ unsafe.read(); if (!ch.isOpen()) { return; } } if ((readyOps & SelectionKey.OP_WRITE) != 0) { ch.unsafe().forceFlush(); } if ((readyOps & SelectionKey.OP_CONNECT) != 0) { int ops = k.interestOps(); ops &= ~SelectionKey.OP_CONNECT; k.interestOps(ops); unsafe.finishConnect(); } } catch (CancelledKeyException ignored) { unsafe.close(unsafe.voidPromise()); } }
系統會根據SelectionKey處理哪一種IO事件,IO事件共有4種:
好比客戶端連接後會觸發服務端SelectionKey.OP_ACCEPT的接收事件,而後由服務端的主Channel處理,處理的過程後面會介紹。
處理完IO事件就是處理隊列中的任務若是ioRatio配置成100比較簡單處理方式就是一個一個執行隊列裏的所有任務,ioRatio非100處理比較複雜咱們來下ioRatio非100的處理實現:
protected boolean runAllTasks(long timeoutNanos) { //將須要觸發的定時任務加入到任務隊列 fetchFromScheduledTaskQueue(); //獲取一個任務 Runnable task = pollTask(); if (task == null) { return false; } //計算任務超時時間 final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos; long runTasks = 0; long lastExecutionTime; for (;;) { try { //執行任務 task.run(); } catch (Throwable t) { logger.warn("A task raised an exception.", t); } runTasks ++; //每64次任務進行一次,超時判斷,若是超時退出執行 if ((runTasks & 0x3F) == 0) { lastExecutionTime = ScheduledFutureTask.nanoTime(); if (lastExecutionTime >= deadline) { break; } } //取下個任務 task = pollTask(); //若是沒有任務也退出執行 if (task == null) { lastExecutionTime = ScheduledFutureTask.nanoTime(); break; } } this.lastExecutionTime = lastExecutionTime; return true; }
執行任務的流程也是個循環直到超時或者沒任務,執行任務流程以下:
將須要觸發的定時任務加入到任務隊列。
計算任務超時時間,超時時長是根據上面說的公式計算的。
執行任務。
每執行64次任務進行一次超時判斷,若是超時退出執行。
這裏設計64次的緣由官方給的解釋是每次調用nanoTime()去判斷超時是耗費性能的,因此寫成64,同時官方也表示硬編碼成64是不太合理後面會改爲可配置。
取下個任務,若是沒有任務也退出執行。