Netty 源碼分析(三):服務器端的初始化和註冊過程

1. 簡介

接下來咱們會經過使用 Netty 去實現 NIO TCP 服務器的這個場景來解析 Netty 的源代碼,深刻了解 Netty 的設計。java

使用 Netty 來實現一個 TCP 服務器,咱們大體要作如下事情:git

  1. 建立 ServerSocketChannel、Channel、ChannelHandler 等一系列對象。這裏的 Channel 將包含 ServerSocketChannel 和一系列 ChannelHandler。
  2. 將 ServerSocketChannel 註冊到 Selector 多路複用器上
  3. 啓動 EventLoop,並將 Channel 註冊到其中
  4. 爲 ServerSocketChannel 綁定端口
  5. 接受客戶端鏈接,並將 SocketChannel 註冊到 Selector 和 EventLoop 中
  6. 處理讀寫事件 ...

2. 源碼解析

咱們先來了解 Server Channel 的初始化和註冊的過程。初始化是來構建 Netty 自己的各類組件,應用用戶的設置參數。註冊的主要工做最終是將 SelectableChannel 註冊到多路複用器 Selector。這一過程在全部基於 Java NIO 的項目裏都是相似的。github

服務端的整個構建過程是從 ServerBootstrap 開始的:編程

ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
    .channel(NioServerSocketChannel.class)
    .handler(new LoggingHandler(LogLevel.DEBUG))
    .childHandler(channelInitializer)
    .bind(portNum).sync().channel().closeFuture().sync();

接下來咱們從 AbstractBootstrapbind(int port) 方法開始瞭解 Netty 的源碼。設計模式

2.1 類:AbstractBootstrap,方法:ChannelFuture doBind(final SocketAddress localAddress)

bind(int port) 的實際工做絕大部分是在 AbstractBootstrapChannelFuture doBind(final SocketAddress localAddress) 實現的,咱們來看其源碼:promise

private ChannelFuture doBind(final SocketAddress localAddress) {
    final ChannelFuture regFuture = initAndRegister();
    final Channel channel = regFuture.channel();
    if (regFuture.cause() != null) {
        return regFuture;
    }
 
    if (regFuture.isDone()) {
        // At this point we know that the registration was complete and successful.
        ChannelPromise promise = channel.newPromise();
        doBind0(regFuture, channel, localAddress, promise);
        return promise;
    } else {
        // Registration future is almost always fulfilled already, but just in case it's not.
        final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
        regFuture.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                Throwable cause = future.cause();
                if (cause != null) {
                    // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
                    // IllegalStateException once we try to access the EventLoop of the Channel.
                    promise.setFailure(cause);
                } else {
                    // Registration was successful, so set the correct executor to use.
                    // See https://github.com/netty/netty/issues/2586
                    promise.executor = channel.eventLoop();
                }
                doBind0(regFuture, channel, localAddress, promise);
            }
        });
        return promise;
    }
}

doBind(SocketAddress localAddress) 方法主要會作三件事:服務器

  1. ServerChannel 初始化
  2. 註冊 ServerChannel 到 Selector
  3. 將 ServerChannel bind 到本地端口上

最後一步 bind 須要在 register 完成以後再執行,可是由於這些動做均可能發生在不一樣的線程上,因此 bind 的動做是經過回調的方式實現的,具體細節後面再介紹。本篇將先介紹前兩個操做。網絡

由於 AbstractBootstrap 類的 initAndRegister() 方法是接下來第一個被調用的方法,因此咱們接下來看它的源碼。架構

2.2 類:AbstractBootstrap,方法:ChannelFuture initAndRegister()

final ChannelFuture initAndRegister() {
    final Channel channel = channelFactory().newChannel();
    try {
        init(channel);
    } catch (Throwable t) {
        channel.unsafe().closeForcibly();
        // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
        return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
    }
 
    ChannelFuture regFuture = group().register(channel); // 見 2.5
    if (regFuture.cause() != null) {
        if (channel.isRegistered()) {
            channel.close();
        } else {
            channel.unsafe().closeForcibly();
        }
    }
 
    // If we are here and the promise is not failed, it's one of the following cases:
    // 1) If we attempted registration from the event loop, the registration has been completed at this point.
    //    i.e. It's safe to attempt bind() or connect() now because the channel has been registered.
    // 2) If we attempted registration from the other thread, the registration request has been successfully
    //    added to the event loop's task queue for later execution.
    //    i.e. It's safe to attempt bind() or connect() now:
    //         because bind() or connect() will be executed *after* the scheduled registration task is executed
    //         because register(), bind(), and connect() are all bound to the same thread.
 
    return regFuture;
}

這個方法主要作了下面幾件事:併發

  • Channel channel = channelFactory().newChannel(); 此處經過 channelFactory() 方法獲得的 ChannelFactory 實現類是 ReflectiveChannelFactory。(這個類是默認實現,AbstractBootstrap 還提供了方法用來設置其它 ChannelFactory 的實現)這個類將經過 channel(NioServerSocketChannel.class) 提供的類反射建立出 NioServerSocketChannel。而在 NioServerSocketChannel 的構造函數裏,一個 java.nio.channels.ServerSocketChannel 將被構建出來。

  • 接下來,在 NioServerSocketChannel 的父類 AbstractNioChannel 的構造函數中,ServerSocketChannel 被設置成非阻塞模式 ch.configureBlocking(false);

  • AbstractNioChannel 父類 AbstractChannel 的構造函數裏,有兩個重要的對象被構造出來

protected AbstractChannel(Channel parent) {
    this.parent = parent;
    unsafe = newUnsafe();
    pipeline = new DefaultChannelPipeline(this);
}

在這裏 unsafepipeline 被構建出來。構造出 unsafenewUnsafe() 方法是在子類中實現,在本例中 NioMessageUnsafe(還有一個相似的 NioByteUnsafe,其爲 NioSocketChannel 提供了具體的 IO 實現),它包含了不少具體的方法實現。而 DefaultChannelPipeline 則是另外一個重要的類,接下來咱們來看看它的構造函數。

2.3 類 DefaultChannelPipeline 的構造函數

public DefaultChannelPipeline(AbstractChannel channel) {
    if (channel == null) {
       throw new NullPointerException("channel");
    }
    this.channel = channel;
 
    tail = new TailContext(this);
    head = new HeadContext(this);
 
    head.next = tail;
    tail.prev = head;
}

DefaultChannelPipeline 構造函數中作了一件很重要的事,就是構造了 ChannelHandler 鏈。TailContextHeadContext 都繼承了 AbstractChannelHandlerContext。另外,TailContext 實現了 ChannelInboundHandlerHeadContext 實現了 ChannelOutboundHandler(Netty 5 在此處有變化,其再也不區分 ChannelInboundHandlerChannelOutboundHandler)。

上面這些就是 final Channel channel = channelFactory().newChannel(); 所作的事情。接下來咱們來了解 init(Channel channel) 方法所作的事情。在這裏,void init(Channel channel) 方法由 AbstractBootstrap 的子類 ServerBootstrap 實現

2.4 類:ServerBootstrap,方法:void init(Channel channel)

void init(Channel channel) throws Exception {
    final Map<ChannelOption<?>, Object> options = options();
    synchronized (options) {
        channel.config().setOptions(options);
    }
 
    final Map<AttributeKey<?>, Object> attrs = attrs();
    synchronized (attrs) {
        for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
            @SuppressWarnings("unchecked")
            AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
            channel.attr(key).set(e.getValue());
        }
    }
 
    ChannelPipeline p = channel.pipeline();
    if (handler() != null) {
        p.addLast(handler());
    }
 
    final EventLoopGroup currentChildGroup = childGroup;
    final ChannelHandler currentChildHandler = childHandler;
    final Entry<ChannelOption<?>, Object>[] currentChildOptions;
    final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
    synchronized (childOptions) {
        currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
    }
    synchronized (childAttrs) {
        currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
    }
 
    p.addLast(new ChannelInitializer<Channel>() {
        @Override
        public void initChannel(Channel ch) throws Exception {
            ch.pipeline().addLast(new ServerBootstrapAcceptor(
                    currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
        }
    });
}

這個方法主要作了兩件事:

  1. NioServerSocketChannel 設置 options 和 attrs
  2. 爲 pipeline 添加一個 Inbound 處理器 ChannelInitializer。這個 ChannelInitializer 會在 ChannelRegistered 事件發生時將 ServerBootstrapAcceptor 添加到 pipeline 上。在後面 ServerBootstrapAcceptor 將被用來接收客戶端的鏈接,咱們會在後續文章中介紹。DefaultChannelPipelineaddLast 方法會將新的 ChannelHandler 添加到 tail 以前,其它全部 ChannelHandler 以後。

在作完初始化工做以後,就要開始註冊的工做了。接下來來看 group().register(channel) 的實現。其中 group() 方法將會返回咱們在 b.group(bossGroup, workerGroup) 中設定的 bossGroup 這裏不作過多介紹了。接下來看 register(channel) 方法。

2.5 類:AbstractChannel.AbstractUnsafe,方法:void register(EventLoop eventLoop, ChannelPromise promise)

由於咱們實現的是一個 NIO server,因此此處 EventLoop 使用的實現類是 NioEventLoopNioEventLoopregister(Channel) 方法是繼承自 SingleThreadEventLoop。而 SingleThreadEventLoop 則經過 channel.unsafe().register(this, promise) 方法將註冊工做代理給了 Channel.Unsafe 來實現。此處 Unsafe 的實現類是 AbstractChannel.AbstractUnsafe(多麼一致的命名)。

public final void register(EventLoop eventLoop, final ChannelPromise promise) {
    ...
 
    AbstractChannel.this.eventLoop = eventLoop;
 
    if (eventLoop.inEventLoop()) {
        register0(promise);
    } else {
        try {
            eventLoop.execute(new OneTimeTask() {
                @Override
                public void run() {
                    register0(promise);
                }
            });
        } catch (Throwable t) {
            logger.warn(
                    "Force-closing a channel whose registration task was not accepted by an event loop: {}",
                    AbstractChannel.this, t);
            closeForcibly();
            closeFuture.setClosed();
            safeSetFailure(promise, t);
        }
    }
}
 
private void register0(ChannelPromise promise) {
    try {
        // check if the channel is still open as it could be closed in the mean time when the register
        // call was outside of the eventLoop
        if (!promise.setUncancellable() || !ensureOpen(promise)) {
            return;
        }
        boolean firstRegistration = neverRegistered;
        doRegister();
        neverRegistered = false;
        registered = true;
        safeSetSuccess(promise);
        pipeline.fireChannelRegistered();
        // Only fire a channelActive if the channel has never been registered. This prevents firing
        // multiple channel actives if the channel is deregistered and re-registered.
        if (firstRegistration && isActive()) {
            pipeline.fireChannelActive();
        }
    } catch (Throwable t) {
        // Close the channel directly to avoid FD leak.
        closeForcibly();
        closeFuture.setClosed();
        safeSetFailure(promise, t);
    }
}

先解釋一下 eventLoop.inEventLoop(),這個判斷在不少地方都能看見。這個方法是用來判斷當前的線程是不是 EventLoop 的任務執行線程。若是是,那就不用在添加任務,直接執行就能夠了,不然須要將任務添加到 EventLoop 中。在本例中,很明顯,執行過程將走到 else 分支中。

註冊工做主要是在 doRegister() 方法中實現的,這個方法是定義在 AbstractChannel 中的一個抽象方法。在本例中,這個方法由 AbstractNioChannel 實現的。

2.6 類:AbstractNioChannel,方法:doRegister()

protected void doRegister() throws Exception {
    boolean selected = false;
    for (;;) {
        try {
            selectionKey = javaChannel().register(eventLoop().selector, 0, this);
            return;
        } catch (CancelledKeyException e) {
            if (!selected) {
                // Force the Selector to select now as the "canceled" SelectionKey may still be
                // cached and not removed because no Select.select(..) operation was called yet.
                eventLoop().selectNow();
                selected = true;
            } else {
                // We forced a select operation on the selector before but the SelectionKey is still cached
                // for whatever reason. JDK bug ?
                throw e;
            }
        }
    }
}

在這個方法中,selectionKey = javaChannel().register(eventLoop().selector, 0, this); 經過調用 JDK 方法,將 SelectableChannel 註冊到 Selector 上。注意一個細節,由於同一個 Channel 和 Selector 能夠對應一個 SelectionKey,因此若是另一個相應的 SelectionKey 的 cancel 方法被執行以後,會致使 SelectableChannelregister 方法拋出 CancelledKeyException。因此這裏經過 selectNow() 方法清除取消狀態以後,從新 register。循環的緣由就像是註釋所描述的同樣,Netty 團隊也不清楚,這難道是 JDK 的 bug 嗎?

上面這幾個過程就實現了 Server Channel 的初始化和註冊工做。

3. 總結

3.1 設計模式

從這部分代碼,咱們能夠看到不少設計模式的使用

Builder 模式

在使用 ServerBootstrap 構建服務端的時候,Netty 應用了 Builder 模式。雖不是典型應用,但也起到了是代碼簡潔易懂的目的。

工廠方法模式

Channel channel = channelFactory().newChannel(); 使用了工廠方法模式。ServerBootstrap 提供了方法設置工廠類,同時也提供了默認實現。經過工廠方法模式建立 Channel,實現了良好的可擴展性。

模板方法模式

AbstractBootstrapvoid init(Channel channel) 就是一個模板方法。初始化和註冊工做的主要實現方法是 AbstractBootstrapinitAndRegister,這個方法調用模板方法 init(Channel)。而模板方法 init(Channel) 則交由子類作具體的實現。

一樣,AbstractChanneldoRegister 方法也是一個模板方法模式的例子。

3.2 設計思想

Netty 用上面複雜的代碼實現了並不複雜的功能。其背後反映處的思想就是做爲一個通用的高性能網絡 IO 框架,Netty 必須設計出一個高性能、高擴展性的基礎架構,而後再在這個架構之上實現各類功能。Netty 的執行核心是 EventLoopGroup 及其實現類。絕大部分 IO 操做和非 IO 操做都是交由 EventLoopGroup 來執行。這是 Netty 能被用來實現高併發服務的緣由之一。因此本文所涉及的操做,因此並不複雜,可是其中的一些操做,例如註冊工做,也是須要交由 EventLoopGroup 異步執行。這雖然由於異步的方式,提升了系統的執行效率,但事也未任務直接的協調製造了困難,這一點在後續的介紹中會看的很清楚。

若是說 EventLoopGroup 是執行調度的核心,那 Channel 就是實現具體操做的實現核心。由於網絡編程十分複雜,有各類複雜的協議,也有複雜的底層操做系統實現,因此 Channel 相應的實現類也是種類繁多。這其實並無增長複雜度,而是將各類複雜功能各歸其主,將實現細節梳理清晰。

4. 接下來

接下來的文章將會解析服務端端口綁定的實現。

相關文章
相關標籤/搜索