Netty 系列目錄(http://www.javashuo.com/article/p-hskusway-em.html)html
上文提到在啓動 NioEventLoop 線程時會執行 SingleThreadEventExecutor#doStartThread(),在這個方法中調用 SingleThreadEventExecutor.this.run(),NioEventLoop 重寫了 run() 方法。NioEventLoop#run() 代碼以下:java
@Override protected void run() { for (;;) { try { // 1. select 策略選擇 switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) { // 1.1 非阻塞的 select 策略。實際上,默認狀況下,不會返回 CONTINUE 的策略 case SelectStrategy.CONTINUE: continue; // 1.2 阻塞的 select 策略 case SelectStrategy.SELECT: select(wakenUp.getAndSet(false)); if (wakenUp.get()) { selector.wakeup(); } // 1.3 不須要 select,目前已經有能夠執行的任務了 default: } // 2. 執行網絡 IO 事件和任務調度 cancelledKeys = 0; needsToSelectAgain = false; final int ioRatio = this.ioRatio; if (ioRatio == 100) { try { // 2.1. 處理網絡 IO 事件 processSelectedKeys(); } finally { // 2.2. 處理系統 Task 和自定義 Task runAllTasks(); } } else { // 根據 ioRatio 計算非 IO 最多執行的時間 final long ioStartTime = System.nanoTime(); try { processSelectedKeys(); } finally { final long ioTime = System.nanoTime() - ioStartTime; runAllTasks(ioTime * (100 - ioRatio) / ioRatio); } } } catch (Throwable t) { handleLoopException(t); } // 3. 關閉線程 try { if (isShuttingDown()) { closeAll(); if (confirmShutdown()) { return; } } } catch (Throwable t) { handleLoopException(t); } } }
NioEventLoop#run() 作了記下事情:git
當 taskQueue 中沒有任務時,那麼 Netty 能夠阻塞地等待 IO 就緒事件。而當 taskQueue 中有任務時,咱們天然地但願所提交的任務能夠儘快地執行 ,所以 Netty 會調用非阻塞的 selectNow() 方法,以保證 taskQueue 中的任務儘快能夠執行。github
(1) hasTasks網絡
首先,在 run 方法中,第一步是調用 hasTasks() 方法來判斷當前任務隊列中是否有任務併發
protected boolean hasTasks() { assert inEventLoop(); return !taskQueue.isEmpty(); }
這個方法很簡單,僅僅是檢查了一下 taskQueue 是否爲空。至於 taskQueue 是什麼呢,其實它就是存放一系列的須要由此 EventLoop 所執行的任務列表。關於 taskQueue,咱們這裏暫時不表,等到後面再來詳細分析它。ide
(2) DefaultSelectStrategyoop
// NioEventLoop#selectNowSupplier private final IntSupplier selectNowSupplier = new IntSupplier() { @Override public int get() throws Exception { return selectNow(); } }; // 非阻塞的 select 策略。實際上,默認狀況下,不會返回 CONTINUE 的策略 SelectStrategy.SELECT = -1; // 阻塞的 select 策略 SelectStrategy.CONTINUE = -2; // DefaultSelectStrategy public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception { return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT; }
顯然當 taskQueue 爲空時,執行的是 select(oldWakenUp) 方法。那麼 selectNow() 和 select(oldWakenUp) 之間有什麼區別呢? 來看一下,selectNow() 的源碼以下fetch
(3) selectNow優化
int selectNow() throws IOException { try { return selector.selectNow(); } finally { // restore wakeup state if needed if (wakenUp.get()) { selector.wakeup(); } } }
調用 JDK 底層的 selector.selectNow()。selectNow() 方法會檢查當前是否有就緒的 IO 事件,若是有,則返回就緒 IO 事件的個數;若是沒有,則返回 0。注意,selectNow() 是當即返回的,不會阻塞當前線程。當 selectNow() 調用後,finally 語句塊中會檢查 wakenUp 變量是否爲 true,當爲 true 時,調用 selector.wakeup() 喚醒 select() 的阻塞調用。
(4) select(boolean oldWakenUp)
private void select(boolean oldWakenUp) throws IOException { Selector selector = this.selector; try { int selectedKeys = selector.select(timeoutMillis); } catch (CancelledKeyException e) { } }
在這個 select 方法中,調用了 selector.select(timeoutMillis),而這個調用是會阻塞住當前線程的,timeoutMillis是阻塞的超時時間。到來這裏,咱們能夠看到,當 hasTasks() 爲真時,調用的的 selectNow() 方法是不會阻塞當前線程的,而當 hasTasks() 爲假時,調用的 select(oldWakenUp) 是會阻塞當前線程的。
在 NioEventLoop.run() 方法中,第一步是經過 select/selectNow 調用查詢當前是否有就緒的 IO 事件,那麼當有 IO 事件就緒時,第二步天然就是處理這些 IO 事件啦。首先讓咱們來看一下 NioEventLoop.run 中循環的剩餘部分:
final int ioRatio = this.ioRatio; if (ioRatio == 100) { try { // 2.1. 處理網絡 IO 事件 processSelectedKeys(); } finally { // 2.2. 處理系統 Task 和自定義 Task runAllTasks(); } } else { // 根據 ioRatio 計算非 IO 最多執行的時間 final long ioStartTime = System.nanoTime(); try { processSelectedKeys(); } finally { final long ioTime = System.nanoTime() - ioStartTime; runAllTasks(ioTime * (100 - ioRatio) / ioRatio); } }
上面列出的代碼中,有兩個關鍵的調用:
這裏的代碼還有一個十分有意思的地方,即 ioRatio。那什麼是 ioRatio 呢?它表示的是此線程分配給 IO 操做所佔的時間比(即運行 processSelectedKeys 耗時在整個循環中所佔用的時間)。例如 ioRatio 默認是 50,則表示 IO 操做和執行 task 的所佔用的線程執行時間比是 1 : 1。當知道了 IO 操做耗時和它所佔用的時間比,那麼執行 task 的時間就能夠很方便的計算出來了。
咱們這裏先分析一下 processSelectedKeys() 方法調用,runAllTasks() 留到下面再分析。processSelectedKeys() 方法的源碼以下:
private void processSelectedKeys() { if (selectedKeys != null) { processSelectedKeysOptimized(); } else { processSelectedKeysPlain(selector.selectedKeys()); } }
因爲默認未開啓 selectedKeys 優化功能,因此會進入 processSelectedKeysPlain 分支執。下面繼續分析 processSelectedKeysPlain 的代碼實現。
private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) { // https://github.com/netty/netty/issues/597 if (selectedKeys.isEmpty()) { return; } Iterator<SelectionKey> i = selectedKeys.iterator(); for (;;) { final SelectionKey k = i.next(); final Object a = k.attachment(); i.remove(); if (a instanceof AbstractNioChannel) { // NioSocketChannel 或 NioServerSocketChannel 進行 IO 讀寫相關的操做 processSelectedKey(k, (AbstractNioChannel) a); } else { // 用戶自行註冊的 Task 任務,通常狀況下不會執行 NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a; processSelectedKey(k, task); } if (!i.hasNext()) { break; } // 省略... } }
processSelectedKey 方法源碼以下:
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) { final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe(); // 省略... try { int readyOps = k.readyOps(); // 1. OP_CONNECT 讀寫前要先處理鏈接,不然可能拋 NotYetConnectedException 異常 if ((readyOps & SelectionKey.OP_CONNECT) != 0) { // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking // See https://github.com/netty/netty/issues/924 int ops = k.interestOps(); ops &= ~SelectionKey.OP_CONNECT; k.interestOps(ops); unsafe.finishConnect(); } // 2. OP_WRITE if ((readyOps & SelectionKey.OP_WRITE) != 0) { // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write ch.unsafe().forceFlush(); } // 3. OP_READ 或 OP_ACCEPT if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { unsafe.read(); } } catch (CancelledKeyException ignored) { unsafe.close(unsafe.voidPromise()); } }
這個代碼是否是很熟悉啊?徹底是 Java NIO 的 Selector 的那一套處理流程嘛!processSelectedKey 中處理了三個
事件,分別是:
OP_READ
可讀事件,即 Channel 中收到了新數據可供上層讀取.OP_WRITE
可寫事件,即上層能夠向 Channel 寫入數據.OP_CONNECT
鏈接創建事件,即 TCP 鏈接已經創建,Channel 處於 active 狀態.下面咱們分別根據這三個事件來看一下 Netty 是怎麼處理的吧。
當就緒的 IO 事件是 OP_READ,代碼會調用 unsafe.read() 方法。unsafe 咱們已見過屢次,NioSocketChannel 的 Unsafe 是在 AbstractNioByteChannel 中實現的,而 NioServerSocketChannel 的 Unsafe 是在 NioMessageUnsafe 中實現。
public final void read() { final ChannelConfig config = config(); final ChannelPipeline pipeline = pipeline(); final ByteBufAllocator allocator = config.getAllocator(); final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle(); allocHandle.reset(config); ByteBuf byteBuf = null; boolean close = false; try { do { // 1. 分配緩衝區 ByteBuf byteBuf = allocHandle.allocate(allocator); allocHandle.lastBytesRead(doReadBytes(byteBuf)); // 2. 從 NioSocketChannel 中讀取數據 if (allocHandle.lastBytesRead() <= 0) { // nothing was read. release the buffer. byteBuf.release(); byteBuf = null; close = allocHandle.lastBytesRead() < 0; if (close) { readPending = false; } break; } allocHandle.incMessagesRead(1); readPending = false; // 3. 調用 pipeline.fireChannelRead 發送一個 inbound 事件 pipeline.fireChannelRead(byteBuf); byteBuf = null; } while (allocHandle.continueReading()); allocHandle.readComplete(); pipeline.fireChannelReadComplete(); if (close) { closeOnRead(pipeline); } } catch (Throwable t) { handleReadException(pipeline, byteBuf, t, close, allocHandle); } finally { // See https://github.com/netty/netty/issues/2254 if (!readPending && !config.isAutoRead()) { removeReadOp(); } } }
上面 read 方法其實概括起來,能夠認爲作了以下工做:
OP_WRITE 可寫事件代碼以下。這裏代碼比較簡單,沒有詳細分析的必要了。
if ((readyOps & SelectionKey.OP_WRITE) != 0) { // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write ch.unsafe().forceFlush(); }
最後一個事件是 OP_CONNECT,即 TCP 鏈接已創建事
if ((readyOps & SelectionKey.OP_CONNECT) != 0) { // 已鏈接後就須要註銷 OP_CONNECT 事件 See https://github.com/netty/netty/issues/924 int ops = k.interestOps(); ops &= ~SelectionKey.OP_CONNECT; k.interestOps(ops); unsafe.finishConnect(); }
OP_CONNECT 事件的處理中,只作了兩件事情:
正如代碼中的註釋所言, 咱們須要將 OP_CONNECT 從就緒事件集中清除, 否則會一直有 OP_CONNECT 事件。
調用 unsafe.finishConnect() 通知上層鏈接已創建
unsafe.finishConnect() 調用最後會調用到 pipeline().fireChannelActive(),產生一個 inbound 事件,通知 pipeline 中的各個 handler TCP 通道已創建(即 ChannelInboundHandler.channelActive 方法會被調用)
到了這裏,咱們整個 NioEventLoop 的 IO 操做部分已經瞭解完了,接下來的一節咱們要重點分析一下 Netty 的任務
隊列機制。
咱們已經提到過,在 Netty 中,一個 NioEventLoop 一般須要肩負起兩種任務,第一個是做爲 IO 線程,處理 IO 操做;第二個就是做爲任務線程,處理 taskQueue 中的任務。這一節的重點就是分析一下 NioEventLoop 的任務隊列機制
的。
// SingleThreadEventExecutor private final Queue<Runnable> taskQueue = new LinkedBlockingQueue<Runnable>(maxPendingTasks); protected void addTask(Runnable task) { if (task == null) { throw new NullPointerException("task"); } if (!offerTask(task)) { reject(task); } } final boolean offerTask(Runnable task) { if (isShutdown()) { reject(); } return taskQueue.offer(task); }
所以實際上,taskQueue 是存放着待執行的任務的隊列。
除了經過 execute 添加普通的 Runnable 任務外,咱們還能夠經過調用 eventLoop.scheduleXXX 之類的方法來添加
一個定時任務。schedule 功能的實現是在 SingleThreadEventExecutor 的父類,即 AbstractScheduledEventExecutor 中實現的。
// SingleThreadEventExecutor PriorityQueue<ScheduledFutureTask<?>> scheduledTaskQueue;
scheduledTaskQueue 是一個隊列(Queue),其中存放的元素是 ScheduledFutureTask。而ScheduledFutureTask 咱們很容易猜到,它是對 Schedule 任務的一個抽象。咱們來看一下 AbstractScheduledEventExecutor 所實現的 schedule 方法:
<V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) { if (inEventLoop()) { scheduledTaskQueue().add(task); } else { execute(new Runnable() { @Override public void run() { scheduledTaskQueue().add(task); } }); } return task; }
protected boolean runAllTasks() { assert inEventLoop(); boolean fetchedAll; boolean ranAtLeastOne = false; do { fetchedAll = fetchFromScheduledTaskQueue(); if (runAllTasksFrom(taskQueue)) { ranAtLeastOne = true; } } while (!fetchedAll); // keep on processing until we fetched all scheduled tasks. if (ranAtLeastOne) { lastExecutionTime = ScheduledFutureTask.nanoTime(); } afterRunningAllTasks(); return ranAtLeastOne; }
咱們前面已經提到過,EventLoop 能夠經過調用 EventLoop.execute 來將一個 Runnable 提交到 taskQueue 中,
也能夠經過調用 EventLoop.schedule 來提交一個 schedule 任務到 scheduledTaskQueue 中。在此方法的一開
始調用的 fetchFromScheduledTaskQueue() 其實就是將 scheduledTaskQueue 中已經能夠執行的(即定時時
間已到的 schedule 任務) 拿出來並添加到 taskQueue 中,做爲可執行的 task 等待被調度執行。代碼以下:
private boolean fetchFromScheduledTaskQueue() { long nanoTime = AbstractScheduledEventExecutor.nanoTime(); Runnable scheduledTask = pollScheduledTask(nanoTime); while (scheduledTask != null) { if (!taskQueue.offer(scheduledTask)) { // No space left in the task queue add it back to the scheduledTaskQueue so we pick it up again. scheduledTaskQueue().add((ScheduledFutureTask<?>) scheduledTask); return false; } scheduledTask = pollScheduledTask(nanoTime); } return true; }
接下來 runAllTasks() 方法就會不斷調用 task = pollTask() 從 taskQueue 中獲取一個可執行的 task,而後調用它
的 run() 方法來運行此 task。
注意: 由於 EventLoop 既須要執行 IO 操做,又須要執行 task,所以咱們在調用 EventLoop.execute 方法提交
任務時,不要提交耗時任務,更不能提交一些會形成阻塞的任務,否則會致使咱們的 IO 線程得不到調度,影響整
個程序的併發量。
天天用心記錄一點點。內容也許不重要,但習慣很重要!