(三)Netty源碼學習筆記之boss線程處理流程

  尊重原創,轉載註明出處,原文地址:http://www.cnblogs.com/cishengchongyan/p/6160194.html html

  

  本文咱們將先從NioEventLoop開始來學習服務端的處理流程。話很少說,開始學習~~~~react

  咱們從上文中已經知道server在啓動的時候會開啓兩個線程:bossGroup和workerGroup,這兩個線程分別是boss線程池(用於接收client請求)和worker線程池(用於處理具體的讀寫操做),這兩個線程調度器都是NioEventLoopGroup,bossGroup有一個NioEventLoop,而worker線程池有n*cup數量個NioEventLoop。那麼咱們看看在NioEventLoop中的是如何開始的:linux

  NioEventLoop本質上是一個線程調度器(繼承自ScheduledExecutorService),當bind以後就開始run起一個線程:  promise

 (代碼一)
1
@Override 2 protected void run() { 3 for (;;) { 4 boolean oldWakenUp = wakenUp.getAndSet(false); 5 try { 6 if (hasTasks()) { 7 selectNow(); 8 } else { 9 select(oldWakenUp); 10 11 if (wakenUp.get()) { 12 selector.wakeup(); 13 } 14 } 15 16 cancelledKeys = 0; 17 needsToSelectAgain = false; 18 final int ioRatio = this.ioRatio; 19 if (ioRatio == 100) { 20 processSelectedKeys(); 21 runAllTasks(); 22 } else { 23 final long ioStartTime = System.nanoTime(); 24 25 processSelectedKeys(); 26 27 final long ioTime = System.nanoTime() - ioStartTime; 28 runAllTasks(ioTime * (100 - ioRatio) / ioRatio); 29 } 30 31 if (isShuttingDown()) { 32 closeAll(); 33 if (confirmShutdown()) { 34 break; 35 } 36 } 37 } catch (Throwable t) { 38 ... 39 } 40 } 41 }

 

  這個for(;;)裏面就是boss線程的核心處理流程:異步

  【代碼一主線】1,不斷地監聽selector拿到socket句柄而後建立channel。每次run的時候先拿到wakeup的值,而且set進去false(PS:wakeup是什麼鬼?一個AtomicBoolean,表明是否用戶喚醒,若是不人爲將其set成true,永遠是false)。socket

    【代碼一主線】2,若是任務隊列中已有任務,那麼selectNow(),(PS:selectNow是什麼鬼?咱們知道selector.select()是一個阻塞調用,而selectNow方法是個非阻塞方法,若是沒有到達的socket句柄則返回0),所以若隊列中已有任務的話應該當即開始執行,而不能阻塞到selector.select()上,不然則調用select()方法,繼續看select()裏面:ide

 (代碼二)
1
private void select(boolean oldWakenUp) throws IOException { 2 Selector selector = this.selector; 3 try { 4 int selectCnt = 0; 5 long currentTimeNanos = System.nanoTime(); 6 long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos); 7 for (;;) { 8 long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L; 9 if (timeoutMillis <= 0) { 10 if (selectCnt == 0) { 11 selector.selectNow(); 12 selectCnt = 1; 13 } 14 break; 15 } 16 17 int selectedKeys = selector.select(timeoutMillis); 18 selectCnt ++; 19 20 if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) { 21 // 若是selectedKeys不爲空、或者被用戶喚醒、或者隊列中有待處理任務、或者調度器中有任務,則break 22 break; 23 } 24 if (Thread.interrupted()) { 25 //若是線程被中斷則重置selectedKeys,同時break出本次循環,因此不會陷入一個繁忙的循環。 26 selectCnt = 1; 27 break; 28 } 29 30 long time = System.nanoTime(); 31 if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) { 32 // selector超時 33 selectCnt = 1; 34 } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 && selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) { 35 // selector屢次過早返回,從新創建並打開Selector 36 ... 37 } 38 39 currentTimeNanos = time; 40 } 41 ... 42 } catch (CancelledKeyException e) { 43 ... 44 } 45 }

   咱們看到,select()方法進入一個for循環去select阻塞等待socket(這裏的selector的實如今是根據操做系統和netty的版原本定的,在最新的netty中是使用的linux的epoll模型),同時入參裏有「超時時間」,若是超過了這個時間仍然沒有socket到來則從新將selectCnt置爲1從新循環等待,直到有socket到來。若是selectedKeys不爲空、或者被用戶喚醒、或者隊列中有待處理任務、或者調度器中有任務,那麼就是說該eventLoop有活幹了,先break出去去幹活,完了再打開selector從新阻塞等待。正常狀況下會等待到一個socket,break出去以後回到代碼一oop

  【代碼一主線】3,根據ioRatio來選擇任務執行策略(PS:ioRatio是什麼鬼?看了下用途應該是這樣的,這個ioRatio表明該eventLoop指望在I/O操做上花費時間的比例)。而NioEventLoop中有兩類操做,一類是I/O操做(讀寫之類),調用processSelectedKeys;一類是非I/O操做(例如register等),調用runAllTasks。若是ioRatio是100的話那麼會按照順序執行I/O操做->非I/O操做;若是不是會按照這個比例算出一個超時時間,在run任務隊列的時候若是超過了這個時間會當即返回,確保I/O操做能夠獲得及時的調用。學習

  咱們關心的是I/O操做,那麼進入processSelectedKeys()看下發生了什麼吧。this

  

(代碼三)
1
private void processSelectedKeys() { 2 if (selectedKeys != null) { 3 processSelectedKeysOptimized(selectedKeys.flip()); 4 } else { 5 processSelectedKeysPlain(selector.selectedKeys()); 6 } 7 }

 

  正常狀況下會走到processSelectedKeysOptimized中:

  

 (代碼四)
1
  private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) { 2 for (int i = 0;; i ++) { 3 final SelectionKey k = selectedKeys[i]; 4 if (k == null) { 5 break; 6 } 7 selectedKeys[i] = null; 8 9 final Object a = k.attachment(); 10 11 if (a instanceof AbstractNioChannel) { 12 processSelectedKey(k, (AbstractNioChannel) a); 13 } else { 14 @SuppressWarnings("unchecked") 15 NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a; 16 processSelectedKey(k, task); 17 } 18 19 if (needsToSelectAgain) { 20 for (;;) { 21 if (selectedKeys[i] == null) { 22 break; 23 } 24 selectedKeys[i] = null; 25 i++; 26 } 27 28 selectAgain(); 29 selectedKeys = this.selectedKeys.flip(); 30 i = -1; 31 } 32 } 33 }

  

  遍歷拿到全部的SelectionKey,而後判斷每一個SelectionKey的attachment,上篇文章中已經分析過給ServerBootstrap註冊的Channel是NioServerSocketChannel(繼承自AbstractNioChannel),所以進入processSelectedKey中:

 

 (代碼五)
1
  private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) { 2 final NioUnsafe unsafe = ch.unsafe(); 3 if (!k.isValid()) { 4 unsafe.close(unsafe.voidPromise()); 5 return; 6 } 7 8 try { 9 int readyOps = k.readyOps(); 10 if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { 11 unsafe.read(); 12 if (!ch.isOpen()) { 13 return; 14 } 15 } 16 if ((readyOps & SelectionKey.OP_WRITE) != 0) { 17 ch.unsafe().forceFlush(); 18 } 19 if ((readyOps & SelectionKey.OP_CONNECT) != 0) { 20 int ops = k.interestOps(); 21 ops &= ~SelectionKey.OP_CONNECT; 22 k.interestOps(ops); 23 24 unsafe.finishConnect(); 25 } 26 } catch (CancelledKeyException ignored) { 27 unsafe.close(unsafe.voidPromise()); 28 } 29 }

  

  在這裏根據傳入的SelectionKey的已就緒操做類型來決定下一步的操做,若是是一個讀操做,那麼進入AbstractNioMessageChannel$NioMessageUnsafe的read實現,這裏代碼不少,咱們只貼一下核心的代碼:

 

(代碼六)
1
@Override 2 public void read() { 3 ... 4 final ChannelPipeline pipeline = pipeline(); 5 ... 6 try { 7 int size = readBuf.size(); 8 for (int i = 0; i < size; i ++) { 9 pipeline.fireChannelRead(readBuf.get(i)); 10 } 11 ... 12 readBuf.clear(); 13 pipeline.fireChannelReadComplete(); 14 } finally { 15 } 16 }

  核心就是這個pipeline.fireChannelRead(readBuf.get(i));,這已經到了pipeline階段,可能有些人會誤覺得這是否是已經到了worker線程中,可是不可能啊,咱們的代碼其實在處於processSelectedKeys的邏輯裏面。實際上,不管是boss仍是worker,他們都是NioEventLoopGroup,玩法都是同樣的,只不過職責不同而已。boss也有本身的handler,上篇文章中咱們提到了netty中的reactor模式的玩法,從Doug Lea的圖中能夠看出,boss(實際上就是mainReactor)的handler其實就是這個acceptor。

  在此咱們順便學習一下netty中的handler:

  

  從用途上來講,handler分爲ChannelInboundHandler(讀)和ChannelOutboundHandler(寫),增長一層適配器產生了兩handler的Adapter,咱們使用到的類都是繼承自這兩個Adapter。咱們常常用到的SimpleChannelInboundHandler就繼承ChannelInboundHandlerAdapter,用於初始化用戶handler鏈的ChannelInitializer和boss線程綁定的ServerBootstrapAcceptor也都繼承於此。

  回到【代碼六主線】咱們從pipeline.fireChannelRead繼續追蹤下去會追到ChannelInboundHandler的channelRead的實現,而這裏的Hander就是ServerBootstrapAcceptor。

 (代碼七)
1
@Override 2 @SuppressWarnings("unchecked") 3 public void channelRead(ChannelHandlerContext ctx, Object msg) { 4 final Channel child = (Channel) msg; 5 6 child.pipeline().addLast(childHandler); 7 8 for (Entry<ChannelOption<?>, Object> e: childOptions) { 9 try { 10 if (!child.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) { 11 } 12 } catch (Throwable t) { 13 } 14 } 15 16 for (Entry<AttributeKey<?>, Object> e: childAttrs) { 17 child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue()); 18 } 19 20 try { 21 childGroup.register(child).addListener(new ChannelFutureListener() { 22 @Override 23 public void operationComplete(ChannelFuture future) throws Exception { 24 if (!future.isSuccess()) { 25 forceClose(child, future.cause()); 26 } 27 } 28 }); 29 } catch (Throwable t) { 30 forceClose(child, t); 31 } 32 }

  因爲ServerBootstrapAcceptor 很重要,咱們先看一下都有什麼內容:

private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter { private final EventLoopGroup childGroup; private final ChannelHandler childHandler; private final Entry<ChannelOption<?>, Object>[] childOptions; private final Entry<AttributeKey<?>, Object>[] childAttrs; }

  我本身的理解:

  childGroup就是subReactor(也就是worker線程);childHandler就是xxx;childOptions和childAttrs是爲channel準備的一些參數。

  回到【代碼七主線】在這裏作了3件事:

  1.爲客戶端channel的pipeline中添加childHandler,那麼這個childHandler是什麼鬼呢?回憶一下上文中的服務端啓動代碼,有bootStrap.childHandler(xxx)這樣的代碼,因此此處就是把在服務端啓動時咱們定義好的Handler鏈綁定給每一個channel。

  2.把咱們服務端初始化時的參數綁定到每一個channel中。

  3.childGroup.register(child).addListener(new ChannelFutureListener()),後面這個異步listener做用很明確,問題是這個childGroup是什麼鬼?我理解應該就是worker線程了。詳細說一下childGroup.register(child),繼續跟下去,跟到AbstractChannel$AbstractUnsafe中

(代碼八)
1
@Override 2 public final void register(EventLoop eventLoop, final ChannelPromise promise) { 3 ... 4 AbstractChannel.this.eventLoop = eventLoop; 5 6 if (eventLoop.inEventLoop()) { 7 register0(promise); 8 } else { 9 ... 10 } catch (Throwable t) { 11 } 12 } 13 }

  繼續register0:

(代碼九)
1
private void register0(ChannelPromise promise) { 2 try { 3 if (!promise.setUncancellable() || !ensureOpen(promise)) { 4 return; 5 } 6 boolean firstRegistration = neverRegistered; 7 doRegister(); 8 neverRegistered = false; 9 registered = true; 10 safeSetSuccess(promise); 11 pipeline.fireChannelRegistered(); 12 if (firstRegistration && isActive()) { 13 pipeline.fireChannelActive(); 14 } 15 } catch (Throwable t) { 16 } 17 }

  這裏核心有兩步:

  1.doRegister(),其實咱們在上篇文章中分析過,就是將channel綁定到selector上。此處有點懵逼,我猜想是綁定到worker線程的selector中,若是有大神知道請留言個人微博。

  2.pipeline.fireChannelRegistered(),繼續往下跟跟進到ChannelInboundHandler的channelRegistered方法中,而此時會調用咱們定義的ChannelInitializer,將咱們定義的handler註冊到pipeline中。

 

  至此【代碼一主線】執行完畢,咱們瀏覽了一遍boss線程的在接收socket請求期間的處理流程,過程當中是結合reactor模式去理解的,有些地方本身也有點不懂,還請各位指正。

  總結一下:

  1.boss線程就是個loop循環,打開selector -> 得到監聽到的SelectionKey -> 處理I/O請求 -> 處理非I/O請求,而咱們最關心的就是處理I/O請求(在processSelectedKeys()方法中完成)。

  2.遍歷準備就緒的SelectionKey,根據其可操做類型(read or write。。)來決定下一步的具體操做,咱們着重去了解了read邏輯。

  3.NioServerSocketChannel調用父類AbstractNioMessageChannel的unsafe類NioMessageUnsafe來處理讀取邏輯:調用pipeline處理readbuf。

  4.pipeline.fireChannelRead會調用ServerBootstrapAcceptor的channelRead:初始化客戶端channel參數,將該channel綁定到worker線程的selector中,爲channel註冊用戶定義的handler鏈。

 

  再精煉一點:

  boss線程只是接收客戶端socket並初始化客戶端channle,將channel丟給acceptor,acceptor會將這個channel註冊到worker線程中。整個loop過程都是一個非阻塞過程(所有異步化),同時boss中不會作耗時的I/O讀取,只是將channel丟給worker。所以是一個高效的loop過程。

  

  下文中咱們將分析worker線程的處理流程,敬請期待。。。

相關文章
相關標籤/搜索