接下來咱們會經過使用 Netty 去實現 NIO TCP 服務器的這個場景來解析 Netty 的源代碼,深刻了解 Netty 的設計。java
使用 Netty 來實現一個 TCP 服務器,咱們大體要作如下事情:git
咱們先來了解 Server Channel 的初始化和註冊的過程。初始化是來構建 Netty 自己的各類組件,應用用戶的設置參數。註冊的主要工做最終是將 SelectableChannel
註冊到多路複用器 Selector
。這一過程在全部基於 Java NIO 的項目裏都是相似的。github
服務端的整個構建過程是從 ServerBootstrap
ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .handler(new LoggingHandler(LogLevel.DEBUG)) .childHandler(channelInitializer) .bind(portNum).sync().channel().closeFuture().sync();
接下來咱們從 AbstractBootstrap
的 bind(int port)
方法開始瞭解 Netty 的源碼。設計模式
,方法:ChannelFuture doBind(final SocketAddress localAddress)
bind(int port)
的實際工做絕大部分是在 AbstractBootstrap
的 ChannelFuture doBind(final SocketAddress localAddress)
private ChannelFuture doBind(final SocketAddress localAddress) { final ChannelFuture regFuture = initAndRegister(); final Channel channel = regFuture.channel(); if (regFuture.cause() != null) { return regFuture; } if (regFuture.isDone()) { // At this point we know that the registration was complete and successful. ChannelPromise promise = channel.newPromise(); doBind0(regFuture, channel, localAddress, promise); return promise; } else { // Registration future is almost always fulfilled already, but just in case it's not. final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel); regFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { Throwable cause = future.cause(); if (cause != null) { // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an // IllegalStateException once we try to access the EventLoop of the Channel. promise.setFailure(cause); } else { // Registration was successful, so set the correct executor to use. // See https://github.com/netty/netty/issues/2586 promise.executor = channel.eventLoop(); } doBind0(regFuture, channel, localAddress, promise); } }); return promise; } }
doBind(SocketAddress localAddress)
最後一步 bind 須要在 register 完成以後再執行,可是由於這些動做均可能發生在不一樣的線程上,因此 bind 的動做是經過回調的方式實現的,具體細節後面再介紹。本篇將先介紹前兩個操做。網絡
由於 AbstractBootstrap
類的 initAndRegister()
,方法:ChannelFuture initAndRegister()
final ChannelFuture initAndRegister() { final Channel channel = channelFactory().newChannel(); try { init(channel); } catch (Throwable t) { channel.unsafe().closeForcibly(); // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t); } ChannelFuture regFuture = group().register(channel); // 見 2.5 if (regFuture.cause() != null) { if (channel.isRegistered()) { channel.close(); } else { channel.unsafe().closeForcibly(); } } // If we are here and the promise is not failed, it's one of the following cases: // 1) If we attempted registration from the event loop, the registration has been completed at this point. // i.e. It's safe to attempt bind() or connect() now because the channel has been registered. // 2) If we attempted registration from the other thread, the registration request has been successfully // added to the event loop's task queue for later execution. // i.e. It's safe to attempt bind() or connect() now: // because bind() or connect() will be executed *after* the scheduled registration task is executed // because register(), bind(), and connect() are all bound to the same thread. return regFuture; }
Channel channel = channelFactory().newChannel();
此處經過 channelFactory()
方法獲得的 ChannelFactory
實現類是 ReflectiveChannelFactory
還提供了方法用來設置其它 ChannelFactory
的實現)這個類將經過 channel(NioServerSocketChannel.class)
提供的類反射建立出 NioServerSocketChannel
。而在 NioServerSocketChannel
的構造函數裏,一個 java.nio.channels.ServerSocketChannel
接下來,在 NioServerSocketChannel
的父類 AbstractNioChannel
被設置成非阻塞模式 ch.configureBlocking(false);
在 AbstractNioChannel
父類 AbstractChannel
protected AbstractChannel(Channel parent) { this.parent = parent; unsafe = newUnsafe(); pipeline = new DefaultChannelPipeline(this); }
在這裏 unsafe
和 pipeline
被構建出來。構造出 unsafe
的 newUnsafe()
方法是在子類中實現,在本例中 NioMessageUnsafe
(還有一個相似的 NioByteUnsafe
,其爲 NioSocketChannel
提供了具體的 IO 實現),它包含了不少具體的方法實現。而 DefaultChannelPipeline
的構造函數public DefaultChannelPipeline(AbstractChannel channel) { if (channel == null) { throw new NullPointerException("channel"); } this.channel = channel; tail = new TailContext(this); head = new HeadContext(this); head.next = tail; tail.prev = head; }
構造函數中作了一件很重要的事,就是構造了 ChannelHandler
和 HeadContext
都繼承了 AbstractChannelHandlerContext
實現了 ChannelInboundHandler
實現了 ChannelOutboundHandler
(Netty 5 在此處有變化,其再也不區分 ChannelInboundHandler
和 ChannelOutboundHandler
上面這些就是 final Channel channel = channelFactory().newChannel();
所作的事情。接下來咱們來了解 init(Channel channel)
方法所作的事情。在這裏,void init(Channel channel)
方法由 AbstractBootstrap
的子類 ServerBootstrap
,方法:void init(Channel channel)
void init(Channel channel) throws Exception { final Map<ChannelOption<?>, Object> options = options(); synchronized (options) { channel.config().setOptions(options); } final Map<AttributeKey<?>, Object> attrs = attrs(); synchronized (attrs) { for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) { @SuppressWarnings("unchecked") AttributeKey<Object> key = (AttributeKey<Object>) e.getKey(); channel.attr(key).set(e.getValue()); } } ChannelPipeline p = channel.pipeline(); if (handler() != null) { p.addLast(handler()); } final EventLoopGroup currentChildGroup = childGroup; final ChannelHandler currentChildHandler = childHandler; final Entry<ChannelOption<?>, Object>[] currentChildOptions; final Entry<AttributeKey<?>, Object>[] currentChildAttrs; synchronized (childOptions) { currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size())); } synchronized (childAttrs) { currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size())); } p.addLast(new ChannelInitializer<Channel>() { @Override public void initChannel(Channel ch) throws Exception { ch.pipeline().addLast(new ServerBootstrapAcceptor( currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } }); }
設置 options 和 attrsChannelInitializer
。這個 ChannelInitializer
會在 ChannelRegistered 事件發生時將 ServerBootstrapAcceptor
添加到 pipeline 上。在後面 ServerBootstrapAcceptor
的 addLast
方法會將新的 ChannelHandler
添加到 tail 以前,其它全部 ChannelHandler
以後。在作完初始化工做以後,就要開始註冊的工做了。接下來來看 group().register(channel)
的實現。其中 group()
方法將會返回咱們在 b.group(bossGroup, workerGroup)
中設定的 bossGroup
這裏不作過多介紹了。接下來看 register(channel)
,方法:void register(EventLoop eventLoop, ChannelPromise promise)
由於咱們實現的是一個 NIO server,因此此處 EventLoop
使用的實現類是 NioEventLoop
的 register(Channel)
方法是繼承自 SingleThreadEventLoop
。而 SingleThreadEventLoop
則經過 channel.unsafe().register(this, promise)
方法將註冊工做代理給了 Channel.Unsafe
來實現。此處 Unsafe
的實現類是 AbstractChannel.AbstractUnsafe
public final void register(EventLoop eventLoop, final ChannelPromise promise) { ... AbstractChannel.this.eventLoop = eventLoop; if (eventLoop.inEventLoop()) { register0(promise); } else { try { eventLoop.execute(new OneTimeTask() { @Override public void run() { register0(promise); } }); } catch (Throwable t) { logger.warn( "Force-closing a channel whose registration task was not accepted by an event loop: {}", AbstractChannel.this, t); closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } } } private void register0(ChannelPromise promise) { try { // check if the channel is still open as it could be closed in the mean time when the register // call was outside of the eventLoop if (!promise.setUncancellable() || !ensureOpen(promise)) { return; } boolean firstRegistration = neverRegistered; doRegister(); neverRegistered = false; registered = true; safeSetSuccess(promise); pipeline.fireChannelRegistered(); // Only fire a channelActive if the channel has never been registered. This prevents firing // multiple channel actives if the channel is deregistered and re-registered. if (firstRegistration && isActive()) { pipeline.fireChannelActive(); } } catch (Throwable t) { // Close the channel directly to avoid FD leak. closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } }
先解釋一下 eventLoop.inEventLoop()
,這個判斷在不少地方都能看見。這個方法是用來判斷當前的線程是不是 EventLoop
的任務執行線程。若是是,那就不用在添加任務,直接執行就能夠了,不然須要將任務添加到 EventLoop
中。在本例中,很明顯,執行過程將走到 else 分支中。
註冊工做主要是在 doRegister()
方法中實現的,這個方法是定義在 AbstractChannel
中的一個抽象方法。在本例中,這個方法由 AbstractNioChannel
protected void doRegister() throws Exception { boolean selected = false; for (;;) { try { selectionKey = javaChannel().register(eventLoop().selector, 0, this); return; } catch (CancelledKeyException e) { if (!selected) { // Force the Selector to select now as the "canceled" SelectionKey may still be // cached and not removed because no Select.select(..) operation was called yet. eventLoop().selectNow(); selected = true; } else { // We forced a select operation on the selector before but the SelectionKey is still cached // for whatever reason. JDK bug ? throw e; } } } }
在這個方法中,selectionKey = javaChannel().register(eventLoop().selector, 0, this);
經過調用 JDK 方法,將 SelectableChannel
註冊到 Selector
上。注意一個細節,由於同一個 Channel 和 Selector 能夠對應一個 SelectionKey,因此若是另一個相應的 SelectionKey 的 cancel
方法被執行以後,會致使 SelectableChannel
的 register
方法拋出 CancelledKeyException
。因此這裏經過 selectNow()
方法清除取消狀態以後,從新 register。循環的緣由就像是註釋所描述的同樣,Netty 團隊也不清楚,這難道是 JDK 的 bug 嗎?
上面這幾個過程就實現了 Server Channel 的初始化和註冊工做。
在使用 ServerBootstrap
構建服務端的時候,Netty 應用了 Builder 模式。雖不是典型應用,但也起到了是代碼簡潔易懂的目的。
Channel channel = channelFactory().newChannel();
提供了方法設置工廠類,同時也提供了默認實現。經過工廠方法模式建立 Channel
的 void init(Channel channel)
就是一個模板方法。初始化和註冊工做的主要實現方法是 AbstractBootstrap
的 initAndRegister
,這個方法調用模板方法 init(Channel)
。而模板方法 init(Channel)
的 doRegister
Netty 用上面複雜的代碼實現了並不複雜的功能。其背後反映處的思想就是做爲一個通用的高性能網絡 IO 框架,Netty 必須設計出一個高性能、高擴展性的基礎架構,而後再在這個架構之上實現各類功能。Netty 的執行核心是 EventLoopGroup
及其實現類。絕大部分 IO 操做和非 IO 操做都是交由 EventLoopGroup
來執行。這是 Netty 能被用來實現高併發服務的緣由之一。因此本文所涉及的操做,因此並不複雜,可是其中的一些操做,例如註冊工做,也是須要交由 EventLoopGroup
若是說 EventLoopGroup
是執行調度的核心,那 Channel
就是實現具體操做的實現核心。由於網絡編程十分複雜,有各類複雜的協議,也有複雜的底層操做系統實現,因此 Channel