Netty源碼分析系列之新鏈接的接入

掃描下方二維碼或者微信搜索公衆號菜鳥飛呀飛,便可關注微信公衆號,閱讀更多Spring源碼分析Java併發編程文章。java

微信公衆號

1.問題

當 netty 的服務端啓動之後,就能夠開始接收客戶端的鏈接了。那麼在 netty 中,服務端是如何來進行新鏈接的建立的呢?在開始進行源碼閱讀以前,能夠先思考如下三個問題。編程

  • 服務端是如何檢測到有新的客戶端請求接入的(後面簡稱新鏈接接入)?
  • 在 JDK 原生的 NIO 中,服務端會經過ServerSocketChannel.accept() 來爲新接入的客戶端建立對應的客戶端 channel,那麼在 netty 中服務端又是如何來處理新鏈接的接入的呢?
  • 在 netty 中網絡 IO 的讀寫操做都是在 NioEventLoop 線程中進行的,那麼客戶端 channel 是如何和工做線程池中的 NioEventLoop 綁定的呢?

2.檢測新鏈接接入

在上一篇文章Netty 源碼分析系列之 NioEventLoop 的執行流程中,分析了 NioEventLoop 線程在啓動後,會不停地去循環處理網絡 IO 事件、普通任務和定時任務。在處理網絡 IO 事件時,當輪詢到 IO 事件類型爲 OP_ACCEPT 時(以下代碼所示),就表示有新客戶端來鏈接服務端了,也就是檢測到了新鏈接。這個時候,服務端 channel 就會進行新鏈接的讀取。promise

if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
    unsafe.read();
}
複製代碼

能夠看到,當是 OP_ACCEPT 事件時,就會調用unsafe.read() 方法來進行新鏈接的接入。此時 unsafe 對象是 NioMessageUnsafe 類型的實例,爲何呢?由於只有服務端 channel 纔會對 OP_ACCEPT 事件感興趣,而服務端 channel 中 unsafe 屬性保存的是 NioMessageUnsafe 類型的實例。微信

read()方法的源碼很長,但它主要乾了兩件事,第一:調用 doReadMessages()方法來讀取鏈接;第二:將讀取到的鏈接經過服務端 channel 中的 pipeline 來進行傳播,最終執行每個 handler 中的 channelRead()方法。網絡

3.建立客戶端 channel

服務端 channel 在監聽到 OP_ACCEPT 事件後,會爲新鏈接建立一個客戶端 channel,後面數據的讀寫均是經過這個客戶端 channel 來進行的。而這個客戶端 channel 是經過 doReadMessages()方法來建立的,該方法是定義在 NioServerSocketChannel 中的,下面是其源碼。數據結構

protected int doReadMessages(List<Object> buf) throws Exception {
    SocketChannel ch = SocketUtils.accept(javaChannel());
    try {
        if (ch != null) {
            // 將原生的客戶端channel包裝成netty中的客戶端channel:NioSocketChannel
            buf.add(new NioSocketChannel(this, ch));
            return 1;
        }
    } catch (Throwable t) {
        // 異常日誌打印等...
    }
    return 0;
}
複製代碼

在該方法中,首先會經過 javaChannel()獲取到 JDK 原生的服務端 channel,即 ServerSocketChannel,這個原生的服務端 channel 是被保存在 NioServerSocketChannel 的ch屬性中,在初始化 NioServerSocketChannel 時會對ch屬性賦值(能夠參考這篇文章:Netty 源碼分析系列之服務端 Channel 初始化)。建立完 JDK 原生的服務端 channel 後,會經過 SocketUtils 這個工具類來建立一個 JDK 原生的客戶端 channel,即 SocketChannel。SocketUtils 這個工具類的底層實現,實際上就是調用 JDK 原生的 API,即 ServerSocketChannel.accept()。多線程

在建立完原生的 SocketChannel 後,netty 須要將其包裝成 netty 中定義的服務端 channel 類型,即:NioSocketChannel。如何包裝的呢?經過 new 關鍵字調用 NioSocketChannel 的構造方法來進行包裝。在構造方法中,作了不少初始化工做。跟蹤源碼,發現會調用到 AbstractNioChannel 類的以下構造方法。併發

protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
    // 此時的parent = NioServerSocketChannel,ch = SocketChannel(JDK原生的客戶端channel),readInterestOp = OP_READ
    super(parent);
    // 保存channel和感興趣的事件
    this.ch = ch;
    this.readInterestOp = readInterestOp;
    try {
        // 設置爲非阻塞
        ch.configureBlocking(false);
    } catch (IOException e) {
        // 異常處理...
    }
}
複製代碼

在該構造方法中,首先會保存原生的客戶端 channel 和客戶端 channel 感興趣的事件,而後將客戶端 channel 的阻塞模式設置爲 false,表示不阻塞(在 NIO 網絡編程中,這一步是必須的,不然啓動會報錯)。同時還會調用父類構造方法,父類就是 AbstractChannel。AbstractChannel 類的構造方法源碼以下。app

protected AbstractChannel(Channel parent) {
    // parent的值爲NioServerSocketChannel
    this.parent = parent;
    id = newId();
    // 對於客戶端channel而言,建立的unsafe是NioSocketChannelUnsafe
    unsafe = newUnsafe();
    // DefaultChannelPipeline
    pipeline = newChannelPipeline();
}
複製代碼

在該構造方法中,對於客戶端 channel 而言,parent 的值爲 NioServerSocketChannel,也就是 netty 服務端啓動時建立的服務端 channel。而後建立的 unsafe 是 NioSocketChannelUnsafe,最後會爲客戶端 channel 建立一個默認的 pipeline,此時 pipeline 的結構以下。(若是看過前幾篇文章,可能會發現,服務端 channel 在建立時也會調用到該構造方法)異步

pipeline結構圖

最終還會爲 NioSocketChannel 建立一個 NioSocketChannelConfig 對象,這個對象是用來保存用戶爲客戶端 channel 設置的一些 TCP 配置和屬性,在建立這個 config 對象時,會將 TCP 的 TCP_NODELAY 參數設置爲 true。TCP 在默認狀況下,會將小的數據包積攢成大的數據包之後才發出去,而 netty 爲了及時地 i 將較小的數據報發送出去,所以將 TCP_NODELAY 參數設置爲 true,表示不延遲發送。

至此,新鏈接對應的客戶端 channel 就建立完成了,後面網絡數據的讀寫,都是基於這個 NioSocketChannel 來進行的。

4.綁定 NioEventLoop

當客戶端的 channel 建立完成後,在 read()方法中,就會經過 pipeline.fireChannelRead(socketChannel)這一行代碼,將客戶端 channel 經過 pipeline 進行傳播,依次執行 pipeline 中每個 handler 的 channelRead()方法。(注意,這兒的 pipeline 是服務端 channel 中保存的 pipeline,在建立客戶端 channel 時,也會爲每一個新建的客戶端 channel 建立一個 pipeline,這裏千萬不要搞混了)

在服務端啓動的時候,服務端 channel 中 pipeline 的結構圖以下(詳細解釋能夠參考這三篇文章: Netty 源碼分析系列之服務端 Channel 初始化Netty 源碼分析系列之服務端 Channel 註冊Netty 源碼分析系列之服務端 Channel 的端口綁定)。

服務端pipeline

該 pipeline 中,對於 head 和 tail 而言,它倆的 channelRead()方法沒作什麼實際意義的工做,直接是向下一個節點傳播了,這裏重要的是 ServerBootstrapAcceptor 節點的 channelRead()方法。該方法的源碼以下。

public void channelRead(ChannelHandlerContext ctx, Object msg) {
    final Channel child = (Channel) msg;

    // 向客戶端的channel中添加用戶自定義的childHandler
    child.pipeline().addLast(childHandler);

    // 保存用戶爲客戶端channel配置的屬性
    setChannelOptions(child, childOptions, logger);

    for (Entry<AttributeKey<?>, Object> e: childAttrs) {
        child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
    }

    try {
        // 將客戶端channel註冊到工做線程池,即從workerGroup中選擇出一個NioEventloop,再將客戶端channel綁定到NioEventLoop上
        childGroup.register(child).addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                if (!future.isSuccess()) {
                    forceClose(child, future.cause());
                }
            }
        });
    } catch (Throwable t) {
        forceClose(child, t);
    }
}
複製代碼

在該方法中,首先向客戶端 channel 的 pipeline 中的節點中添加了一個 childHandler,這個 childHandler 是用戶本身定義的,什麼意思呢?以下圖所示,用戶經過 childHandler()方法自定義了一個 ChannelInitializer 類型的 childHandler,這個此時就會向客戶端 channel 的 pipeline 中的節點中添加該 childHandler(這個地方很重要,後面會用到)。而後經過 setChannelOptions 保存用戶爲客戶端 channel 配置的 TCP 參數和屬性。

Demo

最重要的一步在 childGroup.register(child),這一行代碼會將客戶端 channel 註冊到 workerGroup 線程池中的某一個 NioEventLoop 上。(在服務端端口綁定的過程當中,也是相似於調用 NioEventLoopGroup 的 register()方法,將服務端 channel 註冊到 bossGroup 中的某一個 NioEventLoop 中)。

此時的 childGroup 是 workerGroup(Reactor 主從多線程線程模型中的從線程池),調用 register()方法時,會調用到以下方法。

public ChannelFuture register(Channel channel) {
    // next()方法會從NioEventLoop中選擇出一個NioEventLoop
    return next().register(channel);
}
複製代碼

next()方法會從 NioEventLoop 中選擇出一個 NioEventLoop(關於 next()方法的詳細介紹請參考: Netty 源碼分析系列之 NioEventLoop 的建立與啓動),因爲 NioEventLoop 繼承了 SingleThreadEventLoop,因此這兒最後調用的是 SingleThreadEventLoop 中的以下的 register()方法。

public ChannelFuture register(final ChannelPromise promise) {
    ObjectUtil.checkNotNull(promise, "promise");
    /** * 對於客戶端channel而言 * promise是DefaultChannelPromise * promise.channel()獲取到的是NioSocketChannel * promise.channel().unsafe()獲得的是NioSocketChannelUnsafe * 因爲NioSocketChannelUnsafe繼承了AbstractUnsafe,因此當調用unsafe.register()時,會調用到AbstractUnsafe類的register()方法 */
    // this爲NioEventLoop
    promise.channel().unsafe().register(this, promise);
    return promise;
}
複製代碼

這裏的 unsafe()獲取到的是 NioSocketChannelUnsafe 對象,因爲 NioSocketChannelUnsafe 繼承了 AbstractUnsafe,因此當調用 unsafe.register()時,會調用到 AbstractUnsafe 類的 register()方法。該方法精簡後的源碼以下。

public final void register(EventLoop eventLoop, final ChannelPromise promise) {
    // 省略部分代碼....

    // 對客戶端channel而言,這一步是給NioSocketChannel的eventLoop屬性賦值
    AbstractChannel.this.eventLoop = eventLoop;

    // 判斷是同步執行register0(),仍是異步執行register0()
    if (eventLoop.inEventLoop()) {
        // 同步執行
        register0(promise);
    } else {
        try {
        	// 提交到NioEventLoop線程中,異步執行
            eventLoop.execute(new Runnable() {
                @Override
                public void run() {
                    register0(promise);
                }
            });
        } catch (Throwable t) {
            // 省略部分代碼
        }
    }
}
複製代碼

實際上,服務端 channel 註冊到 NioEventLoop 上時,也是調用的到了該方法(能夠參考這篇文章: Netty 源碼分析系列之服務端 Channel 註冊)。

對於客戶端而言,在該方法中,經過以下一行代碼,就將客戶端 channel 與一個 NioEventLoop 進行了綁定,這就回答了文章開頭的第三個問題。

AbstractChannel.this.eventLoop = eventLoop;
複製代碼

接着會判斷當前線程是否等於傳入的 eventLoop 中保存的線程,這裏確定不是。爲何呢?由於當前線程是 bossGroup 線程組中的線程,而 eventLoop 是 workerGroup 線程組中的線程,因此這裏會返回 false,那麼就會異步執行 register0()方法。register0()方法的源碼以下。

private void register0(ChannelPromise promise) {
    try {
        // 省略部分代碼...
        boolean firstRegistration = neverRegistered;
        /** * 對於客戶端的channel而言,doRegister()方法作的事情就是將服務端Channel註冊到多路複用器上 */
        doRegister();
        neverRegistered = false;
        registered = true;

        //會執行handlerAdded方法
        pipeline.invokeHandlerAddedIfNeeded();

        safeSetSuccess(promise);
        //經過在pipeline傳播來執行每一個ChannelHandler的channelRegistered()方法
        pipeline.fireChannelRegistered();

           // 若是客戶端channel已經激活,就執行下面邏輯。
        if (isActive()) {
            if (firstRegistration) {
                pipeline.fireChannelActive();
            } else if (config().isAutoRead()) {
                beginRead();
            }
        }
    } catch (Throwable t) {
        // 省略部分代碼...
    }
}
複製代碼

在 register0()方法中,有三步重要的邏輯,第一:doRegister();第二:pipeline.invokeHandlerAddedIfNeeded();第三:pipeline.fireChannelRegistered()。下面分別來看看這三步都幹了哪些事情。

doRegister()就是真正將客戶端 channel 註冊到多路複用器上的一步。doRegister()調用的是 AbstractNioChannel 類中的 doRegister()方法,刪減後源碼以下。

protected void doRegister() throws Exception {
    boolean selected = false;
    for (;;) {
        try {
            selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
            return;
        } catch (CancelledKeyException e) {
            // 異常處理......
        }
    }
}
複製代碼

其中 javaChannel()獲取的就是 JDK 中原生的 SocketChannel。

eventLoop().unwrappedSelector()獲取的是 JDK 中原生的多路複用器 Selector(底層的數據結構被替換了)。(EventLoop 中的 unwrappedSelector 屬性是在建立 NioEventLoop 時,初始化的,底層的數據結構也是這個時候被替換的)

因此javaChannel().register(eventLoop().unwrappedSelector(), 0, this)這一行代碼,實際上就是調用 JDK 原生 SocketChannel 的register(selector,ops,attr)方法,而後將客戶端 Channel 註冊到了多路複用器 Selector 上。

注意這裏在調 JDK 原生的 register()方法時,第三個參數傳入的是 this,此時 this 表明的就是當前的 NioSocketChannel 對象。將 this 做爲一個 attachment 保存到多路複用器 Selector 上,這樣作的好處就是,後面能夠經過多路複用器 Selector 獲取到客戶端的 channel。第二個參數傳入的是 0,表示此時將客戶端 channel 註冊到多路複用器上,客戶端 chennel 感興趣的事件標識符是 0,即此時對任何事件都不感興趣(在後面纔會將感興趣的事件設置爲 OP_READ)。

當 doRegister()方法執行完之後,就會執行第二步:pipeline.invokeHandlerAddedIfNeeded(),這一步作的事情就是回調 pipeline 中 handler 的 handlerAdded()方法。

往下執行,代碼會執行到 pipeline.fireChannelRegistered(),也就是前面咱們提到的第三步。這一步作的事情就是傳播 Channel 註冊事件,如何傳播呢?就是沿着 pipeline 中的頭結點這個 handler 開始,日後依次執行每一個 handler 的 channelRegistered()方法。

在前面咱們提到過,會向客戶端 channel 的 pipeline 中添加一個 ChannelInitializer 類型的匿名類,所以在傳播執行 channelRegistered()方法的時候,就會執行到該匿名類的 channelRegistered()方法,從而最終會執行該匿名類中重寫的 initChannel(channel)方法,即以下圖所示的代碼。關因而如何調用到 initChannel(channel)方法中的,能夠參考這篇文章:Netty 源碼分析系列之服務端 Channel 註冊,裏面進行了很詳細的分析。不過讀源碼最佳方式仍是親自動手,Debug 調試一下你也許會體會更深,更容易理解。

回調

再次回到 register0()方法中,最後會判斷 isActive()是否爲 true,此時因爲客戶端 channel 已經註冊到多路複用器上了,所以會返回 true,並且因爲此時客戶端 channel 是第一次註冊,因此會 pipeline.fireChannelActive()這一行代碼,也就是又會經過客戶端 channel 的 pipeline 向下傳播執行全部 handler 的 channelActive()方法,最終會調用到 AbstractChannel 的 doBeginRead()方法(這一步的調用過程很複雜,建議直接 DEBUG)。doBeginRead 方法的源碼以下。

protected void doBeginRead() throws Exception {
    // Channel.read() or ChannelHandlerContext.read() was called
    final SelectionKey selectionKey = this.selectionKey;
    if (!selectionKey.isValid()) {
        return;
    }

    readPending = true;
    /** * 在客戶端channel註冊到多路複用器上時,將selectionKey的interestOps屬性設置爲了0 * selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this); */
    final int interestOps = selectionKey.interestOps();
    /** * readInterestOp屬性的值,是在NioSocketChannel的構造器中,被設置爲SelectionKey.OP_READ */
    if ((interestOps & readInterestOp) == 0) {
        // 對於客戶端channel而言,interestOps | readInterestOp運算的結果爲OP_READ
        // 因此最終selectionKey感興趣的事件爲OP_READ事件,至此,客戶端channel終於能夠開始接收客戶端的連接了。
        selectionKey.interestOps(interestOps | readInterestOp);
    }
}
複製代碼

至此,客戶端 channel 感興趣的就變成了 OP_READ 事件,那麼接下來就能夠進行數據的讀寫了。

5.總結

本文主要分析了當一個新鏈接進來後,netty 服務端是如何爲這個新鏈接建立客戶端 channel 的,又是如何將其綁定到 NioEventLoop 線程中的。客戶端 channel 註冊過程與服務端 channel 的註冊過程很是類似,調用過程幾乎同樣,因此建議先閱讀這篇文章Netty 源碼分析系列之服務端 Channel 註冊

推薦

微信公衆號
相關文章
相關標籤/搜索