netty中的EventLoop和EventLoopGroup

Netty框架的主要線程就是I/O線程,線程模型設計的好壞,決定了系統的吞吐量、併發性和安全性等架構質量屬性。html

1、Netty的線程模型

  在討論Netty線程模型時候,通常首先會想到的是經典的Reactor線程模型,儘管不一樣的NIO框架對應Reactor模式的實現存在差別,但本質上仍是遵循了Reactor的基礎線程模型。java

Reactor的3中線程模型:git

  • Reactor單線程模型
  • Reactor多線程模型
  • 主從Reactor多線程模型

見《Netty中的三種Reactor(反應堆)github

NioEventLoopGroup 與 Reactor 線程模型的對應關係      見《Netty中的三種Reactor(反應堆)

Netty的線程模型並非一成不變的,實際經過用戶的啓動配置參數來配置。算法

 在建立ServerBootstrap類實例前,先建立兩個EventLoopGroup,它們其實是兩個獨立的Reactor線程池,bossGroup負責接收客戶端的鏈接,workerGroup負責處理IO相關的讀寫操做,或者執行系統task、定時task等。數據庫

用於接收客戶端請求的線程池職責以下:編程

  1. 接收客戶端TCP鏈接,初始化Channel參數;
  2. 將鏈路狀態變動事件通知給ChannelPipeline;

處理IO操做的線程池職責以下:後端

  1. 異步讀取遠端數據,發送讀事件到ChannelPipeline;
  2. 異步發送數據到遠端,調用ChannelPipeline的發送消息接口;
  3. 執行系統調用Task;
  4. 執行定時任務Task,如空閒鏈路檢測等;

經過調整兩個EventLoopGroup的線程數、是否共享線程池等方式,Netty的Reactor線程模型能夠在單線程、多線程和主從多線程間切換,用戶能夠根據實際狀況靈活配置。   數組

爲了提升性能,Netty在不少地方採用了無鎖化設計。例如在IO線程的內部進行串行操做,避免多線程競爭致使的性能降低。儘管串行化設計看上去CPU利用率不高,併發程度不夠,可是經過調整NIO線程池的線程參數,能夠同時啓動多個串行化的線程並行運行,這種局部無鎖化的設計相比一個隊列——多個工做線程的模型性能更優。promise

它的設計原理以下

Netty的NioEventLoop讀取到消息以後,調用ChannelPipeline的fireChannelRead方法,只要用戶不主動切換線程,就一直由NioEventLoop調用用戶的Handler,期間不進行線程切換。這種串行化的處理方式避免了多線程操做致使的鎖競爭,從性能角度看是最優的。

 Netty多線程編程的最佳實踐以下: 

  1. 服務端建立兩個EventLoopGroup,用於邏輯隔離NIO acceptor和NIO IO線程;
  2. 儘可能避免在用戶Handler裏面啓動用戶線程(解碼後將POJO消息發送到後端業務線程除外);
  3. 解碼要在NIO線程調用的解碼Handler中進行,不要切換到用戶線程中完成消息的解碼;
  4. 若是業務邏輯比較簡單,沒有複雜的業務邏輯計算,沒有可能阻塞線程的操做如磁盤操做、數據庫操做、網絡操做等,能夠直接在NIO線程中進行業務邏輯操做,不用切換到用戶線程;
  5. 若是業務邏輯比較複雜,不要在NIO線程上操做,應將解碼後的POJO封裝成Task提交到業務線程池中執行,以保證NIO線程被儘快釋放,處理其餘IO操做;

2、NioEventLoopGroup

NioEventLoopGroup 類層次結構

  • 從類結構可知,NioEventLoopGroup是一個Schedule類型的線程池,線程池中的線程用數組存放,

    EventLoopGroup(實際上是MultithreadEventExecutorGroup) 內部維護一個類型爲 EventExecutor children 數組, 其大小是 nThreads, 這樣就構成了一個線程池,線程池大小經過

    在實例化 NioEventLoopGroup 時, 若是指定線程池大小, 則 nThreads 就是指定的值, 反之是處理器核心數 * 2;
  • MultithreadEventExecutorGroup 中會調用 newChild 抽象方法(抽象方法 newChild 是在 NioEventLoopGroup 中實現的, 它返回一個 NioEventLoop 實例)來初始化 children 數組,也是在NioEventLoopGroup中調用NioEventLoop的構造函數來建立NioEventLoop

    @Override
    protected EventLoop newChild(Executor executor, Object... args) throws Exception {
        return new NioEventLoop(this, executor, (SelectorProvider) args[0]);
    }
  • NioEventLoop相關後面再講;

 

3、NioEventLoop

  NioEventLoop 繼承於 SingleThreadEventLoop, 而 SingleThreadEventLoop 又繼承於 SingleThreadEventExecutor. SingleThreadEventExecutor 是 Netty 中對本地線程的抽象, 它內部有一個 Thread thread 屬性, 存儲了一個本地 Java 線程. 所以咱們能夠認爲, 一個 NioEventLoop 其實和一個特定的線程綁定, 而且在其生命週期內, 綁定的線程都不會再改變

3.一、NioEventLoop 類層次結構

NioEventLoop 的類層次結構圖仍是比較複雜的, 不過咱們只須要關注幾個重要的點便可. 首先 NioEventLoop 的繼承鏈以下:

NioEventLoop -> SingleThreadEventLoop -> SingleThreadEventExecutor -> AbstractScheduledEventExecutor

3.二、NioEventLoop實例化過程

從上圖能夠看到, SingleThreadEventExecutor 有一個名爲 thread 的 Thread 類型字段, 這個字段就表明了與 SingleThreadEventExecutor 關聯的本地線程。在 SingleThreadEventExecutor 構造器中, 經過 threadFactory.newThread 建立了一個新的 Java 線程. 在這個線程中所作的事情主要就是調用 SingleThreadEventExecutor.this.run() 方法, 而由於 NioEventLoop 實現了這個方法, 所以根據多態性, 其實調用的是 NioEventLoop.run() 方法。

protected SingleThreadEventExecutor(
        EventExecutorGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) {
    this.parent = parent;
    this.addTaskWakesUp = addTaskWakesUp;

    thread = threadFactory.newThread(new Runnable() {
        @Override
        public void run() {
            boolean success = false;
            updateLastExecutionTime();
            try {
                SingleThreadEventExecutor.this.run();
                success = true;
            } catch (Throwable t) {
                logger.warn("Unexpected exception from an event executor: ", t);
            } finally {
                // 省略清理代碼
                ...
            }
        }
    });
    threadProperties = new DefaultThreadProperties(thread);
    taskQueue = newTaskQueue();
}

3.三、NioEventLoop的設計兩大功能:

一、是做爲 IO 線程, 執行與 Channel 相關的 IO 操做, 包括 調用 select 等待就緒的 IO 事件、讀寫數據與數據的處理等;

二、第二個任務是做爲任務隊列執行任務, 任務能夠分爲2類:

  2.一、普通task:經過調用NioEventLoop的execute(Runnable task)方法往任務隊列裏增長任務,Netty有不少系統task,建立他們的主要緣由是:當io線程和用戶線程同時操做網絡資源的時候,爲了防止併發操做致使的鎖競爭,將用戶線程的操做封裝成Task放入消息隊列中,由i/o線程負責執行,這樣就實現了局部無鎖化
  2.二、定時任務:執行schedule()方法

依次看這些功能的源碼吧:

對比2.1:上面2.1中的普通任務,NioEventLoop經過隊列來保存任務。在 SingleThreadEventLoop 中, 又實現了任務隊列的功能, 經過它, 咱們能夠調用一個 NioEventLoop 實例的 execute 方法來向任務隊列中添加一個 task, 並由 NioEventLoop 進行調度執行。

    protected Queue<Runnable> newTaskQueue() {
        return new LinkedBlockingQueue<Runnable>();
    }

添加任務的SingleThreadEventExecutor中的execute()方法:

    @Override
    public void execute(Runnable task) {
        if (task == null) {
            throw new NullPointerException("task");
        }

        boolean inEventLoop = inEventLoop();
        if (inEventLoop) {
            addTask(task);
        } else {
            startThread();
            addTask(task);
            if (isShutdown() && removeTask(task)) {
                reject();
            }
        }

        if (!addTaskWakesUp) {
            wakeup(inEventLoop);
        }
    }

上面2.1中的普通task的執行:

    protected boolean runAllTasks() {
        fetchFromScheduledTaskQueue();
        Runnable task = pollTask();
        if (task == null) {
            return false;
        }

        for (;;) {
            try {
                task.run();
            } catch (Throwable t) {
                logger.warn("A task raised an exception.", t);
            }

            task = pollTask();
            if (task == null) {
                lastExecutionTime = ScheduledFutureTask.nanoTime();
                return true;
            }
        }
    }

對比2.2:schedule(定時) 任務處理

除了經過 execute 添加普通的 Runnable 任務外, 咱們還能夠經過調用 eventLoop.scheduleXXX 之類的方法來添加一個定時任務。
EventLoop 中實現任務隊列的功能在超類 SingleThreadEventExecutor 實現的, 而 schedule 功能的實現是在 SingleThreadEventExecutor 的父類, 即 AbstractScheduledEventExecutor 中實現的。
在 AbstractScheduledEventExecutor 中, 有以 scheduledTaskQueue 字段:

Queue<ScheduledFutureTask<?>> scheduledTaskQueue;

scheduledTaskQueue 是一個隊列(Queue), 其中存放的元素是 ScheduledFutureTask. 而 ScheduledFutureTask 咱們很容易猜到, 它是對 Schedule 任務的一個抽象。咱們來看一下 AbstractScheduledEventExecutor 所實現的 schedule 方法吧:

@Override
public  ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
    ObjectUtil.checkNotNull(command, "command");
    ObjectUtil.checkNotNull(unit, "unit");
    if (delay < 0) {
        throw new IllegalArgumentException(
                String.format("delay: %d (expected: >= 0)", delay));
    }
    return schedule(new ScheduledFutureTask<Void>(
            this, command, null, ScheduledFutureTask.deadlineNanos(unit.toNanos(delay))));
}

這是其中一個重載的 schedule, 當一個 Runnable 傳遞進來後, 會被封裝爲一個 ScheduledFutureTask 對象, 這個對象會記錄下這個 Runnable 在什麼時候運行、已何種頻率運行等信息。
當構建了 ScheduledFutureTask 後, 會繼續調用 另外一個重載的 schedule 方法:

<V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) {
    if (inEventLoop()) {
        scheduledTaskQueue().add(task);
    } else {
        execute(new OneTimeTask() {
            @Override
            public void run() {
                scheduledTaskQueue().add(task);
            }
        });
    }

    return task;
}

在這個方法中, ScheduledFutureTask 對象就會被添加到 scheduledTaskQueue 中了。

任務的執行

當一個任務被添加到 taskQueue 後, 它是怎麼被 EventLoop 執行的呢?
讓咱們回到 NioEventLoop.run() 方法中, 在這個方法裏, 會分別調用 processSelectedKeys() 和 runAllTasks() 方法, 來進行 IO 事件的處理和 task 的處理. processSelectedKeys() 方法咱們已經分析過了, 下面咱們來看一下 runAllTasks() 中到底有什麼名堂吧。
runAllTasks 方法有兩個重載的方法, 一個是無參數的, 另外一個有一個參數的. 首先來看一下無參數的 runAllTasks:

protected boolean runAllTasks() { fetchFromScheduledTaskQueue(); Runnable task = pollTask(); if (task == null) { return false; } for (;;) { try { task.run(); } catch (Throwable t) { logger.warn("A task raised an exception.", t); } task = pollTask(); if (task == null) { lastExecutionTime = ScheduledFutureTask.nanoTime(); return true; } } }

 

咱們前面已經提到過, EventLoop 能夠經過調用 EventLoop.execute 來將一個 Runnable 提交到 taskQueue 中, 也能夠經過調用 EventLoop.schedule 來提交一個 schedule 任務到 scheduledTaskQueue 中。在此方法的一開始調用的 fetchFromScheduledTaskQueue() 其實就是將 scheduledTaskQueue 中已經能夠執行的(即定時時間已到的 schedule 任務) 拿出來並添加到 taskQueue 中, 做爲可執行的 task 等待被調度執行。
它的源碼以下:

private void fetchFromScheduledTaskQueue() { if (hasScheduledTasks()) { long nanoTime = AbstractScheduledEventExecutor.nanoTime(); for (;;) { Runnable scheduledTask = pollScheduledTask(nanoTime); if (scheduledTask == null) { break; } taskQueue.add(scheduledTask); } } }

接下來 runAllTasks() 方法就會不斷調用 task = pollTask() 從 taskQueue 中獲取一個可執行的 task, 而後調用它的 run() 方法來運行此 task。

注意, 由於 EventLoop 既須要執行 IO 操做, 又須要執行 task, 所以咱們在調用 EventLoop.execute 方法提交任務時, 不要提交耗時任務, 更不能提交一些會形成阻塞的任務, 否則會致使咱們的 IO 線程得不到調度, 影響整個程序的併發量.

=======================

 

3.四、Netty中的IO處理循環 (上面1中的做爲io線程,用於IO操做的源碼解析以下:

3.4.一、回顧下Nio的Selector 的基本使用流程見《Java NIO系列教程(六) 多路複用器Selector

  1. 經過 Selector.open() 打開一個 Selector.

  2. 將 Channel 註冊到 Selector 中, 並設置須要監聽的事件(interest set)

  3. 不斷重複:

    • 調用 select() 方法

    • 調用 selector.selectedKeys() 獲取 selected keys

    • 迭代每一個 selected key:

      • 1) 從 selected key 中獲取 對應的 Channel 和附加信息(若是有的話)

      • 2) 判斷是哪些 IO 事件已經就緒了, 而後處理它們. 若是是 OP_ACCEPT 事件, 則調用 "SocketChannel clientChannel = ((ServerSocketChannel) key.channel()).accept()" 獲取 SocketChannel, 並將它設置爲 非阻塞的, 而後將這個 Channel 註冊到 Selector 中.

      • 3) 根據須要更改 selected key 的監聽事件.

      • 4) 將已經處理過的 key 從 selected keys 集合中刪除.

3.4.二、依次看NIO中的關鍵幾步分別在Netty中如何實現的:

對比1:經過 Selector.open() 打開一個 Selector 《?》這一小節中已經提到了, Netty 中是經過調用 SelectorProvider.openSocketChannel() 來打開一個新的 Java NIO SocketChannel:
private static SocketChannel newSocket(SelectorProvider provider) {
    ...
    return provider.openSocketChannel();
}

對比2:將 Channel 註冊到 Selector 中, 並設置須要監聽的事件(interest set) 的操做咱們在《?》, 咱們在來回顧一下, 在客戶端的 Channel 註冊過程當中, 會有以下調用鏈:

Bootstrap.initAndRegister -> 
    AbstractBootstrap.initAndRegister -> 
        MultithreadEventLoopGroup.register -> 
            SingleThreadEventLoop.register -> 
                AbstractUnsafe.register ->
                    AbstractUnsafe.register0 ->
                        AbstractNioChannel.doRegister

在 AbstractUnsafe.register 方法中調用了 register0 方法:

@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
    // 省略條件判斷和錯誤處理
    AbstractChannel.this.eventLoop = eventLoop;
    register0(promise);
}

register0 方法代碼以下:

private void register0(ChannelPromise promise) {
    boolean firstRegistration = neverRegistered;
    doRegister();
    neverRegistered = false;
    registered = true;
    safeSetSuccess(promise);
    pipeline.fireChannelRegistered();
    // Only fire a channelActive if the channel has never been registered. This prevents firing
    // multiple channel actives if the channel is deregistered and re-registered.
    if (firstRegistration && isActive()) {
        pipeline.fireChannelActive();
    }
}

register0 又調用了 AbstractNioChannel.doRegister:

@Override
protected void doRegister() throws Exception {
    // 省略錯誤處理
    selectionKey = javaChannel().register(eventLoop().selector, 0, this);
}

在這裏 javaChannel() 返回的是一個 Java NIO SocketChannel 對象, 咱們將此 SocketChannel 註冊到前面第一步獲取的 Selector 中。

對比3:thread 的 run 循環

當 EventLoop.execute 第一次被調用時, 就會觸發 startThread() 的調用(3.二、NioEventLoop實例化過程), 進而致使了 EventLoop 所對應的 Java 線程的啓動。

run()中的selector相關的用於IO操做的,網絡的讀取操做是在run方法中去執行的,首先看有沒有未執行的任務,有的話直接執行,不然就去輪訓看是否有就緒的Channel,以下:

@Override
    protected void run() {
//死循環,NioEventLoop的事件循環就在這裏
for (;;) { oldWakenUp = wakenUp.getAndSet(false); try { if (hasTasks()) {//父類SingleThreadEventExecutor中定義的taskQueue是否有任務,若是有馬上執行(父類SingleThreadEventExecutor中還定義了delayedTaskQueue) selectNow(); } else {//若是沒有select則進行輪詢 select(oldWakenUp); if (wakenUp.get()) { selector.wakeup(); } } //當輪訓到有就緒的Channel時,就進行網絡的讀寫操做 cancelledKeys = 0; final long ioStartTime = System.nanoTime(); needsToSelectAgain = false; if (selectedKeys != null) { processSelectedKeysOptimized(selectedKeys.flip()); } else { processSelectedKeysPlain(selector.selectedKeys()); } final long ioTime = System.nanoTime() - ioStartTime; final int ioRatio = this.ioRatio; runAllTasks(ioTime * (100 - ioRatio) / ioRatio); if (isShuttingDown()) { closeAll(); if (confirmShutdown()) { break; } } } catch (Throwable t) { logger.warn("Unexpected exception in the selector loop.", t); // Prevent possible consecutive immediate failures that lead to // excessive CPU consumption. try { Thread.sleep(1000); } catch (InterruptedException e) { // Ignore. } } } }

對比3-IO 事件的輪詢

分支1:selectNow() 和 select(oldWakenUp) 之間有什麼區別:

void selectNow() throws IOException {
    try {
        selector.selectNow();
    } finally {
        // restore wakup state if needed
        if (wakenUp.get()) {
            selector.wakeup();
        }
    }
}

selector 字段正是 Java NIO 中的多路複用器 Selector. 那麼這裏 selector.selectNow() 就很好理解了, selectNow() 方法會檢查當前是否有就緒的 IO 事件, 若是有, 則返回就緒 IO 事件的個數; 若是沒有, 則返回0. 注意, selectNow() 是當即返回的, 不會阻塞當前線程. 當 selectNow() 調用後, finally 語句塊中會檢查 wakenUp 變量是否爲 true, 當爲 true 時, 調用 selector.wakeup() 喚醒 select() 的阻塞調用。

再來看一下 else 分支的 select(oldWakenUp) 方法:
select(oldWakenUp)中輪詢時,可能爲空,也沒有wakeup操做或是最新的消息處理,則說明本次輪訓是一個空輪訓,此時會觸發jdk的epoll bug,它會致使Selector進行空輪訓,使i/o線程處於100%。爲了不這個bug。須要對selector進行統計:

1)對selector操做週期進行統計
2)每完成一次輪訓進行一次計數
3)當在某個週期內超過必定次數說明觸發了bug,此時須要進行從新創建Selector,並賦值新值,將原來的進行關閉。
調用rebuildSelector方法。
    private void select(boolean oldWakenUp) throws IOException {
        Selector selector = this.selector;
        try {
            int selectCnt = 0;
            long currentTimeNanos = System.nanoTime();
            long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);    //當前時間延時1分鐘
            for (;;) {
                long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
                if (timeoutMillis <= 0) {
                    if (selectCnt == 0) {
                        selector.selectNow();
                        selectCnt = 1;
                    }
                    break;
                }

                int selectedKeys = selector.select(timeoutMillis);
                selectCnt ++;
                
                //當輪詢到新的事件或者wakenUp或者有任務時跳出輪詢的循環
                if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks()) {
                    // Selected something,
                    // waken up by user, or
                    // the task queue has a pending task.
                    break;
                }
                
                //不然,可能已經觸發jdk的epoll bug,經過下面各類策略退出
                if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
                        selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
                    // The selector returned prematurely many times in a row.
                    // Rebuild the selector to work around the problem.
                    logger.warn(
                            "Selector.select() returned prematurely {} times in a row; rebuilding selector.",
                            selectCnt);

                    rebuildSelector();
                    selector = this.selector;

                    // Select again to populate selectedKeys.
                    selector.selectNow();
                    selectCnt = 1;
                    break;
                }

                currentTimeNanos = System.nanoTime();
            }

            if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {
                if (logger.isDebugEnabled()) {
                    logger.debug("Selector.select() returned prematurely {} times in a row.", selectCnt - 1);
                }
            }
        } catch (CancelledKeyException e) {
            if (logger.isDebugEnabled()) {
                logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector - JDK bug?", e);
            }
            // Harmless exception - log anyway
        }
    }

對比3--IO 事件的處理

在 NioEventLoop.run() 方法中, 第一步是經過 select/selectNow 調用查詢當前是否有就緒的 IO 事件. 那麼當有 IO 事件就緒時, 第二步天然就是處理這些 IO 事件啦.
首先讓咱們來看一下 NioEventLoop.run 中循環的剩餘部分:

//run()方法的中間一段
//...
                final int ioRatio = this.ioRatio;
                if (ioRatio == 100) {
                    processSelectedKeys();
                    runAllTasks();
                } else {
                    final long ioStartTime = System.nanoTime();

                    processSelectedKeys();

                    final long ioTime = System.nanoTime() - ioStartTime;
                    runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                }
//...

上面列出的代碼中, 有兩個關鍵的調用, 第一個是 processSelectedKeys() 調用, 根據字面意思, 咱們能夠猜出這個方法確定是查詢就緒的 IO 事件, 而後處理它; 第二個調用是 runAllTasks(), 這個方法咱們也能夠一眼就看出來它的功能就是運行 taskQueue 中的任務。
這裏的代碼還有一個十分有意思的地方, 即 ioRatio. 那什麼是 ioRatio呢? 它表示的是此線程分配給 IO 操做所佔的時間比(即運行 processSelectedKeys 耗時在整個循環中所佔用的時間). 例如 ioRatio 默認是 50, 則表示 IO 操做和執行 task 的所佔用的線程執行時間比是 1 : 1. 當知道了 IO 操做耗時和它所佔用的時間比, 那麼執行 task 的時間就能夠很方便的計算出來了。

此時會去調用processSelectedKeysPlain方法,默認沒有開啓SelectedKey的優化方法。這裏執行的方法以下:
設 IO 操做耗時爲 ioTime, ioTime 佔的時間比例爲 ioRatio, 則:
    ioTime / ioRatio = taskTime / taskRatio
    taskRatio = 100 - ioRatio
    => taskTime = ioTime * (100 - ioRatio) / ioRatio

根據上面的公式, 當咱們設置 ioRate = 70 時, 則表示 IO 運行耗時佔比爲70%, 即假設某次循環一共耗時爲 100ms, 那麼根據公式, 咱們知道 processSelectedKeys() 方法調用所耗時大概爲70ms(即 IO 耗時), 而 runAllTasks() 耗時大概爲 30ms(即執行 task 耗時).
當 ioRatio 爲 100 時, Netty 就不考慮 IO 耗時的佔比, 而是分別調用 processSelectedKeys()runAllTasks(); 而當 ioRatio 不爲 100時, 則執行到 else 分支, 在這個分支中, 首先記錄下 processSelectedKeys() 所執行的時間(即 IO 操做的耗時), 而後根據公式, 計算出執行 task 所佔用的時間, 而後以此爲參數, 調用 runAllTasks().

咱們這裏先分析一下 processSelectedKeys() 方法調用, runAllTasks() 咱們留到下一節再分析。


processSelectedKeys() 方法的源碼以下:

    private void processSelectedKeys() {
        if (selectedKeys != null) {
            processSelectedKeysOptimized(selectedKeys.flip());
        } else {
            processSelectedKeysPlain(selector.selectedKeys());
        }
    }

這個方法中, 會根據 selectedKeys 字段是否爲空, 而分別調用 processSelectedKeysOptimized 或 processSelectedKeysPlainselectedKeys 字段是在調用 openSelector() 方法時, 根據 JVM 平臺的不一樣, 而有設置不一樣的值, 在我所調試這個值是不爲 null 的. 其實 processSelectedKeysOptimized 方法 processSelectedKeysPlain 沒有太大的區別,先看processSelectedKeysPlain():

    private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) {
        //判斷selectedkey是否爲空,爲空直接返回,若是不爲空就去獲取selectedkey上的channel
        if (selectedKeys.isEmpty()) {
            return;
        }
        
        Iterator<SelectionKey> i = selectedKeys.iterator();
        for (;;) {
            final SelectionKey k = i.next();
            final Object a = k.attachment();
            i.remove();

            if (a instanceof AbstractNioChannel) {//爲NioServerSocketChannel或是NioSocketChannel
                processSelectedKey(k, (AbstractNioChannel) a);
            } else {
                @SuppressWarnings("unchecked")
                NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
                processSelectedKey(k, task);
            }

            if (!i.hasNext()) {
                break;
            }

            if (needsToSelectAgain) {
                selectAgain();
                selectedKeys = selector.selectedKeys();

                // Create the iterator again to avoid ConcurrentModificationException
                if (selectedKeys.isEmpty()) {
                    break;
                } else {
                    i = selectedKeys.iterator();
                }
            }
        }
    }
這裏會去判斷selectedkey是否爲空,若是不爲空就去獲取selectedkey上的channel,獲取到(NioServerSocketChannel或是NioSocketChannel)channel後,判斷其類型,這裏netty的都是AbstractNioChannel類,就會調用processSelectedKey()方法。
 
processSelectedKey()方法

processSelectedKey 中處理了三個事件, 分別是:

  • OP_READ, 可讀事件, 即 Channel 中收到了新數據可供上層讀取。

  • OP_WRITE, 可寫事件, 即上層能夠向 Channel 寫入數據。

  • OP_CONNECT, 鏈接創建事件, 即 TCP 鏈接已經創建, Channel 處於 active 狀態。

簡單的說就是對網絡位判斷,當網絡位爲寫的時候,則說明有半包消息沒有發送完成,須要繼續調用flush方法進行發送。後面的若是網絡操做位爲鏈接狀態,則須要對鏈接結果進行判斷。

    private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
        final NioUnsafe unsafe = ch.unsafe();
        //看選擇鍵是否可用
        if (!k.isValid()) {
            // close the channel if the key is not valid anymore
            unsafe.close(unsafe.voidPromise());
            return;
        }

        try {
            //可用而後進行位運算來判斷當前狀態
            int readyOps = k.readyOps();
            // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
            // to a spin loop
            if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {//若是是讀或者是鏈接操做,則調用Unsafe的read方法
                //此處的Unsafe的實現是一個多態。(多是調用NioServerSocketChannel或是NioSocketChannel的doReadBytes方法)
                unsafe.read();
                if (!ch.isOpen()) {
                    // Connection already closed - no need to handle write.
                    return;
                }
            }
            if ((readyOps & SelectionKey.OP_WRITE) != 0) {
                // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
                ch.unsafe().forceFlush();
            }
            if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
                // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
                // See https://github.com/netty/netty/issues/924
                int ops = k.interestOps();
                ops &= ~SelectionKey.OP_CONNECT;
                k.interestOps(ops);

                unsafe.finishConnect();
            }
        } catch (CancelledKeyException e) {
            unsafe.close(unsafe.voidPromise());
        }
    }
這裏會看選擇鍵是否可用,可用而後對位進行判斷,若是是讀或者是鏈接操做,則調用Unsafe的read方法。此處的Unsafe的實現是一個多態。(多是調用NioServerSocketChannel或是NioSocketChannel的doReadBytes方法),
Unsafe接口的read()方法有的實現有:

對於服務端處理鏈接的請求以下:

 因爲NioMessageUnsafe是AbstractNioMessageChannel的內部類,調用外部類doReadMessages()方法

下面是服務端NioServerSocketChannel中的該方法

    @Override
    protected int doReadMessages(List<Object> buf) throws Exception {
        SocketChannel ch = javaChannel().accept();

        try {
            if (ch != null) {
                buf.add(new NioSocketChannel(this, childEventLoopGroup().next(), ch));
                return 1;
            }
        } catch (Throwable t) {
            logger.warn("Failed to create a new channel from an accepted socket.", t);

            try {
                ch.close();
            } catch (Throwable t2) {
                logger.warn("Failed to close a socket.", t2);
            }
        }

        return 0;
    }
對於客戶端的調用以下:
NioSocketChannel中的doReadBytes方法以下:
    @Override
    protected int doReadBytes(ByteBuf byteBuf) throws Exception {
        return byteBuf.writeBytes(javaChannel(), byteBuf.writableBytes());
    }
 
再回到processSelectedKeys()方法中的第二個分支 processSelectedKeysOptimized():
    private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) {
        for (int i = 0;; i ++) {
            final SelectionKey k = selectedKeys[i];
            if (k == null) {
                break;
            }
            // null out entry in the array to allow to have it GC'ed once the Channel close
            // See https://github.com/netty/netty/issues/2363
            selectedKeys[i] = null;

            final Object a = k.attachment();

            if (a instanceof AbstractNioChannel) {
                processSelectedKey(k, (AbstractNioChannel) a);
            } else {
                @SuppressWarnings("unchecked")
                NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
                processSelectedKey(k, task);
            }

            if (needsToSelectAgain) {
                // null out entries in the array to allow to have it GC'ed once the Channel close
                // See https://github.com/netty/netty/issues/2363
                for (;;) {
                    if (selectedKeys[i] == null) {
                        break;
                    }
                    selectedKeys[i] = null;
                    i++;
                }

                selectAgain();
                // Need to flip the optimized selectedKeys to get the right reference to the array
                // and reset the index to -1 which will then set to 0 on the for loop
                // to start over again.
                //
                // See https://github.com/netty/netty/issues/1523
                selectedKeys = this.selectedKeys.flip();
                i = -1;
            }
        }
    }

上面代碼關鍵的點就兩個:迭代 selectedKeys 獲取就緒的 IO 事件, 而後爲每一個事件都調用 processSelectedKey 來處理它。
這裏正好完美對應上了咱們提到的 Selector 的使用流程中的第三步裏操做。
還有一點須要注意的是, 咱們能夠調用 selectionKey.attach(object) 給一個 selectionKey 設置一個附加的字段, 而後能夠經過 Object attachedObj = selectionKey.attachment() 獲取它. 上面代代碼正是經過了 k.attachment() 來獲取一個附加在 selectionKey 中的對象, 那麼這個對象是什麼呢? 它又是在哪裏設置的呢? 咱們再來回憶一下 SocketChannel 是如何註冊到 Selector 中的:

在客戶端的 Channel 註冊過程當中, 會有以下調用鏈:

Bootstrap.initAndRegister -> 
    AbstractBootstrap.initAndRegister -> 
        MultithreadEventLoopGroup.register -> 
            SingleThreadEventLoop.register -> 
                AbstractUnsafe.register ->
                    AbstractUnsafe.register0 ->
                        AbstractNioChannel.doRegister

最後的 AbstractNioChannel.doRegister 方法會調用 SocketChannel.register 方法註冊一個 SocketChannel 到指定的 Selector:

@Override
protected void doRegister() throws Exception {
    // 省略錯誤處理
    selectionKey = javaChannel().register(eventLoop().selector, 0, this);
}

特別注意一下 register 的第三個參數, 這個參數是設置 selectionKey 的附加對象的, 和調用 selectionKey.attach(object) 的效果同樣. 而調用 register 所傳遞的第三個參數是 this, 它其實就是一個 NioSocketChannel 的實例. 那麼這裏就很清楚了, 咱們在將 SocketChannel 註冊到 Selector 中時, 將 SocketChannel 所對應的 NioSocketChannel 以附加字段的方式添加到了selectionKey 中.
再回到 processSelectedKeysOptimized 方法中, 當咱們獲取到附加的對象後, 咱們就調用 processSelectedKey 來處理這個 IO 事件:

final Object a = k.attachment();

if (a instanceof AbstractNioChannel) {
    processSelectedKey(k, (AbstractNioChannel) a);
} else {
    @SuppressWarnings("unchecked")
    NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
    processSelectedKey(k, task);
}
 
netty中特有的:非io的系統task和定時任務的處理
 再回到NioEventLoop的run()方法,處理完網絡的io後,Eventloop要執行一些非io的系統task和定時任務,任務的權重上面有算法介紹,代碼以下:
                final long ioTime = System.nanoTime() - ioStartTime;

                final int ioRatio = this.ioRatio;
                runAllTasks(ioTime * (100 - ioRatio) / ioRatio);

                if (isShuttingDown()) {
                    closeAll();
                    if (confirmShutdown()) {
                        break;
                    }
                }
 
因爲要同時執行io和非io的操做,爲了充分使用cpu,會按必定的比例去進行執行,若是io的任務大於定時任務和task,則能夠將io比例調大。反之調小,默認是50%,其執行方法以下:
    protected boolean runAllTasks(long timeoutNanos) {
        fetchFromDelayedQueue();
        //獲取SingleThreadEventExecutor中的taskQueue
        Runnable task = pollTask();
        if (task == null) {
            return false;
        }

        final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
        long runTasks = 0;
        long lastExecutionTime;
        for (;;) {
            try {
                task.run();
            } catch (Throwable t) {
                logger.warn("A task raised an exception.", t);
            }

            runTasks ++;

            // Check timeout every 64 tasks because nanoTime() is relatively expensive.
            // XXX: Hard-coded value - will make it configurable if it is really a problem.
            //當循環次數爲64時,這時候會去比較上次執行時間和延時的關係,若是大於延時那麼就退出,這裏獲取naotime是每64次進行獲取一次。
            //這麼作的目的是一來是獲取naotime比較耗時,另外也不能長時間執行task,讓io阻塞,因此通常每64個任務就會返回
            if ((runTasks & 0x3F) == 0) {
                lastExecutionTime = ScheduledFutureTask.nanoTime();
                if (lastExecutionTime >= deadline) {
                    break;
                }
            }

            task = pollTask();
            if (task == null) {
                lastExecutionTime = ScheduledFutureTask.nanoTime();
                break;
            }
        }

        this.lastExecutionTime = lastExecutionTime;
        return true;
    }

一、獲取SingleThreadEventExecutor中的taskQueue,若是沒有任務就退出;
二、當循環次數爲64時,這時候會去比較上次執行時間和延時的關係,若是大於延時那麼就退出,這裏獲取naotime是每64次進行獲取一次。這麼作的目的是一來是獲取naotime比較耗時,另外也不能長時間執行task,讓io阻塞,因此通常每64個任務就會返回;

最後eventloop的run方法,會判斷是否優雅關閉,若是是優雅關閉會執行closeAll方法,以下:
    private void closeAll() {
        selectAgain();
        Set<SelectionKey> keys = selector.keys();
        Collection<AbstractNioChannel> channels = new ArrayList<AbstractNioChannel>(keys.size());
        for (SelectionKey k: keys) {
            Object a = k.attachment();
            if (a instanceof AbstractNioChannel) {
                channels.add((AbstractNioChannel) a);
            } else {
                k.cancel();
                @SuppressWarnings("unchecked")
                NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
                invokeChannelUnregistered(task, k, null);
            }
        }

        for (AbstractNioChannel ch: channels) {
            ch.unsafe().close(ch.unsafe().voidPromise());
        }
    }
相關文章
相關標籤/搜索