Netty服務端啓動流程源碼分析
前記
哈嘍,自從上篇《Netty之旅二:口口相傳的高性能Netty究竟是什麼?》後,遲遲兩週纔開啓今天的Netty
源碼系列。源碼分析的第一篇文章,下一篇我會分享客戶端的啓動過程源碼分析。經過源碼的閱讀,咱們將會知道,Netty
服務端啓動的調用鏈是很是長的,同時確定也會發現一些新的問題,隨着咱們源碼閱讀的不斷深刻,相信這些問題咱們也會一一攻破。java
廢話很少說,直接上號!git
1、從EchoServer示例入手
示例從哪裏來?任何開源框架都會有本身的示例代碼,Netty源碼也不例外,如模塊netty-example
中就包括了最多見的EchoServer
示例,下面經過這個示例進入服務端啓動流程篇章。github
public final class EchoServer { static final boolean SSL = System.getProperty("ssl") != null; static final int PORT = Integer.parseInt(System.getProperty("port", "8007")); public static void main(String[] args) throws Exception { // Configure SSL. final SslContext sslCtx; if (SSL) { SelfSignedCertificate ssc = new SelfSignedCertificate(); sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build(); } else { sslCtx = null; } // 1. 聲明Main-Sub Reactor模式線程池:EventLoopGroup // Configure the server. EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); // 建立 EchoServerHandler 對象 final EchoServerHandler serverHandler = new EchoServerHandler(); try { // 2. 聲明服務端啓動引導器,並設置相關屬性 ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 100) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); if (sslCtx != null) { p.addLast(sslCtx.newHandler(ch.alloc())); } //p.addLast(new LoggingHandler(LogLevel.INFO)); p.addLast(serverHandler); } }); // 3. 綁定端口即啓動服務端,並同步等待 // Start the server. ChannelFuture f = b.bind(PORT).sync(); // 4. 監聽服務端關閉,並阻塞等待 // Wait until the server socket is closed. f.channel().closeFuture().sync(); } finally { // 5. 優雅地關閉兩個EventLoopGroup線程池 // Shut down all event loops to terminate all threads. bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }
- [代碼行1八、19]聲明
Main-Sub Reactor
模式線程池:EventLoopGroup
建立兩個 EventLoopGroup
對象。其中,bossGroup
用於服務端接受客戶端的鏈接,workerGroup
用於進行客戶端的 SocketChannel
的數據讀寫。算法
(<u>關於EventLoopGroup
不是本文重點因此在後續文章中進行分析</u>)編程
- [代碼行23-39]聲明服務端啓動引導器,並設置相關屬性
AbstractBootstrap
是一個幫助類,經過方法鏈(method chaining
)的方式,提供了一個簡單易用的方式來配置啓動一個Channel
。io.netty.bootstrap.ServerBootstrap
,實現 AbstractBootstrap
抽象類,用於 Server
的啓動器實現類。io.netty.bootstrap.Bootstrap
,實現 AbstractBootstrap
抽象類,用於 Client
的啓動器實現類。以下類圖所示:bootstrap
![AbstractBootstrap類繼承.png](//p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/d877706356d4427a9afd4b13d7177142~tplv-k3u1fbpfcp-zoom-1.image)
(<u>在EchoServer
示例代碼中,咱們看到 ServerBootstrap
的 group
、channel
、option
、childHandler
等屬性鏈式設置都放到關於AbstractBootstrap
體系代碼中詳細介紹。</u>)數組
- [代碼行43]綁定端口即啓動服務端,並同步等待
先調用 #bind(int port)
方法,綁定端口,後調用 ChannelFuture#sync()
方法,阻塞等待成功。對於bind
操做就是本文要詳細介紹的"服務端啓動流程"。promise
- [代碼行47]監聽服務端關閉,並阻塞等待
先調用 #closeFuture()
方法,監聽服務器關閉,後調用 ChannelFuture#sync()
方法,阻塞等待成功。 注意,此處不是關閉服務器,而是channel
的監聽關閉。服務器
- [代碼行5一、52]優雅地關閉兩個
EventLoopGroup
線程池
finally
代碼塊中執行說明服務端將最終關閉,因此調用 EventLoopGroup#shutdownGracefully()
方法,分別關閉兩個EventLoopGroup
對象,終止全部線程。架構
2、服務啓動過程
在服務啓動過程的源碼分析以前,這裏回顧一下咱們在經過JDK NIO
編程在服務端啓動初始的代碼:
serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.configureBlocking(false); serverSocketChannel.socket().bind(new InetSocketAddress(port), 1024); selector = Selector.open(); serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
這5行代碼標示一個最爲熟悉的過程:
- 打開
serverSocketChannel
- 配置非阻塞模式
- 爲
channel
的socket
綁定監聽端口 - 建立
Selector
- 將
serverSocketChannel
註冊到selector
後面等分析完Netty
的啓動過程後,會對這些步驟有一個新的認識。在EchoServer
示例中,進入 #bind(int port)
方法,AbstractBootstrap#bind()
其實有多個方法,方便不一樣地址參數的傳遞,實際調用的方法是AbstractBootstrap#doBind(final SocketAddress localAddress)
方法,代碼以下:
private ChannelFuture doBind(final SocketAddress localAddress) { final ChannelFuture regFuture = initAndRegister(); final Channel channel = regFuture.channel(); if (regFuture.cause() != null) { return regFuture; } if (regFuture.isDone()) { // At this point we know that the registration was complete and successful. ChannelPromise promise = channel.newPromise(); doBind0(regFuture, channel, localAddress, promise); return promise; } else { // Registration future is almost always fulfilled already, but just in case it's not. final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel); regFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { Throwable cause = future.cause(); if (cause != null) { // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an // IllegalStateException once we try to access the EventLoop of the Channel. promise.setFailure(cause); } else { // Registration was successful, so set the correct executor to use. // See https://github.com/netty/netty/issues/2586 promise.registered(); doBind0(regFuture, channel, localAddress, promise); } } }); return promise; } }
- [代碼行2] :調用
#initAndRegister()
方法,初始化並註冊一個Channel
對象。由於註冊是異步的過程,因此返回一個ChannelFuture
對象。詳細解析,見 「initAndRegister()
」。 - [代碼行4-6]]:若發生異常,直接進行返回。
- [代碼行9-34]:由於註冊是異步的過程,有可能已完成,有可能未完成。因此實現代碼分紅了【第 10 至 14 行】和【第 15 至 36 行】分別處理已完成和未完成的狀況。
- 核心在[第 11 、29行],調用
#doBind0(final ChannelFuture regFuture, final Channel channel, final SocketAddress localAddress, final ChannelPromise promise)
方法,綁定 Channel 的端口,並註冊 Channel 到SelectionKey
中。 - 若是異步註冊對應的
ChanelFuture
未完成,則調用ChannelFuture#addListener(ChannelFutureListener)
方法,添加監聽器,在註冊完成後,進行回調執行#doBind0(...)
方法的邏輯。
- 核心在[第 11 、29行],調用
經過doBind
方法能夠知道服務端啓動流程大體以下幾個步驟:
1. 建立Channel
從#doBind(final SocketAddress localAddress)
進入到initAndRegister()
:
final ChannelFuture initAndRegister() { Channel channel = null; try { channel = channelFactory.newChannel(); init(channel); } catch (Throwable t) { if (channel != null) { // channel can be null if newChannel crashed (eg SocketException("too many open files")) channel.unsafe().closeForcibly(); // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t); } // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t); } ChannelFuture regFuture = config().group().register(channel); if (regFuture.cause() != null) { if (channel.isRegistered()) { channel.close(); } else { channel.unsafe().closeForcibly(); } } return regFuture; }
[代碼行4]調用 ChannelFactory#newChannel()
方法,建立Channel
對象。 ChannelFactory
類繼承以下:
能夠在ChannelFactory
註釋看到@deprecated Use {@link io.netty.channel.ChannelFactory} instead.
,這裏只是包名的調整,對於繼承結構不變。netty
默認使用ReflectiveChannelFactory
,咱們能夠看到重載方法:
@Override public T newChannel() { try { return constructor.newInstance(); } catch (Throwable t) { throw new ChannelException("Unable to create Channel from class " + constructor.getDeclaringClass(), t); } }
很明顯,正如其名是經過反射機制構造Channel
對象實例的。constructor
是在其構造方法初始化的:this.constructor = clazz.getConstructor();
這個clazz
按理說應該是咱們要建立的Channel
的Class對象。那Class
對象是什麼呢?咱們接着看channelFactory
是怎麼初始化的。
首先在AbstractBootstrap
找到以下代碼:
@Deprecated public B channelFactory(ChannelFactory<? extends C> channelFactory) { ObjectUtil.checkNotNull(channelFactory, "channelFactory"); if (this.channelFactory != null) { throw new IllegalStateException("channelFactory set already"); } this.channelFactory = channelFactory; return self(); }
調用這個方法的遞推向上看到:
public B channel(Class<? extends C> channelClass) { return channelFactory(new ReflectiveChannelFactory<C>( ObjectUtil.checkNotNull(channelClass, "channelClass") )); }
這個方法正是在EchoServer
中ServerBootstrap
鏈式設置時調用.channel(NioServerSocketChannel.class)
的方法。咱們看到,channelClass
就是NioServerSocketChannel.class
,channelFactory
也是以ReflectiveChannelFactory
做爲具體實例,而且將NioServerSocketChannel.class
做爲構造參數傳遞初始化的,因此這回答了反射機制構造的是io.netty.channel.socket.nio.NioServerSocketChannel
對象。
繼續看NioServerSocketChannel
構造方法邏輯作了什麼事情,看以前先給出NioServerSocketChannel
類繼承關係:
NioServerSocketChannel
與NioSocketChannel
分別對應服務端和客戶端,公共父類都是AbstractNioChannel
和AbstractChannel
,下面介紹建立過程能夠參照這個Channel
類繼承圖。進入NioServerSocketChannel
構造方法:
/** * Create a new instance */ public NioServerSocketChannel() { this(newSocket(DEFAULT_SELECTOR_PROVIDER)); }
點擊newSocket
進去:
private static ServerSocketChannel newSocket(SelectorProvider provider) { try { /** * Use the {@link SelectorProvider} to open {@link SocketChannel} and so remove condition in * {@link SelectorProvider#provider()} which is called by each ServerSocketChannel.open() otherwise. * * See <a href="https://github.com/netty/netty/issues/2308">#2308</a>. */ return provider.openServerSocketChannel(); } catch (IOException e) { throw new ChannelException( "Failed to open a server socket.", e); } }
以上傳進來的provider
是DEFAULT_SELECTOR_PROVIDER
即默認的java.nio.channels.spi.SelectorProvider
,[代碼行9]就是熟悉的jdk nio
建立ServerSocketChannel
。這樣newSocket(DEFAULT_SELECTOR_PROVIDER)
就返回告終果ServerSocketChannel
,回到NioServerSocketChannel()#this()
點進去:
/** * Create a new instance using the given {@link ServerSocketChannel}. */ public NioServerSocketChannel(ServerSocketChannel channel) { super(null, channel, SelectionKey.OP_ACCEPT); config = new NioServerSocketChannelConfig(this, javaChannel().socket()); }
以上super
表明父類AbstractNioMessageChannel
構造方法,點進去看到:
/** * @see AbstractNioChannel#AbstractNioChannel(Channel, SelectableChannel, int) */ protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) { super(parent, ch, readInterestOp); }
以上super
表明父類AbstractNioChannel
構造方法,點進去看到:
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) { super(parent); this.ch = ch; this.readInterestOp = readInterestOp; try { ch.configureBlocking(false); } catch (IOException e) { try { ch.close(); } catch (IOException e2) { if (logger.isWarnEnabled()) { logger.warn("Failed to close a partially initialized socket.", e2); } } throw new ChannelException("Failed to enter non-blocking mode.", e); } }
以上[代碼行3]將ServerSocketChannel
保存到了AbstractNioChannel#ch
成員變量,在上面提到的NioServerSocketChannel
構造方法的[代碼行6]javaChannel()
拿到的就是ch
保存的ServerSocketChannel
變量。
以上[代碼行6]就是熟悉的jdk nio
編程設置ServerSocketChannel
非阻塞方式。這裏還有super
父類構造方法,點擊進去看到:
protected AbstractChannel(Channel parent) { this.parent = parent; id = newId(); unsafe = newUnsafe(); pipeline = newChannelPipeline(); }
以上構造方法中:
parent
屬性,表明父Channel
對象。對於NioServerSocketChannel
的parent
爲null
。id
屬性,Channel
編號對象。在構造方法中,經過調用#newId()
方法進行建立。(<u>這裏不細展開Problem-1</u>)unsafe
屬性,Unsafe
對象。由於Channel
真正的具體操做,是經過調用對應的Unsafe
對象實施。因此須要在構造方法中,經過調用#newUnsafe()
方法進行建立。這裏的Unsafe
並非咱們常說的jdk
自帶的sun.misc.Unsafe
,而是io.netty.channel.Channel#Unsafe
。(<u>這裏不細展開Problem-2</u>)pipeline
屬性默認是DefaultChannelPipeline
對象,賦值後在後面爲channel綁定端口的時候會用到
經過以上建立channel
源碼過程分析,總結的流程時序圖以下:
2. 初始化Channel
回到一開始建立Channel
的initAndRegister()
入口方法,在建立Channel
後緊接着init(channel)
進入初始化流程,由於是服務端初始化,因此是ServerBootstrap#init(Channel channel)
,代碼以下:
@Override void init(Channel channel) throws Exception { final Map<ChannelOption<?>, Object> options = options0(); synchronized (options) { setChannelOptions(channel, options, logger); } final Map<AttributeKey<?>, Object> attrs = attrs0(); synchronized (attrs) { for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) { @SuppressWarnings("unchecked") AttributeKey<Object> key = (AttributeKey<Object>) e.getKey(); channel.attr(key).set(e.getValue()); } } ChannelPipeline p = channel.pipeline(); final EventLoopGroup currentChildGroup = childGroup; final ChannelHandler currentChildHandler = childHandler; final Entry<ChannelOption<?>, Object>[] currentChildOptions; final Entry<AttributeKey<?>, Object>[] currentChildAttrs; synchronized (childOptions) { currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0)); } synchronized (childAttrs) { currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0)); } p.addLast(new ChannelInitializer<Channel>() { @Override public void initChannel(final Channel ch) throws Exception { final ChannelPipeline pipeline = ch.pipeline(); ChannelHandler handler = config.handler(); if (handler != null) { pipeline.addLast(handler); } ch.eventLoop().execute(new Runnable() { @Override public void run() { pipeline.addLast(new ServerBootstrapAcceptor( ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } }); } }); }
-
[代碼 3 - 6 行]:
options0()
方法返回的options
保存了用戶在EchoServer
中設置自定義的可選項集合,這樣ServerBootstrap
將配置的選項集合,設置到了Channel
的可選項集合中。 -
[代碼 8 - 15 行]:
attrs0()
方法返回的attrs
保存了用戶在EchoServer
中設置自定義的屬性集合,這樣ServerBootstrap
將配置的屬性集合,設置到了Channel
的屬性集合中。 -
[代碼21-28行]:經過局部變量
currentChildOptions
和currentChildAttrs
保存了用戶自定義的childOptions
和childAttrs
,用於[代碼43行]ServerBootstrapAcceptor
構造方法。 -
[代碼30-47]]:建立
ChannelInitializer
對象,添加到pipeline
中,用於後續初始化ChannelHandler
到pipeline
中,包括用戶在EchoServer
配置的LoggingHandler
和建立的建立ServerBootstrapAcceptor
對象。-
[代碼行34-37]:添加啓動器配置的
LoggingHandler
到pipeline
中。 -
[代碼行39-45]:建立
ServerBootstrapAcceptor
對象,添加到pipeline
中。從名字上就能夠看出來,ServerBootstrapAcceptor
也是一個ChannelHandler
實現類,專門用於接受客戶端的新鏈接請求,把新的請求扔給某個事件循環器,咱們先不作過多分析。咱們發現是使用EventLoop.execute
執行添加的過程,這是爲何呢?一樣記錄問題(<u>Problem-</u>3) -
須要說明的是
pipeline
在以前介紹Netty核心組件的時候提到是一個包含ChannelHandlerContext
的雙向鏈表,每個context
對於惟一一個ChannelHandler
,這裏初始化後,ChannelPipeline
裏就是以下一個結構:
-
3. 註冊Channel
初始化Channel
一些基本配置和屬性完畢後,回到一開始建立Channel
的initAndRegister()
入口方法,在初始化Channel
後緊接着[代碼行17] ChannelFuture regFuture = config().group().register(channel);
明顯這裏是經過EventLoopGroup
進入註冊流程(EventLoopGroup
體系將在後續文章講解)
在EchoServer
中啓動器一樣經過ServerBootstrap#group()
設置了NioEventLoopGroup
,它繼承自MultithreadEventLoopGroup
,因此註冊流程會進入MultithreadEventLoopGroup
重載的register(Channel channel)
方法,代碼以下:
@Override public ChannelFuture register(Channel channel) { return next().register(channel); }
這裏會調用 next()
方法選擇出來一個 EventLoop
來註冊 Channel
,裏面實際上使用的是一個叫作 EventExecutorChooser
的東西來選擇,它實際上又有兩種實現方式 ——PowerOfTwoEventExecutorChooser
和 GenericEventExecutorChooser
,本質上就是從 EventExecutor
數組中選擇一個 EventExecutor
,咱們這裏就是 NioEventLoop
,那麼,它們有什麼區別呢?(<u>Problem-4:在介紹EventLoopGroup
體系的後續文章中將會詳細講解,這裏簡單地提一下,本質都是按數組長度取餘數 ,不過,2 的 N 次方的形式更高效。</u>)
接着,來到 NioEventLoop
的 register(channel)
方法,你會不會問找不到該方法?提示NioEventLoop
繼承SingleThreadEventLoop
,因此父類方法:
@Override public ChannelFuture register(Channel channel) { return register(new DefaultChannelPromise(channel, this)); } @Override public ChannelFuture register(final ChannelPromise promise) { ObjectUtil.checkNotNull(promise, "promise"); promise.channel().unsafe().register(this, promise); return promise; }
能夠看到,先建立了一個叫作 ChannelPromise
的東西,它是 ChannelFuture
的子類。[代碼行9]又調回了 Channel
的 Unsafe
的 register ()
方法,這裏第一個參數是 this
,也就是 NioEventLoop
,第二個參數是剛建立的 ChannelPromise
。
點擊 AbstractUnsafe#register(EventLoop eventLoop, final ChannelPromise promise)
方法進去,代碼以下:
public final void register(EventLoop eventLoop, final ChannelPromise promise) { if (eventLoop == null) { throw new NullPointerException("eventLoop"); } if (isRegistered()) { promise.setFailure(new IllegalStateException("registered to an event loop already")); return; } if (!isCompatible(eventLoop)) { promise.setFailure( new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName())); return; } AbstractChannel.this.eventLoop = eventLoop; if (eventLoop.inEventLoop()) { register0(promise); } else { try { eventLoop.execute(new Runnable() { @Override public void run() { register0(promise); } }); } catch (Throwable t) { logger.warn( "Force-closing a channel whose registration task was not accepted by an event loop: {}", AbstractChannel.this, t); closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } } }
[代碼行15]這行代碼是設置 Channel
的 eventLoop
屬性。這行前面的代碼主要是在校驗傳入的 eventLoop
參數非空,校驗是否有註冊過以及校驗 Channel
和 eventLoop
類型是否匹配。
[代碼1八、24]接着,跟蹤到 AbstractUnsafe#register0(ChannelPromise promise)
方法中:
private void register0(ChannelPromise promise) { try { // check if the channel is still open as it could be closed in the mean time when the register // call was outside of the eventLoop if (!promise.setUncancellable() || !ensureOpen(promise)) { return; } boolean firstRegistration = neverRegistered; doRegister(); neverRegistered = false; registered = true; // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the // user may already fire events through the pipeline in the ChannelFutureListener. pipeline.invokeHandlerAddedIfNeeded(); safeSetSuccess(promise); pipeline.fireChannelRegistered(); // Only fire a channelActive if the channel has never been registered. This prevents firing // multiple channel actives if the channel is deregistered and re-registered. if (isActive()) { if (firstRegistration) { pipeline.fireChannelActive(); } else if (config().isAutoRead()) { // This channel was registered before and autoRead() is set. This means we need to begin read // again so that we process inbound data. // // See https://github.com/netty/netty/issues/4805 beginRead(); } } } catch (Throwable t) { // Close the channel directly to avoid FD leak. closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } }
[代碼行9]進入 AbstractNioChannel#doRegister()
方法:
protected void doRegister() throws Exception { boolean selected = false; for (;;) { try { selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this); return; } catch (CancelledKeyException e) { if (!selected) { // Force the Selector to select now as the "canceled" SelectionKey may still be // cached and not removed because no Select.select(..) operation was called yet. eventLoop().selectNow(); selected = true; } else { // We forced a select operation on the selector before but the SelectionKey is still cached // for whatever reason. JDK bug ? throw e; } } } }
[代碼行5]關鍵一行代碼,將 Java 原生NIO Selector
與 Java 原生 NIO
的 Channel
對象(ServerSocketChannel
) 綁定在一塊兒,並將當前 Netty 的Channel
經過 attachment
的形式綁定到 SelectionKey
上:
- 調用
#unwrappedSelector()
方法,返回 Java 原生NIO Selector
對象,並且每一個NioEventLoop
與Selector
惟一一對應。 - 調用
SelectableChannel#register(Selector sel, int ops, Object att)
方法,註冊 Java 原生NIO
的Channel
對象到NIO Selector
對象上。
經過以上註冊channel源碼分析,總結流程的時序圖以下:
4. 綁定端口
註冊完Channel
最後回到AbstractBootstrap#doBind()
方法,分析 Channel
的端口綁定邏輯。進入doBind0
代碼以下:
private static void doBind0( final ChannelFuture regFuture, final Channel channel, final SocketAddress localAddress, final ChannelPromise promise) { // This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up // the pipeline in its channelRegistered() implementation. channel.eventLoop().execute(new Runnable() { @Override public void run() { if (regFuture.isSuccess()) { channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } else { promise.setFailure(regFuture.cause()); } } }); }
- [代碼行7]:在前面
Channel
註冊成功的條件下,調用EventLoop
執行Channel
的端口綁定邏輯。可是,實際上當前線程已是EventLoop
所在的線程了,爲什麼還要這樣操做呢?答案在【第 5 至 6 行】的英語註釋,這裏做爲一個問題記着(<u>Problem-5</u>)。 - [代碼行11]:進入
AbstractChannel#bind(SocketAddress localAddress, ChannelPromise promise)
,一樣當即異步返回並添加ChannelFutureListener.CLOSE_ON_FAILURE
監聽事件。 - [代碼行13]:若是綁定端口以前的操做並無成功,天然也就不能進行端口綁定操做了,經過promise記錄異常緣由。
AbstractChannel#bind(SocketAddress localAddress, ChannelPromise promise)
方法以下:
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) { return pipeline.bind(localAddress, promise); }
pipeline
是以前建立channel
的時候建立的DefaultChannelPipeline
,進入該方法:
public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) { return tail.bind(localAddress, promise); }
[在分析初始化流程的時候最後畫一個DefaultChannelPipeline
內部的結構,可以便於分析後面進入DefaultChannelPipeline
一系列bind
方法。]
首先,tail
表明TailContext
,進入AbstractChannelHandlerContext# bind(final SocketAddress localAddress, final ChannelPromise promise)
方法:
public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) { //省略部分代碼 final AbstractChannelHandlerContext next = findContextOutbound(MASK_BIND); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeBind(localAddress, promise); } else { safeExecute(executor, new Runnable() { @Override public void run() { next.invokeBind(localAddress, promise); } }, promise, null); } return promise; }
[代碼行3]:findContextOutbound
方法裏主要是執行ctx = ctx.prev;
那麼獲得的next
就是綁定LoggingHandler
的context
[代碼行6]:進入invokeBind(localAddress, promise)
方法並直接執行LoggingHandler#bind(this, localAddress, promise)
,進入後的方法以下:
public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception { if (logger.isEnabled(internalLevel)) { logger.log(internalLevel, format(ctx, "BIND", localAddress)); } ctx.bind(localAddress, promise); }
設置了LoggingHandler
的日誌基本級別爲默認的INFO後,進行綁定操做的信息打印。接着,繼續循環到AbstractChannelHandlerContext# bind(final SocketAddress localAddress, final ChannelPromise promise)
方法執行ctx = ctx.prev
取出HeadContext
進入到bind方法:
public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) { unsafe.bind(localAddress, promise); }
兜兜轉轉,最終跳出了pipeline
輪迴到AbstractUnsafe#bind(final SocketAddress localAddress, final ChannelPromise promise)
方法,Channel 的端口綁定邏輯。代碼以下:
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) { //此處有省略... boolean wasActive = isActive(); try { doBind(localAddress); } catch (Throwable t) { safeSetFailure(promise, t); closeIfClosed(); return; } //此處有省略... }
作實事方法doBind
進入後以下:
@Override protected void doBind(SocketAddress localAddress) throws Exception { if (PlatformDependent.javaVersion() >= 7) { javaChannel().bind(localAddress, config.getBacklog()); } else { javaChannel().socket().bind(localAddress, config.getBacklog()); } }
到了此處,服務端的 Java 原生 NIO ServerSocketChannel
終於綁定上了端口。
3、問題概括
- Problem-1: 建立
Channel
流程中AbstractChannel
構造函數中爲channel
分配ID的算法如何實現? - Problem-2:
AbstractChannel
內部類AbstractUnsafe
的做用? - Problem-3: 初始化
channel
流程中pipeline
添加ServerBootstrapAcceptor
是經過EventLoop.execute
執行添加的過程,這是爲何呢? - Problem-4:註冊
channel
流程中PowerOfTwoEventExecutorChooser
和GenericEventExecutorChooser
的區別和優化原理? - Problem-5:綁定端口流程中調用
EventLoop
執行Channel
的端口綁定邏輯。可是,實際上當前線程已是EventLoop
所在的線程了,爲什麼還要這樣操做呢?
小結
經過對Netty服務端啓動流程源碼分析,咱們發現了在使用NIO
的模式下,服務端啓動流程其實就是封裝了JDK NIO
編程在服務端啓動的流程。只不過對原生JDK NIO
進行了加強和優化,同時從架構設計上簡化了服務端流程的編寫。
最重要的是感謝彤哥、艿艿和俞超-閃電俠這些大佬前期的分享,可以讓更多人學習源碼的旅途少走不少彎路,謝謝!
歡迎關注: