編者注:Netty是Java領域有名的開源網絡庫,特色是高性能和高擴展性,所以不少流行的框架都是基於它來構建的,好比咱們熟知的Dubbo、Rocketmq、Hadoop等,針對高性能RPC,通常都是基於Netty來構建,好比soft-bolt。總之一句話,Java小夥伴們須要且有必要學會使用Netty並理解其實現原理。
關於Netty的入門講解可參考:Netty 入門,這一篇文章就夠了java
Netty的啓動流程(ServerBootstrap
),就是建立NioEventLoopGroup
(內部可能包含多個NioEventLoop,每一個eventLoop是一個線程,內部包含一個FIFO的taskQueue和Selector)和ServerBootstrap實例,並進行bind的過程(bind流程涉及到channel的建立和註冊),以後就能夠對外提供服務了。程序員
Netty的啓動流程中,涉及到多個操做,好比register、bind、註冊對應事件等,爲了避免影響main線程執行,這些工做以task的形式提交給NioEventLoop,由NioEventLoop來執行這些task,也就是register、bind、註冊事件等操做。面試
NioEventLoop(準確來講是SingleThreadEventExecutor
)中包含了private volatile Thread thread
,該thread變量的初始化是在new的線程第一次執行run方式時才賦值的,這種形式挺新穎的。bootstrap
Netty啓動流程圖以下所示:promise
大體瞭解了Netty啓動流程以後,下面就按照Netty啓動流程中涉及到的源碼來進行分析。網絡
netty啓動流程分爲server端和client端,不一樣之處就是前者監聽端口,對外提供服務(socket->bind->listen操做),對應類ServerBootstrap;後者主動去鏈接遠端端口(socket->connect),對應類Bootstrap。架構
server端啓動流程能夠理解成建立ServerBootstrap實例的過程,就如下面代碼爲例進行分析(echo服務):app
public final class EchoServer { static final int PORT = Integer.parseInt(System.getProperty("port", "8007")); public static void main(String[] args) throws Exception { // bossGroup處理connect事件 // workerGroup處理read/write事件 EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); EchoServerHandler serverHandler = new EchoServerHandler(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 100) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { // 當鏈接創建後(register到childWorkerGroup前)初始化channel.pipeline ch.pipeline().addLast(serverHandler); } }); // Start the server. ChannelFuture f = b.bind(PORT).sync(); // Wait until the server socket is closed. f.channel().closeFuture().sync(); } finally { // Shut down all event loops to terminate all threads. bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } } public class EchoServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { ctx.write(msg); } @Override public void channelReadComplete(ChannelHandlerContext ctx) { ctx.flush(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // Close the connection when an exception is raised. cause.printStackTrace(); ctx.close(); } }
EventLoopGroup中可能包含了多個EventLoop,EventLoop是一個Reactor模型的事件處理器,一個EventLoop對應一個線程,其內部會維護一個selector和taskQueue,負責處理客戶端請求和內部任務,內部任務如ServerSocketChannel註冊和ServerSocket綁定操做等。關於NioEventLoop,後續專門寫一篇文章分析,這裏就再也不展開,只需知道個大概便可,其架構圖以下:框架
EventLoopGroup建立本質就是建立多個NioEventLoop,這裏建立NioEventLoop就是初始化一個Reactor,包括selector和taskQueue。主要邏輯以下:socket
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) { // 建立NioEventLoop實例 children = new EventExecutor[nThreads]; // 初始化NioEventLoop,實際調用的是NioEventLoopGroup.newChild方法 for (int i = 0; i < nThreads; i ++) { children[i] = newChild(executor, args); } // 多個NioEventLoop中選擇策略 chooser = chooserFactory.newChooser(children); } NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider, SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) { // 建立taskQueue super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler); // 是否是很熟悉,java nio selector操做 provider = selectorProvider; final SelectorTuple selectorTuple = openSelector(); selector = selectorTuple.selector; unwrappedSelector = selectorTuple.unwrappedSelector; selectStrategy = strategy; }
EventLoopGroup建立OK後,啓動的第一步就算完成了,接下來該進行bind、listen操做了。
bind操做是ServerBootstrap流程重要的一環,bind流程涉及到NioChannel的建立、初始化和註冊(到Selector),啓動NioEventLoop,以後就能夠對外提供服務了。
public ChannelFuture bind(SocketAddress localAddress) { validate(); // 參數校驗 return doBind(localAddress); } private ChannelFuture doBind(final SocketAddress localAddress) { // 1. 初始化註冊操做 final ChannelFuture regFuture = initAndRegister(); final Channel channel = regFuture.channel(); if (regFuture.cause() != null) { return regFuture; } // 2. doBind0操做 if (regFuture.isDone()) { // register已完成,這裏直接調用doBind0 ChannelPromise promise = channel.newPromise(); doBind0(regFuture, channel, localAddress, promise); return promise; } else { // register還未完成,註冊listener回調,在回調中調用doBind0 final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel); regFuture.addListener(new ChannelFutureListener() { /** * channel register完成(註冊到Selector而且調用了invokeHandlerAddedIfNeeded)以後, * 會調用safeSetSuccess,觸發各個ChannelFutureListener,最終會調用到這裏的operationComplete方法 */ @Override public void operationComplete(ChannelFuture future) throws Exception { Throwable cause = future.cause(); if (cause != null) { promise.setFailure(cause); } else { promise.registered(); doBind0(regFuture, channel, localAddress, promise); } } }); return promise; } }
這裏涉及到2個操做,一個是channel的建立、初始化、註冊操做,另外一個是bind操做,下面兵分兩路,分別來說。
注意,這裏若是main線程執行到regFuture.isDone()時,register還未完成,那麼main線程是不會直接調用bind操做的,而是往regFuture上註冊一個Listenner,這樣channel register完成(註冊到Selector而且調用了invokeHandlerAddedIfNeeded)以後,會調用safeSetSuccess,觸發各個ChannelFutureListener,最終會調用到這裏的operationComplete方法,進而在執行bind操做。
final ChannelFuture initAndRegister() { Channel channel = null; try { // 1.建立(netty自定義)Channel實例,並初始化 // channel爲 NioServerSocketChannel 實例,NioServerSocketChannel的父類AbstractNioChannel保存有nio的ServerSocketChannel channel = channelFactory.newChannel(); // 2.初始化channel() init(channel); } catch (Throwable t) { } // 3.向Selector註冊channel ChannelFuture regFuture = config().group().register(channel); if (regFuture.cause() != null) { if (channel.isRegistered()) { channel.close(); } else { channel.unsafe().closeForcibly(); } } return regFuture; }
這裏重點關注下初始化channel流程,主要操做是設置channel屬性、設置channel.pipeline的ChannelInitializer,注意,ChannelInitializer是在channel註冊到selector以後被回調的。
/** * 初始channel屬性,也就是ChannelOption對應socket的各類屬性。 * 好比 SO_KEEPALIVE SO_RCVBUF ... 能夠與Linux中的setsockopt函數對應起來。 * 最後將ServerBootstrapAcceptor添加到對應channel的ChannelPipeline中。 */ @Override void init(Channel channel) throws Exception { final Map<ChannelOption<?>, Object> options = options0(); synchronized (options) { setChannelOptions(channel, options, logger); } ChannelPipeline p = channel.pipeline(); // 獲取childGroup和childHandler,傳遞給ServerBootstrapAcceptor final EventLoopGroup currentChildGroup = childGroup; final ChannelHandler currentChildHandler = childHandler; final Entry<ChannelOption<?>, Object>[] currentChildOptions; final Entry<AttributeKey<?>, Object>[] currentChildAttrs; synchronized (childOptions) { currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0)); } synchronized (childAttrs) { currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0)); } p.addLast(new ChannelInitializer<Channel>() { /** * 在register0中,將channel註冊到Selector以後,會調用invokeHandlerAddedIfNeeded, * 進而調用到這裏的initChannel方法 */ @Override public void initChannel(final Channel ch) throws Exception { final ChannelPipeline pipeline = ch.pipeline(); ChannelHandler handler = config.handler(); if (handler != null) { pipeline.addLast(handler); } // 這裏註冊一個添加ServerBootstrapAcceptor的任務 ch.eventLoop().execute(new Runnable() { @Override public void run() { // 添加ServerBootstrapAcceptor pipeline.addLast(new ServerBootstrapAcceptor( ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } }); } }); }
channel初始化以後就該將其註冊到selector,即下面的register流程:
public ChannelFuture register(Channel channel) { // next()挑選一個EventLoop,默認輪詢選擇某個NioEventLoop return next().register(channel); } public ChannelFuture register(final ChannelPromise promise) { promise.channel().unsafe().register(this, promise); return promise; } // AbstractChannel public final void register(EventLoop eventLoop, final ChannelPromise promise) { AbstractChannel.this.eventLoop = eventLoop; // 直接執行register0或者以任務方式提交執行 // 啓動時,首先執行到這裏的是main線程,因此是以任務的方式來提交執行的。 // 也就是說,該任務是NioEventLoop第一次執行的任務,即調用register0 if (eventLoop.inEventLoop()) { register0(promise); } else { // 往NioEventLoop中(任務隊列)添加任務時,若是NioEventLoop線程還未啓動,則啓動該線程 eventLoop.execute(new Runnable() { @Override public void run() { register0(promise); } }); } }
register操做以後伴隨着多個回調及listener的觸發:
// AbstractChannel$AbstractUnsafe private void register0(ChannelPromise promise) { boolean firstRegistration = neverRegistered; // 這裏調用的是AbstractNioChannel.doRegister // 這裏將channel註冊上去,並無關注對應的事件(read/write事件) doRegister(); neverRegistered = false; registered = true; // 調用handlerAdd事件,這裏就會調用initChannel方法,設置channel.pipeline,也就是添加 ServerBootstrapAcceptor pipeline.invokeHandlerAddedIfNeeded(); // 調用operationComplete回調 safeSetSuccess(promise); // 回調fireChannelRegistered pipeline.fireChannelRegistered(); // Only fire a channelActive if the channel has never been registered. This prevents firing // multiple channel actives if the channel is deregistered and re-registered. if (isActive()) { if (firstRegistration) { // 回調fireChannelActive pipeline.fireChannelActive(); } else if (config().isAutoRead()) { beginRead(); } } }
上面代碼中的initChannel回調也就是設置對外監聽channel的channelHanlder爲ServerBootstrapAcceptor;operationComplete回調也就是觸發ChannelFutureListener.operationComplete
,這裏會進行後續的doBind操做。
// AbstractBootstrap private static void doBind0( final ChannelFuture regFuture, final Channel channel, final SocketAddress localAddress, final ChannelPromise promise) { // doBind0向EventLoop任務隊列中添加一個bind任務來完成後續操做。 channel.eventLoop().execute(new Runnable() { @Override public void run() { if (regFuture.isSuccess()) { // bind操做 channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } } }); }
在回顧上面的bind操做代碼,bind操做是在register以後進行的,由於register0是由NioEventLoop執行的,因此main線程須要先判斷下future是否完成,若是完成直接進行doBind便可,不然添加listener回調進行doBind。
bind操做及後續初始化操做(channelActive回調、設置監聽事件)
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) { boolean wasActive = isActive(); try { // 調用底層bind操做 doBind(localAddress); } catch (Throwable t) { safeSetFailure(promise, t); closeIfClosed(); return; } if (!wasActive && isActive()) { invokeLater(new Runnable() { @Override public void run() { pipeline.fireChannelActive(); } }); } safeSetSuccess(promise); } // 最後底層bind邏輯bind入參包括了backlog,也就是底層會進行listen操做 // DefaultChannelPipeline.headContext -> NioMessageUnsafe -> NioServerSocketChannel protected void doBind(SocketAddress localAddress) throws Exception { if (PlatformDependent.javaVersion() >= 7) { javaChannel().bind(localAddress, config.getBacklog()); } else { javaChannel().socket().bind(localAddress, config.getBacklog()); } } public void channelActive(ChannelHandlerContext ctx) throws Exception { // 回調fireChannelActive ctx.fireChannelActive(); // 設置selectKey監聽事件,對於監聽端口就是SelectionKey.OP_ACCEPT,對於新建鏈接就是SelectionKey.OP_READ readIfIsAutoRead(); }
到這裏爲止整個netty啓動流程就基本接近尾聲,能夠對外提供服務了。
推薦閱讀
歡迎小夥伴關注【TopCoder】閱讀更多精彩好文。