原本想先寫下NioServerSocketChannel以及NioSocketChannel的註冊流程的,可是最後發現始終離不開NioEventLoop這個類,因此在這以前必須得先講解下NioEventLoop這個類究竟是用來作啥的。其實在第一篇文章裏面有說起到它的,可是沒有詳細的去講解,接下來會對它分析一波。html
在進入正文以前,先簡單的瞭解下NioEventLoop的工做模型(服務端):
java
假設一個NioEventLoopGroup(這裏服務端會用兩個Group)裏面有4個NioEventLoop,那麼netty中的實際工做模型就如上圖所示,服務端會用默認的選擇規則從Group1中選擇出一個NioEventLoop註冊ServerChannel,並綁定一個OP_ACCEPT用於監聽客戶端發起的鏈接請求,一旦有新的鏈接進來,服務端則會從Group2中按必定的規則選出一個NioEventLoop來註冊SocketChannel,並綁定OP_READ興趣事件,這裏注意,一個NioEventLoop能夠綁定多個SocketChannel。具體的註冊流程我會在下一篇文章中寫出來。app
下面進入正題。函數
NioEventLoop具體的構造流程你們能夠去個人Netty系列(一):NioEventLoopGroup源碼解析中去看一下,裏面說的還算蠻詳細的。下面是其調用的構造函數,我們能夠觀察到其身上會綁定一個選擇器Selector
,供後期channel註冊的時候使用的,這一塊是JAVA NIO相關的知識點。
oop
內部還維護着一個executor
去開啓執行線程的,以及taskQueue
任務隊列和一個tailTasks
尾部隊列(這個隊列裏面的任務是在每次執行taskQueue
任務隊列中的任務結束後都會去調用的,很少介紹)。上面介紹的三個四個內部結構Selector,executor,taskQueue,tailTasks
會在後面屢次提起。
下圖是NioEventLoop的簡單的層級結構(下圖取之於Netty in Action):
fetch
這裏咱們先看一下NioEventLoop的execute
方法。實際上這個方法是在其父類SingleThreadEventExecutor
中。這個方法的功能就是將任務丟到taskQueue
中。ui
1 public void execute(Runnable task) {
2 if (task == null) {
3 throw new NullPointerException("task");
4 }
5
6 boolean inEventLoop = inEventLoop();
7 addTask(task);
8 if (!inEventLoop) {
9 // 開啓工做線程,實際上也就是執行NioEventLoop中的run方法,下面會介紹
10 startThread();
11 if (isShutdown()) {
12 boolean reject = false;
13 try {
14 if (removeTask(task)) {
15 reject = true;
16 }
17 } catch (UnsupportedOperationException e) {
18 // The task queue does not support removal so the best thing we can do is to just move on and
19 // hope we will be able to pick-up the task before its completely terminated.
20 // In worst case we will log on termination.
21 }
22 if (reject) {
23 reject();
24 }
25 }
26 }
27
28 if (!addTaskWakesUp && wakesUpForTask(task)) {
29 wakeup(inEventLoop);
30 }
31 }
複製代碼
taskQueue
中。run
方法。taskQueue
中丟一個空任務去喚醒線程。NioEventLoop的工做模式實際上就是開啓一個單線程跑一個死循環,而後一直輪詢taskQueue
隊列是否有任務添加進來,而後就去處理任務,還有就是若是註冊在selector上的channel有興趣事件進來,也會去處理selectorKeys
,這一塊下面會作介紹。this
如今看看NioEventLoop中的run
方法spa
1 protected void run() {
2 for (;;) {
3 try {
4 try {
5 // 按默認配置的話要麼返回select.selectNow(),
6 //要麼返回SelectStrategy.SELECT
7 switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
8 case SelectStrategy.CONTINUE:
9 continue;
10
11 case SelectStrategy.BUSY_WAIT:
12 case SelectStrategy.SELECT:
13 select(wakenUp.getAndSet(false));
14
15 if (wakenUp.get()) {
16 selector.wakeup();
17 }
18 // fall through
19 default:
20 }
21 } catch (IOException e) {
22 rebuildSelector0();
23 handleLoopException(e);
24 continue;
25 }
26
27 cancelledKeys = 0;
28 needsToSelectAgain = false;
29 final int ioRatio = this.ioRatio;
30 if (ioRatio == 100) {
31 try {
32 // IO操做,根據selectedKeys去處理
33 processSelectedKeys();
34 } finally {
35 // 保證執行完全部的任務
36 runAllTasks();
37 }
38 } else {
39 final long ioStartTime = System.nanoTime();
40 try {
41 // IO操做,根據selectedKeys去處理
42 processSelectedKeys();
43 } finally {
44 // 按必定的比例去處理任務,有可能遺留一部分任務下次進行處理
45 final long ioTime = System.nanoTime() - ioStartTime;
46 runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
47 }
48 }
49 } catch (Throwable t) {
50 handleLoopException(t);
51 }
52 // Always handle shutdown even if the loop processing threw an exception.
53 try {
54 // 釋放資源,將註冊的channel所有關閉掉。
55 if (isShuttingDown()) {
56 closeAll();
57 if (confirmShutdown()) {
58 return;
59 }
60 }
61 } catch (Throwable t) {
62 handleLoopException(t);
63 }
64 }
65 }
複製代碼
return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT
1.有任務便會直接返回
select.selectNow()
,則會直接去跑任務或者是去處理selectorKeys
;
2.若沒任務,則會走select(wakenUp.getAndSet(false))
方法,裏面會有一個timeout超時處理,selector.select(timeoutMillis)
,超時後也會去跑任務或者是去處理selectorKeys
;線程
這一塊具體細節也不少,只是說一下處理流程。
ioRatio == 100
這個判斷條件,若是爲100,則會將任務所有處理完成;不然會與io操做按必定的比例去執行任務。這裏的IO操做就是
processSelectedKeys()
方法,代碼雖然很長,可是乾的活就是根據不一樣的興趣事件幹不一樣的活,裏面有對OP_READ OP_ACCEPT OP_WRITE OP_CONNECT
等等不一樣興趣事件的不一樣處理方法,這一塊應該是JAVA NIO裏面的相關知識點。感興趣的朋友能夠debug針對某個觸發事件研究一下。
執行任務的代碼以下(下面是runAllTasks
的代碼):
1 protected boolean runAllTasks() {
2 assert inEventLoop();
3 boolean fetchedAll;
4 boolean ranAtLeastOne = false;
5
6 do {
7 // 這裏會從定時任務隊列中將達到執行事件的task丟到taskQueue中去
8 fetchedAll = fetchFromScheduledTaskQueue();
9 // 執行taskQueue中全部的task
10 if (runAllTasksFrom(taskQueue)) {
11 ranAtLeastOne = true;
12 }
13 } while (!fetchedAll); // keep on processing until we fetched all scheduled tasks.
14
15 if (ranAtLeastOne) {
16 lastExecutionTime = ScheduledFutureTask.nanoTime();
17 }
18 // 這個是執行上面所說的tailTasks中的task
19 afterRunningAllTasks();
20 return ranAtLeastOne;
21 }
複製代碼
這一塊的邏輯是:
先執行fetchFromScheduledTaskQueue
方法,將到期的定時任務丟到taskQueue
隊列中,這個fetchFromScheduledTaskQueue
方法裏面有個小細節,當taskQueue
隊列滿了以後,它就會從新塞到scheduledTaskQueue
隊列中,而後再外圈循環,taskQueue
隊列消費完畢,則繼續執行fetchFromScheduledTaskQueue
方法,直到把全部到期的任務都丟到taskQueue
隊列中執行完畢爲止。以下圖所示:
這一部分到這裏就結束了,下一篇會對NioServerSocketChannel的註冊以及服務端建立NioSocketChannel進行分析。