自頂向下深刻分析Netty(六)--Channel總述

自頂向下深刻分析Netty(六)--Channel總述

自頂向下深刻分析Netty(六)--Channel源碼實現

6.1 總述

6.1.1 Channel

JDK中的Channel是通信的載體,而Netty中的Channel在此基礎上進行封裝從而賦予了Channel更多的能力,用戶可使用Channel進行如下操做:php

  • 查詢Channel的狀態。
  • 配置Channel的參數。
  • 進行Channel支持的I/O操做(read,write,connect,bind)。
  • 獲取對應的ChannelPipeline,從而能夠自定義處理I/O事件或者其餘請求。

爲了保證在使用Channel或者處理I/O操做時不出現錯誤,如下幾點須要特別注意:前端

  1. 全部的I/O操做都是異步的
    因爲採用事件驅動的機制,因此Netty中的全部IO操做都是異步的。這意味着當咱們調用一個IO操做時,方法會當即返回並不保證操做已經完成。由上一章Future的講解中,咱們知道,這些IO操做會返回一個ChannelFuture對象,咱們須要經過添加監聽者的方式執行操做完成後需執行的代碼
  2. Channel是有等級的
    若是一個Channel由另外一個Channel建立,那麼他們之間造成父子關係。好比說,當ServerSocketChannel經過accept()方法接受一個SocketChannel時,那麼SocketChannel的父親是ServerSocketChannel,調用SocketChannel的parent()方法返回該ServerSocketChannel對象。
  3. 可使用向下轉型獲取子類的特定操做
    某些子類Channel會提供一些所需的特定操做,能夠向下轉型到這樣的子類,從而得到特定操做。好比說,對於UDP的數據報的傳輸,有特定的join()和leave()操做,咱們能夠向下轉型到DatagramChannel從而使用這些操做。
  4. 釋放資源
    當一個Channel再也不使用時,須調用close()或者close(ChannelPromise)方法釋放資源。

6.1.2 Channel配置參數

(1).通用參數

CONNECT_TIMEOUT_MILLIS
        Netty參數,鏈接超時毫秒數,默認值30000毫秒即30秒。java

MAX_MESSAGES_PER_READ
        Netty參數,一次Loop讀取的最大消息數,對於ServerChannel或者NioByteChannel,默認值爲16,其餘Channel默認值爲1。默認值這樣設置,是由於:ServerChannel須要接受足夠多的鏈接,保證大吞吐量,NioByteChannel能夠減小沒必要要的系統調用select。linux

WRITE_SPIN_COUNT
        Netty參數,一個Loop寫操做執行的最大次數,默認值爲16。也就是說,對於大數據量的寫操做至多進行16次,若是16次仍沒有所有寫完數據,此時會提交一個新的寫任務給EventLoop,任務將在下次調度繼續執行。這樣,其餘的寫請求才能被響應不會由於單個大數據量寫請求而耽誤。git

ALLOCATOR
        Netty參數,ByteBuf的分配器,默認值爲ByteBufAllocator.DEFAULT,4.0版本爲UnpooledByteBufAllocator,4.1版本爲PooledByteBufAllocator。該值也可使用系統參數io.netty.allocator.type配置,使用字符串值:"unpooled","pooled"。程序員

RCVBUF_ALLOCATOR
        Netty參數,用於Channel分配接受Buffer的分配器,默認值爲AdaptiveRecvByteBufAllocator.DEFAULT,是一個自適應的接受緩衝區分配器,能根據接受到的數據自動調節大小。可選值爲FixedRecvByteBufAllocator,固定大小的接受緩衝區分配器。github

AUTO_READ
        Netty參數,自動讀取,默認值爲True。Netty只在必要的時候才設置關心相應的I/O事件。對於讀操做,須要調用channel.read()設置關心的I/O事件爲OP_READ,這樣如有數據到達才能讀取以供用戶處理。該值爲True時,每次讀操做完畢後會自動調用channel.read(),從而有數據到達便能讀取;不然,須要用戶手動調用channel.read()。須要注意的是:當調用config.setAutoRead(boolean)方法時,若是狀態由false變爲true,將會調用channel.read()方法讀取數據;由true變爲false,將調用config.autoReadCleared()方法終止數據讀取。算法

WRITE_BUFFER_HIGH_WATER_MARK
        Netty參數,寫高水位標記,默認值64KB。若是Netty的寫緩衝區中的字節超過該值,Channel的isWritable()返回False編程

WRITE_BUFFER_LOW_WATER_MARK
        Netty參數,寫低水位標記,默認值32KB。當Netty的寫緩衝區中的字節超太高水位以後若降低到低水位,則Channel的isWritable()返回True。寫高低水位標記使用戶能夠控制寫入數據速度,從而實現流量控制。推薦作法是:每次調用channl.write(msg)方法首先調用channel.isWritable()判斷是否可寫。promise

MESSAGE_SIZE_ESTIMATOR
        Netty參數,消息大小估算器,默認爲DefaultMessageSizeEstimator.DEFAULT。估算ByteBuf、ByteBufHolder和FileRegion的大小,其中ByteBuf和ByteBufHolder爲實際大小,FileRegion估算值爲0。該值估算的字節數在計算水位時使用,FileRegion爲0可知FileRegion不影響高低水位。

SINGLE_EVENTEXECUTOR_PER_GROUP
        Netty參數,單線程執行ChannelPipeline中的事件,默認值爲True。該值控制執行ChannelPipeline中執行ChannelHandler的線程。若是爲Trye,整個pipeline由一個線程執行,這樣不須要進行線程切換以及線程同步,是Netty4的推薦作法;若是爲False,ChannelHandler中的處理過程會由Group中的不一樣線程執行。

(2).SocketChannel參數

SO_RCVBUF
        Socket參數,TCP數據接收緩衝區大小。該緩衝區即TCP接收滑動窗口,linux操做系統可以使用命令:cat /proc/sys/net/ipv4/tcp_rmem查詢其大小。通常狀況下,該值可由用戶在任意時刻設置,但當設置值超過64KB時,須要在鏈接到遠端以前設置。

SO_SNDBUF
        Socket參數,TCP數據發送緩衝區大小。該緩衝區即TCP發送滑動窗口,linux操做系統可以使用命令:cat /proc/sys/net/ipv4/tcp_smem查詢其大小。

TCP_NODELAY
        TCP參數,當即發送數據,默認值爲Ture(Netty默認爲True而操做系統默認爲False)。該值設置Nagle算法的啓用,改算法將小的碎片數據鏈接成更大的報文來最小化所發送的報文的數量,若是須要發送一些較小的報文,則須要禁用該算法。Netty默認禁用該算法,從而最小化報文傳輸延時。

SO_KEEPALIVE
        Socket參數,鏈接保活,默認值爲False。啓用該功能時,TCP會主動探測空閒鏈接的有效性。能夠將此功能視爲TCP的心跳機制,須要注意的是:默認的心跳間隔是7200s即2小時。Netty默認關閉該功能

SO_REUSEADDR
        Socket參數,地址複用,默認值False。有四種狀況可使用:(1).當有一個有相同本地地址和端口的socket1處於TIME_WAIT狀態時,而你但願啓動的程序的socket2要佔用該地址和端口,好比重啓服務且保持先前端口。(2).有多塊網卡或用IP Alias技術的機器在同一端口啓動多個進程,但每一個進程綁定的本地IP地址不能相同。(3).單個進程綁定相同的端口到多個socket上,但每一個socket綁定的ip地址不一樣。(4).徹底相同的地址和端口的重複綁定。但這隻用於UDP的多播,不用於TCP。

SO_LINGER
         Netty對底層Socket參數的簡單封裝,關閉Socket的延遲時間,默認值爲-1,表示禁用該功能。-1以及全部<0的數表示socket.close()方法當即返回,但OS底層會將發送緩衝區所有發送到對端。0表示socket.close()方法當即返回,OS放棄發送緩衝區的數據直接向對端發送RST包,對端收到復位錯誤。非0整數值表示調用socket.close()方法的線程被阻塞直到延遲時間到或發送緩衝區中的數據發送完畢,若超時,則對端會收到復位錯誤。

IP_TOS
        IP參數,設置IP頭部的Type-of-Service字段,用於描述IP包的優先級和QoS選項。

ALLOW_HALF_CLOSURE
        Netty參數,一個鏈接的遠端關閉時本地端是否關閉,默認值爲False。值爲False時,鏈接自動關閉;爲True時,觸發ChannelInboundHandler的userEventTriggered()方法,事件爲ChannelInputShutdownEvent。

(3).ServerSocketChannel參數

SO_RCVBUF
        已說明,須要注意的是:當設置值超過64KB時,須要在綁定到本地端口前設置。該值設置的是由ServerSocketChannel使用accept接受的SocketChannel的接收緩衝區。

SO_REUSEADDR
        已說明

SO_BACKLOG
        Socket參數,服務端接受鏈接的隊列長度,若是隊列已滿,客戶端鏈接將被拒絕。默認值,Windows爲200,其餘爲128。

(4).DatagramChannel參數

SO_BROADCAST
        Socket參數,設置廣播模式。

SO_RCVBUF
        已說明

SO_SNDBUF
        已說明

SO_REUSEADDR
        已說明

IP_MULTICAST_LOOP_DISABLED
        對應IP參數IP_MULTICAST_LOOP,設置本地迴環接口的多播功能。因爲IP_MULTICAST_LOOP返回True表示關閉,因此Netty加上後綴_DISABLED防止歧義。

IP_MULTICAST_ADDR
        對應IP參數IP_MULTICAST_IF,設置對應地址的網卡爲多播模式。

IP_MULTICAST_IF
        對應IP參數IP_MULTICAST_IF2,同上但支持IPV6。

IP_MULTICAST_TTL
        IP參數,多播數據報的time-to-live即存活跳數。

IP_TOS
        已說明

DATAGRAM_CHANNEL_ACTIVE_ON_REGISTRATION
        Netty參數,DatagramChannel註冊的EventLoop即表示已激活。


6.1.3 Channel接口

Channel接口中含有大量的方法,咱們先對這些方法分類:

  1. 狀態查詢
    boolean isOpen(); // 是否開放
    boolean isRegistered(); // 是否註冊到一個EventLoop
    boolean isActive(); // 是否激活
    boolean isWritable();   // 是否可寫

 

open表示Channel的開放狀態,True表示Channel可用,False表示Channel已關閉再也不可用。registered表示Channel的註冊狀態,True表示已註冊到一個EventLoop,False表示沒有註冊到EventLoop。active表示Channel的激活狀態,對於ServerSocketChannel,True表示Channel已綁定到端口;對於SocketChannel,表示Channel可用(open)且已鏈接到對端。Writable表示Channel的可寫狀態,當Channel的寫緩衝區outboundBuffer非null且可寫時返回True。
一個正常結束的Channel狀態轉移有如下兩種狀況:

REGISTERED->CONNECT/BIND->ACTIVE->CLOSE->INACTIVE->UNREGISTERED REGISTERED->ACTIVE->CLOSE->INACTIVE->UNREGISTERED

其中第一種是服務端用於綁定的Channel或者客戶端用於發起鏈接的Channel,第二種是服務端接受的SocketChannel。一個異常關閉的Channel則不會服從這樣的狀態轉移。

  1. getter方法
    EventLoop eventLoop();  // 註冊到的EventLoop
    Channel parent();   // 父類Channel
    ChannelConfig config(); // 配置參數
    ChannelMetadata metadata(); // 元數據
    SocketAddress localAddress();   // 本地地址
    SocketAddress remoteAddress();  // 遠端地址
    Unsafe unsafe();    // Unsafe對象
    ChannelPipeline pipeline(); // 事件管道,用於處理IO事件
    ByteBufAllocator alloc();   // 字節緩存分配器
    ChannelFuture closeFuture();    // Channel關閉時的異步結果
    ChannelPromise voidPromise();   

 

  1. 異步結果生成
    ChannelPromise newPromise();
    ChannelFuture newSucceededFuture();
    ChannelFuture newFailedFuture(Throwable cause);

 

  1. I/O事件處理
    ChannelFuture bind(SocketAddress localAddress);
    ChannelFuture connect(SocketAddress remoteAddress);
    ChannelFuture disconnect();
    ChannelFuture close();
    ChannelFuture deregister();
    Channel read();
    ChannelFuture write(Object msg);
    Channel flush();
    ChannelFuture writeAndFlush(Object msg);

 

這裏的I/O事件都是outbound出站事件,表示由用戶發起,即用戶能夠調用這些方法產生響應的事件。對應地,有inbound入站事件,將在ChnanelPipeline一節中詳述。


6.1.4 Unsafe

Unsafe?直譯中文爲不安全,這曾給我帶來極大的困擾。若是你是第一次遇到這種接口,必定會和我感同身受。一個Unsafe對象是不安全的?這裏說的不安全,是相對於用戶程序員而言的,也就是說,用戶程序員使用Netty進行編程時不會接觸到這個接口和相關類。爲何不會接觸到呢?由於相似的接口和類是Netty的大量內部實現細節,不會暴露給用戶程序員。然而咱們的目標是自頂向下深刻分析Netty,因此有必要深刻Unsafe雷區。咱們先看Unsafe接口中的方法:

    SocketAddress localAddress();   // 本地地址
    SocketAddress remoteAddress();  // 遠端地址
    ChannelPromise voidPromise();   // 不關心結果的異步Promise?
    ChannelOutboundBuffer outboundBuffer(); // 寫緩衝區
    void register(EventLoop eventLoop, ChannelPromise promise);
    void bind(SocketAddress localAddress, ChannelPromise promise);
    void connect(SocketAddress remoteAddress, SocketAddress localAddress, 
                              ChannelPromise promise);
    void disconnect(ChannelPromise promise);
    void close(ChannelPromise promise);
    void closeForcibly();
    void deregister(ChannelPromise promise);
    void beginRead();
    void write(Object msg, ChannelPromise promise);
    void flush();

 

也許你已經發現Unsafe接口和Channel接口中都有register、bind等I/O事件相關的方法,它們有什麼區別呢?回憶一下EventLoop線程實現,當一個selectedKey就緒時,對I/O事件的處理委託給unsafe對象實現,代碼相似以下:

    if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
        k.interestOps(k.interestOps() & ~SelectionKey.OP_CONNECT); 
        unsafe.finishConnect(); 
    }
    if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0
                  || readyOps == 0) {
        unsafe.read(); 
    }
    if ((readyOps & SelectionKey.OP_WRITE) != 0) {
        ch.unsafe().forceFlush();
    }

 

也就是說,Unsafe的子類做爲Channel的內部類,負責處理底層NIO相關的I/O事件。Channel則使用責任鏈的方式經過ChannelPipeline將事件提供給用戶自定義處理。

 

6.2 Channel實現

![Netty_Channel類圖][2]

Channel的類圖比較清晰。咱們主要分析NioSocketChannel和NioServerSocketChannel這兩條線。

6.2.1 AbstractChannel

首先看其中的字段:

    private final Channel parent;   // 父Channel
    private final Unsafe unsafe;    
    private final DefaultChannelPipeline pipeline;  // 處理通道
    private final ChannelFuture succeededFuture = new SucceededChannelFuture(this, null);
    private final VoidChannelPromise voidPromise = new VoidChannelPromise(this, true);
    private final VoidChannelPromise unsafeVoidPromise = new VoidChannelPromise(this, false);
    private final CloseFuture closeFuture = new CloseFuture(this);

    private volatile SocketAddress localAddress;    // 本地地址
    private volatile SocketAddress remoteAddress;   // 遠端地址
    private volatile EventLoop eventLoop;   // EventLoop線程
    private volatile boolean registered;    // 是否註冊到EventLoop

 而後,咱們看其中的構造方法:

    protected AbstractChannel(Channel parent) {
        this.parent = parent;
        unsafe = newUnsafe();
        pipeline = newChannelPipeline();
    }

 newUnsafe()和newChannelPipeline()可由子類覆蓋實現。在Netty的實現中每個Channel都有一個對應的Unsafe內部類:AbstractChannel--AbstractUnsafe,AbstractNioChannel--AbstractNioUnsafe等等,newUnsafe()方法正好用來生成這樣的對應關係。ChannelPipeline將在以後講解,這裏先了解它的功能:做爲用戶處理器Handler的容器爲用戶提供自定義處理I/O事件的能力即爲用戶提供業務邏輯處理。AbstractChannel中對I/O事件的處理,都委託給ChannelPipeline處理,代碼都一模一樣:

    public ChannelFuture bind(SocketAddress localAddress) {
        return pipeline.bind(localAddress);
    }

AbstractChannel其餘方法都比較簡單,主要關注狀態斷定的方法:

    public boolean isRegistered() {
        return registered;
    }

    public boolean isWritable() {
        ChannelOutboundBuffer buf = unsafe.outboundBuffer();
        return buf != null && buf.isWritable(); // 寫緩衝區不爲null且可寫
    }

 對於Channel的實現來講,其中的內部類Unsafe纔是關鍵,由於其中含有I/O事件處理的細節。AbstractUnsafe做爲AbstractChannel的內部類,定義了I/O事件處理的基本框架,其中的細節留給子類實現。咱們將依次對各個事件框架進行分析。

  1. register事件框架

    public final void register(EventLoop eventLoop, final ChannelPromise promise) {
        if (isRegistered()) {
            promise.setFailure(...);    // 已經註冊則失敗
            return;
        }
        if (!isCompatible(eventLoop)) { // EventLoop不兼容當前Channel
            promise.setFailure(...);
            return;
        }
        AbstractChannel.this.eventLoop = eventLoop;
        // 當前線程爲EventLoop線程直接執行;不然提交任務給EventLoop線程
        if (eventLoop.inEventLoop()) {
            register0(promise);
        } else {
            try {
                eventLoop.execute(() -> { register0(promise); });
            } catch (Throwable t) {
                closeForcibly();    // 異常時關閉Channel
                closeFuture.setClosed();    
                safeSetFailure(promise, t);
            }
        }
    }

 

12-22行相似的代碼結構,Netty使用了不少次,這是爲了保證I/O事件以及用戶定義的I/O事件處理邏輯(業務邏輯)在一個線程中處理。咱們看提交的任務register0():

    private void register0(ChannelPromise promise) {
        try {
            // 確保Channel沒有關閉
            if (!promise.setUncancellable() || !ensureOpen(promise)) {
                return;
            }
            boolean firstRegistration = neverRegistered;
            doRegister();   // 模板方法,細節由子類完成
            neverRegistered = false;
            registered = true;
            pipeline.invokeHandlerAddedIfNeeded();  // 將用戶Handler添加到ChannelPipeline
            safeSetSuccess(promise);
            pipeline.fireChannelRegistered();   // 觸發Channel註冊事件
            if (isActive()) {
                // ServerSocketChannel接受的Channel此時已被激活
                if (firstRegistration) {
                    // 首次註冊且激活觸發Channel激活事件
                    pipeline.fireChannelActive();   
                } else if (config().isAutoRead()) {
                    beginRead();   // 可視爲模板方法 
                }
            }
        } catch (Throwable t) {
            closeForcibly();     // 可視爲模板方法
            closeFuture.setClosed();
            safeSetFailure(promise, t);
        }
    }

 

register0()方法定義了註冊到EventLoop的總體框架,整個流程以下:
(1).註冊的具體細節由doRegister()方法完成,子類中實現。
(2).註冊後將處理業務邏輯的用戶Handler添加到ChannelPipeline
(3).異步結果設置爲成功,觸發Channel的Registered事件。
(4).對於服務端接受的客戶端鏈接,若是首次註冊,觸發Channel的Active事件若是已設置autoRead,則調用beginRead()開始讀取數據
對於(4)的是由於fireChannelActive()中也根據autoRead配置,調用了beginRead()方法。beginRead()方法其實也是一個框架,細節由doBeginRead()方法在子類中實現:

    public final void beginRead() {
        assertEventLoop();
        if (!isActive()) {
            return;
        }
        try {
            doBeginRead();
        } catch (final Exception e) {
            invokeLater(() -> { pipeline.fireExceptionCaught(e); });
            close(voidPromise());
        }
    }

異常處理的closeForcibly()方法也是一個框架,細節由doClose()方法在子類中實現:

    public final void closeForcibly() {
        assertEventLoop();
        try {
            doClose();
        } catch (Exception e) {
            logger.warn("Failed to close a channel.", e);
        }
    }

 register框架中有一對safeSetXXX()方法,將未完成的Promise標記爲完成且成功或失敗,其實現以下:

    protected final void safeSetSuccess(ChannelPromise promise) {
        if (!(promise instanceof VoidChannelPromise) && !promise.trySuccess()) {
            logger.warn(...);
        }
    }

至此,register事件框架分析完畢。

  1. bind事件框架

    public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
        assertEventLoop();
        if (!promise.setUncancellable() || !ensureOpen(promise)) {
            return; // 確保Channel沒有關閉
        }
        boolean wasActive = isActive();
        try {
            doBind(localAddress);   // 模板方法,細節由子類完成
        } catch (Throwable t) {
            safeSetFailure(promise, t);
            closeIfClosed();
            return;
        }
        if (!wasActive && isActive()) { 
            invokeLater(() -> { pipeline.fireChannelActive(); });   // 觸發Active事件
        }
        safeSetSuccess(promise);
    }

bind事件框架較爲簡單,主要完成在Channel綁定完成後觸發Channel的Active事件。其中的invokeLater()方法向Channel註冊到的EventLoop提交一個任務:

    private void invokeLater(Runnable task) {
        try {
            eventLoop().execute(task);
        } catch (RejectedExecutionException e) {
            logger.warn("Can't invoke task later as EventLoop rejected it", e);
        }
    }

closeIfClosed()方法當Channel再也不打開時關閉Channel,代碼以下:

    protected final void closeIfClosed() {
        if (isOpen()) {
            return;
        }
        close(voidPromise());
    }

close()也是一個框架,以後會進行分析。

  1. disconnect事件框架

    public final void disconnect(final ChannelPromise promise) {
        assertEventLoop();
        if (!promise.setUncancellable()) {
            return;
        }
        boolean wasActive = isActive();
        try {
            doDisconnect(); // 模板方法,細節由子類實現
        } catch (Throwable t) {
            safeSetFailure(promise, t);
            closeIfClosed();
            return;
        }
        if (wasActive && !isActive()) {
            invokeLater(() ->{ pipeline.fireChannelInactive(); });  // 觸發Inactive事件
        }
        safeSetSuccess(promise);
        closeIfClosed(); // disconnect框架可能會調用close框架
    }

 

  1. close事件框架

    public final void close(final ChannelPromise promise) {
        assertEventLoop();
        close(promise, CLOSE_CLOSED_CHANNEL_EXCEPTION, CLOSE_CLOSED_CHANNEL_EXCEPTION, false);
    }

    private void close(final ChannelPromise promise, final Throwable cause,
                       final ClosedChannelException closeCause, final boolean notify) {
        if (!promise.setUncancellable()) {
            return;
        }
        final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;   
        if (outboundBuffer == null) {   // outboundBuffer做爲一個標記,爲空表示Channel正在關閉
            if (!(promise instanceof VoidChannelPromise)) {
                // 當Channel關閉時,將這次close異步請求結果也設置爲成功
                closeFuture.addListener( (future) -> { promise.setSuccess(); });
            }
            return;
        }
        if (closeFuture.isDone()) {
            safeSetSuccess(promise);    // 已經關閉,保證底層close只執行一次
            return;
        }
        final boolean wasActive = isActive();
        this.outboundBuffer = null; // 設置爲空禁止write操做,同時做爲標記字段表示正在關閉
        Executor closeExecutor = prepareToClose();
        if (closeExecutor != null) {
            closeExecutor.execute(() -> {
                try {
                    doClose0(promise);  // prepareToClose返回的executor執行
                } finally {
                    invokeLater( () -> { // Channel註冊的EventLoop執行
                        // 寫緩衝隊列中的數據所有設置失敗
                        outboundBuffer.failFlushed(cause, notify);
                        outboundBuffer.close(closeCause);
                        fireChannelInactiveAndDeregister(wasActive);
                    });
                }
            });
        } else {    // 當前調用線程執行
            try {
                doClose0(promise);
            } finally {
                outboundBuffer.failFlushed(cause, notify);
                outboundBuffer.close(closeCause);
            }
            if (inFlush0) {
                invokeLater( () -> { fireChannelInactiveAndDeregister(wasActive); });
            } else {
                fireChannelInactiveAndDeregister(wasActive);
            }
        }
    }
    
    private void doClose0(ChannelPromise promise) {
        try {
            doClose();  // 模板方法,細節由子類實現
            closeFuture.setClosed();
            safeSetSuccess(promise);
        } catch (Throwable t) {
            closeFuture.setClosed();
            safeSetFailure(promise, t);
        }
    }

 close事件框架保證只有一個線程執行了真正關閉的doClose()方法,prepareToClose()作一些關閉前的清除工做並返回一個Executor,若是不爲空,須要在該Executor裏執行doClose0()方法;爲空,則在當前線程執行(爲何這樣設計?)。寫緩衝區outboundBuffer同時也做爲一個標記字段,爲空表示Channel正在關閉此時禁止寫操做。fireChannelInactiveAndDeregister()方法須要invokeLater()使用EventLoop執行,是由於其中會調用deRegister()方法觸發Inactive事件,而事件執行須要在EventLoop中執行。

    private void fireChannelInactiveAndDeregister(final boolean wasActive) {
        deregister(voidPromise(), wasActive && !isActive());
    }

 

  1. deregister事件框架

    public final void deregister(final ChannelPromise promise) {
        assertEventLoop();
        deregister(promise, false);
    }

    private void deregister(final ChannelPromise promise, final boolean fireChannelInactive) {
        if (!promise.setUncancellable()) {
            return;
        }
        if (!registered) {
            safeSetSuccess(promise);    // 已經deregister
            return;
        }
        invokeLater( () -> {
            try {
                doDeregister(); // 模板方法,子類實現具體細節
            } catch (Throwable t) {
                logger.warn(...);
            } finally {
                if (fireChannelInactive) {
                    pipeline.fireChannelInactive(); // 根據參數觸發Inactive事件
                }
                if (registered) {
                    registered = false;
                    pipeline.fireChannelUnregistered(); // 首次調用觸發Unregistered事件
                }
                safeSetSuccess(promise);
            }
        });
    }

deregister事件框架的處理流程很清晰,其中,使用invokeLater()方法是由於:用戶可能會在ChannlePipeline中將當前Channel註冊到新的EventLoop,確保ChannelPipiline事件和doDeregister()在同一個EventLoop完成([?][3])。

須要注意的是:事件之間可能相互調用,好比:disconnect->close->deregister。

  1. write事件框架

    public final void write(Object msg, ChannelPromise promise) {
        assertEventLoop();
        ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
        if (outboundBuffer == null) {
            // 聯繫close操做,outboundBuffer爲空表示Channel正在關閉,禁止寫數據
            safeSetFailure(promise, WRITE_CLOSED_CHANNEL_EXCEPTION);
            ReferenceCountUtil.release(msg); // 釋放msg 防止泄露
            return;
        }
        int size;
        try {
            msg = filterOutboundMessage(msg);
            size = pipeline.estimatorHandle().size(msg);
            if (size < 0) {
                size = 0;
            }
        } catch (Throwable t) {
            safeSetFailure(promise, t);
            ReferenceCountUtil.release(msg);
            return;
        }
        outboundBuffer.addMessage(msg, size, promise);
    }

事實上,這是Netty定義的write操做的所有代碼,完成的功能是將要寫的消息Msg加入到寫緩衝區。其中的filterOutboundMessage()可對消息進行過濾整理,例如把HeapBuffer轉爲DirectBuffer,具體實現由子類負責:

    protected Object filterOutboundMessage(Object msg) throws Exception {
        return msg; // 默認實現
    }

 

  1. flush事件框架

    public final void flush() {
        assertEventLoop();
        ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
        if (outboundBuffer == null) {
            return; // Channel正在關閉直接返回
        }
        outboundBuffer.addFlush();  // 添加一個標記
        flush0();
    }

    protected void flush0() {
        if (inFlush0) {
            return;     // 正在flush返回防止屢次調用
        }
        final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
        if (outboundBuffer == null || outboundBuffer.isEmpty()) {
            return; // Channel正在關閉或者已沒有須要寫的數據
        }
        inFlush0 = true;
        if (!isActive()) {
            // Channel已經非激活,將全部進行中的寫請求標記爲失敗
            try {
                if (isOpen()) {
                    outboundBuffer.failFlushed(FLUSH0_NOT_YET_CONNECTED_EXCEPTION, true);
                } else {
                    outboundBuffer.failFlushed(FLUSH0_CLOSED_CHANNEL_EXCEPTION, false);
                }
            } finally {
                inFlush0 = false;
            }
            return;
        }
        try {
            doWrite(outboundBuffer);    // 模板方法,細節由子類實現
        } catch (Throwable t) {
            if (t instanceof IOException && config().isAutoClose()) {
                close(voidPromise(), t, FLUSH0_CLOSED_CHANNEL_EXCEPTION, false);
            } else {
                outboundBuffer.failFlushed(t, true);
            }
        } finally {
            inFlush0 = false;
        }
    }

flush事件中執行真正的底層寫操做,Netty對於寫的處理引入了一個寫緩衝區ChannelOutboundBuffer,由該緩衝區控制Channel的可寫狀態,其具體實現,將會在緩衝區一章中分析。

至此,Unsafe中的事件方法已經分析完7個,但還有connect和read沒有引入,下一節將進行分析。

6.2.2 AbstractNioChannel

Netty的實現中,Unsafe的I/O事件框架中的細節實現方法doXXX()放到了Channel子類中而不是Unsafe子類中,因此咱們先分析Unsafe,而後分析Channel。
AbstractNioChannel從名字能夠看出是對NIO的抽象,咱們自頂向下一步一步深刻細節,該類中定義了一個NioUnsafe接口:

    public interface NioUnsafe extends Unsafe {
        SelectableChannel ch(); // 對應NIO中的JDK實現的Channel
        void finishConnect();   // 鏈接完成
        void read();    // 從JDK的Channel中讀取數據
        void forceFlush(); 
    }

回憶NIO的三大概念:Channel、Buffer、Selector,Netty的Channel包裝了JDK的Channel從而實現更爲複雜的功能。Unsafe中可使用ch()方法,NioChannel中可使用javaChannel()方法得到JDK的Channel。接口中定義了finishConnect()方法是由於:SelectableChannel設置爲非阻塞模式時,connect()方法會當即返回,此時鏈接操做可能沒有完成,若是沒有完成,則須要調用JDK的finishConnect()方法完成鏈接操做。也許你已經注意到,AbstractUnsafe中並無connect事件框架,這是由於並非全部鏈接都有標準的connect過程,好比Netty的LocalChannel和EmbeddedChannel。可是NIO中的鏈接操做則有較爲標準的流程,在介紹Connect事件框架前,先介紹一下其中使用到的相關字段,這些字段定義在AbstractNioChannel中:

    private ChannelPromise connectPromise;  // 鏈接異步結果
    private ScheduledFuture<?> connectTimeoutFuture;    // 鏈接超時檢測任務異步結果
    private SocketAddress requestedRemoteAddress;   // 鏈接的遠端地址

Connect事件框架:

    public final void connect(final SocketAddress remoteAddress, final SocketAddress localAddress,
                                            final ChannelPromise promise) {
        if (!promise.setUncancellable() || !ensureOpen(promise)) {
            return; // Channel已被關閉
        }
        try {
            if (connectPromise != null) {
                throw new ConnectionPendingException(); // 已有鏈接操做正在進行
            }
            boolean wasActive = isActive();
            // 模板方法,細節子類完成
            if (doConnect(remoteAddress, localAddress)) {
                fulfillConnectPromise(promise, wasActive);  // 鏈接操做已完成
            } else {
                // 鏈接操做還沒有完成
                connectPromise = promise;
                requestedRemoteAddress = remoteAddress;
                // 這部分代碼爲Netty的鏈接超時機制
                int connectTimeoutMillis = config().getConnectTimeoutMillis();
                if (connectTimeoutMillis > 0) {
                    connectTimeoutFuture = eventLoop().schedule(() -> {
                        ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise;
                        ConnectTimeoutException cause = new ConnectTimeoutException("...");
                        if (connectPromise != null && connectPromise.tryFailure(cause)) {
                            close(voidPromise());
                        }
                    }, connectTimeoutMillis, TimeUnit.MILLISECONDS);
                }

                promise.addListener((ChannelFutureListener) (future) -> {
                    if (future.isCancelled()) {
                        // 鏈接操做取消則鏈接超時檢測任務取消
                        if (connectTimeoutFuture != null) {
                            connectTimeoutFuture.cancel(false);
                        }
                        connectPromise = null;
                        close(voidPromise());
                    }
                });
            }
        } catch (Throwable t) {
            promise.tryFailure(annotateConnectException(t, remoteAddress));
            closeIfClosed();
        }
    }

Connect事件框架中包含了Netty的鏈接超時檢測機制:向EventLoop提交一個調度任務,設定的超時時間已到則向鏈接操做的異步結果設置失敗而後關閉鏈接。fulfillConnectPromise()設置異步結果爲成功並觸發Channel的Active事件:

    private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {
        if (promise == null) {
            return; // 操做已取消或Promise已被通知?
        }
        boolean active = isActive();
        boolean promiseSet = promise.trySuccess();  // False表示用戶取消操做
        if (!wasActive && active) { // 此時用戶沒有取消Connect操做
            pipeline().fireChannelActive(); // 觸發Active事件
        }
        if (!promiseSet) {
            close(voidPromise()); // 操做已被用戶取消,關閉Channel
        }
    }

 

FinishConnect事件框架:

    public final void finishConnect() {
        assert eventLoop().inEventLoop();
        try {
            boolean wasActive = isActive();
            doFinishConnect();  // 模板方法
            fulfillConnectPromise(connectPromise, wasActive);   // 首次Active觸發Active事件
        } catch (Throwable t) {
            fulfillConnectPromise(connectPromise, annotateConnectException(...));
        } finally {
            if (connectTimeoutFuture != null) {
                connectTimeoutFuture.cancel(false); // 鏈接完成,取消超時檢測任務
            }
            connectPromise = null;
        }
    }

finishConnect()只由EventLoop處理就緒selectionKey的OP_CONNECT事件時調用,從而完成鏈接操做。注意:鏈接操做被取消或者超時不會使該方法被調用。

Flush事件細節:

    protected final void flush0() {
        if (isFlushPending()) {
            return; // 已經有flush操做,返回
        }
        super.flush0(); // 調用父類方法
    }

    public final void forceFlush() {
        super.flush0(); // 調用父類方法
    }

    private boolean isFlushPending() {
        SelectionKey selectionKey = selectionKey();
        return selectionKey.isValid() && 
                    (selectionKey.interestOps() & SelectionKey.OP_WRITE) != 0;
    }

forceFlush()方法由EventLoop處理就緒selectionKey的OP_WRITE事件時調用,將緩衝區中的數據寫入Channel。isFlushPending()方法容易致使困惑:爲何selectionKey關心OP_WRITE事件表示正在Flush呢?OP_WRITE表示通道可寫,而通常狀況下通道均可寫,若是selectionKey一直關心OP_WRITE事件,那麼將不斷從select()方法返回從而致使死循環。Netty使用一個寫緩衝區,write操做將數據放入緩衝區中,flush時設置selectionKey關心OP_WRITE事件,完成後取消關心OP_WRITE事件。因此,若是selectionKey關心OP_WRITE事件表示此時正在Flush數據。

AbstractNioUnsafe還有最後一個方法removeReadOp():

    protected final void removeReadOp() {
        SelectionKey key = selectionKey();
        if (!key.isValid()) {
            return; // selectionKey已被取消
        }
        int interestOps = key.interestOps();
        if ((interestOps & readInterestOp) != 0) {
            key.interestOps(interestOps & ~readInterestOp); // 設置爲再也不感興趣
        }
    }

Netty中將服務端的OP_ACCEPT和客戶端的Read統一抽象爲Read事件,在NIO底層I/O事件使用bitmap表示,一個二進制位對應一個I/O事件。當一個二進制位爲1時表示關心該事件,readInterestOp的二進制表示只有1位爲1,因此體會interestOps & ~readInterestOp的含義,可知removeReadOp()的功能是設置SelectionKey再也不關心Read事件。相似的,還有setReadOp()、removeWriteOp()、setWriteOp()等等。

分析完AbstractNioUnsafe,咱們再分析AbstractNioChannel,首先看其中還沒講解的字段:

    private final SelectableChannel ch; // 包裝的JDK Channel
    protected final int readInterestOp; // Read事件,服務端OP_ACCEPT,其餘OP_READ
    volatile SelectionKey selectionKey; // JDK Channel對應的選擇鍵
    private volatile boolean inputShutdown; // Channel的輸入關閉標記
    private volatile boolean readPending;   // 底層讀事件進行標記

再看一下構造方法:

    protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
        super(parent);
        this.ch = ch;
        this.readInterestOp = readInterestOp;
        try {
            ch.configureBlocking(false);    // 設置非阻塞模式
        } catch (IOException e) {
            try {
                ch.close();
            } catch (IOException e2) {
                // log
            }
            throw new ChannelException("Failed to enter non-blocking mode.", e);
        }
    }

其中的ch.configureBlocking(false)方法設置Channel爲非阻塞模式,從而爲Netty提供非阻塞處理I/O事件的能力。

對於AbstractNioChannel的方法,咱們主要分析它實現I/O事件框架細節部分的doXXX()方法。

    protected void doRegister() throws Exception {
        boolean selected = false;
        for (;;) {
            try {
                selectionKey = javaChannel().register(eventLoop().selector, 0, this);
                return;
            } catch (CancelledKeyException e) {
                if (!selected) {
                    // 選擇鍵取消從新selectNow(),清除因取消操做而緩存的選擇鍵
                    eventLoop().selectNow();
                    selected = true;
                } else {
                    throw e;
                }
            }
        }
    }
    
    protected void doDeregister() throws Exception {
        eventLoop().cancel(selectionKey()); // 設置取消選擇鍵
    }

對於Register事件,當Channel屬於NIO時,已經能夠肯定註冊操做的所有細節:將Channel註冊到給定NioEventLoop的selector上便可。注意,其中第二個參數0表示註冊時不關心任何事件,第三個參數爲Netty的NioChannel對象自己。對於Deregister事件,選擇鍵執行cancle()操做,選擇鍵表示JDK Channel和selctor的關係,調用cancle()終結這種關係,從而實現從NioEventLoop中Deregister。須要注意的是:cancle操做調用後,註冊關係不會當即生效,而會將cancle的key移入selector的一個取消鍵集合,當下次調用select相關方法或一個正在進行的select調用結束時,會從取消鍵集合中移除該選擇鍵,此時註銷才真正完成。一個Cancle的選擇鍵爲無效鍵,調用它相關的方法會拋出CancelledKeyException。

    protected void doBeginRead() throws Exception {
        if (inputShutdown) {
            return; // Channel的輸入關閉?什麼狀況下發生?
        }
        final SelectionKey selectionKey = this.selectionKey;
        if (!selectionKey.isValid()) {
            return; // 選擇鍵被取消而再也不有效
        }
        readPending = true; // 設置底層讀事件正在進行
        final int interestOps = selectionKey.interestOps();
        if ((interestOps & readInterestOp) == 0) {
            // 選擇鍵關心Read事件
            selectionKey.interestOps(interestOps | readInterestOp);
        }
    }

對於NioChannel的beginRead事件,只需將Read事件設置爲選擇鍵所關心的事件,則以後的select()調用若是Channel對應的Read事件就緒,便會觸發Netty的read()操做。

    protected void doClose() throws Exception {
        ChannelPromise promise = connectPromise;
        if (promise != null) {
            // 鏈接操做還在進行,但用戶調用close操做
            promise.tryFailure(DO_CLOSE_CLOSED_CHANNEL_EXCEPTION);
            connectPromise = null;
        }
        ScheduledFuture<?> future = connectTimeoutFuture;
        if (future != null) {
            // 若是有鏈接超時檢測任務,則取消
            future.cancel(false);
            connectTimeoutFuture = null;
        }
    }

此處的doClose操做主要處理了鏈接操做相關的後續處理。並無實際關閉Channel,因此須要子類繼續增長細節實現。AbstractNioChannel中還有關於建立DirectBuffer的方法,將在之後必要時進行分析。其餘的方法則較爲簡單,不在列出。最後提一下isCompatible()方法,說明NioChannel只在NioEventLoop中可用。

    protected boolean isCompatible(EventLoop loop) {
        return loop instanceof NioEventLoop;
    }

 

AbstractNioChannel的子類實現分爲服務端AbstractNioMessageChannel和客戶端AbstractNioByteChannel,咱們將首先分析服務端AbstractNioMessageChannel。

6.2.3 AbstractNioMessageChannel

AbstractNioMessageChannel是底層數據爲消息的NioChannel。在Netty中,服務端Accept的一個Channel被認爲是一條消息,UDP數據報也是一條消息。該類主要完善flush事件框架的doWrite細節和實現read事件框架(在內部類NioMessageUnsafe完成)。首先看read事件框架:

    public void read() {
        assert eventLoop().inEventLoop();
        final ChannelConfig config = config();
        if (!config.isAutoRead() && !isReadPending()) {
            // 此時讀操做不被容許,既沒有配置autoRead也沒有底層讀事件進行
            removeReadOp(); // 清除read事件,再也不關心
            return;
        }
        
        final int maxMessagesPerRead = config.getMaxMessagesPerRead();
        final ChannelPipeline pipeline = pipeline();
        boolean closed = false;
        Throwable exception = null;
        try {
            try {
                for (;;) {
                    int localRead = doReadMessages(readBuf); // 模板方法,讀取消息
                    if (localRead == 0) { // 沒有數據可讀
                        break;  
                    }
                    if (localRead < 0) { // 讀取出錯
                        closed = true;  
                        break;
                    }
                    if (!config.isAutoRead()) { //沒有設置AutoRead
                        break;
                    }
                    if (readBuf.size() >= maxMessagesPerRead) { // 達到最大可讀數
                        break;
                    }
                }
            } catch (Throwable t) {
                exception = t;
            }
            
            setReadPending(false);  // 已沒有底層讀事件
            int size = readBuf.size();
            for (int i = 0; i < size; i ++) {
                pipeline.fireChannelRead(readBuf.get(i));   //觸發ChannelRead事件,用戶處理
            }
            readBuf.clear(); // ChannelReadComplete事件中若是配置autoRead則會調用beginRead,從而不斷進行讀操做
            pipeline.fireChannelReadComplete(); // 觸發ChannelReadComplete事件,用戶處理

            if (exception != null) {
                if (exception instanceof IOException && !(exception instanceof PortUnreachableException)) {
                    // ServerChannel異常也不能關閉,應該恢復讀取下一個客戶端
                    closed = !(AbstractNioMessageChannel.this instanceof ServerChannel);
                }
                pipeline.fireExceptionCaught(exception);
            }

            if (closed) {
                if (isOpen()) {
                    close(voidPromise());   // 非serverChannel且打開則關閉
                }
            }
        } finally {
            if (!config.isAutoRead() && !isReadPending()) {
                // 既沒有配置autoRead也沒有底層讀事件進行
                removeReadOp();
            }
        }
    }

 

read事件框架的流程已在代碼中註明,須要注意的是讀取消息的細節doReadMessages(readBuf)方法由子類實現。
咱們主要分析NioServerSocketChannel,它不支持doWrite()操做,因此咱們再也不分析本類的flush事件框架的doWrite細節方法,直接轉向下一個目標:NioServerSocketChannel。

6.2.4 NioServerSocketChannel

你確定已經使用過NioServerSocketChannel,做爲處於Channel最底層的子類,NioServerSocketChannel會實現I/O事件框架的底層細節。首先須要注意的是:NioServerSocketChannel只支持bind、read和close操做

   protected void doBind(SocketAddress localAddress) throws Exception {
        if (PlatformDependent.javaVersion() >= 7) { // JDK版本1.7以上
            javaChannel().bind(localAddress, config.getBacklog());
        } else {
            javaChannel().socket().bind(localAddress, config.getBacklog());
        }
    }
    
    protected int doReadMessages(List<Object> buf) throws Exception {
        SocketChannel ch = javaChannel().accept();
        try {
            if (ch != null) {
                // 一個NioSocketChannel爲一條消息
                buf.add(new NioSocketChannel(this, ch));
                return 1;
            }
        } catch (Throwable t) {
            logger.warn("Failed to create a new channel from an accepted socket.", t);
            try {
                ch.close();
            } catch (Throwable t2) {
                logger.warn("Failed to close a socket.", t2);
            }
        }
        return 0;
    }
    
    protected void doClose() throws Exception {
        javaChannel().close();
    }

其中的實現,都是調用JDK的Channel的方法,從而實現了最底層的細節。須要注意的是:此處的doReadMessages()方法每次最多返回一個消息(客戶端鏈接),由此可知NioServerSocketChannel的read操做一次至多處理的鏈接數爲config.getMaxMessagesPerRead(),也就是參數值MAX_MESSAGES_PER_READ。此外doClose()覆蓋了AbstractNioChannel的實現,由於NioServerSocketChannel不支持connect操做,因此不須要鏈接超時處理。

最後,咱們再看關鍵構造方法:

    public NioServerSocketChannel(ServerSocketChannel channel) {
        super(null, channel, SelectionKey.OP_ACCEPT);
        config = new NioServerSocketChannelConfig(this, javaChannel().socket());
    }

其中的SelectionKey.OP_ACCEPT最爲關鍵,Netty正是在此處將NioServerSocketChannel的read事件定義爲NIO底層的OP_ACCEPT,統一完成read事件的抽象。

至此,咱們已分析完兩條線索中的服務端部分,下面分析客戶端部分。首先是AbstractNioChannel的另外一個子類AbstractNioByteChannel。

6.2.5 AbstractNioByteChannel

從字面可推知,AbstractNioByteChannel的底層數據爲Byte字節。首先看構造方法:

    protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
        super(parent, ch, SelectionKey.OP_READ);
    }

 

其中的SelectionKey.OP_READ,說明AbstractNioByteChannel的read事件爲NIO底層的OP_READ事件。
而後咱們看read事件框架:

    public final void read() {
        final ChannelConfig config = config();
        if (!config.isAutoRead() && !isReadPending()) {
            // 此時讀操做不被容許,既沒有配置autoRead也沒有底層讀事件進行
            removeReadOp();
            return;
        }

        final ChannelPipeline pipeline = pipeline();
        final ByteBufAllocator allocator = config.getAllocator();
        final int maxMessagesPerRead = config.getMaxMessagesPerRead();
        RecvByteBufAllocator.Handle allocHandle = this.allocHandle;
        if (allocHandle == null) {
            this.allocHandle = allocHandle = config.getRecvByteBufAllocator().newHandle();
        }

        ByteBuf byteBuf = null;
        int messages = 0;
        boolean close = false;
        try {
            int totalReadAmount = 0;
            boolean readPendingReset = false;
            do {
                byteBuf = allocHandle.allocate(allocator);  // 建立一個ByteBuf
                int writable = byteBuf.writableBytes(); 
                int localReadAmount = doReadBytes(byteBuf); // 模板方法,子類實現細節
                if (localReadAmount <= 0) { // 沒有數據可讀
                    byteBuf.release();
                    byteBuf = null;
                    close = localReadAmount < 0; // 讀取數據量爲負數表示對端已經關閉
                    break;
                }
                if (!readPendingReset) {
                    readPendingReset = true;
                    setReadPending(false);  // 沒有底層讀事件進行
                    // 此時,若autoRead關閉則必須調用beginRead,read操做纔會讀取數據
                }
                pipeline.fireChannelRead(byteBuf);  // 觸發ChannelRead事件,用戶處理
                byteBuf = null;

                if (totalReadAmount >= Integer.MAX_VALUE - localReadAmount) {   // 防止溢出
                    totalReadAmount = Integer.MAX_VALUE;
                    break;
                }
                totalReadAmount += localReadAmount;

                if (!config.isAutoRead()) { // 沒有配置AutoRead
                    break;
                }
                if (localReadAmount < writable) {   // 讀取數小於可寫數,可能接受緩衝區已徹底耗盡
                    break;
                }
            } while (++ messages < maxMessagesPerRead);

            // ReadComplete結束時,若是開啓autoRead則會調用beginRead,從而能夠繼續read
            pipeline.fireChannelReadComplete();
            allocHandle.record(totalReadAmount);

            if (close) {
                closeOnRead(pipeline);
                close = false;
            }
        } catch (Throwable t) {
            handleReadException(pipeline, byteBuf, t, close);
        } finally {
            if (!config.isAutoRead() && !isReadPending()) {
                // 既沒有配置autoRead也沒有底層讀事件進行
                removeReadOp(); 
            }
        }
    }

AbstractNioByteChannel的read事件框架處理流程與AbstractNioMessageChannel的稍有不一樣:AbstractNioMessageChannel依次讀取Message,最後統一觸發ChannelRead事件;而AbstractNioByteChannel每讀取到必定字節就觸發ChannelRead事件。這是由於,AbstractNioMessageChannel需求高吞吐量,特別是ServerSocketChannel須要儘量多地接受鏈接;而AbstractNioByteChannel需求快響應,要儘量快地響應遠端請求

read事件的具體流程請參考代碼和代碼註釋進行理解,再也不分析。注意到代碼中有關於接收緩衝區的代碼,這一部分咱們單獨使用一節講述,以後會分析。當讀取到的數據小於零時,表示遠端鏈接已關閉,這時會調用closeOnRead(pipeline)方法:

    private void closeOnRead(ChannelPipeline pipeline) {
        SelectionKey key = selectionKey();
        setInputShutdown(); // 遠端關閉此時設置Channel的輸入源關閉
        if (isOpen()) {
            if (Boolean.TRUE.equals(config().getOption(ChannelOption.ALLOW_HALF_CLOSURE))) {
                // 取消關心Read事件並觸發UserEvent事件ChannelInputShutdownEvent
                key.interestOps(key.interestOps() & ~readInterestOp);   
                pipeline.fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE);
            } else {
                close(voidPromise());   // 直接關閉
            }
        }
    }

 這段代碼正是Channel參數ALLOW_HALF_CLOSURE的意義描述,該參數爲True時,會觸發用戶事件ChannelInputShutdownEvent,不然,直接關閉該Channel。

拋出異常時,會調用handleReadException(pipeline, byteBuf, t, close)方法:

    private void handleReadException(ChannelPipeline pipeline,
                                         ByteBuf byteBuf, Throwable cause, boolean close) {
        if (byteBuf != null) {  // 已讀取到數據
            if (byteBuf.isReadable()) { // 數據可讀
                setReadPending(false);
                pipeline.fireChannelRead(byteBuf);  
            } else {    // 數據不可讀
                byteBuf.release();
            }
        }
        pipeline.fireChannelReadComplete();
        pipeline.fireExceptionCaught(cause);
        if (close || cause instanceof IOException) {
            closeOnRead(pipeline);
        }
    }

可見,拋出異常時,若是讀取到可用數據和正常讀取同樣觸發ChannelRead事件,只是最後會統一觸發ExceptionCaught事件由用戶進行處理。

至此,read事件框架分析完畢,下面咱們分析write事件的細節實現方法doWrite()。在此以前,先看filterOutboundMessage()方法對須要寫的數據進行過濾。

    protected final Object filterOutboundMessage(Object msg) {
        if (msg instanceof ByteBuf) {
            ByteBuf buf = (ByteBuf) msg;
            if (buf.isDirect()) {
                return msg;
            }
            return newDirectBuffer(buf); // 非DirectBuf轉爲DirectBuf
        }
        if (msg instanceof FileRegion) {
            return msg;
        }
        throw new UnsupportedOperationException("...");
    }

 

可知,Netty支持的寫數據類型只有兩種:DirectBufferFileRegion。咱們再看這些數據怎麼寫到Channel上,也就是doWrite()方法:

    protected void doWrite(ChannelOutboundBuffer in) throws Exception {
        int writeSpinCount = -1;
        boolean setOpWrite = false;
        for (;;) {
            Object msg = in.current();
            if (msg == null) {  // 數據已所有寫完
                clearOpWrite();     // 清除OP_WRITE事件
                return;
            }

            if (msg instanceof ByteBuf) {
                ByteBuf buf = (ByteBuf) msg;
                int readableBytes = buf.readableBytes();
                if (readableBytes == 0) {
                    in.remove();
                    continue;
                }

                boolean done = false;
                long flushedAmount = 0;
                if (writeSpinCount == -1) {
                    writeSpinCount = config().getWriteSpinCount();
                }
                for (int i = writeSpinCount - 1; i >= 0; i --) {
                    int localFlushedAmount = doWriteBytes(buf); // 模板方法,子類實現細節
                    if (localFlushedAmount == 0) {
                        // NIO在非阻塞模式下寫操做可能返回0表示未寫入數據
                        setOpWrite = true;
                        break;
                    }

                    flushedAmount += localFlushedAmount;
                    if (!buf.isReadable()) {
                        // ByteBuf不可讀,此時數據已寫完
                        done = true;
                        break;
                    }
                }
                
                in.progress(flushedAmount); // 記錄進度
                if (done) {
                    in.remove();    // 完成時,清理緩衝區
                } else {
                    break;  // 跳出循環執行incompleteWrite()
                }
            } else if (msg instanceof FileRegion) {
                // ....
            } else {
                throw new Error();  // 其餘類型不支持
            }
        }
        incompleteWrite(setOpWrite);
    }

代碼中省略了對FileRegion的處理,FileRegion是Netty對NIO底層的FileChannel的封裝,負責將File中的數據寫入到WritableChannel中。FileRegion的默認實現是DefaultFileRegion,若是你很感興趣它的實現,能夠自行查閱。

咱們主要分析對ByteBuf的處理。doWrite的流程簡潔明瞭,核心操做是模板方法doWriteBytes(buf),將ByteBuf中的數據寫入到Channel,因爲NIO底層的寫操做返回已寫入的數據量,在非阻塞模式下該值可能爲0,此時會調用incompleteWrite()方法:

    protected final void incompleteWrite(boolean setOpWrite) {
        if (setOpWrite) {
            setOpWrite();   // 設置繼續關心OP_WRITE事件
        } else {
            // 此時已進行寫操做次數writeSpinCount,但並無寫完
            Runnable flushTask = this.flushTask;
            if (flushTask == null) {
                flushTask = this.flushTask = (Runnable) () -> { flush(); };
            }
            // 再次提交一個flush()任務
            eventLoop().execute(flushTask);
        }
    }

 

該方法分兩種狀況處理,在上文提到的第一種狀況(實際寫0數據)下,設置SelectionKey繼續關心OP_WRITE事件從而繼續進行寫操做;第二種狀況下,也就是寫操做進行次數達到配置中的writeSpinCount值但還沒有寫完,此時向EventLoop提交一個新的flush任務,此時能夠響應其餘請求,從而提交響應速度。這樣的處理,不會使大數據的寫操做佔用所有資源而使其餘請求得不到響應,可見這是一個較爲公平的處理。這裏引出一個問題:使用Netty如何搭建高性能文件服務器?
至此,已分析完對於Byte數據的read事件和doWrite細節的處理,接下里,繼續分析NioSocketChannel,從而完善各事件框架的細節部分。

6.2.6 NioSocketChannel

NioSocketChannel做爲Channel的最末端子類,實現了NioSocket相關的最底層細節實現,首先看doBind():

    protected void doBind(SocketAddress localAddress) throws Exception {
        doBind0(localAddress);
    }

    private void doBind0(SocketAddress localAddress) throws Exception {
        if (PlatformDependent.javaVersion() >= 7) {
            javaChannel().bind(localAddress);   // JDK版本1.7以上
        } else {
            javaChannel().socket().bind(localAddress);
        }
    }

 

這部分代碼與NioServerSocketChannel中相同,委託給JDK的Channel進行綁定操做。
接着再看doConnect()和doFinishConnect()方法:

    protected boolean doConnect(SocketAddress remoteAddress, 
                                        SocketAddress localAddress) throws Exception {
        if (localAddress != null) {
            doBind0(localAddress);
        }

        boolean success = false;
        try {
            boolean connected = javaChannel().connect(remoteAddress);
            if (!connected) {
                // 設置關心OP_CONNECT事件,事件就緒時調用finishConnect()
                selectionKey().interestOps(SelectionKey.OP_CONNECT);
            }
            success = true;
            return connected;
        } finally {
            if (!success) {
                doClose();
            }
        }
    }

    protected void doFinishConnect() throws Exception {
        if (!javaChannel().finishConnect()) {
            throw new Error();
        }
    }

 

JDK中的Channel在非阻塞模式下調用connect()方法時,會當即返回結果:成功創建鏈接返回True,操做還在進行時返回False。返回False時,須要在底層OP_CONNECT事件就緒時,調用finishConnect()方法完成鏈接操做。
再看doDisconnect()和doClose()方法:

    protected void doDisconnect() throws Exception {
        doClose();
    }

    protected void doClose() throws Exception {
        super.doClose();    // AbstractNioChannel中關於鏈接超時的處理
        javaChannel().close();
    }

 

而後看核心的doReadBytes()和doWriteXXX()方法:

    protected int doReadBytes(ByteBuf byteBuf) throws Exception {
        return byteBuf.writeBytes(javaChannel(), byteBuf.writableBytes());
    }

    protected int doWriteBytes(ByteBuf buf) throws Exception {
        final int expectedWrittenBytes = buf.readableBytes();
        return buf.readBytes(javaChannel(), expectedWrittenBytes);
    }

    protected long doWriteFileRegion(FileRegion region) throws Exception {
        final long position = region.transfered();
        return region.transferTo(javaChannel(), position);
    }

 

對於read和write操做,委託給ByteBuf處理,咱們將使用專門的一章,對這一部分細節進行完善,將在後面介紹。
NioSocketChannel最重要的部分是覆蓋了父類的doWrite()方法,使用更高效的方式進行寫操做,其代碼以下:

    protected void doWrite(ChannelOutboundBuffer in) throws Exception {
        for (;;) {
            int size = in.size();
            if (size == 0) {
                clearOpWrite(); // 全部數據已寫完,再也不關心OP_WRITE事件
                break;
            }
            long writtenBytes = 0;
            boolean done = false;
            boolean setOpWrite = false;

            ByteBuffer[] nioBuffers = in.nioBuffers();
            int nioBufferCnt = in.nioBufferCount();
            long expectedWrittenBytes = in.nioBufferSize();
            SocketChannel ch = javaChannel();

            switch (nioBufferCnt) {
                case 0: // 沒有ByteBuffer,也就是隻有FileRegion
                    super.doWrite(in);  // 使用父類方法進行普通處理
                    return;
                case 1: // 只有一個ByteBuffer,此時的處理等效於父類方法的處理
                    ByteBuffer nioBuffer = nioBuffers[0];
                    for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) {
                        final int localWrittenBytes = ch.write(nioBuffer);
                        if (localWrittenBytes == 0) {
                            setOpWrite = true;
                            break;
                        }
                        expectedWrittenBytes -= localWrittenBytes;
                        writtenBytes += localWrittenBytes;
                        if (expectedWrittenBytes == 0) {
                            done = true;
                            break;
                        }
                    }
                    break;
                default: // 多個ByteBuffer,採用gathering方法處理
                    for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) {
                        // gathering方法,此時一次寫多個ByteBuffer
                        final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt);
                        if (localWrittenBytes == 0) {
                            setOpWrite = true;
                            break;
                        }
                        expectedWrittenBytes -= localWrittenBytes;
                        writtenBytes += localWrittenBytes;
                        if (expectedWrittenBytes == 0) {
                            done = true;
                            break;
                        }
                    }
                    break;
            }
            in.removeBytes(writtenBytes);   // 清理緩衝區
            if (!done) {
                incompleteWrite(setOpWrite);    // 寫操做並無完成
                break;
            }
        }
    }

 

在明白了父類的doWrite方法後,這段代碼便容易理解,本段代碼作的優化是:當輸出緩衝區中有多個buffer時,採用Gathering Writes將數據從這些buffer寫入到同一個channel。
在AbstractUnsafe對close事件框架的分析中,有一個prepareToClose()方法,進行關閉的必要處理並在必要時返回一個Executor執行doClose()操做,默認方法返回null,NioSocketChannelUnsafe覆蓋了父類的實現,代碼以下:

    protected Executor prepareToClose() {
            try {
                if (javaChannel().isOpen() && config().getSoLinger() > 0) {
                    doDeregister(); // 取消選擇鍵selectionKey
                    return GlobalEventExecutor.INSTANCE;
                }
            } catch (Throwable ignore) {
                //
            }
            return null;
        }

 

SO_LINGER表示Socket關閉的延時時間,在此時間內,內核將繼續把TCP緩衝區的數據發送給對端且執行close操做的線程將阻塞直到數據發送完成。Netty的原則是I/O線程不能被阻塞,因此此時返回一個Executor用於執行阻塞的doClose()操做。doDeregister()取消選擇鍵selectionKey是由於:延遲關閉期間, 若是selectionKey仍然關心OP_WRITE事件,而輸出緩衝區又爲null,這樣write操做直接返回,不會再執行clearOpWrite()操做取消關心OP_WRITE事件,而Channel通常是可寫的,這樣OP_WRITE事件會不斷就緒從而耗盡CPU,因此須要取消選擇鍵刪除註冊的事件。
[1]: //upload-images.jianshu.io/upload_images/3288959-5a4be2f31620177d.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240
[2]: http://img.blog.csdn.net/20160928165809260
[3]: https://github.com/netty/netty/issues/4435

 


做者:Hypercube
連接:https://www.jianshu.com/p/9258af254e1d
來源:簡書
簡書著做權歸做者全部,任何形式的轉載都請聯繫做者得到受權並註明出處。
做者:Hypercube 連接:https://www.jianshu.com/p/fffc18d33159 來源:簡書 簡書著做權歸做者全部,任何形式的轉載都請聯繫做者得到受權並註明出處。
相關文章
相關標籤/搜索