-- [I]AttributeMap, ChannelOutboundInvoker, Comparable -- [I]AttributeMap ---- [I]Channel ---- [C]DefaultAttributeMap -- [I]Channel, [C]DefaultAttributeMap ---- [AC]AbstractChannel ------ [AC]AbstractNioChannel -------- [AC]AbstractNioMessageChannel, [AC]AbstractNioByteChannel -- [I]Channel -- [I]Channel ---- [I]ServerChannel ---- [I]DuplexChannel ------ [I](N)ServerSocketChannel ------ [I](N)SocketChannel -- [I](N)ServerSocketChannel, [AC]AbstractNioMessageChannel -- (N)SocketChannel, AbstractNioByteChannel ---- [C]NioServerSocketChannel ---- [C]NioSocketChannel
這個實現仍是蠻嚴格的:html
protected void doRegister() throws Exception { boolean selected = false; for (;;) { try { selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this); return; } catch (CancelledKeyException e) { if (!selected) { // Force the Selector to select now as the "canceled" SelectionKey may still be // cached and not removed because no Select.select(..) operation was called yet. eventLoop().selectNow(); selected = true; } else { // We forced a select operation on the selector before but the SelectionKey is still cached // for whatever reason. JDK bug ? throw e; } } } }
註冊這個事情看起來不復雜,可是要嚴格寫好寫對就要注意。
註冊原本只須要用:java
/** * Registers this channel with the given selector, returning a selection * key. * * <p> If this channel is currently registered with the given selector then * the selection key representing that registration is returned. The key's * interest set will have been changed to <tt>ops</tt>, as if by invoking * the {@link SelectionKey#interestOps(int) interestOps(int)} method. If * the <tt>att</tt> argument is not <tt>null</tt> then the key's attachment * will have been set to that value. A {@link CancelledKeyException} will * be thrown if the key has already been cancelled. * * <p> Otherwise this channel has not yet been registered with the given * selector, so it is registered and the resulting new key is returned. * The key's initial interest set will be <tt>ops</tt> and its attachment * will be <tt>att</tt>. * * <p> This method may be invoked at any time. If this method is invoked * while another invocation of this method or of the {@link * #configureBlocking(boolean) configureBlocking} method is in progress * then it will first block until the other operation is complete. This * method will then synchronize on the selector's key set and therefore may * block if invoked concurrently with another registration or selection * operation involving the same selector. </p> * * <p> If this channel is closed while this operation is in progress then * the key returned by this method will have been cancelled and will * therefore be invalid. </p> * * @param sel * The selector with which this channel is to be registered * * @param ops * The interest set for the resulting key * * @param att * The attachment for the resulting key; may be <tt>null</tt> * * @throws ClosedChannelException * If this channel is closed * * @throws ClosedSelectorException * If the selector is closed * * @throws IllegalBlockingModeException * If this channel is in blocking mode * * @throws IllegalSelectorException * If this channel was not created by the same provider * as the given selector * * @throws CancelledKeyException * If this channel is currently registered with the given selector * but the corresponding key has already been cancelled * * @throws IllegalArgumentException * If a bit in the <tt>ops</tt> set does not correspond to an * operation that is supported by this channel, that is, if * {@code set & ~validOps() != 0} * * @return A key representing the registration of this channel with * the given selector */ java.nio.channels.SelectableChannel.register(Selector sel, int ops, Object att) throws ClosedChannelException;
Selector實例,用的是EventLoop的unwrappedSelector實例
int ops 此處目前註冊時送的是0(能夠送定義的4個以外的?),SelectionKey定義了4個:ios
- OP_READ 1
- OP_WRITE 4
- OP_CONNECT 8
- OP_ACCEPT 16
Object att 表示要塞給Selector實例的附件,此處送的是AbstractNioChannelgit
doRegister異常的處理:github
+ | +----------v----------+ +-----------------------> SelectableChannel | | | .register | | +----------+----------+ | | | | | +----------v----------+ +-----------------+ | | is success +---Y---> return | | +---------------------+ +-----------------+ | N N | | | | +------------------v-----+ +---v----------------+ | | CancelledKeyException | | other exception | | +-----------+------------+ +------+-------------+ | | | | | | +-----------------+ | +--------v--------+ +-------> throw exp | | | selected? | +-------^---------+ | +-------------+---+ | | N | | | | +--------Y--------------------------+ | +---------v---+ | | selectNow | +----+ selected. | +-------------+
有幾個注意點sql
JDK SelectionKey定義的4個操做沒有0,可是註冊時ops時爲啥送0 ?能夠參見io.netty.channel.socket.nio.AbstractNioChannel doRegister() ?? #1836,有netty做者,和寫這個代碼的commiter的解釋。大體意思就是爲了解決潛在的JDK的bug。promise
It's an intentional stuff(有意設計) to deal with a potential(潛在的) JDK bug where it returns a selection key with readyOps set to 0 for no good reason, leading to a spin loop. So, I'd leave it as it is.
// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead to a spin loop緩存
有個dengyuankai272同窗說到以下,感受解釋的更靠譜,做者解釋的那個0不是註冊的這個地方app
There are two way to register interested events: Channel.register() SelectionKey.interestOps(int) netty use 2. After binding channel to selector with interestOps 0, netty will invoke fireChannelActive() ->fire a read event ->HeadContext.unsafe.doBeginRead() ->SelectionKey.interestOps(int) to set interestOps.
用了eventLoop().cancel(selectionKey());
跟進去邏輯以下:socket
+-------------------+ | SelectionKey | | .cancel | +--------+----------+ | | +--------v----------+ | cancelledKey add 1| +--------+----------+ | | | +----------v-----------+ | cancelledKeys | +----+ >= CLEANUP_INTERVAL+----+ | | (256,hard-code) | | N +----------------------+ Y | | | | +-----v--------+ +-------------v-------+ | return | | needsToSelectAgain| | | | = true | +--------------+ +-----------+---------+ | | // poll task,processSelectedKeysPlain,closeAll等地方觸發 +-----------v---------+ | selector | | .selectNow | +---------------------+
+------------------+ | SelectionKey +----N------+ | .isValid | | +------------------+ +------v------+ Y | return | | +-------------+ +--------v---------+ | readPending | | = true | +--------+---------+ | | | | +-----------------v----------------+ | selectionKey.interestOps | | (interestOps | readInterestOp) | +--------------+-------------------+
connectPromise.tryFailure(DO_CLOSE_CLOSED_CHANNEL_EXCEPTION);
connectTimeoutFuture.cancel(false);
將鏈接的promise和鏈接超時的future設置成失敗和取消。
是將非direct的bug包裝轉換成direct的buf,好比PooledHeapByteBuf
就不是direct的。
分配direct buf,一是靠ByteBufAllocator.directBuffer,另外一是靠ByteBufUtil.threadLocalDirectBuffer()。
這個仍是有點意思的,還會涉及到 ReferenceCountUtil
,待分析。
newDirectBuffer
會用在AbstractNioByteChannel.filterOutboundMessage(Object)
邏輯中。
在RestExpress項目中,這個方法的實際調用棧示例與簡單分析以下:
ServerBootstrapFactory$2.newThread(Runnable) line: 90 // 開始建立worker線程 ThreadPerTaskExecutor.execute(Runnable) line: 33 NioEventLoop(SingleThreadEventExecutor).doStartThread() line: 894 NioEventLoop(SingleThreadEventExecutor).startThread() line: 865 NioEventLoop(SingleThreadEventExecutor).execute(Runnable) line: 758 NioSocketChannel$NioSocketChannelUnsafe(AbstractChannel$AbstractUnsafe).register(EventLoop, ChannelPromise) line: 483 NioEventLoop(SingleThreadEventLoop).register(ChannelPromise) line: 80 NioEventLoop(SingleThreadEventLoop).register(Channel) line: 74 // return register(new DefaultChannelPromise(channel, this)); 拿Channel new了一個DefaultChannelPromise,這個Channel對象就是上面boss線程read出來message Object,也就是NioSocketChannel實例。 NioEventLoopGroup(MultithreadEventLoopGroup).register(Channel) line: 86 ServerBootstrap$ServerBootstrapAcceptor.channelRead(ChannelHandlerContext, Object) line: 255 // 這個是boss線程accept以後準備投遞到worker線程的地方 childGroup.register(child).addListener(new ChannelFutureListener() {... DefaultChannelHandlerContext(AbstractChannelHandlerContext).invokeChannelRead(Object) line: 362 AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext, Object) line: 348 DefaultChannelPipeline$HeadContext(AbstractChannelHandlerContext).fireChannelRead(Object) line: 340 DefaultChannelPipeline$HeadContext.channelRead(ChannelHandlerContext, Object) line: 1408 DefaultChannelPipeline$HeadContext(AbstractChannelHandlerContext).invokeChannelRead(Object) line: 362 AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext, Object) line: 348 DefaultChannelPipeline.fireChannelRead(Object) line: 930 AbstractNioMessageChannel$NioMessageUnsafe.read() line: 93 // ** 正在分析的這個方法 NioEventLoop.processSelectedKey(SelectionKey, AbstractNioChannel) line: 677 // 是accept或者read事件 或者是0 觸發boss線程上的read動做。注意是 boss線程。在boss線程上accept等同於read。 if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { unsafe.read(); } NioEventLoop.processSelectedKeysOptimized() line: 612 NioEventLoop.processSelectedKeys() line: 529 NioEventLoop.run() line: 491 SingleThreadEventExecutor$5.run() line: 905 Thread.run() line: 748
這個堆棧實際上也解釋了,boss線程從接到IO上的accept事件到交給work線程去作後續的讀取的整個過程。
更詳細的用sql查trace數據:
select * from trace_data where id>=15309 and THREAD_ID=13 and STACK_NUM>5 order by id
邏輯流程:
靠子類的doReadMessages將須要讀取的message讀取出來放到readBuf (一個List)
迭代讀取到的message挨個通知pipeline channelRead事件,pipeline.fireChannelRead
清理readBuf
通知pipeline channelReadComplete事件,pipeline.fireChannelReadComplete
若是異常則關閉 closeOnReadError 並通知pipeline fireExceptionCaught事件
子類實現
此處咱們用到的AbstractNioMessageChannel
有個子類實現是NioServerSocketChannel
doReadMessages實先的內容是: 作accept操做 並建立NioSocketChannel實例做爲read到的message,這個Channel實例看上面的的調用棧,會在NioEventLoop(SingleThreadEventLoop).register(Channel) line: 74用到。
看下trace數據:
select * from trace_data where class_name like '%AbstractNioByteChannel%'
數據:
ID THREAD_ID STACK_NUM THREAD_NAME METHOD_ID CLASS_NAME METHOD_NAME LINE_NUM 15336 13 7 boss-0 14143 io/netty/channel/nio/AbstractNioByteChannel <clinit> 45 15350 13 8 boss-0 14123 io/netty/channel/nio/AbstractNioByteChannel <init> 67 15374 13 15 boss-0 14144 io/netty/channel/nio/AbstractNioByteChannel$NioByteUnsafe <init> 97 15407 13 9 boss-0 14186 io/netty/channel/nio/AbstractNioByteChannel$1 <init> 49 15420 13 13 boss-0 14127 io/netty/channel/nio/AbstractNioByteChannel metadata 85 20790 15 5 worker-0 14147 io/netty/channel/nio/AbstractNioByteChannel$NioByteUnsafe read 186 20793 15 6 worker-0 14128 io/netty/channel/nio/AbstractNioByteChannel shouldBreakReadReady 89 26019 15 30 worker-0 14133 io/netty/channel/nio/AbstractNioByteChannel filterOutboundMessage 283 26288 15 30 worker-0 14139 io/netty/channel/nio/AbstractNioByteChannel clearOpWrite 348 26357 15 5 worker-0 14147 io/netty/channel/nio/AbstractNioByteChannel$NioByteUnsafe read 186 26360 15 6 worker-0 14128 io/netty/channel/nio/AbstractNioByteChannel shouldBreakReadReady 89 26757 15 6 worker-0 14145 io/netty/channel/nio/AbstractNioByteChannel$NioByteUnsafe closeOnRead 111 26767 15 7 worker-0 14141 io/netty/channel/nio/AbstractNioByteChannel access$000 43 26768 15 8 worker-0 14129 io/netty/channel/nio/AbstractNioByteChannel isAllowHalfClosure 93
建立是在boss線程,工做是在work線程,主要用到的方法是read。
tarce 數據中的記錄:
ID THREAD_ID STACK_NUM THREAD_NAME METHOD_ID CLASS_NAME METHOD_NAME LINE_NUM 20790 15 5 worker-0 14147 io/netty/channel/nio/AbstractNioByteChannel$NioByteUnsafe read 186
從NioByteUnsafe.read到http object解碼的調用棧:
HttpRequestDecoder(HttpObjectDecoder).decode(ChannelHandlerContext, ByteBuf, List<Object>) line: 196 HttpRequestDecoder(ByteToMessageDecoder).decodeRemovalReentryProtection(ChannelHandlerContext, ByteBuf, List<Object>) line: 502 HttpRequestDecoder(ByteToMessageDecoder).callDecode(ChannelHandlerContext, ByteBuf, List<Object>) line: 441 HttpRequestDecoder(ByteToMessageDecoder).channelRead(ChannelHandlerContext, Object) line: 278 DefaultChannelHandlerContext(AbstractChannelHandlerContext).invokeChannelRead(Object) line: 362 AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext, Object) line: 348 DefaultChannelHandlerContext(AbstractChannelHandlerContext).fireChannelRead(Object) line: 340 ReadTimeoutHandler(IdleStateHandler).channelRead(ChannelHandlerContext, Object) line: 286 DefaultChannelHandlerContext(AbstractChannelHandlerContext).invokeChannelRead(Object) line: 362 AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext, Object) line: 348 DefaultChannelPipeline$HeadContext(AbstractChannelHandlerContext).fireChannelRead(Object) line: 340 DefaultChannelPipeline$HeadContext.channelRead(ChannelHandlerContext, Object) line: 1408 DefaultChannelPipeline$HeadContext(AbstractChannelHandlerContext).invokeChannelRead(Object) line: 362 AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext, Object) line: 348 DefaultChannelPipeline.fireChannelRead(Object) line: 930 NioSocketChannel$NioSocketChannelUnsafe(AbstractNioByteChannel$NioByteUnsafe).read() line: 163 NioEventLoop.processSelectedKey(SelectionKey, AbstractNioChannel) line: 677 NioEventLoop.processSelectedKeysOptimized() line: 612 NioEventLoop.processSelectedKeys() line: 529 NioEventLoop.run() line: 491 SingleThreadEventExecutor$5.run() line: 905 Thread.run() line: 748
邏輯實現:
shouldBreakReadReady檢查, 判斷是否應該中斷 讀ready的處理,針對的狀況能夠參見其代碼判斷邏輯
調用子類實現的doReadBytes方法,讀取byte的動做
流水線觸發讀事件 pipeline.fireChannelRead,fireChannelRead接受的參數是Object類型...,因此此處給的ByteBuf和上面NioMessageUnsafe read時給送Channel均可以的。
流水線觸發讀完成事件 pipeline.fireChannelReadComplete()
DefaultChannelHandlerContext 這個等到後面Handler相關分析的地方再繼續分析
nio的select()的時候,只要數據通道容許寫,每次select()返回的OP_WRITE都是true。因此在nio的寫數據裏面,咱們在每次須要寫數據以前把數據放到緩衝區,而且註冊OP_WRITE,對selector進行wakeup(),這樣這一輪select()發現有OP_WRITE以後,將緩衝區數據寫入channel,清空緩衝區,而且反註冊OP_WRITE,寫數據完成。
protected final void clearOpWrite() { final SelectionKey key = selectionKey(); // Check first if the key is still valid as it may be canceled as part of the deregistration // from the EventLoop // See https://github.com/netty/netty/issues/2104 if (!key.isValid()) { return; } final int interestOps = key.interestOps(); if ((interestOps & SelectionKey.OP_WRITE) != 0) { key.interestOps(interestOps & ~SelectionKey.OP_WRITE); // 反註冊?? } }
構造函數指定這個類感興趣的I/O事件是OP_ACCEPT
這個類實現了doBind具體細節
對於不一樣的JDK版本用了不一樣的bind方法
JDK7以上(含) java.nio.channels.ServerSocketChannel.bind(SocketAddress, int)
JDK7如下 java.net.ServerSocket.bind(SocketAddress, int)
@Override protected void doBind(SocketAddress localAddress) throws Exception { if (PlatformDependent.javaVersion() >= 7) { javaChannel().bind(localAddress, config.getBacklog()); } else { javaChannel().socket().bind(localAddress, config.getBacklog()); } }
直接調用ServerSocketChannel的close方法,沒有額外邏輯。
用NIO的ServerSocketChannel實例accept出NIO的SocketChannel實例,再用NIO的SocketChannel實例建立了netty對應的NioSocketChannel實例,並將此實例做爲message返回到上下文處理邏輯中。(爲了整個封裝,messgae不必定指消息報文,此處就是指從ServerSocketChannel-->SocketChannel-->NioSocketChannel)
NioSocketChannel建立實例初始化時又作了很大一堆邏輯處理。
NioServerSocketChannel端不支持doConnect操做,不支持doFinishConnect操做,不支持doDisconnect操做,不支持doWriteMessage操做,不支持filterOutboundMessage操做。
AbstractNioUnsafe的實現邏輯,諸如connect等
io.netty.channel.nio.AbstractNioMessageChannel.doWrite(ChannelOutboundBuffer)實現邏輯待分析,在rest server這個項目中並無用到這個邏輯。
ServerSocketChannel 這個類1.4就有了,可是它的bind方法在1.7才提供,詳細參見其註釋。可是JDK1.7爲啥要新增這個方法呢?爲了解決啥問題?// TODO