Netty 源碼分析之 一 揭開 Bootstrap 神祕的紅蓋頭 (客戶端)

目錄java

簡述

這一章是 Netty 源碼分析系列 的第一章, 我打算在這一章中, 展現一下 Netty 的客戶端和服務端的初始化和啓動的流程, 給讀者一個對 Netty 源碼有一個大體的框架上的認識, 而不會深刻每一個功能模塊.
本章會從 Bootstrap/ServerBootstrap 類 入手, 分析 Netty 程序的初始化和啓動的流程.

Bootstrap

Bootstrap 是 Netty 提供的一個便利的工廠類, 咱們能夠經過它來完成 Netty 的客戶端或服務器端的 Netty 初始化.
下面我以 Netty 源碼例子中的 Echo 服務器做爲例子, 從客戶端和服務器端分別分析一下Netty 的程序是如何啓動的.

客戶端部分

鏈接源碼

首先, 讓咱們從客戶端方面的代碼開始
下面是源碼example/src/main/java/io/netty/example/echo/EchoClient.java 的客戶端部分的啓動代碼:

EventLoopGroup group = new NioEventLoopGroup();
try {
    Bootstrap b = new Bootstrap();
    b.group(group)
     .channel(NioSocketChannel.class)
     .option(ChannelOption.TCP_NODELAY, true)
     .handler(new ChannelInitializer<SocketChannel>() {
         @Override
         public void initChannel(SocketChannel ch) throws Exception {
             ChannelPipeline p = ch.pipeline();
             p.addLast(new EchoClientHandler());
         }
     });

    // Start the client.
    ChannelFuture f = b.connect(HOST, PORT).sync();

    // Wait until the connection is closed.
    f.channel().closeFuture().sync();
} finally {
    // Shut down the event loop to terminate all threads.
    group.shutdownGracefully();
}

從上面的客戶端代碼雖然簡單, 可是卻展現了 Netty 客戶端初始化時所需的全部內容:

  1. EventLoopGroup: 不管是服務器端仍是客戶端, 都必須指定 EventLoopGroup. 在這個例子中, 指定了 NioEventLoopGroup, 表示一個 NIO 的EventLoopGroup.

  2. ChannelType: 指定 Channel 的類型. 由於是客戶端, 所以使用了 NioSocketChannel.

  3. Handler: 設置數據的處理器.

下面咱們深刻代碼, 看一下客戶端經過 Bootstrap 啓動後, 都作了哪些工做.

NioSocketChannel 的初始化過程

在 Netty 中, Channel 是一個 Socket 的抽象, 它爲用戶提供了關於 Socket 狀態(是不是鏈接仍是斷開) 以及對 Socket 的讀寫等操做. 每當 Netty 創建了一個鏈接後, 都會有一個對應的 Channel 實例.
NioSocketChannel 的類層次結構以下:

clipboard.png

這一小節咱們着重分析一下 Channel 的初始化過程.

ChannelFactory 和 Channel 類型的肯定

除了 TCP 協議之外, Netty 還支持不少其餘的鏈接協議, 而且每種協議還有 NIO(異步 IO) 和 OIO(Old-IO, 即傳統的阻塞 IO) 版本的區別. 不一樣協議不一樣的阻塞類型的鏈接都有不一樣的 Channel 類型與之對應下面是一些經常使用的 Channel 類型:

  • NioSocketChannel, 表明異步的客戶端 TCP Socket 鏈接.

  • NioServerSocketChannel, 異步的服務器端 TCP Socket 鏈接.

  • NioDatagramChannel, 異步的 UDP 鏈接

  • NioSctpChannel, 異步的客戶端 Sctp 鏈接.

  • NioSctpServerChannel, 異步的 Sctp 服務器端鏈接.

  • OioSocketChannel, 同步的客戶端 TCP Socket 鏈接.

  • OioServerSocketChannel, 同步的服務器端 TCP Socket 鏈接.

  • OioDatagramChannel, 同步的 UDP 鏈接

  • OioSctpChannel, 同步的 Sctp 服務器端鏈接.

  • OioSctpServerChannel, 同步的客戶端 TCP Socket 鏈接.

那麼咱們是如何設置所須要的 Channel 的類型的呢? 答案是 channel() 方法的調用.
回想一下咱們在客戶端鏈接代碼的初始化 Bootstrap 中, 會調用 channel() 方法, 傳入 NioSocketChannel.class, 這個方法其實就是初始化了一個 BootstrapChannelFactory:

public B channel(Class<? extends C> channelClass) {
    if (channelClass == null) {
        throw new NullPointerException("channelClass");
    }
    return channelFactory(new BootstrapChannelFactory<C>(channelClass));
}

而 BootstrapChannelFactory 實現了 ChannelFactory 接口, 它提供了惟一的方法, 即 newChannel. ChannelFactory, 顧名思義, 就是產生 Channel 的工廠類.
進入到 BootstrapChannelFactory.newChannel 中, 咱們看到其實現代碼以下:

@Override
public T newChannel() {
    // 刪除 try 塊
    return clazz.newInstance();
}

根據上面代碼的提示, 咱們就能夠肯定:

  • Bootstrap 中的 ChannelFactory 的實現是 BootstrapChannelFactory

  • 生成的 Channel 的具體類型是 NioSocketChannel.
    Channel 的實例化過程, 其實就是調用的 ChannelFactory#newChannel 方法, 而實例化的 Channel 的具體的類型又是和在初始化 Bootstrap 時傳入的 channel() 方法的參數相關. 所以對於咱們這個例子中的客戶端的 Bootstrap 而言, 生成的的 Channel 實例就是 NioSocketChannel.

Channel 實例化

前面咱們已經知道了如何肯定一個 Channel 的類型, 而且瞭解到 Channel 是經過工廠方法 ChannelFactory.newChannel() 來實例化的, 那麼 ChannelFactory.newChannel() 方法在哪裏調用呢?
繼續跟蹤, 咱們發現其調用鏈是:

Bootstrap.connect -> Bootstrap.doConnect -> AbstractBootstrap.initAndRegister

在 AbstractBootstrap.initAndRegister 中就調用了 channelFactory().newChannel() 來獲取一個新的 NioSocketChannel 實例, 其源碼以下:

final ChannelFuture initAndRegister() {
    // 去掉非關鍵代碼
    final Channel channel = channelFactory().newChannel();
    init(channel);
    ChannelFuture regFuture = group().register(channel);
}

newChannel 中, 經過類對象的 newInstance 來獲取一個新 Channel 實例, 於是會調用NioSocketChannel 的默認構造器.
NioSocketChannel 默認構造器代碼以下:

public NioSocketChannel() {
    this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}

這裏的代碼比較關鍵, 咱們看到, 在這個構造器中, 會調用 newSocket 來打開一個新的 Java NIO SocketChannel:

private static SocketChannel newSocket(SelectorProvider provider) {
    ...
    return provider.openSocketChannel();
}

接着會調用父類, 即 AbstractNioByteChannel 的構造器:

AbstractNioByteChannel(Channel parent, SelectableChannel ch)

並傳入參數 parent 爲 null, ch 爲剛纔使用 newSocket 建立的 Java NIO SocketChannel, 所以生成的 NioSocketChannel 的 parent channel 是空的.

protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
    super(parent, ch, SelectionKey.OP_READ);
}

接着會繼續調用父類 AbstractNioChannel 的構造器, 並傳入了參數 readInterestOp = SelectionKey.OP_READ:

protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
    super(parent);
    this.ch = ch;
    this.readInterestOp = readInterestOp;
    // 省略 try 塊
    // 配置 Java NIO SocketChannel 爲非阻塞的.
    ch.configureBlocking(false);
}

而後繼續調用父類 AbstractChannel 的構造器:

protected AbstractChannel(Channel parent) {
    this.parent = parent;
    unsafe = newUnsafe();
    pipeline = new DefaultChannelPipeline(this);
}

到這裏, 一個完整的 NioSocketChannel 就初始化完成了, 咱們能夠稍微總結一下構造一個 NioSocketChannel 所須要作的工做:

  • 調用 NioSocketChannel.newSocket(DEFAULT_SELECTOR_PROVIDER) 打開一個新的 Java NIO SocketChannel

  • AbstractChannel(Channel parent) 中初始化 AbstractChannel 的屬性:

    • parent 屬性置爲 null

    • unsafe 經過newUnsafe() 實例化一個 unsafe 對象, 它的類型是 AbstractNioByteChannel.NioByteUnsafe 內部類

    • pipeline 是 new DefaultChannelPipeline(this) 新建立的實例. 這裏體現了:Each channel has its own pipeline and it is created automatically when a new channel is created.

  • AbstractNioChannel 中的屬性:

    • SelectableChannel ch 被設置爲 Java SocketChannel, 即 NioSocketChannel#newSocket 返回的 Java NIO SocketChannel.

    • readInterestOp 被設置爲 SelectionKey.OP_READ

    • SelectableChannel ch 被配置爲非阻塞的 ch.configureBlocking(false)

  • NioSocketChannel 中的屬性:

    • SocketChannelConfig config = new NioSocketChannelConfig(this, socket.socket())

關於 unsafe 字段的初始化

咱們簡單地提到了, 在實例化 NioSocketChannel 的過程當中, 會在父類 AbstractChannel 的構造器中, 調用 newUnsafe() 來獲取一個 unsafe 實例. 那麼 unsafe 是怎麼初始化的呢? 它的做用是什麼?
其實 unsafe 特別關鍵, 它封裝了對 Java 底層 Socket 的操做, 所以其實是溝通 Netty 上層和 Java 底層的重要的橋樑.

那麼咱們就來看一下 Unsafe 接口所提供的方法吧:

interface Unsafe {
    SocketAddress localAddress();
    SocketAddress remoteAddress();
    void register(EventLoop eventLoop, ChannelPromise promise);
    void bind(SocketAddress localAddress, ChannelPromise promise);
    void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise);
    void disconnect(ChannelPromise promise);
    void close(ChannelPromise promise);
    void closeForcibly();
    void deregister(ChannelPromise promise);
    void beginRead();
    void write(Object msg, ChannelPromise promise);
    void flush();
    ChannelPromise voidPromise();
    ChannelOutboundBuffer outboundBuffer();
}

一看便知, 這些方法其實都會對應到相關的 Java 底層的 Socket 的操做.
回到 AbstractChannel 的構造方法中, 在這裏調用了 newUnsafe() 獲取一個新的 unsafe 對象, 而 newUnsafe 方法在 NioSocketChannel 中被重寫了:

@Override
protected AbstractNioUnsafe newUnsafe() {
    return new NioSocketChannelUnsafe();
}

NioSocketChannel.newUnsafe 方法會返回一個 NioSocketChannelUnsafe 實例. 從這裏咱們就能夠肯定了, 在實例化的 NioSocketChannel 中的 unsafe 字段, 實際上是一個 NioSocketChannelUnsafe 的實例.

關於 pipeline 的初始化

上面咱們分析了一個 Channel (在這個例子中是 NioSocketChannel) 的大致初始化過程, 可是咱們漏掉了一個關鍵的部分, 即 ChannelPipeline 的初始化.
根據 Each channel has its own pipeline and it is created automatically when a new channel is created., 咱們知道, 在實例化一個 Channel 時, 必然伴隨着實例化一個 ChannelPipeline. 而咱們確實在 AbstractChannel 的構造器看到了 pipeline 字段被初始化爲 DefaultChannelPipeline 的實例. 那麼咱們就來看一下, DefaultChannelPipeline 構造器作了哪些工做吧:

public DefaultChannelPipeline(AbstractChannel channel) {
    if (channel == null) {
        throw new NullPointerException("channel");
    }
    this.channel = channel;

    tail = new TailContext(this);
    head = new HeadContext(this);

    head.next = tail;
    tail.prev = head;
}

咱們調用 DefaultChannelPipeline 的構造器, 傳入了一個 channel, 而這個 channel 其實就是咱們實例化的 NioSocketChannel, DefaultChannelPipeline 會將這個 NioSocketChannel 對象保存在channel 字段中. DefaultChannelPipeline 中, 還有兩個特殊的字段, 即 head 和 tail, 而這兩個字段是一個雙向鏈表的頭和尾. 其實在 DefaultChannelPipeline 中, 維護了一個以 AbstractChannelHandlerContext 爲節點的雙向鏈表, 這個鏈表是 Netty 實現 Pipeline 機制的關鍵. 關於 DefaultChannelPipeline 中的雙向鏈表以及它所起的做用, 我在這裏暫時不表, 在 Netty 源碼分析之 二 貫穿Netty 的大動脈 ── ChannelPipeline 中會有詳細的分析.

HeadContext 的繼承層次結構以下所示:

clipboard.png

TailContext 的繼承層次結構以下所示:

clipboard.png

咱們能夠看到, 鏈表中 head 是一個 ChannelOutboundHandler, 而 tail 則是一個 ChannelInboundHandler.
接着看一下 HeadContext 的構造器:

HeadContext(DefaultChannelPipeline pipeline) {
    super(pipeline, null, HEAD_NAME, false, true);
    unsafe = pipeline.channel().unsafe();
}

它調用了父類 AbstractChannelHandlerContext 的構造器, 並傳入參數 inbound = false, outbound = true.
TailContext 的構造器與 HeadContext 的相反, 它調用了父類 AbstractChannelHandlerContext 的構造器, 並傳入參數 inbound = true, outbound = false.
即 header 是一個 outboundHandler, 而 tail 是一個inboundHandler, 關於這一點, 你們要特別注意, 由於在分析到 Netty Pipeline 時, 咱們會反覆用到 inbound 和 outbound 這兩個屬性.

關於 EventLoop 初始化

回到最開始的 EchoClient.java 代碼中, 咱們在一開始就實例化了一個 NioEventLoopGroup 對象, 所以咱們就從它的構造器中追蹤一下 EventLoop 的初始化過程.
首先來看一下 NioEventLoopGroup 的類繼承層次:

clipboard.png

NioEventLoop 有幾個重載的構造器, 不過內容都沒有什麼大的區別, 最終都是調用的父類MultithreadEventLoopGroup構造器:

protected MultithreadEventLoopGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
    super(nThreads == 0? DEFAULT_EVENT_LOOP_THREADS : nThreads, threadFactory, args);
}

其中有一點有意思的地方是, 若是咱們傳入的線程數 nThreads 是0, 那麼 Netty 會爲咱們設置默認的線程數 DEFAULT_EVENT_LOOP_THREADS, 而這個默認的線程數是怎麼肯定的呢?
其實很簡單, 在靜態代碼塊中, 會首先肯定 DEFAULT_EVENT_LOOP_THREADS 的值:

static {
    DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
            "io.netty.eventLoopThreads", Runtime.getRuntime().availableProcessors() * 2));
}

Netty 會首先從系統屬性中獲取 "io.netty.eventLoopThreads" 的值, 若是咱們沒有設置它的話, 那麼就返回默認值: 處理器核心數 * 2.

回到MultithreadEventLoopGroup構造器中, 這個構造器會繼續調用父類 MultithreadEventExecutorGroup 的構造器:

protected MultithreadEventExecutorGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
    // 去掉了參數檢查, 異常處理 等代碼.
    children = new SingleThreadEventExecutor[nThreads];
    if (isPowerOfTwo(children.length)) {
        chooser = new PowerOfTwoEventExecutorChooser();
    } else {
        chooser = new GenericEventExecutorChooser();
    }

    for (int i = 0; i < nThreads; i ++) {
        children[i] = newChild(threadFactory, args);
    }
}

根據代碼, 咱們就很清楚 MultithreadEventExecutorGroup 中的處理邏輯了:

  • 建立一個大小爲 nThreads 的 SingleThreadEventExecutor 數組

  • 根據 nThreads 的大小, 建立不一樣的 Chooser, 即若是 nThreads 是 2 的冪, 則使用 PowerOfTwoEventExecutorChooser, 反之使用 GenericEventExecutorChooser. 不論使用哪一個 Chooser, 它們的功能都是同樣的, 即從 children 數組中選出一個合適的 EventExecutor 實例.

  • 調用 newChhild 方法初始化 children 數組.

根據上面的代碼, 咱們知道, MultithreadEventExecutorGroup 內部維護了一個 EventExecutor 數組, Netty 的 EventLoopGroup 的實現機制其實就創建在 MultithreadEventExecutorGroup 之上. 每當 Netty 須要一個 EventLoop 時, 會調用 next() 方法獲取一個可用的 EventLoop.
上面代碼的最後一部分是 newChild 方法, 這個是一個抽象方法, 它的任務是實例化 EventLoop 對象. 咱們跟蹤一下它的代碼, 能夠發現, 這個方法在 NioEventLoopGroup 類中實現了, 其內容很簡單:

@Override
protected EventExecutor newChild(
        ThreadFactory threadFactory, Object... args) throws Exception {
    return new NioEventLoop(this, threadFactory, (SelectorProvider) args[0]);
}

其實就是實例化一個 NioEventLoop 對象, 而後返回它.

最後總結一下整個 EventLoopGroup 的初始化過程吧:

  • EventLoopGroup(實際上是MultithreadEventExecutorGroup) 內部維護一個類型爲 EventExecutor children 數組, 其大小是 nThreads, 這樣就構成了一個線程池

  • 若是咱們在實例化 NioEventLoopGroup 時, 若是指定線程池大小, 則 nThreads 就是指定的值, 反之是處理器核心數 * 2

  • MultithreadEventExecutorGroup 中會調用 newChild 抽象方法來初始化 children 數組

  • 抽象方法 newChild 是在 NioEventLoopGroup 中實現的, 它返回一個 NioEventLoop 實例.

  • NioEventLoop 屬性:

    • SelectorProvider provider 屬性: NioEventLoopGroup 構造器中經過 SelectorProvider.provider() 獲取一個 SelectorProvider

    • Selector selector 屬性: NioEventLoop 構造器中經過調用經過 selector = provider.openSelector() 獲取一個 selector 對象.

channel 的註冊過程

在前面的分析中, 咱們提到, channel 會在 Bootstrap.initAndRegister 中進行初始化, 可是這個方法還會將初始化好的 Channel 註冊到 EventGroup 中. 接下來咱們就來分析一下 Channel 註冊的過程.
回顧一下 AbstractBootstrap.initAndRegister 方法:

final ChannelFuture initAndRegister() {
    // 去掉非關鍵代碼
    final Channel channel = channelFactory().newChannel();
    init(channel);
    ChannelFuture regFuture = group().register(channel);
}

當Channel 初始化後, 會緊接着調用 group().register() 方法來註冊 Channel, 咱們繼續跟蹤的話, 會發現其調用鏈以下:
AbstractBootstrap.initAndRegister -> MultithreadEventLoopGroup.register -> SingleThreadEventLoop.register -> AbstractUnsafe.register
經過跟蹤調用鏈, 最終咱們發現是調用到了 unsafe 的 register 方法, 那麼接下來咱們就仔細看一下 AbstractUnsafe.register 方法中到底作了什麼:

@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
    // 省略條件判斷和錯誤處理
    AbstractChannel.this.eventLoop = eventLoop;
    register0(promise);
}

首先, 將 eventLoop 賦值給 Channel 的 eventLoop 屬性, 而咱們知道這個 eventLoop 對象實際上是 MultithreadEventLoopGroup.next() 方法獲取的, 根據咱們前面 關於 EventLoop 初始化 小節中, 咱們能夠肯定 next() 方法返回的 eventLoop 對象是 NioEventLoop 實例.
register 方法接着調用了 register0 方法:

private void register0(ChannelPromise promise) {
    boolean firstRegistration = neverRegistered;
    doRegister();
    neverRegistered = false;
    registered = true;
    safeSetSuccess(promise);
    pipeline.fireChannelRegistered();
    // Only fire a channelActive if the channel has never been registered. This prevents firing
    // multiple channel actives if the channel is deregistered and re-registered.
    if (firstRegistration && isActive()) {
        pipeline.fireChannelActive();
    }
}

register0 又調用了 AbstractNioChannel.doRegister:

@Override
protected void doRegister() throws Exception {
    // 省略錯誤處理
    selectionKey = javaChannel().register(eventLoop().selector, 0, this);
}

javaChannel() 這個方法在前面咱們已經知道了, 它返回的是一個 Java NIO SocketChannel, 這裏咱們將這個 SocketChannel 註冊到與 eventLoop 關聯的 selector 上了.

咱們總結一下 Channel 的註冊過程:

  • 首先在 AbstractBootstrap.initAndRegister中, 經過 group().register(channel), 調用 MultithreadEventLoopGroup.register 方法

  • 在MultithreadEventLoopGroup.register 中, 經過 next() 獲取一個可用的 SingleThreadEventLoop, 而後調用它的 register

  • 在 SingleThreadEventLoop.register 中, 經過 channel.unsafe().register(this, promise) 來獲取 channel 的 unsafe() 底層操做對象, 而後調用它的 register.

  • 在 AbstractUnsafe.register 方法中, 調用 register0 方法註冊 Channel

  • 在 AbstractUnsafe.register0 中, 調用 AbstractNioChannel.doRegister 方法

  • AbstractNioChannel.doRegister 方法經過 javaChannel().register(eventLoop().selector, 0, this) 將 Channel 對應的 Java NIO SockerChannel 註冊到一個 eventLoop 的 Selector 中, 而且將當前 Channel 做爲 attachment.

總的來講, Channel 註冊過程所作的工做就是將 Channel 與對應的 EventLoop 關聯, 所以這也體現了, 在 Netty 中, 每一個 Channel 都會關聯一個特定的 EventLoop, 而且這個 Channel 中的全部 IO 操做都是在這個 EventLoop 中執行的; 當關聯好 Channel 和 EventLoop 後, 會繼續調用底層的 Java NIO SocketChannel 的 register 方法, 將底層的 Java NIO SocketChannel 註冊到指定的 selector 中. 經過這兩步, 就完成了 Netty Channel 的註冊過程.

handler 的添加過程

Netty 的一個強大和靈活之處就是基於 Pipeline 的自定義 handler 機制. 基於此, 咱們能夠像添加插件同樣自由組合各類各樣的 handler 來完成業務邏輯. 例如咱們須要處理 HTTP 數據, 那麼就能夠在 pipeline 前添加一個 Http 的編解碼的 Handler, 而後接着添加咱們本身的業務邏輯的 handler, 這樣網絡上的數據流就向經過一個管道同樣, 從不一樣的 handler 中流過並進行編解碼, 最終在到達咱們自定義的 handler 中.
既然說到這裏, 有些讀者朋友確定會好奇, 既然這個 pipeline 機制是這麼的強大, 那麼它是怎麼實現的呢? 不過我這裏不打算詳細展開 Netty 的 ChannelPipeline 的實現機制(具體的細節會在後續的章節中展現), 我在這一小節中, 從簡單的入手, 展現一下咱們自定義的 handler 是如何以及什麼時候添加到 ChannelPipeline 中的.
首先讓咱們看一下以下的代碼片斷:

...
.handler(new ChannelInitializer<SocketChannel>() {
     @Override
     public void initChannel(SocketChannel ch) throws Exception {
         ChannelPipeline p = ch.pipeline();
         if (sslCtx != null) {
             p.addLast(sslCtx.newHandler(ch.alloc(), HOST, PORT));
         }
         //p.addLast(new LoggingHandler(LogLevel.INFO));
         p.addLast(new EchoClientHandler());
     }
 });

這個代碼片斷就是實現了 handler 的添加功能. 咱們看到, Bootstrap.handler 方法接收一個 ChannelHandler, 而咱們傳遞的是一個 派生於 ChannelInitializer 的匿名類, 它正好也實現了 ChannelHandler 接口. 咱們來看一下, ChannelInitializer 類內到底有什麼玄機:

@Sharable
public abstract class ChannelInitializer<C extends Channel> extends ChannelInboundHandlerAdapter {

    private static final InternalLogger logger = InternalLoggerFactory.getInstance(ChannelInitializer.class);
    protected abstract void initChannel(C ch) throws Exception;

    @Override
    @SuppressWarnings("unchecked")
    public final void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        initChannel((C) ctx.channel());
        ctx.pipeline().remove(this);
        ctx.fireChannelRegistered();
    }
    ...
}

ChannelInitializer 是一個抽象類, 它有一個抽象的方法 initChannel, 咱們正是實現了這個方法, 並在這個方法中添加的自定義的 handler 的. 那麼 initChannel 是哪裏被調用的呢? 答案是 ChannelInitializer.channelRegistered 方法中.
咱們來關注一下 channelRegistered 方法. 從上面的源碼中, 咱們能夠看到, 在 channelRegistered 方法中, 會調用 initChannel 方法, 將自定義的 handler 添加到 ChannelPipeline 中, 而後調用 ctx.pipeline().remove(this) 將本身從 ChannelPipeline 中刪除. 上面的分析過程, 能夠用以下圖片展現:
一開始, ChannelPipeline 中只有三個 handler, head, tail 和咱們添加的 ChannelInitializer.

clipboard.png

接着 initChannel 方法調用後, 添加了自定義的 handler:

clipboard.png

最後將 ChannelInitializer 刪除:
clipboard.png

分析到這裏, 咱們已經簡單瞭解了自定義的 handler 是如何添加到 ChannelPipeline 中的, 不過限於主題與篇幅的緣由, 我沒有在這裏詳細展開 ChannelPipeline 的底層機制, 我打算在下一篇 Netty 源碼分析之 二 貫穿Netty 的大動脈 ── ChannelPipeline 中對這個問題進行深刻的探討.

客戶端鏈接分析

通過上面的各類分析後, 咱們大體瞭解了 Netty 初始化時, 所作的工做, 那麼接下來咱們就直奔主題, 分析一下客戶端是如何發起 TCP 鏈接的.

首先, 客戶端經過調用 Bootstrapconnect 方法進行鏈接.
在 connect 中, 會進行一些參數檢查後, 最終調用的是 doConnect0 方法, 其實現以下:

private static void doConnect0(
        final ChannelFuture regFuture, final Channel channel,
        final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {

    // This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up
    // the pipeline in its channelRegistered() implementation.
    channel.eventLoop().execute(new Runnable() {
        @Override
        public void run() {
            if (regFuture.isSuccess()) {
                if (localAddress == null) {
                    channel.connect(remoteAddress, promise);
                } else {
                    channel.connect(remoteAddress, localAddress, promise);
                }
                promise.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
            } else {
                promise.setFailure(regFuture.cause());
            }
        }
    });
}

在 doConnect0 中, 會在 event loop 線程中調用 Channel 的 connect 方法, 而這個 Channel 的具體類型是什麼呢? 咱們在 Channel 初始化這一小節中已經分析過了, 這裏 channel 的類型就是 NioSocketChannel.
進行跟蹤到 channel.connect 中, 咱們發現它調用的是 DefaultChannelPipeline#connect, 而, pipeline 的 connect 代碼以下:

@Override
public ChannelFuture connect(SocketAddress remoteAddress) {
    return tail.connect(remoteAddress);
}

而 tail 字段, 咱們已經分析過了, 是一個 TailContext 的實例, 而 TailContext 又是 AbstractChannelHandlerContext 的子類, 而且沒有實現 connect 方法, 所以這裏調用的實際上是 AbstractChannelHandlerContext.connect, 咱們看一下這個方法的實現:

@Override
public ChannelFuture connect(
        final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {

    // 刪除的參數檢查的代碼
    final AbstractChannelHandlerContext next = findContextOutbound();
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        next.invokeConnect(remoteAddress, localAddress, promise);
    } else {
        safeExecute(executor, new OneTimeTask() {
            @Override
            public void run() {
                next.invokeConnect(remoteAddress, localAddress, promise);
            }
        }, promise, null);
    }

    return promise;
}

上面的代碼中有一個關鍵的地方, 即 final AbstractChannelHandlerContext next = findContextOutbound(), 這裏調用 findContextOutbound 方法, 從 DefaultChannelPipeline 內的雙向鏈表的 tail 開始, 不斷向前尋找第一個 outbound 爲 true 的 AbstractChannelHandlerContext, 而後調用它的 invokeConnect 方法, 其代碼以下:

private void invokeConnect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
    // 忽略 try 塊
    ((ChannelOutboundHandler) handler()).connect(this, remoteAddress, localAddress, promise);
}

還記得咱們在 "關於 pipeline 的初始化" 這一小節分析的的內容嗎? 咱們提到, 在 DefaultChannelPipeline 的構造器中, 會實例化兩個對象: head 和 tail, 並造成了雙向鏈表的頭和尾. head 是 HeadContext 的實例, 它實現了 ChannelOutboundHandler 接口, 而且它的 outbound 字段爲 true. 所以在 findContextOutbound 中, 找到的 AbstractChannelHandlerContext 對象其實就是 head. 進而在 invokeConnect 方法中, 咱們向上轉換爲 ChannelOutboundHandler 就是沒問題的了.
而又由於 HeadContext 重寫了 connect 方法, 所以實際上調用的是 HeadContext.connect. 咱們接着跟蹤到 HeadContext.connect, 其代碼以下:

@Override
public void connect(
        ChannelHandlerContext ctx,
        SocketAddress remoteAddress, SocketAddress localAddress,
        ChannelPromise promise) throws Exception {
    unsafe.connect(remoteAddress, localAddress, promise);
}

這個 connect 方法很簡單, 僅僅調用了 unsafe 的 connect 方法. 而 unsafe 又是什麼呢?
回顧一下 HeadContext 的構造器, 咱們發現 unsafe 是 pipeline.channel().unsafe() 返回的, 而 Channel 的 unsafe 字段, 在這個例子中, 咱們已經知道了, 實際上是 AbstractNioByteChannel.NioByteUnsafe 內部類. 兜兜轉轉了一大圈, 咱們找到了建立 Socket 鏈接的關鍵代碼.
進行跟蹤 NioByteUnsafe -> AbstractNioUnsafe.connect:

@Override
public final void connect(
        final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
    boolean wasActive = isActive();
    if (doConnect(remoteAddress, localAddress)) {
        fulfillConnectPromise(promise, wasActive);
    } else {
        ...
    }
}

AbstractNioUnsafe.connect 的實現如上代碼所示, 在這個 connect 方法中, 調用了 doConnect 方法, 注意, 這個方法並非 AbstractNioUnsafe 的方法, 而是 AbstractNioChannel 的抽象方法. doConnect 方法是在 NioSocketChannel 中實現的, 所以進入到 NioSocketChannel.doConnect 中:

@Override
protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
    if (localAddress != null) {
        javaChannel().socket().bind(localAddress);
    }

    boolean success = false;
    try {
        boolean connected = javaChannel().connect(remoteAddress);
        if (!connected) {
            selectionKey().interestOps(SelectionKey.OP_CONNECT);
        }
        success = true;
        return connected;
    } finally {
        if (!success) {
            doClose();
        }
    }
}

咱們終於看到的最關鍵的部分了, 慶祝一下!
上面的代碼不用多說, 首先是獲取 Java NIO SocketChannel, 即咱們已經分析過的, 從 NioSocketChannel.newSocket 返回的 SocketChannel 對象; 而後是調用 SocketChannel.connect 方法完成 Java NIO 層面上的 Socket 的鏈接.
最後, 上面的代碼流程能夠用以下時序圖直觀地展現:

clipboard.png

本文由 yongshun 發表於我的博客, 採用 署名-相同方式共享 3.0 中國大陸許可協議.
Email: yongshun1228@gmail.com
本文標題爲: Netty 源碼分析之 一 揭開 Bootstrap 神祕的紅蓋頭 (客戶端)
本文連接爲: http://www.javashuo.com/article/p-chtrfmgv-ee.html

相關文章
相關標籤/搜索