Netty框架的主要線程就是I/O線程,線程模型設計的好壞,決定了系統的吞吐量、併發性和安全性等架構質量屬性。html
在討論Netty線程模型時候,通常首先會想到的是經典的Reactor線程模型,儘管不一樣的NIO框架對應Reactor模式的實現存在差別,但本質上仍是遵循了Reactor的基礎線程模型。java
Reactor的3中線程模型:git
見《Netty中的三種Reactor(反應堆)》github
Netty的線程模型並非一成不變的,實際經過用戶的啓動配置參數來配置。算法
在建立ServerBootstrap類實例前,先建立兩個EventLoopGroup,它們其實是兩個獨立的Reactor線程池,bossGroup負責接收客戶端的鏈接,workerGroup負責處理IO相關的讀寫操做,或者執行系統task、定時task等。數據庫
用於接收客戶端請求的線程池職責以下:編程
處理IO操做的線程池職責以下:後端
經過調整兩個EventLoopGroup的線程數、是否共享線程池等方式,Netty的Reactor線程模型能夠在單線程、多線程和主從多線程間切換,用戶能夠根據實際狀況靈活配置。 數組
爲了提升性能,Netty在不少地方採用了無鎖化設計。例如在IO線程的內部進行串行操做,避免多線程競爭致使的性能降低。儘管串行化設計看上去CPU利用率不高,併發程度不夠,可是經過調整NIO線程池的線程參數,能夠同時啓動多個串行化的線程並行運行,這種局部無鎖化的設計相比一個隊列——多個工做線程的模型性能更優。promise
它的設計原理以下
Netty的NioEventLoop讀取到消息以後,調用ChannelPipeline的fireChannelRead方法,只要用戶不主動切換線程,就一直由NioEventLoop調用用戶的Handler,期間不進行線程切換。這種串行化的處理方式避免了多線程操做致使的鎖競爭,從性能角度看是最優的。
Netty多線程編程的最佳實踐以下:
EventLoopGroup(實際上是MultithreadEventExecutorGroup) 內部維護一個類型爲 EventExecutor children 數組, 其大小是 nThreads, 這樣就構成了一個線程池,線程池大小經過
在實例化 NioEventLoopGroup 時, 若是指定線程池大小, 則 nThreads 就是指定的值, 反之是處理器核心數 * 2;MultithreadEventExecutorGroup 中會調用 newChild 抽象方法(抽象方法 newChild 是在 NioEventLoopGroup 中實現的, 它返回一個 NioEventLoop 實例)來初始化 children 數組,也是在NioEventLoopGroup中調用NioEventLoop的構造函數來建立NioEventLoop;
@Override protected EventLoop newChild(Executor executor, Object... args) throws Exception { return new NioEventLoop(this, executor, (SelectorProvider) args[0]); }
NioEventLoop相關後面再講;
NioEventLoop 繼承於 SingleThreadEventLoop, 而 SingleThreadEventLoop 又繼承於 SingleThreadEventExecutor. SingleThreadEventExecutor 是 Netty 中對本地線程的抽象, 它內部有一個 Thread thread 屬性, 存儲了一個本地 Java 線程. 所以咱們能夠認爲, 一個 NioEventLoop 其實和一個特定的線程綁定, 而且在其生命週期內, 綁定的線程都不會再改變。
NioEventLoop 的類層次結構圖仍是比較複雜的, 不過咱們只須要關注幾個重要的點便可. 首先 NioEventLoop 的繼承鏈以下:
NioEventLoop -> SingleThreadEventLoop -> SingleThreadEventExecutor -> AbstractScheduledEventExecutor
從上圖能夠看到, SingleThreadEventExecutor 有一個名爲 thread 的 Thread 類型字段, 這個字段就表明了與 SingleThreadEventExecutor 關聯的本地線程。在 SingleThreadEventExecutor 構造器中, 經過 threadFactory.newThread 建立了一個新的 Java 線程. 在這個線程中所作的事情主要就是調用 SingleThreadEventExecutor.this.run() 方法, 而由於 NioEventLoop 實現了這個方法, 所以根據多態性, 其實調用的是 NioEventLoop.run() 方法。
protected SingleThreadEventExecutor( EventExecutorGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) { this.parent = parent; this.addTaskWakesUp = addTaskWakesUp; thread = threadFactory.newThread(new Runnable() { @Override public void run() { boolean success = false; updateLastExecutionTime(); try { SingleThreadEventExecutor.this.run(); success = true; } catch (Throwable t) { logger.warn("Unexpected exception from an event executor: ", t); } finally { // 省略清理代碼 ... } } }); threadProperties = new DefaultThreadProperties(thread); taskQueue = newTaskQueue(); }
一、是做爲 IO 線程, 執行與 Channel 相關的 IO 操做, 包括 調用 select 等待就緒的 IO 事件、讀寫數據與數據的處理等;
二、第二個任務是做爲任務隊列執行任務, 任務能夠分爲2類:
依次看這些功能的源碼吧:
對比2.1:上面2.1中的普通任務,NioEventLoop經過隊列來保存任務。在 SingleThreadEventLoop 中, 又實現了任務隊列的功能, 經過它, 咱們能夠調用一個 NioEventLoop 實例的 execute 方法來向任務隊列中添加一個 task, 並由 NioEventLoop 進行調度執行。
protected Queue<Runnable> newTaskQueue() { return new LinkedBlockingQueue<Runnable>(); }
添加任務的SingleThreadEventExecutor中的execute()方法:
@Override public void execute(Runnable task) { if (task == null) { throw new NullPointerException("task"); } boolean inEventLoop = inEventLoop(); if (inEventLoop) { addTask(task); } else { startThread(); addTask(task); if (isShutdown() && removeTask(task)) { reject(); } } if (!addTaskWakesUp) { wakeup(inEventLoop); } }
上面2.1中的普通task的執行:
protected boolean runAllTasks() { fetchFromScheduledTaskQueue(); Runnable task = pollTask(); if (task == null) { return false; } for (;;) { try { task.run(); } catch (Throwable t) { logger.warn("A task raised an exception.", t); } task = pollTask(); if (task == null) { lastExecutionTime = ScheduledFutureTask.nanoTime(); return true; } } }
對比2.2:schedule(定時) 任務處理
除了經過 execute 添加普通的 Runnable 任務外, 咱們還能夠經過調用 eventLoop.scheduleXXX 之類的方法來添加一個定時任務。
EventLoop 中實現任務隊列的功能在超類 SingleThreadEventExecutor
實現的, 而 schedule 功能的實現是在 SingleThreadEventExecutor
的父類, 即 AbstractScheduledEventExecutor
中實現的。
在 AbstractScheduledEventExecutor
中, 有以 scheduledTaskQueue 字段:
Queue<ScheduledFutureTask<?>> scheduledTaskQueue;
scheduledTaskQueue 是一個隊列(Queue), 其中存放的元素是 ScheduledFutureTask. 而 ScheduledFutureTask 咱們很容易猜到, 它是對 Schedule 任務的一個抽象。咱們來看一下 AbstractScheduledEventExecutor
所實現的 schedule 方法吧:
@Override public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) { ObjectUtil.checkNotNull(command, "command"); ObjectUtil.checkNotNull(unit, "unit"); if (delay < 0) { throw new IllegalArgumentException( String.format("delay: %d (expected: >= 0)", delay)); } return schedule(new ScheduledFutureTask<Void>( this, command, null, ScheduledFutureTask.deadlineNanos(unit.toNanos(delay)))); }
這是其中一個重載的 schedule, 當一個 Runnable 傳遞進來後, 會被封裝爲一個 ScheduledFutureTask 對象, 這個對象會記錄下這個 Runnable 在什麼時候運行、已何種頻率運行等信息。
當構建了 ScheduledFutureTask 後, 會繼續調用 另外一個重載的 schedule 方法:
<V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) { if (inEventLoop()) { scheduledTaskQueue().add(task); } else { execute(new OneTimeTask() { @Override public void run() { scheduledTaskQueue().add(task); } }); } return task; }
在這個方法中, ScheduledFutureTask 對象就會被添加到 scheduledTaskQueue 中了。
當一個任務被添加到 taskQueue 後, 它是怎麼被 EventLoop 執行的呢?
讓咱們回到 NioEventLoop.run() 方法中, 在這個方法裏, 會分別調用 processSelectedKeys() 和 runAllTasks() 方法, 來進行 IO 事件的處理和 task 的處理. processSelectedKeys() 方法咱們已經分析過了, 下面咱們來看一下 runAllTasks() 中到底有什麼名堂吧。
runAllTasks 方法有兩個重載的方法, 一個是無參數的, 另外一個有一個參數的. 首先來看一下無參數的 runAllTasks:
protected boolean runAllTasks() { fetchFromScheduledTaskQueue(); Runnable task = pollTask(); if (task == null) { return false; } for (;;) { try { task.run(); } catch (Throwable t) { logger.warn("A task raised an exception.", t); } task = pollTask(); if (task == null) { lastExecutionTime = ScheduledFutureTask.nanoTime(); return true; } } }
咱們前面已經提到過, EventLoop 能夠經過調用 EventLoop.execute 來將一個 Runnable 提交到 taskQueue 中, 也能夠經過調用 EventLoop.schedule 來提交一個 schedule 任務到 scheduledTaskQueue 中。在此方法的一開始調用的 fetchFromScheduledTaskQueue() 其實就是將 scheduledTaskQueue 中已經能夠執行的(即定時時間已到的 schedule 任務) 拿出來並添加到 taskQueue 中, 做爲可執行的 task 等待被調度執行。
它的源碼以下:
private void fetchFromScheduledTaskQueue() { if (hasScheduledTasks()) { long nanoTime = AbstractScheduledEventExecutor.nanoTime(); for (;;) { Runnable scheduledTask = pollScheduledTask(nanoTime); if (scheduledTask == null) { break; } taskQueue.add(scheduledTask); } } }
接下來 runAllTasks() 方法就會不斷調用 task = pollTask() 從 taskQueue 中獲取一個可執行的 task, 而後調用它的 run() 方法來運行此 task。
注意
, 由於 EventLoop 既須要執行 IO 操做, 又須要執行 task, 所以咱們在調用 EventLoop.execute 方法提交任務時, 不要提交耗時任務, 更不能提交一些會形成阻塞的任務, 否則會致使咱們的 IO 線程得不到調度, 影響整個程序的併發量.
=======================
經過 Selector.open() 打開一個 Selector.
將 Channel 註冊到 Selector 中, 並設置須要監聽的事件(interest set)
不斷重複:
調用 select() 方法
調用 selector.selectedKeys() 獲取 selected keys
迭代每一個 selected key:
1) 從 selected key 中獲取 對應的 Channel 和附加信息(若是有的話)
2) 判斷是哪些 IO 事件已經就緒了, 而後處理它們. 若是是 OP_ACCEPT 事件, 則調用 "SocketChannel clientChannel = ((ServerSocketChannel) key.channel()).accept()" 獲取 SocketChannel, 並將它設置爲 非阻塞的, 而後將這個 Channel 註冊到 Selector 中.
3) 根據須要更改 selected key 的監聽事件.
4) 將已經處理過的 key 從 selected keys 集合中刪除.
private static SocketChannel newSocket(SelectorProvider provider) { ... return provider.openSocketChannel(); }
對比2:將 Channel 註冊到 Selector 中, 並設置須要監聽的事件(interest set) 的操做咱們在《?》, 咱們在來回顧一下, 在客戶端的 Channel 註冊過程當中, 會有以下調用鏈:
Bootstrap.initAndRegister -> AbstractBootstrap.initAndRegister -> MultithreadEventLoopGroup.register -> SingleThreadEventLoop.register -> AbstractUnsafe.register -> AbstractUnsafe.register0 -> AbstractNioChannel.doRegister
在 AbstractUnsafe.register 方法中調用了 register0 方法:
@Override public final void register(EventLoop eventLoop, final ChannelPromise promise) { // 省略條件判斷和錯誤處理 AbstractChannel.this.eventLoop = eventLoop; register0(promise); }
register0 方法代碼以下:
private void register0(ChannelPromise promise) { boolean firstRegistration = neverRegistered; doRegister(); neverRegistered = false; registered = true; safeSetSuccess(promise); pipeline.fireChannelRegistered(); // Only fire a channelActive if the channel has never been registered. This prevents firing // multiple channel actives if the channel is deregistered and re-registered. if (firstRegistration && isActive()) { pipeline.fireChannelActive(); } }
register0 又調用了 AbstractNioChannel.doRegister:
@Override protected void doRegister() throws Exception { // 省略錯誤處理 selectionKey = javaChannel().register(eventLoop().selector, 0, this); }
在這裏 javaChannel() 返回的是一個 Java NIO SocketChannel 對象, 咱們將此 SocketChannel 註冊到前面第一步獲取的 Selector 中。
對比3:thread 的 run 循環
當 EventLoop.execute 第一次被調用時, 就會觸發 startThread() 的調用(3.二、NioEventLoop實例化過程), 進而致使了 EventLoop 所對應的 Java 線程的啓動。
run()中的selector相關的用於IO操做的,網絡的讀取操做是在run方法中去執行的,首先看有沒有未執行的任務,有的話直接執行,不然就去輪訓看是否有就緒的Channel,以下:
@Override protected void run() {
//死循環,NioEventLoop的事件循環就在這裏 for (;;) { oldWakenUp = wakenUp.getAndSet(false); try { if (hasTasks()) {//父類SingleThreadEventExecutor中定義的taskQueue是否有任務,若是有馬上執行(父類SingleThreadEventExecutor中還定義了delayedTaskQueue) selectNow(); } else {//若是沒有select則進行輪詢 select(oldWakenUp); if (wakenUp.get()) { selector.wakeup(); } } //當輪訓到有就緒的Channel時,就進行網絡的讀寫操做 cancelledKeys = 0; final long ioStartTime = System.nanoTime(); needsToSelectAgain = false; if (selectedKeys != null) { processSelectedKeysOptimized(selectedKeys.flip()); } else { processSelectedKeysPlain(selector.selectedKeys()); } final long ioTime = System.nanoTime() - ioStartTime; final int ioRatio = this.ioRatio; runAllTasks(ioTime * (100 - ioRatio) / ioRatio); if (isShuttingDown()) { closeAll(); if (confirmShutdown()) { break; } } } catch (Throwable t) { logger.warn("Unexpected exception in the selector loop.", t); // Prevent possible consecutive immediate failures that lead to // excessive CPU consumption. try { Thread.sleep(1000); } catch (InterruptedException e) { // Ignore. } } } }
對比3-IO 事件的輪詢
分支1:selectNow() 和 select(oldWakenUp) 之間有什麼區別:
void selectNow() throws IOException { try { selector.selectNow(); } finally { // restore wakup state if needed if (wakenUp.get()) { selector.wakeup(); } } }
selector 字段正是 Java NIO 中的多路複用器 Selector. 那麼這裏 selector.selectNow() 就很好理解了, selectNow() 方法會檢查當前是否有就緒的 IO 事件, 若是有, 則返回就緒 IO 事件的個數; 若是沒有, 則返回0. 注意, selectNow() 是當即返回的, 不會阻塞當前線程.
當 selectNow() 調用後, finally 語句塊中會檢查 wakenUp 變量是否爲 true, 當爲 true 時, 調用 selector.wakeup() 喚醒 select() 的阻塞調用。
再來看一下 else 分支的 select(oldWakenUp) 方法:
select(oldWakenUp)中輪詢時,可能爲空,也沒有wakeup操做或是最新的消息處理,則說明本次輪訓是一個空輪訓,此時會觸發jdk的epoll bug,它會致使Selector進行空輪訓,使i/o線程處於100%。爲了不這個bug。須要對selector進行統計:
private void select(boolean oldWakenUp) throws IOException { Selector selector = this.selector; try { int selectCnt = 0; long currentTimeNanos = System.nanoTime(); long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos); //當前時間延時1分鐘 for (;;) { long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L; if (timeoutMillis <= 0) { if (selectCnt == 0) { selector.selectNow(); selectCnt = 1; } break; } int selectedKeys = selector.select(timeoutMillis); selectCnt ++; //當輪詢到新的事件或者wakenUp或者有任務時跳出輪詢的循環 if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks()) { // Selected something, // waken up by user, or // the task queue has a pending task. break; } //不然,可能已經觸發jdk的epoll bug,經過下面各類策略退出 if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 && selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) { // The selector returned prematurely many times in a row. // Rebuild the selector to work around the problem. logger.warn( "Selector.select() returned prematurely {} times in a row; rebuilding selector.", selectCnt); rebuildSelector(); selector = this.selector; // Select again to populate selectedKeys. selector.selectNow(); selectCnt = 1; break; } currentTimeNanos = System.nanoTime(); } if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) { if (logger.isDebugEnabled()) { logger.debug("Selector.select() returned prematurely {} times in a row.", selectCnt - 1); } } } catch (CancelledKeyException e) { if (logger.isDebugEnabled()) { logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector - JDK bug?", e); } // Harmless exception - log anyway } }
在 NioEventLoop.run() 方法中, 第一步是經過 select/selectNow 調用查詢當前是否有就緒的 IO 事件. 那麼當有 IO 事件就緒時, 第二步天然就是處理這些 IO 事件啦.
首先讓咱們來看一下 NioEventLoop.run 中循環的剩餘部分:
//run()方法的中間一段 //... final int ioRatio = this.ioRatio; if (ioRatio == 100) { processSelectedKeys(); runAllTasks(); } else { final long ioStartTime = System.nanoTime(); processSelectedKeys(); final long ioTime = System.nanoTime() - ioStartTime; runAllTasks(ioTime * (100 - ioRatio) / ioRatio); } //...
上面列出的代碼中, 有兩個關鍵的調用, 第一個是 processSelectedKeys() 調用, 根據字面意思, 咱們能夠猜出這個方法確定是查詢就緒的 IO 事件, 而後處理它; 第二個調用是 runAllTasks(), 這個方法咱們也能夠一眼就看出來它的功能就是運行 taskQueue 中的任務。
這裏的代碼還有一個十分有意思的地方, 即 ioRatio. 那什麼是 ioRatio呢? 它表示的是此線程分配給 IO 操做所佔的時間比(即運行 processSelectedKeys 耗時在整個循環中所佔用的時間). 例如 ioRatio 默認是 50, 則表示 IO 操做和執行 task 的所佔用的線程執行時間比是 1 : 1. 當知道了 IO 操做耗時和它所佔用的時間比, 那麼執行 task 的時間就能夠很方便的計算出來了。
設 IO 操做耗時爲 ioTime, ioTime 佔的時間比例爲 ioRatio, 則: ioTime / ioRatio = taskTime / taskRatio taskRatio = 100 - ioRatio => taskTime = ioTime * (100 - ioRatio) / ioRatio
根據上面的公式, 當咱們設置 ioRate = 70 時, 則表示 IO 運行耗時佔比爲70%, 即假設某次循環一共耗時爲 100ms, 那麼根據公式, 咱們知道 processSelectedKeys() 方法調用所耗時大概爲70ms(即 IO 耗時), 而 runAllTasks() 耗時大概爲 30ms(即執行 task 耗時).
當 ioRatio 爲 100 時, Netty 就不考慮 IO 耗時的佔比, 而是分別調用 processSelectedKeys()、runAllTasks(); 而當 ioRatio 不爲 100時, 則執行到 else 分支, 在這個分支中, 首先記錄下 processSelectedKeys() 所執行的時間(即 IO 操做的耗時), 而後根據公式, 計算出執行 task 所佔用的時間, 而後以此爲參數, 調用 runAllTasks().
咱們這裏先分析一下 processSelectedKeys() 方法調用, runAllTasks() 咱們留到下一節再分析。
processSelectedKeys() 方法的源碼以下:
private void processSelectedKeys() { if (selectedKeys != null) { processSelectedKeysOptimized(selectedKeys.flip()); } else { processSelectedKeysPlain(selector.selectedKeys()); } }
這個方法中, 會根據 selectedKeys 字段是否爲空, 而分別調用 processSelectedKeysOptimized 或 processSelectedKeysPlain. selectedKeys 字段是在調用 openSelector() 方法時, 根據 JVM 平臺的不一樣, 而有設置不一樣的值, 在我所調試這個值是不爲 null 的. 其實 processSelectedKeysOptimized 方法 processSelectedKeysPlain 沒有太大的區別,先看processSelectedKeysPlain():
processSelectedKey 中處理了三個事件, 分別是:
OP_READ, 可讀事件, 即 Channel 中收到了新數據可供上層讀取。
OP_WRITE, 可寫事件, 即上層能夠向 Channel 寫入數據。
OP_CONNECT, 鏈接創建事件, 即 TCP 鏈接已經創建, Channel 處於 active 狀態。
簡單的說就是對網絡位判斷,當網絡位爲寫的時候,則說明有半包消息沒有發送完成,須要繼續調用flush方法進行發送。後面的若是網絡操做位爲鏈接狀態,則須要對鏈接結果進行判斷。
對於服務端處理鏈接的請求以下:
因爲NioMessageUnsafe是AbstractNioMessageChannel的內部類,調用外部類doReadMessages()方法
下面是服務端NioServerSocketChannel中的該方法
@Override protected int doReadMessages(List<Object> buf) throws Exception { SocketChannel ch = javaChannel().accept(); try { if (ch != null) { buf.add(new NioSocketChannel(this, childEventLoopGroup().next(), ch)); return 1; } } catch (Throwable t) { logger.warn("Failed to create a new channel from an accepted socket.", t); try { ch.close(); } catch (Throwable t2) { logger.warn("Failed to close a socket.", t2); } } return 0; }
@Override protected int doReadBytes(ByteBuf byteBuf) throws Exception { return byteBuf.writeBytes(javaChannel(), byteBuf.writableBytes()); }
private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) { for (int i = 0;; i ++) { final SelectionKey k = selectedKeys[i]; if (k == null) { break; } // null out entry in the array to allow to have it GC'ed once the Channel close // See https://github.com/netty/netty/issues/2363 selectedKeys[i] = null; final Object a = k.attachment(); if (a instanceof AbstractNioChannel) { processSelectedKey(k, (AbstractNioChannel) a); } else { @SuppressWarnings("unchecked") NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a; processSelectedKey(k, task); } if (needsToSelectAgain) { // null out entries in the array to allow to have it GC'ed once the Channel close // See https://github.com/netty/netty/issues/2363 for (;;) { if (selectedKeys[i] == null) { break; } selectedKeys[i] = null; i++; } selectAgain(); // Need to flip the optimized selectedKeys to get the right reference to the array // and reset the index to -1 which will then set to 0 on the for loop // to start over again. // // See https://github.com/netty/netty/issues/1523 selectedKeys = this.selectedKeys.flip(); i = -1; } } }
上面代碼關鍵的點就兩個:迭代 selectedKeys 獲取就緒的 IO 事件, 而後爲每一個事件都調用 processSelectedKey 來處理它。
這裏正好完美對應上了咱們提到的 Selector 的使用流程中的第三步裏操做。
還有一點須要注意的是, 咱們能夠調用 selectionKey.attach(object) 給一個 selectionKey 設置一個附加的字段, 而後能夠經過 Object attachedObj = selectionKey.attachment() 獲取它. 上面代代碼正是經過了 k.attachment() 來獲取一個附加在 selectionKey 中的對象, 那麼這個對象是什麼呢? 它又是在哪裏設置的呢? 咱們再來回憶一下 SocketChannel 是如何註冊到 Selector 中的:
在客戶端的 Channel 註冊過程當中, 會有以下調用鏈:
Bootstrap.initAndRegister -> AbstractBootstrap.initAndRegister -> MultithreadEventLoopGroup.register -> SingleThreadEventLoop.register -> AbstractUnsafe.register -> AbstractUnsafe.register0 -> AbstractNioChannel.doRegister
最後的 AbstractNioChannel.doRegister 方法會調用 SocketChannel.register 方法註冊一個 SocketChannel 到指定的 Selector:
@Override protected void doRegister() throws Exception { // 省略錯誤處理 selectionKey = javaChannel().register(eventLoop().selector, 0, this); }
特別注意一下 register 的第三個參數, 這個參數是設置 selectionKey 的附加對象的, 和調用 selectionKey.attach(object) 的效果同樣. 而調用 register 所傳遞的第三個參數是 this, 它其實就是一個 NioSocketChannel
的實例. 那麼這裏就很清楚了, 咱們在將 SocketChannel 註冊到 Selector 中時, 將 SocketChannel 所對應的 NioSocketChannel 以附加字段的方式添加到了selectionKey 中.
再回到 processSelectedKeysOptimized 方法中, 當咱們獲取到附加的對象後, 咱們就調用 processSelectedKey 來處理這個 IO 事件:
final Object a = k.attachment(); if (a instanceof AbstractNioChannel) { processSelectedKey(k, (AbstractNioChannel) a); } else { @SuppressWarnings("unchecked") NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a; processSelectedKey(k, task); }
final long ioTime = System.nanoTime() - ioStartTime; final int ioRatio = this.ioRatio; runAllTasks(ioTime * (100 - ioRatio) / ioRatio); if (isShuttingDown()) { closeAll(); if (confirmShutdown()) { break; } }
protected boolean runAllTasks(long timeoutNanos) { fetchFromDelayedQueue(); //獲取SingleThreadEventExecutor中的taskQueue Runnable task = pollTask(); if (task == null) { return false; } final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos; long runTasks = 0; long lastExecutionTime; for (;;) { try { task.run(); } catch (Throwable t) { logger.warn("A task raised an exception.", t); } runTasks ++; // Check timeout every 64 tasks because nanoTime() is relatively expensive. // XXX: Hard-coded value - will make it configurable if it is really a problem. //當循環次數爲64時,這時候會去比較上次執行時間和延時的關係,若是大於延時那麼就退出,這裏獲取naotime是每64次進行獲取一次。 //這麼作的目的是一來是獲取naotime比較耗時,另外也不能長時間執行task,讓io阻塞,因此通常每64個任務就會返回 if ((runTasks & 0x3F) == 0) { lastExecutionTime = ScheduledFutureTask.nanoTime(); if (lastExecutionTime >= deadline) { break; } } task = pollTask(); if (task == null) { lastExecutionTime = ScheduledFutureTask.nanoTime(); break; } } this.lastExecutionTime = lastExecutionTime; return true; }
一、獲取SingleThreadEventExecutor中的taskQueue,若是沒有任務就退出;
二、當循環次數爲64時,這時候會去比較上次執行時間和延時的關係,若是大於延時那麼就退出,這裏獲取naotime是每64次進行獲取一次。這麼作的目的是一來是獲取naotime比較耗時,另外也不能長時間執行task,讓io阻塞,因此通常每64個任務就會返回;