Netty系列(三):說說NioEventLoop

前言

原本想先寫下NioServerSocketChannel以及NioSocketChannel的註冊流程的,可是最後發現始終離不開NioEventLoop這個類,因此在這以前必須得先講解下NioEventLoop這個類究竟是用來作啥的。其實在第一篇文章裏面有說起到它的,可是沒有詳細的去講解,接下來會對它分析一波。html


設計模型

在進入正文以前,先簡單的瞭解下NioEventLoop的工做模型(服務端):
java

假設一個NioEventLoopGroup(這裏服務端會用兩個Group)裏面有4個NioEventLoop,那麼netty中的實際工做模型就如上圖所示,服務端會用默認的選擇規則從Group1中選擇出一個NioEventLoop註冊ServerChannel,並綁定一個OP_ACCEPT用於監聽客戶端發起的鏈接請求,一旦有新的鏈接進來,服務端則會從Group2中按必定的規則選出一個NioEventLoop來註冊SocketChannel,並綁定OP_READ興趣事件,這裏注意,一個NioEventLoop能夠綁定多個SocketChannel。具體的註冊流程我會在下一篇文章中寫出來。app

下面進入正題。函數


構造流程

NioEventLoop具體的構造流程你們能夠去個人Netty系列(一):NioEventLoopGroup源碼解析中去看一下,裏面說的還算蠻詳細的。下面是其調用的構造函數,我們能夠觀察到其身上會綁定一個選擇器Selector,供後期channel註冊的時候使用的,這一塊是JAVA NIO相關的知識點。
oop


內部還維護着一個executor去開啓執行線程的,以及taskQueue任務隊列和一個tailTasks尾部隊列(這個隊列裏面的任務是在每次執行taskQueue任務隊列中的任務結束後都會去調用的,很少介紹)。上面介紹的三個四個內部結構Selector,executor,taskQueue,tailTasks會在後面屢次提起。
下圖是NioEventLoop的簡單的層級結構(下圖取之於Netty in Action):
fetch


NioEventLoop.execute

這裏咱們先看一下NioEventLoop的execute方法。實際上這個方法是在其父類SingleThreadEventExecutor中。這個方法的功能就是將任務丟到taskQueue中。ui

 1    public void execute(Runnable task) {
2        if (task == null) {
3            throw new NullPointerException("task");
4        }
5
6        boolean inEventLoop = inEventLoop();
7        addTask(task);
8        if (!inEventLoop) {
9            // 開啓工做線程,實際上也就是執行NioEventLoop中的run方法,下面會介紹
10            startThread();
11            if (isShutdown()) {
12                boolean reject = false;
13                try {
14                    if (removeTask(task)) {
15                        reject = true;
16                    }
17                } catch (UnsupportedOperationException e) {
18                    // The task queue does not support removal so the best thing we can do is to just move on and
19                    // hope we will be able to pick-up the task before its completely terminated.
20                    // In worst case we will log on termination.
21                }
22                if (reject) {
23                    reject();
24                }
25            }
26        }
27
28        if (!addTaskWakesUp && wakesUpForTask(task)) {
29            wakeup(inEventLoop);
30        }
31    }
複製代碼
  1. 添加task到執行隊列中,也就是我們上文提起的taskQueue中。
  2. 判斷這個NioEventLoop中的是否已經開啓過線程。
  3. 若未開啓,則必須先啓動線程任務,也就是咱們下文會介紹的run方法。
  4. 首次初始化會在taskQueue中丟一個空任務去喚醒線程。

NioEventLoop的工做模式實際上就是開啓一個單線程跑一個死循環,而後一直輪詢taskQueue隊列是否有任務添加進來,而後就去處理任務,還有就是若是註冊在selector上的channel有興趣事件進來,也會去處理selectorKeys,這一塊下面會作介紹。this


NioEventLoop.run

如今看看NioEventLoop中的run方法spa

 1    protected void run() {
2        for (;;) {
3            try {
4                try {
5                    // 按默認配置的話要麼返回select.selectNow(),
6                    //要麼返回SelectStrategy.SELECT
7                   switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
8                    case SelectStrategy.CONTINUE:
9                        continue;
10
11                    case SelectStrategy.BUSY_WAIT:
12                    case SelectStrategy.SELECT:
13                        select(wakenUp.getAndSet(false));
14
15                        if (wakenUp.get()) {
16                            selector.wakeup();
17                        }
18                        // fall through
19                    default:
20                    }
21                } catch (IOException e) {
22                    rebuildSelector0();
23                    handleLoopException(e);
24                    continue;
25                }
26
27                cancelledKeys = 0;
28                needsToSelectAgain = false;
29                final int ioRatio = this.ioRatio;
30                if (ioRatio == 100) {
31                    try {
32                        // IO操做,根據selectedKeys去處理
33                        processSelectedKeys();
34                    } finally {
35                        // 保證執行完全部的任務
36                        runAllTasks();
37                    }
38                } else {
39                    final long ioStartTime = System.nanoTime();
40                    try {
41                        // IO操做,根據selectedKeys去處理
42                        processSelectedKeys();
43                    } finally {
44                        // 按必定的比例去處理任務,有可能遺留一部分任務下次進行處理
45                        final long ioTime = System.nanoTime() - ioStartTime;
46                        runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
47                    }
48                }
49            } catch (Throwable t) {
50                handleLoopException(t);
51            }
52            // Always handle shutdown even if the loop processing threw an exception.
53            try {
54                // 釋放資源,將註冊的channel所有關閉掉。
55                if (isShuttingDown()) {
56                    closeAll();
57                    if (confirmShutdown()) {
58                        return;
59                    }
60                }
61            } catch (Throwable t) { 
62                handleLoopException(t);
63            }
64        }
65    }
複製代碼
  1. 這裏有個默認的計算策略:
    return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT

1.有任務便會直接返回select.selectNow(),則會直接去跑任務或者是去處理selectorKeys
2.若沒任務,則會走select(wakenUp.getAndSet(false))方法,裏面會有一個timeout超時處理,selector.select(timeoutMillis),超時後也會去跑任務或者是去處理selectorKeys線程

這一塊具體細節也不少,只是說一下處理流程。

  1. 注意上面有個ioRatio == 100這個判斷條件,若是爲100,則會將任務所有處理完成;不然會與io操做按必定的比例去執行任務。

這裏的IO操做就是processSelectedKeys()方法,代碼雖然很長,可是乾的活就是根據不一樣的興趣事件幹不一樣的活,裏面有對OP_READ OP_ACCEPT OP_WRITE OP_CONNECT等等不一樣興趣事件的不一樣處理方法,這一塊應該是JAVA NIO裏面的相關知識點。感興趣的朋友能夠debug針對某個觸發事件研究一下。


runAllTasks

執行任務的代碼以下(下面是runAllTasks的代碼):

 1    protected boolean runAllTasks() {
2        assert inEventLoop();
3        boolean fetchedAll;
4        boolean ranAtLeastOne = false;
5
6        do {
7            // 這裏會從定時任務隊列中將達到執行事件的task丟到taskQueue中去
8            fetchedAll = fetchFromScheduledTaskQueue();
9            // 執行taskQueue中全部的task
10            if (runAllTasksFrom(taskQueue)) {
11                ranAtLeastOne = true;
12            }
13        } while (!fetchedAll); // keep on processing until we fetched all scheduled tasks.
14
15        if (ranAtLeastOne) {
16            lastExecutionTime = ScheduledFutureTask.nanoTime();
17        }
18        // 這個是執行上面所說的tailTasks中的task
19        afterRunningAllTasks();
20        return ranAtLeastOne;
21    }
複製代碼

這一塊的邏輯是:
先執行fetchFromScheduledTaskQueue方法,將到期的定時任務丟到taskQueue隊列中,這個fetchFromScheduledTaskQueue方法裏面有個小細節,當taskQueue隊列滿了以後,它就會從新塞到scheduledTaskQueue隊列中,而後再外圈循環,taskQueue隊列消費完畢,則繼續執行fetchFromScheduledTaskQueue方法,直到把全部到期的任務都丟到taskQueue隊列中執行完畢爲止。以下圖所示:

netty_runTasks.png
netty_runTasks.png

這一部分到這裏就結束了,下一篇會對NioServerSocketChannel的註冊以及服務端建立NioSocketChannel進行分析。


End

相關文章
相關標籤/搜索