Netty 源碼 NioEventLoop(三)執行流程

Netty 源碼 NioEventLoop(三)執行流程

Netty 系列目錄(http://www.javashuo.com/article/p-hskusway-em.html)html

上文提到在啓動 NioEventLoop 線程時會執行 SingleThreadEventExecutor#doStartThread(),在這個方法中調用 SingleThreadEventExecutor.this.run(),NioEventLoop 重寫了 run() 方法。NioEventLoop#run() 代碼以下:java

@Override
protected void run() {
    for (;;) {
        try {
            // 1. select 策略選擇
            switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                // 1.1 非阻塞的 select 策略。實際上,默認狀況下,不會返回 CONTINUE 的策略
                case SelectStrategy.CONTINUE:
                    continue;
                // 1.2 阻塞的 select 策略
                case SelectStrategy.SELECT:
                    select(wakenUp.getAndSet(false));
                    if (wakenUp.get()) {
                        selector.wakeup();
                    }
                // 1.3 不須要 select,目前已經有能夠執行的任務了
                default:
            }

            // 2. 執行網絡 IO 事件和任務調度
            cancelledKeys = 0;
            needsToSelectAgain = false;
            final int ioRatio = this.ioRatio;
            if (ioRatio == 100) {
                try {
                    // 2.1. 處理網絡 IO 事件
                    processSelectedKeys();
                } finally {
                    // 2.2. 處理系統 Task 和自定義 Task
                    runAllTasks();
                }
            } else {
                // 根據 ioRatio 計算非 IO 最多執行的時間 
                final long ioStartTime = System.nanoTime();
                try {
                    processSelectedKeys();
                } finally {
                    final long ioTime = System.nanoTime() - ioStartTime;
                    runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                }
            }
        } catch (Throwable t) {
            handleLoopException(t);
        }
        // 3. 關閉線程
        try {
            if (isShuttingDown()) {
                closeAll();
                if (confirmShutdown()) {
                    return;
                }
            }
        } catch (Throwable t) {
            handleLoopException(t);
        }
    }
}

NioEventLoop#run() 作了記下事情:git

  1. 根據 selectStrategy 執行不一樣的策略
  2. 執行網絡 IO 事件和任務調度
  3. 關閉線程

1、IO 輪詢策略

當 taskQueue 中沒有任務時,那麼 Netty 能夠阻塞地等待 IO 就緒事件。而當 taskQueue 中有任務時,咱們天然地但願所提交的任務能夠儘快地執行 ,所以 Netty 會調用非阻塞的 selectNow() 方法,以保證 taskQueue 中的任務儘快能夠執行。github

(1) hasTasks網絡

首先,在 run 方法中,第一步是調用 hasTasks() 方法來判斷當前任務隊列中是否有任務併發

protected boolean hasTasks() {
    assert inEventLoop();
    return !taskQueue.isEmpty();
}

這個方法很簡單,僅僅是檢查了一下 taskQueue 是否爲空。至於 taskQueue 是什麼呢,其實它就是存放一系列的須要由此 EventLoop 所執行的任務列表。關於 taskQueue,咱們這裏暫時不表,等到後面再來詳細分析它。ide

(2) DefaultSelectStrategyoop

// NioEventLoop#selectNowSupplier
private final IntSupplier selectNowSupplier = new IntSupplier() {
    @Override
    public int get() throws Exception {
        return selectNow();
    }
};

// 非阻塞的 select 策略。實際上,默認狀況下,不會返回 CONTINUE 的策略
SelectStrategy.SELECT = -1;
// 阻塞的 select 策略
SelectStrategy.CONTINUE = -2;

// DefaultSelectStrategy 
public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception {
    return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT;
}

顯然當 taskQueue 爲空時,執行的是 select(oldWakenUp) 方法。那麼 selectNow() 和 select(oldWakenUp) 之間有什麼區別呢? 來看一下,selectNow() 的源碼以下fetch

(3) selectNow優化

int selectNow() throws IOException {
    try {
        return selector.selectNow();
    } finally {
        // restore wakeup state if needed
        if (wakenUp.get()) {
            selector.wakeup();
        }
    }
}

調用 JDK 底層的 selector.selectNow()。selectNow() 方法會檢查當前是否有就緒的 IO 事件,若是有,則返回就緒 IO 事件的個數;若是沒有,則返回 0。注意,selectNow() 是當即返回的,不會阻塞當前線程。當 selectNow() 調用後,finally 語句塊中會檢查 wakenUp 變量是否爲 true,當爲 true 時,調用 selector.wakeup() 喚醒 select() 的阻塞調用。

(4) select(boolean oldWakenUp)

private void select(boolean oldWakenUp) throws IOException {
    Selector selector = this.selector;
    try {
        int selectedKeys = selector.select(timeoutMillis);
    } catch (CancelledKeyException e) {
    }
}

在這個 select 方法中,調用了 selector.select(timeoutMillis),而這個調用是會阻塞住當前線程的,timeoutMillis是阻塞的超時時間。到來這裏,咱們能夠看到,當 hasTasks() 爲真時,調用的的 selectNow() 方法是不會阻塞當前線程的,而當 hasTasks() 爲假時,調用的 select(oldWakenUp) 是會阻塞當前線程的。

2、IO 事件的處理

在 NioEventLoop.run() 方法中,第一步是經過 select/selectNow 調用查詢當前是否有就緒的 IO 事件,那麼當有 IO 事件就緒時,第二步天然就是處理這些 IO 事件啦。首先讓咱們來看一下 NioEventLoop.run 中循環的剩餘部分:

final int ioRatio = this.ioRatio;
if (ioRatio == 100) {
    try {
        // 2.1. 處理網絡 IO 事件
        processSelectedKeys();
    } finally {
        // 2.2. 處理系統 Task 和自定義 Task
        runAllTasks();
    }
} else {
    // 根據 ioRatio 計算非 IO 最多執行的時間 
    final long ioStartTime = System.nanoTime();
    try {
        processSelectedKeys();
    } finally {
        final long ioTime = System.nanoTime() - ioStartTime;
        runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
    }
}

上面列出的代碼中,有兩個關鍵的調用:

  • 第一個是 processSelectedKeys():處理準備就緒的 IO 事件;
  • 第二個是 runAllTasks():運行 taskQueue 中的任務。

這裏的代碼還有一個十分有意思的地方,即 ioRatio。那什麼是 ioRatio 呢?它表示的是此線程分配給 IO 操做所佔的時間比(即運行 processSelectedKeys 耗時在整個循環中所佔用的時間)。例如 ioRatio 默認是 50,則表示 IO 操做和執行 task 的所佔用的線程執行時間比是 1 : 1。當知道了 IO 操做耗時和它所佔用的時間比,那麼執行 task 的時間就能夠很方便的計算出來了。

咱們這裏先分析一下 processSelectedKeys() 方法調用,runAllTasks() 留到下面再分析。processSelectedKeys() 方法的源碼以下:

private void processSelectedKeys() {
    if (selectedKeys != null) {
        processSelectedKeysOptimized();
    } else {
        processSelectedKeysPlain(selector.selectedKeys());
    }
}

因爲默認未開啓 selectedKeys 優化功能,因此會進入 processSelectedKeysPlain 分支執。下面繼續分析 processSelectedKeysPlain 的代碼實現。

private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) {
    // https://github.com/netty/netty/issues/597
    if (selectedKeys.isEmpty()) {
        return;
    }

    Iterator<SelectionKey> i = selectedKeys.iterator();
    for (;;) {
        final SelectionKey k = i.next();
        final Object a = k.attachment();
        i.remove();

        if (a instanceof AbstractNioChannel) {
            // NioSocketChannel 或 NioServerSocketChannel 進行 IO 讀寫相關的操做
            processSelectedKey(k, (AbstractNioChannel) a);
        } else {
            // 用戶自行註冊的 Task 任務,通常狀況下不會執行
            NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
            processSelectedKey(k, task);
        }

        if (!i.hasNext()) {
            break;
        }
        // 省略...
    }
}

processSelectedKey 方法源碼以下:

private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
    final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
    // 省略...

    try {
        int readyOps = k.readyOps();
        // 1. OP_CONNECT 讀寫前要先處理鏈接,不然可能拋 NotYetConnectedException 異常
        if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
            // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
            // See https://github.com/netty/netty/issues/924
            int ops = k.interestOps();
            ops &= ~SelectionKey.OP_CONNECT;
            k.interestOps(ops);

            unsafe.finishConnect();
        }

        // 2. OP_WRITE
        if ((readyOps & SelectionKey.OP_WRITE) != 0) {
            // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
            ch.unsafe().forceFlush();
        }

        // 3. OP_READ 或 OP_ACCEPT
        if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
            unsafe.read();
        }
    } catch (CancelledKeyException ignored) {
        unsafe.close(unsafe.voidPromise());
    }
}

這個代碼是否是很熟悉啊?徹底是 Java NIO 的 Selector 的那一套處理流程嘛!processSelectedKey 中處理了三個
事件,分別是:

  • OP_READ 可讀事件,即 Channel 中收到了新數據可供上層讀取.
  • OP_WRITE 可寫事件,即上層能夠向 Channel 寫入數據.
  • OP_CONNECT 鏈接創建事件,即 TCP 鏈接已經創建,Channel 處於 active 狀態.

下面咱們分別根據這三個事件來看一下 Netty 是怎麼處理的吧。

2.1 OP_READ

當就緒的 IO 事件是 OP_READ,代碼會調用 unsafe.read() 方法。unsafe 咱們已見過屢次,NioSocketChannel 的 Unsafe 是在 AbstractNioByteChannel 中實現的,而 NioServerSocketChannel 的 Unsafe 是在 NioMessageUnsafe 中實現。

public final void read() {
    final ChannelConfig config = config();
    final ChannelPipeline pipeline = pipeline();
    final ByteBufAllocator allocator = config.getAllocator();
    final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
    allocHandle.reset(config);

    ByteBuf byteBuf = null;
    boolean close = false;
    try {
        do {
            // 1. 分配緩衝區 ByteBuf
            byteBuf = allocHandle.allocate(allocator);
            allocHandle.lastBytesRead(doReadBytes(byteBuf));
            // 2. 從 NioSocketChannel 中讀取數據
            if (allocHandle.lastBytesRead() <= 0) {
                // nothing was read. release the buffer.
                byteBuf.release();
                byteBuf = null;
                close = allocHandle.lastBytesRead() < 0;
                if (close) {
                    readPending = false;
                }
                break;
            }

            allocHandle.incMessagesRead(1);
            readPending = false;
            // 3. 調用 pipeline.fireChannelRead 發送一個 inbound 事件
            pipeline.fireChannelRead(byteBuf);
            byteBuf = null;
        } while (allocHandle.continueReading());

        allocHandle.readComplete();
        pipeline.fireChannelReadComplete();

        if (close) {
            closeOnRead(pipeline);
        }
    } catch (Throwable t) {
        handleReadException(pipeline, byteBuf, t, close, allocHandle);
    } finally {
        // See https://github.com/netty/netty/issues/2254
        if (!readPending && !config.isAutoRead()) {
            removeReadOp();
        }
    }
}

上面 read 方法其實概括起來,能夠認爲作了以下工做:

  1. 分配 ByteBuf
  2. 從 SocketChannel 中讀取數據
  3. 調用 pipeline.fireChannelRead 發送一個 inbound 事件

2.2 OP_WRITE

OP_WRITE 可寫事件代碼以下。這裏代碼比較簡單,沒有詳細分析的必要了。

if ((readyOps & SelectionKey.OP_WRITE) != 0) {
    // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
    ch.unsafe().forceFlush();
}

2.3 OP_CONNECT

最後一個事件是 OP_CONNECT,即 TCP 鏈接已創建事

if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
    // 已鏈接後就須要註銷 OP_CONNECT 事件 See https://github.com/netty/netty/issues/924
    int ops = k.interestOps();
    ops &= ~SelectionKey.OP_CONNECT;
    k.interestOps(ops);

    unsafe.finishConnect();
}

OP_CONNECT 事件的處理中,只作了兩件事情:

  1. 正如代碼中的註釋所言, 咱們須要將 OP_CONNECT 從就緒事件集中清除, 否則會一直有 OP_CONNECT 事件。

  2. 調用 unsafe.finishConnect() 通知上層鏈接已創建
    unsafe.finishConnect() 調用最後會調用到 pipeline().fireChannelActive(),產生一個 inbound 事件,通知 pipeline 中的各個 handler TCP 通道已創建(即 ChannelInboundHandler.channelActive 方法會被調用)

到了這裏,咱們整個 NioEventLoop 的 IO 操做部分已經瞭解完了,接下來的一節咱們要重點分析一下 Netty 的任務
隊列機制。

3、任務調度

咱們已經提到過,在 Netty 中,一個 NioEventLoop 一般須要肩負起兩種任務,第一個是做爲 IO 線程,處理 IO 操做;第二個就是做爲任務線程,處理 taskQueue 中的任務。這一節的重點就是分析一下 NioEventLoop 的任務隊列機制
的。

3.1 普通 Runnable 任務

// SingleThreadEventExecutor
private final Queue<Runnable> taskQueue = new LinkedBlockingQueue<Runnable>(maxPendingTasks);
protected void addTask(Runnable task) {
    if (task == null) {
        throw new NullPointerException("task");
    }
    if (!offerTask(task)) {
        reject(task);
    }
}
final boolean offerTask(Runnable task) {
    if (isShutdown()) {
        reject();
    }
    return taskQueue.offer(task);
}

所以實際上,taskQueue 是存放着待執行的任務的隊列。

3.2 schedule 任務

除了經過 execute 添加普通的 Runnable 任務外,咱們還能夠經過調用 eventLoop.scheduleXXX 之類的方法來添加
一個定時任務。schedule 功能的實現是在 SingleThreadEventExecutor 的父類,即 AbstractScheduledEventExecutor 中實現的。

// SingleThreadEventExecutor
PriorityQueue<ScheduledFutureTask<?>> scheduledTaskQueue;

scheduledTaskQueue 是一個隊列(Queue),其中存放的元素是 ScheduledFutureTask。而ScheduledFutureTask 咱們很容易猜到,它是對 Schedule 任務的一個抽象。咱們來看一下 AbstractScheduledEventExecutor 所實現的 schedule 方法:

<V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) {
    if (inEventLoop()) {
        scheduledTaskQueue().add(task);
    } else {
        execute(new Runnable() {
            @Override
            public void run() {
                scheduledTaskQueue().add(task);
            }
        });
    }

    return task;
}

3.3 執行調度任務

protected boolean runAllTasks() {
    assert inEventLoop();
    boolean fetchedAll;
    boolean ranAtLeastOne = false;

    do {
        fetchedAll = fetchFromScheduledTaskQueue();
        if (runAllTasksFrom(taskQueue)) {
            ranAtLeastOne = true;
        }
    } while (!fetchedAll); // keep on processing until we fetched all scheduled tasks.

    if (ranAtLeastOne) {
        lastExecutionTime = ScheduledFutureTask.nanoTime();
    }
    afterRunningAllTasks();
    return ranAtLeastOne;
}

咱們前面已經提到過,EventLoop 能夠經過調用 EventLoop.execute 來將一個 Runnable 提交到 taskQueue 中,
也能夠經過調用 EventLoop.schedule 來提交一個 schedule 任務到 scheduledTaskQueue 中。在此方法的一開
始調用的 fetchFromScheduledTaskQueue() 其實就是將 scheduledTaskQueue 中已經能夠執行的(即定時時
間已到的 schedule 任務) 拿出來並添加到 taskQueue 中,做爲可執行的 task 等待被調度執行。代碼以下:

private boolean fetchFromScheduledTaskQueue() {
    long nanoTime = AbstractScheduledEventExecutor.nanoTime();
    Runnable scheduledTask  = pollScheduledTask(nanoTime);
    while (scheduledTask != null) {
        if (!taskQueue.offer(scheduledTask)) {
            // No space left in the task queue add it back to the scheduledTaskQueue so we pick it up again.
            scheduledTaskQueue().add((ScheduledFutureTask<?>) scheduledTask);
            return false;
        }
        scheduledTask  = pollScheduledTask(nanoTime);
    }
    return true;
}

接下來 runAllTasks() 方法就會不斷調用 task = pollTask() 從 taskQueue 中獲取一個可執行的 task,而後調用它
的 run() 方法來運行此 task。

注意: 由於 EventLoop 既須要執行 IO 操做,又須要執行 task,所以咱們在調用 EventLoop.execute 方法提交
任務時,不要提交耗時任務,更不能提交一些會形成阻塞的任務,否則會致使咱們的 IO 線程得不到調度,影響整
個程序的併發量。


天天用心記錄一點點。內容也許不重要,但習慣很重要!

相關文章
相關標籤/搜索