接上篇Netty 源碼分析之 三 我就是大名鼎鼎的 EventLoop(一)java
在 Netty 中, 一個 EventLoop 須要負責兩個工做, 第一個是做爲 IO 線程, 負責相應的 IO 操做; 第二個是做爲任務線程, 執行 taskQueue 中的任務.
接下來咱們先從 IO 操縱方面入手, 看一下 TCP 數據是如何從 Java NIO Socket 傳遞到咱們的 handler 中的.git
Netty 是 Reactor 模型的一個實現, 而且是基於 Java NIO 的, 那麼從 Java NIO 的前生今世 之四 NIO Selector 詳解 中咱們知道, Netty 中必然有一個 Selector 線程, 用於不斷調用 Java NIO 的 Selector.select 方法, 查詢當前是否有就緒的 IO 事件. 回顧一下在 Java NIO 中所講述的 Selector 的使用流程:github
經過 Selector.open() 打開一個 Selector.segmentfault
將 Channel 註冊到 Selector 中, 並設置須要監聽的事件(interest set)promise
不斷重複:併發
調用 select() 方法socket
調用 selector.selectedKeys() 獲取 selected keyside
迭代每一個 selected key:oop
1) 從 selected key 中獲取 對應的 Channel 和附加信息(若是有的話)源碼分析
2) 判斷是哪些 IO 事件已經就緒了, 而後處理它們. 若是是 OP_ACCEPT 事件, 則調用 "SocketChannel clientChannel = ((ServerSocketChannel) key.channel()).accept()" 獲取 SocketChannel, 並將它設置爲 非阻塞的, 而後將這個 Channel 註冊到 Selector 中.
3) 根據須要更改 selected key 的監聽事件.
4) 將已經處理過的 key 從 selected keys 集合中刪除.
上面的使用流程用代碼來體現就是:
/** * @author xiongyongshun * @Email yongshun1228@gmail.com * @version 1.0 * @created 16/8/1 13:13 */ public class NioEchoServer { private static final int BUF_SIZE = 256; private static final int TIMEOUT = 3000; public static void main(String args[]) throws Exception { // 打開服務端 Socket ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); // 打開 Selector Selector selector = Selector.open(); // 服務端 Socket 監聽8080端口, 並配置爲非阻塞模式 serverSocketChannel.socket().bind(new InetSocketAddress(8080)); serverSocketChannel.configureBlocking(false); // 將 channel 註冊到 selector 中. // 一般咱們都是先註冊一個 OP_ACCEPT 事件, 而後在 OP_ACCEPT 到來時, 再將這個 Channel 的 OP_READ // 註冊到 Selector 中. serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); while (true) { // 經過調用 select 方法, 阻塞地等待 channel I/O 可操做 if (selector.select(TIMEOUT) == 0) { System.out.print("."); continue; } // 獲取 I/O 操做就緒的 SelectionKey, 經過 SelectionKey 能夠知道哪些 Channel 的哪類 I/O 操做已經就緒. Iterator<SelectionKey> keyIterator = selector.selectedKeys().iterator(); while (keyIterator.hasNext()) { SelectionKey key = keyIterator.next(); // 當獲取一個 SelectionKey 後, 就要將它刪除, 表示咱們已經對這個 IO 事件進行了處理. keyIterator.remove(); if (key.isAcceptable()) { // 當 OP_ACCEPT 事件到來時, 咱們就有從 ServerSocketChannel 中獲取一個 SocketChannel, // 表明客戶端的鏈接 // 注意, 在 OP_ACCEPT 事件中, 從 key.channel() 返回的 Channel 是 ServerSocketChannel. // 而在 OP_WRITE 和 OP_READ 中, 從 key.channel() 返回的是 SocketChannel. SocketChannel clientChannel = ((ServerSocketChannel) key.channel()).accept(); clientChannel.configureBlocking(false); //在 OP_ACCEPT 到來時, 再將這個 Channel 的 OP_READ 註冊到 Selector 中. // 注意, 這裏咱們若是沒有設置 OP_READ 的話, 即 interest set 仍然是 OP_CONNECT 的話, 那麼 select 方法會一直直接返回. clientChannel.register(key.selector(), OP_READ, ByteBuffer.allocate(BUF_SIZE)); } if (key.isReadable()) { SocketChannel clientChannel = (SocketChannel) key.channel(); ByteBuffer buf = (ByteBuffer) key.attachment(); long bytesRead = clientChannel.read(buf); if (bytesRead == -1) { clientChannel.close(); } else if (bytesRead > 0) { key.interestOps(OP_READ | SelectionKey.OP_WRITE); System.out.println("Get data length: " + bytesRead); } } if (key.isValid() && key.isWritable()) { ByteBuffer buf = (ByteBuffer) key.attachment(); buf.flip(); SocketChannel clientChannel = (SocketChannel) key.channel(); clientChannel.write(buf); if (!buf.hasRemaining()) { key.interestOps(OP_READ); } buf.compact(); } } } } }
還記得不, 上面操做的第一步 經過 Selector.open() 打開一個 Selector 咱們已經在第一章的 Channel 實例化 這一小節中已經提到了, Netty 中是經過調用 SelectorProvider.openSocketChannel() 來打開一個新的 Java NIO SocketChannel:
private static SocketChannel newSocket(SelectorProvider provider) { ... return provider.openSocketChannel(); }
第二步 將 Channel 註冊到 Selector 中, 並設置須要監聽的事件(interest set) 的操做咱們在第一章 channel 的註冊過程 中也分析過了, 咱們在來回顧一下, 在客戶端的 Channel 註冊過程當中, 會有以下調用鏈:
Bootstrap.initAndRegister -> AbstractBootstrap.initAndRegister -> MultithreadEventLoopGroup.register -> SingleThreadEventLoop.register -> AbstractUnsafe.register -> AbstractUnsafe.register0 -> AbstractNioChannel.doRegister
在 AbstractUnsafe.register 方法中調用了 register0 方法:
@Override public final void register(EventLoop eventLoop, final ChannelPromise promise) { // 省略條件判斷和錯誤處理 AbstractChannel.this.eventLoop = eventLoop; register0(promise); }
register0 方法代碼以下:
private void register0(ChannelPromise promise) { boolean firstRegistration = neverRegistered; doRegister(); neverRegistered = false; registered = true; safeSetSuccess(promise); pipeline.fireChannelRegistered(); // Only fire a channelActive if the channel has never been registered. This prevents firing // multiple channel actives if the channel is deregistered and re-registered. if (firstRegistration && isActive()) { pipeline.fireChannelActive(); } }
register0 又調用了 AbstractNioChannel.doRegister:
@Override protected void doRegister() throws Exception { // 省略錯誤處理 selectionKey = javaChannel().register(eventLoop().selector, 0, this); }
在這裏 javaChannel() 返回的是一個 Java NIO SocketChannel 對象, 咱們將此 SocketChannel 註冊到前面第一步獲取的 Selector 中.
那麼接下來的第三步的循環是在哪裏實現的呢? 第三步的操做就是咱們今天分析的關鍵, 下面我會一步一步向讀者展現出來.
在 EventLoop 的啓動 一小節中, 咱們已經瞭解到了, 當 EventLoop.execute 第一次被調用時, 就會觸發 startThread() 的調用, 進而致使了 EventLoop 所對應的 Java 線程的啓動. 接着咱們來更深刻一些, 來看一下此線程啓動後都會作什麼東東吧.
下面是此線程的 run() 方法, 我已經把一些異常處理和收尾工做的代碼都去掉了. 這個 run 方法能夠說是十分簡單, 主要就是調用了 SingleThreadEventExecutor.this.run() 方法. 而 SingleThreadEventExecutor.run() 是一個抽象方法, 它的實如今 NioEventLoop 中.
thread = threadFactory.newThread(new Runnable() { @Override public void run() { boolean success = false; updateLastExecutionTime(); try { SingleThreadEventExecutor.this.run(); success = true; } catch (Throwable t) { logger.warn("Unexpected exception from an event executor: ", t); } finally { ... } } });
繼續跟蹤到 NioEventLoop.run() 方法, 其源碼以下:
@Override protected void run() { for (;;) { boolean oldWakenUp = wakenUp.getAndSet(false); try { if (hasTasks()) { selectNow(); } else { select(oldWakenUp); if (wakenUp.get()) { selector.wakeup(); } } cancelledKeys = 0; needsToSelectAgain = false; final int ioRatio = this.ioRatio; if (ioRatio == 100) { processSelectedKeys(); runAllTasks(); } else { final long ioStartTime = System.nanoTime(); processSelectedKeys(); final long ioTime = System.nanoTime() - ioStartTime; runAllTasks(ioTime * (100 - ioRatio) / ioRatio); } if (isShuttingDown()) { closeAll(); if (confirmShutdown()) { break; } } } catch (Throwable t) { ... } } }
啊哈, 看到了上面代碼的 for(;;) 所構成的死循環了沒? 原來 NioEventLoop 事件循環的核心就是這裏!
如今咱們把上面所提到的 Selector 使用步驟的第三步的部分也找到了.
這個 run 方法能夠說是 Netty NIO 的核心, 屬於重中之重, 把它分析明白了, 那麼對 Netty 的事件循環機制也就瞭解了大部分了. 讓咱們一氣呵成, 繼續分析下去吧!
首先, 在 run 方法中, 第一步是調用 hasTasks() 方法來判斷當前任務隊列中是否有任務:
protected boolean hasTasks() { assert inEventLoop(); return !taskQueue.isEmpty(); }
這個方法很簡單, 僅僅是檢查了一下 taskQueue 是否爲空. 至於 taskQueue 是什麼呢, 其實它就是存放一系列的須要由此 EventLoop 所執行的任務列表. 關於 taskQueue, 咱們這裏暫時不表, 等到後面再來詳細分析它.
當 taskQueue 不爲空時, 就執行到了 if 分支中的 selectNow() 方法. 然而當 taskQueue 爲空時, 執行的是 select(oldWakenUp) 方法. 那麼 selectNow() 和 select(oldWakenUp) 之間有什麼區別呢? 來看一下, selectNow() 的源碼以下:
void selectNow() throws IOException { try { selector.selectNow(); } finally { // restore wakup state if needed if (wakenUp.get()) { selector.wakeup(); } } }
首先調用了 selector.selectNow() 方法, 這裏 selector 是什麼你們還有印象不? 咱們在第一章 Netty 源碼分析之 一 揭開 Bootstrap 神祕的紅蓋頭 (客戶端) 時對它有過介紹, 這個 selector 字段正是 Java NIO 中的多路複用器 Selector. 那麼這裏 selector.selectNow() 就很好理解了, selectNow() 方法會檢查當前是否有就緒的 IO 事件, 若是有, 則返回就緒 IO 事件的個數; 若是沒有, 則返回0. 注意, selectNow() 是當即返回的, 不會阻塞當前線程.
當 selectNow() 調用後, finally 語句塊中會檢查 wakenUp 變量是否爲 true, 當爲 true 時, 調用 selector.wakeup() 喚醒 select() 的阻塞調用.
看了 if 分支的 selectNow 方法後, 咱們再來看一下 else 分支的 select(oldWakenUp) 方法.
其實 else 分支的 select(oldWakenUp) 方法的處理邏輯比較複雜, 而咱們這裏的目的暫時不是分析這個方法調用的具體工做, 所以我這裏長話短說, 只列出咱們咱們關注的內如:
private void select(boolean oldWakenUp) throws IOException { Selector selector = this.selector; try { ... int selectedKeys = selector.select(timeoutMillis); ... } catch (CancelledKeyException e) { ... } }
在這個 select 方法中, 調用了 selector.select(timeoutMillis), 而這個調用是會阻塞住當前線程的, timeoutMillis 是阻塞的超時時間.
到來這裏, 咱們能夠看到, 當 hasTasks() 爲真時, 調用的的 selectNow() 方法是不會阻塞當前線程的, 而當 hasTasks() 爲假時, 調用的 select(oldWakenUp) 是會阻塞當前線程的.
這其實也很好理解: 當 taskQueue 中沒有任務時, 那麼 Netty 能夠阻塞地等待 IO 就緒事件; 而當 taskQueue 中有任務時, 咱們天然地但願所提交的任務能夠儘快地執行, 所以 Netty 會調用非阻塞的 selectNow() 方法, 以保證 taskQueue 中的任務儘快能夠執行.
在 NioEventLoop.run() 方法中, 第一步是經過 select/selectNow 調用查詢當前是否有就緒的 IO 事件. 那麼當有 IO 事件就緒時, 第二步天然就是處理這些 IO 事件啦.
首先讓咱們來看一下 NioEventLoop.run 中循環的剩餘部分:
final int ioRatio = this.ioRatio; if (ioRatio == 100) { processSelectedKeys(); runAllTasks(); } else { final long ioStartTime = System.nanoTime(); processSelectedKeys(); final long ioTime = System.nanoTime() - ioStartTime; runAllTasks(ioTime * (100 - ioRatio) / ioRatio); }
上面列出的代碼中, 有兩個關鍵的調用, 第一個是 processSelectedKeys() 調用, 根據字面意思, 咱們能夠猜出這個方法確定是查詢就緒的 IO 事件, 而後處理它; 第二個調用是 runAllTasks(), 這個方法咱們也能夠一眼就看出來它的功能就是運行 taskQueue 中的任務.
這裏的代碼還有一個十分有意思的地方, 即 ioRatio. 那什麼是 ioRatio呢? 它表示的是此線程分配給 IO 操做所佔的時間比(即運行 processSelectedKeys 耗時在整個循環中所佔用的時間). 例如 ioRatio 默認是 50, 則表示 IO 操做和執行 task 的所佔用的線程執行時間比是 1 : 1. 當知道了 IO 操做耗時和它所佔用的時間比, 那麼執行 task 的時間就能夠很方便的計算出來了:
設 IO 操做耗時爲 ioTime, ioTime 佔的時間比例爲 ioRatio, 則: ioTime / ioRatio = taskTime / taskRatio taskRatio = 100 - ioRatio => taskTime = ioTime * (100 - ioRatio) / ioRatio
根據上面的公式, 當咱們設置 ioRate = 70 時, 則表示 IO 運行耗時佔比爲70%, 即假設某次循環一共耗時爲 100ms, 那麼根據公式, 咱們知道 processSelectedKeys() 方法調用所耗時大概爲70ms(即 IO 耗時), 而 runAllTasks() 耗時大概爲 30ms(即執行 task 耗時).
當 ioRatio 爲 100 時, Netty 就不考慮 IO 耗時的佔比, 而是分別調用 processSelectedKeys()、runAllTasks(); 而當 ioRatio 不爲 100時, 則執行到 else 分支, 在這個分支中, 首先記錄下 processSelectedKeys() 所執行的時間(即 IO 操做的耗時), 而後根據公式, 計算出執行 task 所佔用的時間, 而後以此爲參數, 調用 runAllTasks().
咱們這裏先分析一下 processSelectedKeys() 方法調用, runAllTasks() 咱們留到下一節再分析.
processSelectedKeys() 方法的源碼以下:
private void processSelectedKeys() { if (selectedKeys != null) { processSelectedKeysOptimized(selectedKeys.flip()); } else { processSelectedKeysPlain(selector.selectedKeys()); } }
這個方法中, 會根據 selectedKeys 字段是否爲空, 而分別調用 processSelectedKeysOptimized 或 processSelectedKeysPlain. selectedKeys 字段是在調用 openSelector() 方法時, 根據 JVM 平臺的不一樣, 而有設置不一樣的值, 在我所調試這個值是不爲 null 的. 其實 processSelectedKeysOptimized 方法 processSelectedKeysPlain 沒有太大的區別, 爲了簡單起見, 咱們以 processSelectedKeysOptimized 爲例分析一下源碼的工做流程吧.
private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) { for (int i = 0;; i ++) { final SelectionKey k = selectedKeys[i]; if (k == null) { break; } selectedKeys[i] = null; final Object a = k.attachment(); if (a instanceof AbstractNioChannel) { processSelectedKey(k, (AbstractNioChannel) a); } else { @SuppressWarnings("unchecked") NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a; processSelectedKey(k, task); } ... } }
其實你別看它代碼挺多的, 可是關鍵的點就兩個: 迭代 selectedKeys 獲取就緒的 IO 事件, 而後爲每一個事件都調用 processSelectedKey 來處理它.
這裏正好完美對應上了咱們提到的 Selector 的使用流程中的第三步裏操做.
還有一點須要注意的是, 咱們能夠調用 selectionKey.attach(object) 給一個 selectionKey 設置一個附加的字段, 而後能夠經過 Object attachedObj = selectionKey.attachment() 獲取它. 上面代代碼正是經過了 k.attachment() 來獲取一個附加在 selectionKey 中的對象, 那麼這個對象是什麼呢? 它又是在哪裏設置的呢? 咱們再來回憶一下 SocketChannel 是如何註冊到 Selector 中的:
在客戶端的 Channel 註冊過程當中, 會有以下調用鏈:
Bootstrap.initAndRegister -> AbstractBootstrap.initAndRegister -> MultithreadEventLoopGroup.register -> SingleThreadEventLoop.register -> AbstractUnsafe.register -> AbstractUnsafe.register0 -> AbstractNioChannel.doRegister
最後的 AbstractNioChannel.doRegister 方法會調用 SocketChannel.register 方法註冊一個 SocketChannel 到指定的 Selector:
@Override protected void doRegister() throws Exception { // 省略錯誤處理 selectionKey = javaChannel().register(eventLoop().selector, 0, this); }
特別注意一下 register 的第三個參數, 這個參數是設置 selectionKey 的附加對象的, 和調用 selectionKey.attach(object) 的效果同樣. 而調用 register 所傳遞的第三個參數是 this, 它其實就是一個 NioSocketChannel
的實例. 那麼這裏就很清楚了, 咱們在將 SocketChannel 註冊到 Selector 中時, 將 SocketChannel 所對應的 NioSocketChannel 以附加字段的方式添加到了selectionKey 中.
再回到 processSelectedKeysOptimized 方法中, 當咱們獲取到附加的對象後, 咱們就調用 processSelectedKey 來處理這個 IO 事件:
final Object a = k.attachment(); if (a instanceof AbstractNioChannel) { processSelectedKey(k, (AbstractNioChannel) a); } else { @SuppressWarnings("unchecked") NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a; processSelectedKey(k, task); }
processSelectedKey 方法源碼以下:
private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) { final NioUnsafe unsafe = ch.unsafe(); ... try { int readyOps = k.readyOps(); // 可讀事件 if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { unsafe.read(); if (!ch.isOpen()) { // Connection already closed - no need to handle write. return; } } // 可寫事件 if ((readyOps & SelectionKey.OP_WRITE) != 0) { // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write ch.unsafe().forceFlush(); } // 鏈接創建事件 if ((readyOps & SelectionKey.OP_CONNECT) != 0) { // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking // See https://github.com/netty/netty/issues/924 int ops = k.interestOps(); ops &= ~SelectionKey.OP_CONNECT; k.interestOps(ops); unsafe.finishConnect(); } } catch (CancelledKeyException ignored) { unsafe.close(unsafe.voidPromise()); } }
這個代碼是否是很熟悉啊? 徹底是 Java NIO 的 Selector 的那一套處理流程嘛!
processSelectedKey 中處理了三個事件, 分別是:
OP_READ, 可讀事件, 即 Channel 中收到了新數據可供上層讀取.
OP_WRITE, 可寫事件, 即上層能夠向 Channel 寫入數據.
OP_CONNECT, 鏈接創建事件, 即 TCP 鏈接已經創建, Channel 處於 active 狀態.
下面咱們分別根據這三個事件來看一下 Netty 是怎麼處理的吧.
當就緒的 IO 事件是 OP_READ, 代碼會調用 unsafe.read() 方法, 即:
// 可讀事件 if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { unsafe.read(); if (!ch.isOpen()) { // Connection already closed - no need to handle write. return; } }
unsafe 這個字段, 咱們已經和它打了太多的交道了, 在第一章 Netty 源碼分析之 一 揭開 Bootstrap 神祕的紅蓋頭 (客戶端) 中咱們已經對它進行過濃墨重彩地分析了, 最後咱們肯定了它是一個 NioSocketChannelUnsafe 實例, 負責的是 Channel 的底層 IO 操做.
咱們能夠利用 Intellij IDEA 提供的 Go To Implementations 功能, 尋找到這個方法的實現. 最後咱們發現這個方法沒有在 NioSocketChannelUnsafe 中實現, 而是在它的父類 AbstractNioByteChannel 實現的, 它的實現源碼以下:
@Override public final void read() { ... ByteBuf byteBuf = null; int messages = 0; boolean close = false; try { int totalReadAmount = 0; boolean readPendingReset = false; do { byteBuf = allocHandle.allocate(allocator); int writable = byteBuf.writableBytes(); int localReadAmount = doReadBytes(byteBuf); // 檢查讀取結果. ... pipeline.fireChannelRead(byteBuf); byteBuf = null; ... totalReadAmount += localReadAmount; // 檢查是不是配置了自動讀取, 若是不是, 則當即退出循環. ... } while (++ messages < maxMessagesPerRead); pipeline.fireChannelReadComplete(); allocHandle.record(totalReadAmount); if (close) { closeOnRead(pipeline); close = false; } } catch (Throwable t) { handleReadException(pipeline, byteBuf, t, close); } finally { } }
read() 源碼比較長, 我爲了篇幅起見, 刪除了部分代碼, 只留下了主幹. 不過我建議讀者朋友們本身必定要看一下 read() 源碼, 這對理解 Netty 的 EventLoop 十分有幫助.
上面 read 方法其實概括起來, 能夠認爲作了以下工做:
分配 ByteBuf
從 SocketChannel 中讀取數據
調用 pipeline.fireChannelRead 發送一個 inbound 事件.
前面兩點沒什麼好說的, 第三點 pipeline.fireChannelRead 讀者朋友們看到了有沒有會心一笑地感受呢? 反正我看到這裏時是有的. pipeline.fireChannelRead 正好就是咱們在第二章 Netty 源碼分析之 二 貫穿Netty 的大動脈 ── ChannelPipeline (二) 中分析的 inbound 事件起點. 當調用了 pipeline.fireIN_EVT() 後, 那麼就產生了一個 inbound 事件, 此事件會以 head -> customContext -> tail 的方向依次流經 ChannelPipeline 中的各個 handler.
調用了 pipeline.fireChannelRead 後, 就是 ChannelPipeline 中所須要作的工做了, 這些咱們已經在第二章中有過詳細討論, 這裏就展開了.
OP_WRITE 可寫事件代碼以下. 這裏代碼比較簡單, 沒有詳細分析的必要了.
if ((readyOps & SelectionKey.OP_WRITE) != 0) { // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write ch.unsafe().forceFlush(); }
最後一個事件是 OP_CONNECT, 即 TCP 鏈接已創建事件.
if ((readyOps & SelectionKey.OP_CONNECT) != 0) { // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking // See https://github.com/netty/netty/issues/924 int ops = k.interestOps(); ops &= ~SelectionKey.OP_CONNECT; k.interestOps(ops); unsafe.finishConnect(); }
OP_CONNECT 事件的處理中, 只作了兩件事情:
正如代碼中的註釋所言, 咱們須要將 OP_CONNECT 從就緒事件集中清除, 否則會一直有 OP_CONNECT 事件.
調用 unsafe.finishConnect() 通知上層鏈接已創建
unsafe.finishConnect() 調用最後會調用到 pipeline().fireChannelActive(), 產生一個 inbound 事件, 通知 pipeline 中的各個 handler TCP 通道已創建(即 ChannelInboundHandler.channelActive 方法會被調用)
到了這裏, 咱們整個 NioEventLoop 的 IO 操做部分已經瞭解完了, 接下來的一節咱們要重點分析一下 Netty 的任務隊列機制.
咱們已經提到過, 在Netty 中, 一個 NioEventLoop 一般須要肩負起兩種任務, 第一個是做爲 IO 線程, 處理 IO 操做; 第二個就是做爲任務線程, 處理 taskQueue 中的任務. 這一節的重點就是分析一下 NioEventLoop 的任務隊列機制的.
NioEventLoop 繼承於 SingleThreadEventExecutor, 而 SingleThreadEventExecutor
中有一個 Queue<Runnable> taskQueue 字段, 用於存放添加的 Task. 在 Netty 中, 每一個 Task 都使用一個實現了 Runnable 接口的實例來表示.
例如當咱們須要將一個 Runnable 添加到 taskQueue 中時, 咱們能夠進行以下操做:
EventLoop eventLoop = channel.eventLoop(); eventLoop.execute(new Runnable() { @Override public void run() { System.out.println("Hello, Netty!"); } });
當調用 execute 後, 其實是調用到了 SingleThreadEventExecutor.execute() 方法, 它的實現以下:
@Override public void execute(Runnable task) { if (task == null) { throw new NullPointerException("task"); } boolean inEventLoop = inEventLoop(); if (inEventLoop) { addTask(task); } else { startThread(); addTask(task); if (isShutdown() && removeTask(task)) { reject(); } } if (!addTaskWakesUp && wakesUpForTask(task)) { wakeup(inEventLoop); } }
而添加任務的 addTask 方法的源碼以下:
protected void addTask(Runnable task) { if (task == null) { throw new NullPointerException("task"); } if (isShutdown()) { reject(); } taskQueue.add(task); }
所以實際上, taskQueue
是存放着待執行的任務的隊列.
除了經過 execute 添加普通的 Runnable 任務外, 咱們還能夠經過調用 eventLoop.scheduleXXX 之類的方法來添加一個定時任務.
EventLoop 中實現任務隊列的功能在超類 SingleThreadEventExecutor
實現的, 而 schedule 功能的實現是在 SingleThreadEventExecutor
的父類, 即 AbstractScheduledEventExecutor
中實現的.
在 AbstractScheduledEventExecutor
中, 有以 scheduledTaskQueue 字段:
Queue<ScheduledFutureTask<?>> scheduledTaskQueue;
scheduledTaskQueue 是一個隊列(Queue), 其中存放的元素是 ScheduledFutureTask. 而 ScheduledFutureTask 咱們很容易猜到, 它是對 Schedule 任務的一個抽象.
咱們來看一下 AbstractScheduledEventExecutor
所實現的 schedule 方法吧:
@Override public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) { ObjectUtil.checkNotNull(command, "command"); ObjectUtil.checkNotNull(unit, "unit"); if (delay < 0) { throw new IllegalArgumentException( String.format("delay: %d (expected: >= 0)", delay)); } return schedule(new ScheduledFutureTask<Void>( this, command, null, ScheduledFutureTask.deadlineNanos(unit.toNanos(delay)))); }
這是其中一個重載的 schedule, 當一個 Runnable 傳遞進來後, 會被封裝爲一個 ScheduledFutureTask 對象, 這個對象會記錄下這個 Runnable 在什麼時候運行、已何種頻率運行等信息.
當構建了 ScheduledFutureTask 後, 會繼續調用 另外一個重載的 schedule 方法:
<V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) { if (inEventLoop()) { scheduledTaskQueue().add(task); } else { execute(new OneTimeTask() { @Override public void run() { scheduledTaskQueue().add(task); } }); } return task; }
在這個方法中, ScheduledFutureTask 對象就會被添加到 scheduledTaskQueue 中了.
當一個任務被添加到 taskQueue 後, 它是怎麼被 EventLoop 執行的呢?
讓咱們回到 NioEventLoop.run() 方法中, 在這個方法裏, 會分別調用 processSelectedKeys() 和 runAllTasks() 方法, 來進行 IO 事件的處理和 task 的處理. processSelectedKeys() 方法咱們已經分析過了, 下面咱們來看一下 runAllTasks() 中到底有什麼名堂吧.
runAllTasks 方法有兩個重載的方法, 一個是無參數的, 另外一個有一個參數的. 首先來看一下無參數的 runAllTasks:
protected boolean runAllTasks() { fetchFromScheduledTaskQueue(); Runnable task = pollTask(); if (task == null) { return false; } for (;;) { try { task.run(); } catch (Throwable t) { logger.warn("A task raised an exception.", t); } task = pollTask(); if (task == null) { lastExecutionTime = ScheduledFutureTask.nanoTime(); return true; } } }
咱們前面已經提到過, EventLoop 能夠經過調用 EventLoop.execute 來將一個 Runnable 提交到 taskQueue 中, 也能夠經過調用 EventLoop.schedule 來提交一個 schedule 任務到 scheduledTaskQueue 中. 在此方法的一開始調用的 fetchFromScheduledTaskQueue() 其實就是將 scheduledTaskQueue 中已經能夠執行的(即定時時間已到的 schedule 任務) 拿出來並添加到 taskQueue 中, 做爲可執行的 task 等待被調度執行.
它的源碼以下:
private void fetchFromScheduledTaskQueue() { if (hasScheduledTasks()) { long nanoTime = AbstractScheduledEventExecutor.nanoTime(); for (;;) { Runnable scheduledTask = pollScheduledTask(nanoTime); if (scheduledTask == null) { break; } taskQueue.add(scheduledTask); } } }
接下來 runAllTasks() 方法就會不斷調用 task = pollTask() 從 taskQueue 中獲取一個可執行的 task, 而後調用它的 run() 方法來運行此 task.
注意
, 由於 EventLoop 既須要執行 IO 操做, 又須要執行 task, 所以咱們在調用 EventLoop.execute 方法提交任務時, 不要提交耗時任務, 更不能提交一些會形成阻塞的任務, 否則會致使咱們的 IO 線程得不到調度, 影響整個程序的併發量.
本文由 yongshun 發表於我的博客, 採用 署名-相同方式共享 3.0 中國大陸許可協議.
Email: yongshun1228@gmail .com
本文標題爲: Netty 源碼分析之 三 我就是大名鼎鼎的 EventLoop(二)
本文連接爲: http://www.javashuo.com/article/p-rhpytbjk-ev.html