[netty4][netty-transpot]Channel體系分析

Channel體系分析

接口與類結構體系

-- [I]AttributeMap, ChannelOutboundInvoker, Comparable        -- [I]AttributeMap
---- [I]Channel                                               ---- [C]DefaultAttributeMap


-- [I]Channel, [C]DefaultAttributeMap
---- [AC]AbstractChannel
------ [AC]AbstractNioChannel
-------- [AC]AbstractNioMessageChannel, [AC]AbstractNioByteChannel


-- [I]Channel                                                   -- [I]Channel
---- [I]ServerChannel                                           ---- [I]DuplexChannel
------ [I](N)ServerSocketChannel                                ------ [I](N)SocketChannel


-- [I](N)ServerSocketChannel, [AC]AbstractNioMessageChannel     -- (N)SocketChannel, AbstractNioByteChannel
---- [C]NioServerSocketChannel                                  ---- [C]NioSocketChannel
AbstractChannel新增長的特性
  1. EventLoop與該Channel是否兼容
  2. 獲取本地綁定的SocketAddress
  3. 獲取遠程鏈接到的SocketAddress
  4. doxxxx接口,註冊,綁定,鏈接,關閉,去註冊,開始讀,寫等實現的接口特性約定。Epoll,kqueue上會用到。
AbstractChannel實現的邏輯
  1. 構造初始化邏輯,包括id建立,unsafe建立,pipeline建立
  2. 是否可寫,是否已經註冊,
  3. 不可寫以前還有多少byte數據要寫出
  4. 在可寫以前有多少byte數據待寫出
  5. pipeline,parent字段落地
  6. 獲取的ByteBufAllocator,EventLoop,pipeline,parent,unsafe,closeFuture等實例
  7. 本地地址,遠程地址的獲取。支持cache,不是每次都獲取
  8. bind,connect,disconnect,close,deregister,flush,read,write,writeAndFlush,newPromise,voidPromise,newSucceededFuture,newFailedFuture。都是觸發pipeline的相應方法。
  9. 重寫hashcode,直接返回id的hashCode
  10. 重寫equals,在這裏認爲==的纔算equals。
  11. 重寫compare,就是拿id進行compare
  12. 重寫toString
AbstractNioChannel實現的邏輯
  1. 落地JDK的SelectableChannel, SelectionKey實例,以及暴露對外獲取接口
  2. 落地readInterestOp字段
  3. 落地connectPromise ChannelPromise字段
  4. 落地connectTimeoutFuture ScheduledFuture字段
  5. 落地遠端地址字段SocketAddress requestedRemoteAddress
  6. 對外暴露NioUnsafe,NioEventLoop等實例獲取接口
  7. 對外暴露是否開啓,是否讀掛起等判斷接口
  8. 設置讀掛起,清理讀掛起邏輯實現
  9. EventLoop與該Channel是否兼容的邏輯實現。就是看EventLoop是否是NioEventLoop
  10. 註冊、去註冊操做實現。
  11. 開始讀操做實現
  12. 關閉操做實現
  13. newDirectBuffer邏輯實現。這個行爲不在接口約定中。
註冊操做實現

這個實現仍是蠻嚴格的:html

protected void doRegister() throws Exception {
    boolean selected = false;
    for (;;) {
        try {
            selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
            return;
        } catch (CancelledKeyException e) {
            if (!selected) {
                // Force the Selector to select now as the "canceled" SelectionKey may still be
                // cached and not removed because no Select.select(..) operation was called yet.
                eventLoop().selectNow();
                selected = true;
            } else {
                // We forced a select operation on the selector before but the SelectionKey is still cached
                // for whatever reason. JDK bug ?
                throw e;
            }
        }
    }
}

註冊這個事情看起來不復雜,可是要嚴格寫好寫對就要注意。
註冊原本只須要用:java

/**
* Registers this channel with the given selector, returning a selection
* key.
*
* <p> If this channel is currently registered with the given selector then
* the selection key representing that registration is returned.  The key's
* interest set will have been changed to <tt>ops</tt>, as if by invoking
* the {@link SelectionKey#interestOps(int) interestOps(int)} method.  If
* the <tt>att</tt> argument is not <tt>null</tt> then the key's attachment
* will have been set to that value.  A {@link CancelledKeyException} will
* be thrown if the key has already been cancelled.
*
* <p> Otherwise this channel has not yet been registered with the given
* selector, so it is registered and the resulting new key is returned.
* The key's initial interest set will be <tt>ops</tt> and its attachment
* will be <tt>att</tt>.
*
* <p> This method may be invoked at any time.  If this method is invoked
* while another invocation of this method or of the {@link
* #configureBlocking(boolean) configureBlocking} method is in progress
* then it will first block until the other operation is complete.  This
* method will then synchronize on the selector's key set and therefore may
* block if invoked concurrently with another registration or selection
* operation involving the same selector. </p>
*
* <p> If this channel is closed while this operation is in progress then
* the key returned by this method will have been cancelled and will
* therefore be invalid. </p>
*
* @param  sel
*         The selector with which this channel is to be registered
*
* @param  ops
*         The interest set for the resulting key
*
* @param  att
*         The attachment for the resulting key; may be <tt>null</tt>
*
* @throws  ClosedChannelException
*          If this channel is closed
*
* @throws  ClosedSelectorException
*          If the selector is closed
*
* @throws  IllegalBlockingModeException
*          If this channel is in blocking mode
*
* @throws  IllegalSelectorException
*          If this channel was not created by the same provider
*          as the given selector
*
* @throws  CancelledKeyException
*          If this channel is currently registered with the given selector
*          but the corresponding key has already been cancelled
*
* @throws  IllegalArgumentException
*          If a bit in the <tt>ops</tt> set does not correspond to an
*          operation that is supported by this channel, that is, if
*          {@code set & ~validOps() != 0}
*
* @return  A key representing the registration of this channel with
*          the given selector
*/
java.nio.channels.SelectableChannel.register(Selector sel, int ops, Object att)
throws ClosedChannelException;

Selector實例,用的是EventLoop的unwrappedSelector實例
int ops 此處目前註冊時送的是0(能夠送定義的4個以外的?),SelectionKey定義了4個:ios

  1. OP_READ 1
  2. OP_WRITE 4
  3. OP_CONNECT 8
  4. OP_ACCEPT 16

Object att 表示要塞給Selector實例的附件,此處送的是AbstractNioChannelgit

doRegister異常的處理:github

  1. 若是是CancelledKeyException以外的異常,直接拋出,中斷註冊動做。好比:IllegalSelectorException,IllegalBlockingModeException,ClosedChannelException,ClosedSelectorException等異常。
  2. 若是是CancelledKeyException異常,嘗試操做eventLoop().selectNow();注意這個時候用的EventLoop的selector字段。 注意:EventLoop中unwrappedSelector 與 selector兩個字段的區別。作完eventLoop().selectNow()以後再嘗試註冊,若是仍是CancelledKeyException異常則對外拋出異常。selectNow()的目的是規避有些SelectionKey處於Cancel狀態但還在緩存中,調用selectNow能夠嘗試將其remove。
    整個流程圖示以下:
+
                                   |
                        +----------v----------+
+----------------------->  SelectableChannel  |
|                       |     .register       |
|                       +----------+----------+
|                                  |
|                                  |
|                       +----------v----------+       +-----------------+
|                       |    is success       +---Y--->      return     |
|                       +---------------------+       +-----------------+
|                           N              N
|                           |              |
|        +------------------v-----+    +---v----------------+
|        | CancelledKeyException  |    | other exception    |
|        +-----------+------------+    +------+-------------+
|                    |                        |
|                    |                        |       +-----------------+
|           +--------v--------+               +------->  throw exp      |
|           |  selected?      |                       +-------^---------+
|           +-------------+---+                               |
|              N          |                                   |
|              |          +--------Y--------------------------+
|    +---------v---+
|    |  selectNow  |
+----+  selected.  |
     +-------------+

有幾個注意點sql

  1. 在註冊時如有CancelledKeyException須要作一次selectNow,而後再重試註冊,不然可能會觸發JDK bug。Selectkey被cancel了,可是仍在緩存中。
  2. 針對1的狀況,也會有作了selectNow後再重試註冊後還會有CancelledKeyException,那麼此時直接拋出異常就行了。
  3. JDK SelectionKey定義的4個操做沒有0,可是註冊時ops時爲啥送0 ?能夠參見io.netty.channel.socket.nio.AbstractNioChannel doRegister() ?? #1836,有netty做者,和寫這個代碼的commiter的解釋。大體意思就是爲了解決潛在的JDK的bug。promise

    It's an intentional stuff(有意設計) to deal with a potential(潛在的) JDK bug where it returns a selection key with readyOps set to 0 for no good reason, leading to a spin loop. So, I'd leave it as it is.
    // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead to a spin loop緩存

有個dengyuankai272同窗說到以下,感受解釋的更靠譜,做者解釋的那個0不是註冊的這個地方app

There are two way to register interested events:

Channel.register()
SelectionKey.interestOps(int)
netty use 2.

After binding channel to selector with interestOps 0, netty will invoke fireChannelActive()
->fire a read event
->HeadContext.unsafe.doBeginRead()
->SelectionKey.interestOps(int)
to set interestOps.
去除註冊操做邏輯實現

用了eventLoop().cancel(selectionKey());
跟進去邏輯以下:socket

+-------------------+
             |   SelectionKey    |
             |    .cancel        |
             +--------+----------+
                      |
                      |
             +--------v----------+
             | cancelledKey add 1|
             +--------+----------+
                      |
                      |
                      |
           +----------v-----------+
           |   cancelledKeys      |
      +----+   >= CLEANUP_INTERVAL+----+
      |    |     (256,hard-code)  |    |
      N    +----------------------+    Y
      |                                |
      |                                |
+-----v--------+         +-------------v-------+
|   return     |         |   needsToSelectAgain|
|              |         |   = true            |
+--------------+         +-----------+---------+
                                     |
                                     | // poll task,processSelectedKeysPlain,closeAll等地方觸發
                         +-----------v---------+
                         |    selector         |
                         |    .selectNow       |
                         +---------------------+
開始讀操做邏輯實現
+------------------+
         |   SelectionKey   +----N------+
         |   .isValid       |           |
         +------------------+    +------v------+
                  Y              |  return     |
                  |              +-------------+
         +--------v---------+
         |   readPending    |
         |   = true         |
         +--------+---------+
                  |
                  |
                  |
                  |
+-----------------v----------------+
| selectionKey.interestOps         |
| (interestOps | readInterestOp)   |
+--------------+-------------------+
關閉操做邏輯實現

connectPromise.tryFailure(DO_CLOSE_CLOSED_CHANNEL_EXCEPTION);
connectTimeoutFuture.cancel(false);
將鏈接的promise和鏈接超時的future設置成失敗和取消。

newDirectBuffer操做邏輯實現

是將非direct的bug包裝轉換成direct的buf,好比PooledHeapByteBuf就不是direct的。
分配direct buf,一是靠ByteBufAllocator.directBuffer,另外一是靠ByteBufUtil.threadLocalDirectBuffer()。
這個仍是有點意思的,還會涉及到 ReferenceCountUtil,待分析。
newDirectBuffer會用在AbstractNioByteChannel.filterOutboundMessage(Object)邏輯中。

AbstractNioMessageChannel實現的邏輯
NioMessageUnsafe.read實現的邏輯

在RestExpress項目中,這個方法的實際調用棧示例與簡單分析以下:

ServerBootstrapFactory$2.newThread(Runnable) line: 90 // 開始建立worker線程   
ThreadPerTaskExecutor.execute(Runnable) line: 33    
NioEventLoop(SingleThreadEventExecutor).doStartThread() line: 894   
NioEventLoop(SingleThreadEventExecutor).startThread() line: 865 
NioEventLoop(SingleThreadEventExecutor).execute(Runnable) line: 758 
NioSocketChannel$NioSocketChannelUnsafe(AbstractChannel$AbstractUnsafe).register(EventLoop, ChannelPromise) line: 483   
NioEventLoop(SingleThreadEventLoop).register(ChannelPromise) line: 80   
NioEventLoop(SingleThreadEventLoop).register(Channel) line: 74  // return register(new DefaultChannelPromise(channel, this)); 拿Channel new了一個DefaultChannelPromise,這個Channel對象就是上面boss線程read出來message Object,也就是NioSocketChannel實例。 
NioEventLoopGroup(MultithreadEventLoopGroup).register(Channel) line: 86 
ServerBootstrap$ServerBootstrapAcceptor.channelRead(ChannelHandlerContext, Object) line: 255 // 這個是boss線程accept以後準備投遞到worker線程的地方 childGroup.register(child).addListener(new ChannelFutureListener() {...   
DefaultChannelHandlerContext(AbstractChannelHandlerContext).invokeChannelRead(Object) line: 362 
AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext, Object) line: 348    
DefaultChannelPipeline$HeadContext(AbstractChannelHandlerContext).fireChannelRead(Object) line: 340 
DefaultChannelPipeline$HeadContext.channelRead(ChannelHandlerContext, Object) line: 1408    
DefaultChannelPipeline$HeadContext(AbstractChannelHandlerContext).invokeChannelRead(Object) line: 362   
AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext, Object) line: 348    
DefaultChannelPipeline.fireChannelRead(Object) line: 930    
AbstractNioMessageChannel$NioMessageUnsafe.read() line: 93  // **  正在分析的這個方法    
NioEventLoop.processSelectedKey(SelectionKey, AbstractNioChannel) line: 677 // 是accept或者read事件 或者是0 觸發boss線程上的read動做。注意是 boss線程。在boss線程上accept等同於read。 if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {  unsafe.read();  }
NioEventLoop.processSelectedKeysOptimized() line: 612   
NioEventLoop.processSelectedKeys() line: 529    
NioEventLoop.run() line: 491    
SingleThreadEventExecutor$5.run() line: 905 
Thread.run() line: 748

這個堆棧實際上也解釋了,boss線程從接到IO上的accept事件到交給work線程去作後續的讀取的整個過程。
更詳細的用sql查trace數據:

select * from trace_data where id>=15309 and THREAD_ID=13 and STACK_NUM>5 order by id

邏輯流程:
靠子類的doReadMessages將須要讀取的message讀取出來放到readBuf (一個List)
迭代讀取到的message挨個通知pipeline channelRead事件,pipeline.fireChannelRead
清理readBuf
通知pipeline channelReadComplete事件,pipeline.fireChannelReadComplete
若是異常則關閉 closeOnReadError 並通知pipeline fireExceptionCaught事件

子類實現
此處咱們用到的AbstractNioMessageChannel 有個子類實現是NioServerSocketChannel
doReadMessages實先的內容是: 作accept操做 並建立NioSocketChannel實例做爲read到的message,這個Channel實例看上面的的調用棧,會在NioEventLoop(SingleThreadEventLoop).register(Channel) line: 74用到。

AbstractNioByteChannel實現的邏輯

看下trace數據:

select * from trace_data where class_name like '%AbstractNioByteChannel%'

數據:

ID      THREAD_ID   STACK_NUM   THREAD_NAME     METHOD_ID   CLASS_NAME      METHOD_NAME     LINE_NUM  
15336   13  7   boss-0  14143   io/netty/channel/nio/AbstractNioByteChannel <clinit>    45
15350   13  8   boss-0  14123   io/netty/channel/nio/AbstractNioByteChannel <init>  67
15374   13  15  boss-0  14144   io/netty/channel/nio/AbstractNioByteChannel$NioByteUnsafe   <init>  97
15407   13  9   boss-0  14186   io/netty/channel/nio/AbstractNioByteChannel$1   <init>  49
15420   13  13  boss-0  14127   io/netty/channel/nio/AbstractNioByteChannel metadata    85
20790   15  5   worker-0    14147   io/netty/channel/nio/AbstractNioByteChannel$NioByteUnsafe   read    186
20793   15  6   worker-0    14128   io/netty/channel/nio/AbstractNioByteChannel shouldBreakReadReady    89
26019   15  30  worker-0    14133   io/netty/channel/nio/AbstractNioByteChannel filterOutboundMessage   283
26288   15  30  worker-0    14139   io/netty/channel/nio/AbstractNioByteChannel clearOpWrite    348
26357   15  5   worker-0    14147   io/netty/channel/nio/AbstractNioByteChannel$NioByteUnsafe   read    186
26360   15  6   worker-0    14128   io/netty/channel/nio/AbstractNioByteChannel shouldBreakReadReady    89
26757   15  6   worker-0    14145   io/netty/channel/nio/AbstractNioByteChannel$NioByteUnsafe   closeOnRead 111
26767   15  7   worker-0    14141   io/netty/channel/nio/AbstractNioByteChannel access$000  43
26768   15  8   worker-0    14129   io/netty/channel/nio/AbstractNioByteChannel isAllowHalfClosure  93

建立是在boss線程,工做是在work線程,主要用到的方法是read。

NioByteUnsafe.read實現的邏輯

tarce 數據中的記錄:

ID      THREAD_ID   STACK_NUM   THREAD_NAME     METHOD_ID   CLASS_NAME      METHOD_NAME     LINE_NUM  
20790   15  5   worker-0    14147   io/netty/channel/nio/AbstractNioByteChannel$NioByteUnsafe   read    186

從NioByteUnsafe.read到http object解碼的調用棧:

HttpRequestDecoder(HttpObjectDecoder).decode(ChannelHandlerContext, ByteBuf, List<Object>) line: 196    
HttpRequestDecoder(ByteToMessageDecoder).decodeRemovalReentryProtection(ChannelHandlerContext, ByteBuf, List<Object>) line: 502 
HttpRequestDecoder(ByteToMessageDecoder).callDecode(ChannelHandlerContext, ByteBuf, List<Object>) line: 441 
HttpRequestDecoder(ByteToMessageDecoder).channelRead(ChannelHandlerContext, Object) line: 278   
DefaultChannelHandlerContext(AbstractChannelHandlerContext).invokeChannelRead(Object) line: 362 
AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext, Object) line: 348    
DefaultChannelHandlerContext(AbstractChannelHandlerContext).fireChannelRead(Object) line: 340   
ReadTimeoutHandler(IdleStateHandler).channelRead(ChannelHandlerContext, Object) line: 286   
DefaultChannelHandlerContext(AbstractChannelHandlerContext).invokeChannelRead(Object) line: 362 
AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext, Object) line: 348    
DefaultChannelPipeline$HeadContext(AbstractChannelHandlerContext).fireChannelRead(Object) line: 340 
DefaultChannelPipeline$HeadContext.channelRead(ChannelHandlerContext, Object) line: 1408    
DefaultChannelPipeline$HeadContext(AbstractChannelHandlerContext).invokeChannelRead(Object) line: 362   
AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext, Object) line: 348    
DefaultChannelPipeline.fireChannelRead(Object) line: 930    
NioSocketChannel$NioSocketChannelUnsafe(AbstractNioByteChannel$NioByteUnsafe).read() line: 163  
NioEventLoop.processSelectedKey(SelectionKey, AbstractNioChannel) line: 677 
NioEventLoop.processSelectedKeysOptimized() line: 612   
NioEventLoop.processSelectedKeys() line: 529    
NioEventLoop.run() line: 491    
SingleThreadEventExecutor$5.run() line: 905 
Thread.run() line: 748

邏輯實現:
shouldBreakReadReady檢查, 判斷是否應該中斷 讀ready的處理,針對的狀況能夠參見其代碼判斷邏輯
調用子類實現的doReadBytes方法,讀取byte的動做
流水線觸發讀事件 pipeline.fireChannelRead,fireChannelRead接受的參數是Object類型...,因此此處給的ByteBuf和上面NioMessageUnsafe read時給送Channel均可以的。
流水線觸發讀完成事件 pipeline.fireChannelReadComplete()

DefaultChannelHandlerContext 這個等到後面Handler相關分析的地方再繼續分析

AbstractNioByteChannel.filterOutboundMessage邏輯實現
  1. msg是ByteBuf類型的,若不是Direct的buf,則用newDirectBuffer wrap一下
  2. msg若是是FileRegion類型的,原樣返回
  3. 不是1和2兩種類型的,拋出UnsupportedOperationException異常
AbstractNioByteChannel.clearOpWrite邏輯實現

nio的select()的時候,只要數據通道容許寫,每次select()返回的OP_WRITE都是true。因此在nio的寫數據裏面,咱們在每次須要寫數據以前把數據放到緩衝區,而且註冊OP_WRITE,對selector進行wakeup(),這樣這一輪select()發現有OP_WRITE以後,將緩衝區數據寫入channel,清空緩衝區,而且反註冊OP_WRITE,寫數據完成。

protected final void clearOpWrite() {
        final SelectionKey key = selectionKey();
        // Check first if the key is still valid as it may be canceled as part of the deregistration
        // from the EventLoop
        // See https://github.com/netty/netty/issues/2104
        if (!key.isValid()) {
            return;
        }
        final int interestOps = key.interestOps();
        if ((interestOps & SelectionKey.OP_WRITE) != 0) {
            key.interestOps(interestOps & ~SelectionKey.OP_WRITE); // 反註冊??
        }
    }
NioServerSocketChannel分析

構造函數指定這個類感興趣的I/O事件是OP_ACCEPT

doBind實現

這個類實現了doBind具體細節
對於不一樣的JDK版本用了不一樣的bind方法
JDK7以上(含) java.nio.channels.ServerSocketChannel.bind(SocketAddress, int)
JDK7如下 java.net.ServerSocket.bind(SocketAddress, int)

@Override
    protected void doBind(SocketAddress localAddress) throws Exception {
        if (PlatformDependent.javaVersion() >= 7) {
            javaChannel().bind(localAddress, config.getBacklog());
        } else {
            javaChannel().socket().bind(localAddress, config.getBacklog());
        }
    }
doClose實現

直接調用ServerSocketChannel的close方法,沒有額外邏輯。

doReadMessages實現

用NIO的ServerSocketChannel實例accept出NIO的SocketChannel實例,再用NIO的SocketChannel實例建立了netty對應的NioSocketChannel實例,並將此實例做爲message返回到上下文處理邏輯中。(爲了整個封裝,messgae不必定指消息報文,此處就是指從ServerSocketChannel-->SocketChannel-->NioSocketChannel)
NioSocketChannel建立實例初始化時又作了很大一堆邏輯處理。

connect相關實現

NioServerSocketChannel端不支持doConnect操做,不支持doFinishConnect操做,不支持doDisconnect操做,不支持doWriteMessage操做,不支持filterOutboundMessage操做。

遺留問題

AbstractNioUnsafe的實現邏輯,諸如connect等
io.netty.channel.nio.AbstractNioMessageChannel.doWrite(ChannelOutboundBuffer)實現邏輯待分析,在rest server這個項目中並無用到這個邏輯。

ServerSocketChannel 這個類1.4就有了,可是它的bind方法在1.7才提供,詳細參見其註釋。可是JDK1.7爲啥要新增這個方法呢?爲了解決啥問題?// TODO

相關文章
相關標籤/搜索