文本分享Netty中事件循環機制,Channel的設計,以及Server啓動過程。
源碼分析基於Netty 4.1java
Netty經過事件循環(Event Loop)機制處理IO事件,即經過一個死循環不斷獲取並處理Selector中發生的事件。
下面看一下Netty中事件循環機制相關的類。promise
EventExecutorGroup表明了一組EventExecutor集合,它繼承了ScheduledExecutorService,execute方法經過next方法選擇一個EventExecutor,並調用EventLoop#execute處理事件。
而EventExecutor也繼承了EventExecutorGroup,它能夠看作一個特殊的EventExecutorGroup,它的實現類負責提供真正的execute方法實現。微信
EventLoop繼承了EventExecutor,負責處理註冊於其上的Channel的IO事件(表明事件循環機制)。而EventLoopGroup接口則繼承了EventExecutorGroup,負責調度EventLoop。網絡
而NioEventLoop是EventLoop的實現類,其execute方法的主要邏輯在run方法中,
NioEventLoop#run方法經過一個死循環不斷處理Selector中發生的事件(事件循環機制具體實現),正是Netty中的核心。多線程
MultithreadEventExecutorGroup實現EventExecutorGroup,實現多線程處理任務(其實是維護了多個EventExecutor,一個EventExecutor能夠理解爲一個線程),newChild方法構造具體的EventExecutor。
子類MultithreadEventLoopGroup實現EventLoopGroup,其newChild方法構造具體的EventLoop。
子類NioEventLoopGroup#newChild構建了NioEventLoop,處理NIO操做。app
EventLoop各實現類關係以下
框架
NioEventLoop繼承了SingleThreadEventExecutor。
SingleThreadEventExecutor中維護了一個Executor,doStartThread方法調用Executor#execute啓動一個新的線程,並在新線程中調用子類的run方法(run方法是EventLoop邏輯實現方法,新開線程纔是EventLoop真正執行線程)。
因此,一個NioEventLoop對象能夠看作是一個線程。
NioEventLoopGroup維護了一組NioEventLoop,它能夠理解爲一個線程池,而具體任務交給其中一個線程(NioEventLoop)處理。異步
Netty的啓動引導類ServerBootstrap中維護了兩個EventLoopGroup,EventLoopGroup#childGroup和AbstractBootstrap#group,
其中AbstractBootstrap#group負責管理ServerChannel,處理accept事件,並將生成的SocketChannel交給childGroup,由EventLoopGroup#childGroup處理read,write事件(爲了方便,下文我將ServerBootstrap#childGroup稱爲AcceptGroup,AbstractBootstrap#group稱爲ReadGroup。)。
另外,每一個NioEventLoop對象中都維護了一個(jvm)Selector,該NioEventLoop只需處理該Selector的事件便可。jvm
這些設計來自Reactor模式,詳細能夠見java.util.concurrent包的做者Doug Lea的《Scalable IO in Java》。socket
Channel,可以完成IO操做的通道,提供read, write, connect, and bind等方法。
Channel中維護了一個Unsafe對象,用於完成數據傳輸操做(這類操做一般由IO事件觸發,而不是用戶觸發)。
子接口SocketChannel表明Socket鏈接的網絡通道,面向流,支持讀寫操做。
而另外一個子接口ServerChannel表示能夠監聽新鏈接的通道,ServerSocketChannel表明實現TCP/IP協議的ServerChannel。
AbstractChannel提供最基本的實現,它維護了Unsafe和DefaultChannelPipeline對象,並委託這兩個對象處理實際的邏輯。同時,它也提供newUnsafe,newChannelPipeline方法給子類構造他們須要的對象。
AbstractChannel中還有一個內部類AbstractUnsafe,實現了register,bind,disconnect等方法的基礎邏輯。
AbstractNioChannel實現了NIO基礎邏輯,如維護(jvm)SelectableChannel,(jvm)SelectionKey等對象,還有一個很關鍵的selectionKey,表明關注的事件Key。
他也有一個內部類,AbstractNioUnsafe,繼承了AbstractUnsafe,實現了Unsafe子接口NioUnsafe,添加一些NIO的處理邏輯。
AbstractNioChannel的子類能夠分紅ServerChannel實現類和SocketChannel實現類。
ServerChannel實現類是NioServerSocketChannel,實現TCP/IP協議下的NIO網絡通訊。
其父類AbstractNioMessageChannel,newUnsafe方法返回的NioMessageUnsafe。
SocketChannel實現類是NioSocketChannel,一樣實現NIO網絡通訊。
其父類AbstractNioByteChannel,newUnsafe方法返回的NioByteUnsafe。
Channel各實現類關係以下
Netty中將接口按功能劃分得很細,最好你們能夠按功能層次理解各接口表明含義以及實現類的邏輯。
以避免後續看源碼時混淆了。
AbstractBootstrap#bind -> AbstractBootstrap#doBind
private ChannelFuture doBind(final SocketAddress localAddress) { // #1 final ChannelFuture regFuture = initAndRegister(); final Channel channel = regFuture.channel(); if (regFuture.cause() != null) { return regFuture; } if (regFuture.isDone()) { ChannelPromise promise = channel.newPromise(); // #2 doBind0(regFuture, channel, localAddress, promise); return promise; } else { ... } }
#1
初始化及註冊ServerChannel。
initAndRegister方法返回ChannelFuture,ChannelFuture繼承了(jvm)Future,表明IO異步處理結果,能夠綁定監聽事件。
咱們要有這個意識,Netty是一個異步框架,全部的IO操做都是異步的(充分利用cpu),IO方法不會等待實際IO操做完成,而是返回ChannelFuture,表明異步處理結果。
實際IO完成時,Netty會觸發ChannelFuture的回調函數。
ChannelPromise是一種特殊的ChannelFuture,提供操做結果的方法(setSuccess,setFailure方法),通常會做爲IO方法的參數(Unsafe中不少方法都有該參數)。#2
註冊完成後,將ServerChannel綁定監聽端口。
AbstractBootstrap#initAndRegister
final ChannelFuture initAndRegister() { Channel channel = null; try { // #1 channel = channelFactory.newChannel(); // #2 init(channel); } catch (Throwable t) { ... } // #3 ChannelFuture regFuture = config().group().register(channel); if (regFuture.cause() != null) { // #4 } return regFuture; }
#1
構造ServerChannel
AbstractBootstrap#channelFactory是一個ReflectiveChannelFactory對象,他經過反射生成Channel。
ServerBootstrap#channel方法會構造該對象,並指定具體的ServerChannel類。
(因此咱們要指定NioServerSocketChannel.class,new ServerBootstrap().channel(NioServerSocketChannel.class)
)#2
初始化ServerChannel,該方法由子類實現#3
註冊Channel,注意,config().group()
返回AcceptGroup。這裏將ServerChannel註冊AcceptGroup中,具體說,註冊到AcceptGroup維護的(jvm)Selector中。#4
若是IO操做發生了異常,須要關閉Channel。
NioServerSocketChannel#構造方法 -> NioServerSocketChannel#newSocket方法
private static ServerSocketChannel newSocket(SelectorProvider provider) { try { return provider.openServerSocketChannel(); } catch (IOException e) { throw new ChannelException("Failed to open a server socket.", e); } }
使用(jvm)SelectorProvider,打開一個(jvm)ServerSocketChannel。
這裏咱們完成NIO網絡通訊第一步,構造一個(jvm)ServerSocketChannel。
ServerBootstrap#init
void init(Channel channel) throws Exception { // #1 ... ChannelPipeline p = channel.pipeline(); final EventLoopGroup currentChildGroup = childGroup; final ChannelHandler currentChildHandler = childHandler; final Entry<ChannelOption<?>, Object>[] currentChildOptions; final Entry<AttributeKey<?>, Object>[] currentChildAttrs; synchronized (childOptions) { currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0)); } synchronized (childAttrs) { currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0)); } // #2 p.addLast(new ChannelInitializer<Channel>() { @Override public void initChannel(final Channel ch) throws Exception { final ChannelPipeline pipeline = ch.pipeline(); ChannelHandler handler = config.handler(); if (handler != null) { pipeline.addLast(handler); } ch.eventLoop().execute(new Runnable() { public void run() { pipeline.addLast(new ServerBootstrapAcceptor( ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); // #3 } }); } }); }
#1
設置ServerChannel的Option和Attribute#2
給ServerChannel的ChannelPipeline添加一個ChannelInitializer
ChannelPipeline能夠理解爲攔截鏈,維護了一條ChannelHandler鏈表,ChannelHandler即具體攔截器,能夠在IO讀寫中,對數據進行處理。
ChannelHandler也能夠分爲兩類
ChannelOutboundHandler ,在write操做前調用
ChannelInboundHandler,在read操做後調用
AbstractChannel中維護了pipeline變量,子類能夠經過newChannelPipeline方法構造具體的ChannelPipeline。
NioServerSocketChannel使用的是DefaultChannelPipeline。
能夠這樣理解,Unsafe負責數據傳輸,而ChannelPipeline負責邏輯處理。
#3
給ServerChannel的ChannelPipeline添加一個ServerBootstrapAcceptor,並將ServerBootstrap的childHandler,currentChildHandler,currentChildOptions,currentChildAttrs變量都交給它,ServerBootstrapAcceptor用於處理Accept事件,後面再寫文章解析。
AbstractBootstrap#initAndRegister方法#3
步驟 -> SingleThreadEventLoop#register ->(經過Channel調用Unsafe)AbstractUnsafe#register
public final void register(EventLoop eventLoop, final ChannelPromise promise) { ... AbstractChannel.this.eventLoop = eventLoop; if (eventLoop.inEventLoop()) { // #1 register0(promise); // #2 } else { try { eventLoop.execute(new Runnable() { // #3 public void run() { register0(promise); } }); } catch (Throwable t) { ... } } }
#1
判斷當前線程是否爲EventLoop執行線程#2
當前線程就是EventLoop執行線程,直接執行任務#3
當前線程不是EventLoop執行線程,給EventLoop添加一個真正處理操做的任務,後面EventLoop執行線程會處理該任務。
這裏是異步的關鍵,以任務的形式將操做交給EventLoop,當前線程不須要阻塞等待真正IO操做完成。如下代碼也是Netty中處理IO的通用格式
if (eventLoop.inEventLoop()) { ... } else { eventLoop.execute(new Runnable() { ... }); }
AbstractUnsafe#register0
private void register0(ChannelPromise promise) { try { if (!promise.setUncancellable() || !ensureOpen(promise)) { return; } boolean firstRegistration = neverRegistered; // #1 doRegister(); neverRegistered = false; registered = true; // #2 pipeline.invokeHandlerAddedIfNeeded(); // #3 safeSetSuccess(promise); // #4 pipeline.fireChannelRegistered(); // #5 if (isActive()) { if (firstRegistration) { pipeline.fireChannelActive(); } else if (config().isAutoRead()) { beginRead(); } } } catch (Throwable t) { // #6 closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } }
#1
由子類實現具體註冊操做#2
執行DefaultChannelPipeline中的延遲任務#3
設置promise狀態爲Success#4
觸發ChannelPipeline#fireChannelRegistered#5
若是是首次註冊,觸發ChannelPipeline#fireChannelActive#6
異常處理,關閉Channel,設置promise狀態爲Failure。
AbstractUnsafe#doRegister -> AbstractNioChannel#doRegister
protected void doRegister() throws Exception { boolean selected = false; for (;;) { try { // #1 selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this); return; } catch (CancelledKeyException e) { ... } } }
#1
javaChannel()
獲取SelectableChannel(jvm),eventLoop().unwrappedSelector()
獲取AcceptGroup維護的Selector(jvm)
這裏完成了NIO網絡通訊第二步,將(jvm)ServerSocketChannel註冊到(jvm)Selector,但尚未註冊關注事件Key。
注意,這裏將當前NioServerSocketChannel做爲channle#attachment,後面使用它來判斷是否爲IO事件。
AbstractUnsafe#register0方法#5
步驟 -> DefaultChannelPipeline#fireChannelActive -> HeadContext#channelActive -> HeadContext#readIfIsAutoRead -> DefaultChannelPipeline#read -> HeadContext#read -> AbstractUnsafe#beginRead -> AbstractNioChannel#doBeginRead
protected void doBeginRead() throws Exception { // #1 final SelectionKey selectionKey = this.selectionKey; if (!selectionKey.isValid()) { return; } readPending = true; final int interestOps = selectionKey.interestOps(); // #2 if ((interestOps & readInterestOp) == 0) { selectionKey.interestOps(interestOps | readInterestOp); } }
#1
selectionKey是Selector中關注事件Key集合(AbstractNioChannel#doRegister方法中生成)#2
將readInterestOp註冊到selectionKey。
那麼readInterestOp的值是什麼呢? 它在AbstractNioChannel#構造方法中賦值,真正的值來自NioServerSocketChannel構造方法,能夠看到,它在ServerChannel中固定爲SelectionKey.OP_ACCEPT。
到這裏,(jvm)ServerChannel已經註冊到AcceptGroup維護中(jvm)Selector,關注的事件Key爲accept。
這裏完成NIO網絡通訊第三步,註冊關注事件Key。
AbstractBootstrap.doBind0 -> AbstractChannel#bind -> DefaultChannelPipeline#bind -> HeadContext#bind -> AbstractUnsafe#bind -> NioServerSocketChannel#doBind
protected void doBind(SocketAddress localAddress) throws Exception { if (PlatformDependent.javaVersion() >= 7) { javaChannel().bind(localAddress, config.getBacklog()); } else { javaChannel().socket().bind(localAddress, config.getBacklog()); } }
根據不一樣JDK版本,調用不一樣的bind方法。
這裏完成了NIO網絡通訊第四步,綁定端口,開始socket監聽。
關於HeadContext,ChannelPipeline調用鏈路,後面會寫文章分享。
延遲任務
DefaultChannelPipeline#addFirst
public final ChannelPipeline addFirst(EventExecutorGroup group, String name, ChannelHandler handler) { final AbstractChannelHandlerContext newCtx; synchronized (this) { checkMultiplicity(handler); name = filterName(name, handler); // #1 newCtx = newContext(group, name, handler); // #2 addFirst0(newCtx); // #3 if (!registered) { newCtx.setAddPending(); callHandlerCallbackLater(newCtx, true); return this; } // #4 EventExecutor executor = newCtx.executor(); if (!executor.inEventLoop()) { newCtx.setAddPending(); executor.execute(new Runnable() { @Override public void run() { callHandlerAdded0(newCtx); } }); return this; } } callHandlerAdded0(newCtx); return this; }
#1
使用當前ChannelHandler構造一個ChannelHandlerContext#2
添加ChannelHandlerContext到鏈表頭#3
當前Channel未註冊,調用DefaultChannelPipeline#callHandlerCallbackLater,添加一個延遲任務#4
當前Channel已註冊,調用DefaultChannelPipeline#callHandlerAdded0,完成ChannelHandler添加擴展操做。
DefaultChannelPipeline#callHandlerCallbackLater方法,將當前ChannelHandlerContext轉化爲一個延遲任務PendingHandlerAddedTask或者PendingHandlerRemovedTask,加到DefaultChannelPipeline#pendingHandlerCallbackHead列表中。
AbstractUnsafe#register0方法#2
步驟 -> DefaultChannelPipeline#callHandlerAddedForAllHandlers,該方法會執行pendingHandlerCallbackHead列表全部任務,調用execute方法。
PendingHandlerAddedTask#execute會調用ChannelHandler#handlerAdded,完成ChannelHandler添加擴展操做。
PendingHandlerRemovedTask#execute則調用ChannelHandler#handlerRemoved,完成完成ChannelHandler移除善後工做。
回到ServerBootstrap#init方法#2
步驟,這裏給ServerChannel的ChannelPipeline添加一個ChannelInitializer,它是Netty提供的工具類,實現了ChannelHandler#handlerAdded方法,邏輯是若是當前Channel已註冊,則調用initChannel方法。(因此咱們能夠利用該方法在註冊完成後添加新的ChannelHandler給ChannelHandler),不然不處理。
注意:該步驟在未註冊時執行,因此也會生成延遲任務,由AbstractUnsafe#register0方法#2
步驟觸發完成操做,將ServerBootstrapAcceptor添加到ServerChannel的ChannelPipeline中。
若是您以爲本文不錯,歡迎關注個人微信公衆號,您的關注是我堅持的動力!