上節對Netty的作了簡單介紹,這節分析下Netty啓動流程,後面的源碼分析都以Netty4.0.32版本爲例,如下面啓動代碼爲例子java
public class TimeServer { public void bind(int port) throws Exception { // 1.配置服務端的NIO線程組 EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); //2.配置參數 b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024) .handler(new LoggingServerHandler()) .childHandler(new ChildChannelHandler()); // 3.綁定端口,同步等待成功 ChannelFuture f = b.bind(port).sync(); //4. 等待服務端關閉 f.channel().closeFuture().sync(); } finally { //5.優雅退出,釋放線程池資源 bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }
EventLoopGroup是Netty的線程池,爲Acceptor Selector和IO Selector事件提供線程支持,EventLoopGroup在初始化的時候會建立相應配置數量的EventLoop來提供單個線程,EventLoop功能比較複雜,既是Select的輪詢事件線程也是其餘IO事件處理的線程。使用時調用EventLoopGroup#next()方法來獲取EventLoop。Netty的線程比較複雜後面章節會詳細說明。promise
NioServerSocketChannel提供了JDK的Channel和selector的綁定功能,再綁定端口前會調用這個方法綁定操做。同時NioServerSocketChannel有個Unsafe內部類在初始化NioServerSocketChannel時會建立Unsafe做爲他的成員變量。Unsafe提供了JDK層的io操做包括讀、寫、綁定端口等根據繼承類不一樣的不一樣操做。異步
ChannelHandler是Netty的核心,它的繼承包含了2個重要的處理器ChannelOutboundHandler和ChannelInboundHandler。ChannelOutboundHandler是寫出IO事件的處理器。ChannelInboundHandler是寫入IO事件的處理器同時也包含了多個觸發器,包括鏈接斷開觸發器,異常觸發器,註冊完成觸發器,取消註冊觸發器等,因爲ChannelHandler比較複雜後面章節會詳細說明。socket
ChannelFuture是Netty的異步事件等待器,Netty利用其來實現全部IO事件的異步化,ChannelFuture能夠註冊多個事件完成監聽器,異步事件會在完成後回調監聽器。ide
ServerBootstrap是Netty配置及啓動的入口,提供線程池、鏈接參數、線程池以及處理器的配置,同時提供了綁定端口的方法。oop
咱們以b.bind(port)這行代碼做爲開始流程,時序圖以下: 源碼分析
ServerBootstrap:this
EventLoopGroup:線程
EventLoop:3d
Channel:
ChannelPipeline:
HeadContext:
ServerBootstrap類的線程池、鏈接參數、線程池以及處理器的配置其實就是成員變量的複製這裏就是很少講,咱們直接從b.bind()開始看。
b.bind()的方法直接將端口包裝成類SocketAddress交給doBind()處理。doBind()源碼以下
private ChannelFuture doBind(final SocketAddress localAddress) { //初始化Channel並註冊Selector final ChannelFuture regFuture = initAndRegister(); final Channel channel = regFuture.channel(); if (regFuture.cause() != null) { return regFuture; } //綁定監聽端口並開始監聽 if (regFuture.isDone()) { ChannelPromise promise = channel.newPromise(); doBind0(regFuture, channel, localAddress, promise); return promise; } else { final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel); regFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { Throwable cause = future.cause(); if (cause != null) { promise.setFailure(cause); } else { promise.executor = channel.eventLoop(); } doBind0(regFuture, channel, localAddress, promise); } }); return promise; } }
這個方法主要作了2件事
註冊Selector是默認是異步進行的這裏Netty作了一個處理,若是註冊Selector已完成則同步調用doBind0()不然註冊監聽器等待Selector註冊完成後調用doBind0(),這樣保證整個doBind()方法的異步性。
接下來咱們看下initAndRegister()的實現,源碼以下:
final ChannelFuture initAndRegister() { //初始化channel final Channel channel = channelFactory().newChannel(); try { init(channel); } catch (Throwable t) { channel.unsafe().closeForcibly(); return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t); } //註冊Selector ChannelFuture regFuture = group().register(channel); if (regFuture.cause() != null) { if (channel.isRegistered()) { channel.close(); } else { channel.unsafe().closeForcibly(); } } return regFuture; }
initAndRegister()方法也是作了2件事
channelFactory().newChannel()是建立你配置的Channel這裏源碼配置了NioServerSocketChannel,而init()的具體實現則是在NioServerSocketChannel中,init()源碼以下:
void init(Channel channel) throws Exception { ... ChannelPipeline p = channel.pipeline(); ... //將從線程組,從處理器,從配置,從屬性封裝成ServerBootstrapAcceptor處理器,共後續IOSelector使用。 p.addLast(new ChannelInitializer<Channel>() { @Override public void initChannel(Channel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); ChannelHandler handler = handler(); if (handler != null) { pipeline.addLast(handler); } pipeline.addLast(new ServerBootstrapAcceptor( currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } }); }
在上節中咱們講了Neety服務端線程模式能夠是主從模型,因此它的配置也分主從配置,init()主要作的將主處理器(ChannelHandler)添加到主ChannelPipeline裏,將 從線程組,從處理器(ChannelHandler),從配置,從屬性封裝成ServerBootstrapAcceptor處理器添加到主ChannelPipeline裏,供後續觸發客戶端鏈接事件使用。
group().register()先是取了主線程組EventLoopGroup委託EventLoopGroup註冊Selector,主線程組中也是調用next()委託給單線程EventLoop去處理
next().register(channel);
最後在EventLoop的register()中委託給Channel中的Unsafe處理(Unsafe是在Channel建立的時候被建立的)
channel.unsafe().register(this, promise);
咱們來看下Unsafe中的register()的源碼:
public final void register(EventLoop eventLoop, final ChannelPromise promise) { ... AbstractChannel.this.eventLoop = eventLoop; //若是eventLoop是當前線程直接執行,不然交給傳eventLoop線程處理 if (eventLoop.inEventLoop()) { register0(promise); } else { try { eventLoop.execute(new OneTimeTask() { @Override public void run() { register0(promise); } }); } catch (Throwable t) { closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } } }
Unsafe中的register()作的就是判斷next()獲取的線程是不是當前線程,若是是同步執行註冊,不然異步執行註冊,按例子中默認配置會異步調用register0()註冊。
咱們來看下register0()的源碼
private void register0(ChannelPromise promise) { try { if (!promise.setUncancellable() || !ensureOpen(promise)) { return; } boolean firstRegistration = neverRegistered; //調用JDK去註冊Selector doRegister(); neverRegistered = false; registered = true; //設置註冊成功通知監聽器 safeSetSuccess(promise); //觸發註冊成功事件 pipeline.fireChannelRegistered(); //若是是第一次則觸發激活成功事件 if (firstRegistration && isActive()) { pipeline.fireChannelActive(); } } catch (Throwable t) { closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } }
register0()作了4件事
selectionKey = javaChannel().register(eventLoop().selector, 0, this);
觸發註冊成功事件和若是綁定成功觸發激活成功事件實際上是在主pipeline中鏈式調用用戶的配置ChannelHandler,調用過程以下
這裏會有個疑問綁定是在註冊selector以後進行的爲何這裏能夠觸發激活成功事件,其實在safeSetSuccess(promise)代碼執行以後已經通知監聽器並開始執行綁定相關的代碼了。
觸發激活成功事件後若是配置了會觸發開始讀事件
public ChannelPipeline fireChannelActive() { head.fireChannelActive(); //若是配置了會觸發開始讀事件 if (channel.config().isAutoRead()) { channel.read(); } return this; }
讀事件也是鏈式調用調用過程以下:
能夠看到最後調用HeadContext裏的read()方法
public void read(ChannelHandlerContext ctx) { unsafe.beginRead(); }
委託給unsafe去處理,處理的方式就是修改interestOps添加讀操做位,由IOSelector去觸發讀事件
selectionKey.interestOps(interestOps | readInterestOp);
在NioServerSocketChannel中readInterestOp其實對應的是JDK中的SelectionKey.OP_ACCEPT,在ServerSocketChannel構造方法中定義:
public NioServerSocketChannel(ServerSocketChannel channel) { super(null, channel, SelectionKey.OP_ACCEPT); config = new NioServerSocketChannelConfig(this, javaChannel().socket()); }
服務端啓動後最早關心是客戶端鏈接的,若是沒有客戶端的鏈接,後續沒法IO操做,因此這裏註冊了SelectionKey.OP_ACCEPT來監聽客戶端的鏈接。
以上就是初始化Channel並註冊Selector的流程
下面說下調用doBind0()綁定監聽端口並開始監聽流程,doBind0()代碼以下:
private static void doBind0( final ChannelFuture regFuture, final Channel channel, final SocketAddress localAddress, final ChannelPromise promise) { channel.eventLoop().execute(new Runnable() { @Override public void run() { if (regFuture.isSuccess()) { channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } else { promise.setFailure(regFuture.cause()); } } }); }
綁定端口是異步進行的,這裏調用channel.eventLoop()獲取的線程默認就是以前註冊Selector的流程中next()獲取的線程。綁定的流程其實也是在主pipeline中鏈式調用用戶的配置ChannelHandler,如圖2,能夠看到最後調用HeadContext裏的bind()方法
public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception { unsafe.bind(localAddress, promise); }
也是委託給unsafe去處理,處理的方式就是調用JDK裏的綁定接口去綁定
javaChannel().socket().bind(localAddress);
若是綁定成功且以前沒觸發激活成功事件則觸發之,這裏的流程跟以前講的同樣就不累述。
以上就是綁定監聽端口並開始監聽的流程。