Netty源碼學習筆記之boss線程處理流程

server在啓動的時候會開啓兩個線程:bossGroup和workerGroup,這兩個線程分別是boss線程池(用於接收client請求)和worker線程池(用於處理具體的讀寫操做),這兩個線程調度器都是NioEventLoopGroup,bossGroup有一個NioEventLoop,而worker線程池有n*cup數量個NioEventLoop。那麼咱們看看在NioEventLoop中的是如何開始的:react

  NioEventLoop本質上是一個線程調度器(繼承自ScheduledExecutorService),當bind以後就開始run起一個線程:  linux

複製代碼

(代碼一)
 1     @Override
 2     protected void run() {
 3         for (;;) {
 4             boolean oldWakenUp = wakenUp.getAndSet(false);
 5             try {
 6                 if (hasTasks()) {
 7                     selectNow();
 8                 } else {
 9                     select(oldWakenUp);
10 
11                     if (wakenUp.get()) {
12                         selector.wakeup();
13                     }
14                 }
15 
16                 cancelledKeys = 0;
17                 needsToSelectAgain = false;
18                 final int ioRatio = this.ioRatio;
19                 if (ioRatio == 100) {
20                     processSelectedKeys();
21                     runAllTasks();
22                 } else {
23                     final long ioStartTime = System.nanoTime();
24 
25                     processSelectedKeys();
26 
27                     final long ioTime = System.nanoTime() - ioStartTime;
28                     runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
29                 }
30 
31                 if (isShuttingDown()) {
32                     closeAll();
33                     if (confirmShutdown()) {
34                         break;
35                     }
36                 }
37             } catch (Throwable t) {
38                 ...
39             }
40         }
41     }

複製代碼

 

  這個for(;;)裏面就是boss線程的核心處理流程:promise

  【代碼一主線】1,不斷地監聽selector拿到socket句柄而後建立channel。每次run的時候先拿到wakeup的值,而且set進去false(PS:wakeup是什麼鬼?一個AtomicBoolean,表明是否用戶喚醒,若是不人爲將其set成true,永遠是false)。異步

    【代碼一主線】2,若是任務隊列中已有任務,那麼selectNow(),(PS:selectNow是什麼鬼?咱們知道selector.select()是一個阻塞調用,而selectNow方法是個非阻塞方法,若是沒有到達的socket句柄則返回0),所以若隊列中已有任務的話應該當即開始執行,而不能阻塞到selector.select()上,不然則調用select()方法,繼續看select()裏面:socket

複製代碼

(代碼二)
 1     private void select(boolean oldWakenUp) throws IOException {
 2         Selector selector = this.selector;
 3         try {
 4             int selectCnt = 0;
 5             long currentTimeNanos = System.nanoTime();
 6             long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
 7             for (;;) {
 8                 long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
 9                 if (timeoutMillis <= 0) {
10                     if (selectCnt == 0) {
11                         selector.selectNow();
12                         selectCnt = 1;
13                     }
14                     break;
15                 }
16 
17                 int selectedKeys = selector.select(timeoutMillis);
18                 selectCnt ++;
19 
20                 if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
21                     // 若是selectedKeys不爲空、或者被用戶喚醒、或者隊列中有待處理任務、或者調度器中有任務,則break
22                     break;
23                 }
24                 if (Thread.interrupted()) {
25                     //若是線程被中斷則重置selectedKeys,同時break出本次循環,因此不會陷入一個繁忙的循環。
26                     selectCnt = 1;
27                     break;
28                 }
29 
30                 long time = System.nanoTime();
31                 if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
32                     // selector超時
33                     selectCnt = 1;
34                 } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 && selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
35                     // selector屢次過早返回,從新創建並打開Selector
36                     ...
37                 }
38 
39                 currentTimeNanos = time;
40             }
41             ...
42         } catch (CancelledKeyException e) {
43             ...
44         }
45     }

複製代碼

   咱們看到,select()方法進入一個for循環去select阻塞等待socket(這裏的selector的實如今是根據操做系統和netty的版原本定的,在最新的netty中是使用的linux的epoll模型),同時入參裏有「超時時間」,若是超過了這個時間仍然沒有socket到來則從新將selectCnt置爲1從新循環等待,直到有socket到來。若是selectedKeys不爲空、或者被用戶喚醒、或者隊列中有待處理任務、或者調度器中有任務,那麼就是說該eventLoop有活幹了,先break出去去幹活,完了再打開selector從新阻塞等待。正常狀況下會等待到一個socket,break出去以後回到代碼一ide

  【代碼一主線】3,根據ioRatio來選擇任務執行策略(PS:ioRatio是什麼鬼?看了下用途應該是這樣的,這個ioRatio表明該eventLoop指望在I/O操做上花費時間的比例)。而NioEventLoop中有兩類操做,一類是I/O操做(讀寫之類),調用processSelectedKeys;一類是非I/O操做(例如register等),調用runAllTasks。若是ioRatio是100的話那麼會按照順序執行I/O操做->非I/O操做;若是不是會按照這個比例算出一個超時時間,在run任務隊列的時候若是超過了這個時間會當即返回,確保I/O操做能夠獲得及時的調用。oop

  咱們關心的是I/O操做,那麼進入processSelectedKeys()看下發生了什麼吧。學習

  

複製代碼

(代碼三)
1     private void processSelectedKeys() {
2         if (selectedKeys != null) {
3             processSelectedKeysOptimized(selectedKeys.flip());
4         } else {
5             processSelectedKeysPlain(selector.selectedKeys());
6         }
7     }

複製代碼

 

  正常狀況下會走到processSelectedKeysOptimized中:this

  

複製代碼

(代碼四)
 1   private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) {
 2         for (int i = 0;; i ++) {
 3             final SelectionKey k = selectedKeys[i];
 4             if (k == null) {
 5                 break;
 6             }
 7             selectedKeys[i] = null;
 8 
 9             final Object a = k.attachment();
10 
11             if (a instanceof AbstractNioChannel) {
12                 processSelectedKey(k, (AbstractNioChannel) a);
13             } else {
14                 @SuppressWarnings("unchecked")
15                 NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
16                 processSelectedKey(k, task);
17             }
18 
19             if (needsToSelectAgain) {
20                 for (;;) {
21                     if (selectedKeys[i] == null) {
22                         break;
23                     }
24                     selectedKeys[i] = null;
25                     i++;
26                 }
27 
28                 selectAgain();
29                 selectedKeys = this.selectedKeys.flip();
30                 i = -1;
31             }
32         }
33     }

複製代碼

  

  遍歷拿到全部的SelectionKey,而後判斷每一個SelectionKey的attachment,上篇文章中已經分析過給ServerBootstrap註冊的Channel是NioServerSocketChannel(繼承自AbstractNioChannel),所以進入processSelectedKey中:操作系統

 

複製代碼

(代碼五)
 1   private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
 2         final NioUnsafe unsafe = ch.unsafe();
 3         if (!k.isValid()) {
 4             unsafe.close(unsafe.voidPromise());
 5             return;
 6         }
 7 
 8         try {
 9             int readyOps = k.readyOps();
10             if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
11                 unsafe.read();
12                 if (!ch.isOpen()) {
13                     return;
14                 }
15             }
16             if ((readyOps & SelectionKey.OP_WRITE) != 0) {
17                 ch.unsafe().forceFlush();
18             }
19             if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
20                 int ops = k.interestOps();
21                 ops &= ~SelectionKey.OP_CONNECT;
22                 k.interestOps(ops);
23 
24                 unsafe.finishConnect();
25             }
26         } catch (CancelledKeyException ignored) {
27             unsafe.close(unsafe.voidPromise());
28         }
29     }

複製代碼

  

  在這裏根據傳入的SelectionKey的已就緒操做類型來決定下一步的操做,若是是一個讀操做,那麼進入AbstractNioMessageChannel$NioMessageUnsafe的read實現,這裏代碼不少,咱們只貼一下核心的代碼:

 

複製代碼

(代碼六)
 1         @Override
 2         public void read() {
 3             ...
 4             final ChannelPipeline pipeline = pipeline();
 5             ...
 6             try {
 7                 int size = readBuf.size();
 8                 for (int i = 0; i < size; i ++) {
 9                     pipeline.fireChannelRead(readBuf.get(i));
10                 }
11                 ...
12                 readBuf.clear();
13                 pipeline.fireChannelReadComplete();
14             } finally {
15             }
16         }

複製代碼

  核心就是這個pipeline.fireChannelRead(readBuf.get(i));,這已經到了pipeline階段,可能有些人會誤覺得這是否是已經到了worker線程中,可是不可能啊,咱們的代碼其實在處於processSelectedKeys的邏輯裏面。實際上,不管是boss仍是worker,他們都是NioEventLoopGroup,玩法都是同樣的,只不過職責不同而已。boss也有本身的handler,上篇文章中咱們提到了netty中的reactor模式的玩法,從Doug Lea的圖中能夠看出,boss(實際上就是mainReactor)的handler其實就是這個acceptor。

  在此咱們順便學習一下netty中的handler:

  

  從用途上來講,handler分爲ChannelInboundHandler(讀)和ChannelOutboundHandler(寫),增長一層適配器產生了兩handler的Adapter,咱們使用到的類都是繼承自這兩個Adapter。咱們常常用到的SimpleChannelInboundHandler就繼承ChannelInboundHandlerAdapter,用於初始化用戶handler鏈的ChannelInitializer和boss線程綁定的ServerBootstrapAcceptor也都繼承於此。

  回到【代碼六主線】咱們從pipeline.fireChannelRead繼續追蹤下去會追到ChannelInboundHandler的channelRead的實現,而這裏的Hander就是ServerBootstrapAcceptor。

複製代碼

(代碼七)
 1         @Override
 2         @SuppressWarnings("unchecked")
 3         public void channelRead(ChannelHandlerContext ctx, Object msg) {
 4             final Channel child = (Channel) msg;
 5 
 6             child.pipeline().addLast(childHandler);
 7 
 8             for (Entry<ChannelOption<?>, Object> e: childOptions) {
 9                 try {
10                     if (!child.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {
11                     }
12                 } catch (Throwable t) {
13                 }
14             }
15 
16             for (Entry<AttributeKey<?>, Object> e: childAttrs) {
17                 child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
18             }
19 
20             try {
21                 childGroup.register(child).addListener(new ChannelFutureListener() {
22                     @Override
23                     public void operationComplete(ChannelFuture future) throws Exception {
24                         if (!future.isSuccess()) {
25                             forceClose(child, future.cause());
26                         }
27                     }
28                 });
29             } catch (Throwable t) {
30                 forceClose(child, t);
31             }
32         }

複製代碼

  因爲ServerBootstrapAcceptor 很重要,咱們先看一下都有什麼內容:

複製代碼

private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter {

        private final EventLoopGroup childGroup;
        private final ChannelHandler childHandler;
        private final Entry<ChannelOption<?>, Object>[] childOptions;
        private final Entry<AttributeKey<?>, Object>[] childAttrs;
}

複製代碼

  我本身的理解:

  childGroup就是subReactor(也就是worker線程);childHandler就是xxx;childOptions和childAttrs是爲channel準備的一些參數。

  回到【代碼七主線】在這裏作了3件事:

  1.爲客戶端channel的pipeline中添加childHandler,那麼這個childHandler是什麼鬼呢?回憶一下上文中的服務端啓動代碼,有bootStrap.childHandler(xxx)這樣的代碼,因此此處就是把在服務端啓動時咱們定義好的Handler鏈綁定給每一個channel。

  2.把咱們服務端初始化時的參數綁定到每一個channel中。

  3.childGroup.register(child).addListener(new ChannelFutureListener()),後面這個異步listener做用很明確,問題是這個childGroup是什麼鬼?我理解應該就是worker線程了。詳細說一下childGroup.register(child),繼續跟下去,跟到AbstractChannel$AbstractUnsafe中

複製代碼

(代碼八)
 1         @Override
 2         public final void register(EventLoop eventLoop, final ChannelPromise promise) {
 3             ...
 4             AbstractChannel.this.eventLoop = eventLoop;
 5 
 6             if (eventLoop.inEventLoop()) {
 7                 register0(promise);
 8             } else {
 9                 ...
10                 } catch (Throwable t) {
11                 }
12             }
13         }

複製代碼

  繼續register0:

複製代碼

(代碼九)
 1     private void register0(ChannelPromise promise) {
 2             try {
 3                 if (!promise.setUncancellable() || !ensureOpen(promise)) {
 4                     return;
 5                 }
 6                 boolean firstRegistration = neverRegistered;
 7                 doRegister();
 8                 neverRegistered = false;
 9                 registered = true;
10                 safeSetSuccess(promise);
11                 pipeline.fireChannelRegistered();
12                 if (firstRegistration && isActive()) {
13                     pipeline.fireChannelActive();
14                 }
15             } catch (Throwable t) {
16             }
17         }

複製代碼

  這裏核心有兩步:

  1.doRegister(),其實咱們在上篇文章中分析過,就是將channel綁定到selector上。此處有點懵逼,我猜想是綁定到worker線程的selector中,若是有大神知道請留言個人微博。

  2.pipeline.fireChannelRegistered(),繼續往下跟跟進到ChannelInboundHandler的channelRegistered方法中,而此時會調用咱們定義的ChannelInitializer,將咱們定義的handler註冊到pipeline中。

 

  至此【代碼一主線】執行完畢,咱們瀏覽了一遍boss線程的在接收socket請求期間的處理流程,過程當中是結合reactor模式去理解的,有些地方本身也有點不懂,還請各位指正。

  總結一下:

  1.boss線程就是個loop循環,打開selector -> 得到監聽到的SelectionKey -> 處理I/O請求 -> 處理非I/O請求,而咱們最關心的就是處理I/O請求(在processSelectedKeys()方法中完成)。

  2.遍歷準備就緒的SelectionKey,根據其可操做類型(read or write。。)來決定下一步的具體操做,咱們着重去了解了read邏輯。

  3.NioServerSocketChannel調用父類AbstractNioMessageChannel的unsafe類NioMessageUnsafe來處理讀取邏輯:調用pipeline處理readbuf。

  4.pipeline.fireChannelRead會調用ServerBootstrapAcceptor的channelRead:初始化客戶端channel參數,將該channel綁定到worker線程的selector中,爲channel註冊用戶定義的handler鏈。

 

  再精煉一點:

  boss線程只是接收客戶端socket並初始化客戶端channle,將channel丟給acceptor,acceptor會將這個channel註冊到worker線程中。整個loop過程都是一個非阻塞過程(所有異步化),同時boss中不會作耗時的I/O讀取,只是將channel丟給worker。所以是一個高效的loop過程。

相關文章
相關標籤/搜索