Netty 出站緩衝區 ChannelOutboundBuffer 源碼解析(isWritable 屬性的重要性)

目錄:java

前言

  1. ChannelOutboundBuffer 介紹
  2. addMessage 方法
  3. addFlush 方法
  4. flush0 方法
  5. 緩衝區擴展思考
  6. 總結

每一個 ChannelSocket 的 Unsafe 都有一個綁定的 ChannelOutboundBuffer , Netty 向站外輸出數據的過程統一經過 ChannelOutboundBuffer 類進行封裝,目的是爲了提升網絡的吞吐量,在外面調用 write 的時候,數據並無寫到 Socket,而是寫到了 ChannelOutboundBuffer 這裏,當調用 flush 的時候,才真正的向 Socket 寫出。同時,本文也關注當緩衝區滿了的時候,Netty 如何處理。數組

1. ChannelOutboundBuffer 介紹

官方文檔這麼介紹的:promise

(Transport implementors only) an internal data structure used by AbstractChannel to store its pending outbound write requests.
All methods must be called by a transport implementation from an I/O thread。
意思是,這個一個數據傳輸的實現者,一個內部的數據結構用於存儲等待的出站寫請求。全部的方法都必有由 IO 線程來調用。緩存

既然該類有一個內部的數據結構,咱們就看看他的數據結構的樣子,有如下幾個屬性:網絡

private Entry flushedEntry; // 即將被消費的開始節點
private Entry unflushedEntry;// 被添加的開始節點,但沒有準備好被消費。
private Entry tailEntry;// 最後一個節點

從上面的屬性能夠看出,這他麼就是個鏈表。不過,這個鏈表有2個頭,在調用 addFlush 方法的時候會將 unflushedEntry 賦值給 flushedEntry。表示即將從這裏開始刷新。具體以下圖:數據結構

調用 addMessage 方法的時候,建立一個 Entry ,將這個 Entry 追加到 TailEntry 節點後面,調用 addFlush 的時候,將 unflushedEntry 的引用賦給 flushedEntry,而後將 unflushedEntry 置爲 null。oop

當數據被寫進 Socket 後,從 flushedEntry(current) 節點開始,循環將每一個節點刪除。學習

關於這 3 個方法,咱們後面詳細解釋。this

2. addMessage 方法

該方法 doc 文檔:操作系統

Add given message to this ChannelOutboundBuffer. The given ChannelPromise will be notified once the message was written.
將給定的消息添加到 ChannelOutboundBuffer,一旦消息被寫入,就會通知 promise。

代碼以下:

public void addMessage(Object msg, int size, ChannelPromise promise) {
    Entry entry = Entry.newInstance(msg, size, total(msg), promise);
    if (tailEntry == null) {
        flushedEntry = null;
        tailEntry = entry;
    } else {
        Entry tail = tailEntry;
        tail.next = entry;
        tailEntry = entry;
    }
    if (unflushedEntry == null) {
        unflushedEntry = entry;
    }
    incrementPendingOutboundBytes(entry.pendingSize, false);
}

說說方法步驟:

  1. 根據 ByteBuf 相互屬性和 promise 建立一個 Entry 節點。
  2. 將新的節點追加到 tailEntry 節點上。若是考慮以前的所有被清空了話,則新節點就是惟一節點,unflushedEntry 屬性就是新的節點。可對照上面的圖來看。
  3. 使用 CAS 將 totalPendingSize(總的數據大小) 屬性增長 Entry 實例的大小(96 字節) + 真實數據的大小。

主要這個 Entry 節點的建立有點意思:

Netty 將在 ThreadLocalMap 中存儲了一個 Stack (棧)對象,存儲重複使用的 DefaultHandle 實例,該實例的 value 屬性就是 Entry ,因此這個 Entry 也是重複使用的,每次用完全部參數置爲 null,再返回到棧中,下次再用,從這個棧中彈出。重複利用。對象池的最佳實踐。並且是保存再線程中,速度更快,不會有線程競爭。這個設計卻是能夠學習如下。

看完了 addMessage ,再看看 addFlush 方法。

3. addFlush 方法

當 addMessage 成功添加進 ChannelOutboundBuffer 後,就須要 flush 刷新到 Socket 中去。可是這個方法並非作刷新到 Socket 的操做。而是將 unflushedEntry 的引用轉移到 flushedEntry 引用中,表示即將刷新這個 flushedEntry,至於爲何這麼作?
答:由於 Netty 提供了 promise,這個對象能夠作取消操做,例如,不發送這個 ByteBuf 了,因此,在 write 以後,flush 以前須要告訴 promise 不能作取消操做了。

代碼以下:

public void addFlush() {
    Entry entry = unflushedEntry;
    if (entry != null) {
        if (flushedEntry == null) {
            flushedEntry = entry;
        }
        do {
            flushed ++;
            if (!entry.promise.setUncancellable()) {
                int pending = entry.cancel();
                decrementPendingOutboundBytes(pending, false, true);
            }
            entry = entry.next;
        } while (entry != null);
        unflushedEntry = null;
    }
}

結合上面的圖:

  1. 首先拿到未刷新的頭節點。
  2. 判 null 以後,將這個 unflushedEntry 賦值給 flushedEntry,而這裏的判 null 是作什麼呢?防止屢次調用 flush 。
  3. 循環嘗試設置這些節點,告訴他們不能作取消操做了,若是嘗試失敗了,就將這個節點取消,在調用 nioBuffers 方法的時候,這個節點會被忽略。同時將 totalPendingSize 相應的減少。

設置以後,promise 調用 cancel 方法就會返回 false。

在調用完 outboundBuffer.addFlush() 方法後,Channel 會調用 flush0 方法作真正的刷新。

4. flush0 方法

flush0 的核心是調用 dowrite 方法並傳入 outboundBuffer。

每種類型的 Channel 都實現都不同。咱們看的是 NioSocketChannel 的實現,方法很長,樓主截取重要邏輯:

// 拿到NIO Socket
SocketChannel ch = javaChannel();
// 獲取自旋的次數,默認16
int writeSpinCount = config().getWriteSpinCount();
// 獲取設置的每一個 ByteBuf 的最大字節數,這個數字來自操做系統的 so_sndbuf 定義
int maxBytesPerGatheringWrite = ((NioSocketChannelConfig) config).getMaxBytesPerGatheringWrite();
// 調用 ChannelOutboundBuffer 的 nioBuffers 方法獲取 ByteBuffer 數組,從flushedEntry開始,循環獲取
ByteBuffer[] nioBuffers = in.nioBuffers(1024, maxBytesPerGatheringWrite);
// ByteBuffer 的數量
int nioBufferCnt = in.nioBufferCount();
// 使用 NIO 寫入 Socket 
ch.write(buffer);
// 調整最大字節數
adjustMaxBytesPerGatheringWrite(attemptedBytes, localWrittenBytes, maxBytesPerGatheringWrite);
// 刪除 ChannelOutboundBuffer 中的 Entry
in.removeBytes(localWrittenBytes);
// 自旋減一,直到自旋小於0中止循環,固然若是 ChannelOutboundBuffer 空了,也會中止。
--writeSpinCount;
// 若是自旋16次尚未完成 flush,則建立一個任務放進mpsc 隊列中執行。
incompleteWrite(writeSpinCount < 0);

上面的註釋基本就是 flush 的邏輯。

  • 固然 flush0 方法在 NIO 的具體實現中,還加入了對註冊事件的判斷:
protected final void flush0() {
    if (!isFlushPending()) {
        super.flush0();
    }
}


private boolean isFlushPending() {
    SelectionKey selectionKey = selectionKey();
    return selectionKey.isValid() && (selectionKey.interestOps() & SelectionKey.OP_WRITE) != 0;
}

這裏的判斷是:若是註冊了寫事件,就暫時不寫了,由於緩衝區到了水位線了,因此此次直接返回,等會再寫。等到 EventLoop 觸發寫事件了,就會調用 ch.unsafe().forceFlush() 方法將數據刷新到 TCP 緩衝區。

  • 這裏有一個小知識點:

    NIO 的寫事件大部分時候是不須要註冊的,只有當 TCP 緩衝區達到水位線了,不能寫入了,才須要註冊寫事件。當緩衝區有空間了,NIO 就會觸發寫事件。

5. 緩衝區擴展思考

從上面的邏輯上來看,不直到你們有沒有發現一個問題:若是對方 Socket 接收很慢,ChannelOutboundBuffer 就會積累不少的數據。而且這個 ChannelOutboundBuffer 是沒有大小限制的鏈表。可能會致使 OOM,Netty 已經考慮了這個問題,在 addMessage 方法的最後一行,incrementPendingOutboundBytes方法,會判斷 totalPendingSize 的大小是否超過了高水位閾值(默認64 kb),若是超過,關閉寫開關,調用 piepeline 的 fireChannelWritabilityChanged 方法可改變 flush 策略。

關於 channelWritabilityChanged API,Netty 這樣解釋:

當 Channel 的可寫狀態發生改變時被調用。用戶能夠確保寫操做不會完成的太快(以免發生 OOM)或者能夠在 Channel 變爲再次可寫時恢復寫入。能夠經過調用 Channel 的 isWritable 方法來檢測 Channel 的可寫性。與可寫性相關的閾值能夠經過 Channel.config().setWriteBufferHighWaterMark 和 Channel.config().setWriteBufferLowWaterMark 方法來設置,默認最小 32 kb,最大 64 kb。

那麼,上面時候恢復可寫狀態呢?remove 的時候,或者 addFlush 是丟棄了某個節點,會對 totalPendingSize 進行削減,削減以後進行判斷。若是 totalPendingSize 小於最低水位了。就恢復寫入。

也就是說,默認的狀況下,ChannelOutboundBuffer 緩存區的大小最大是 64 kb,最小是 32 kb,哪裏看出來的呢?

固然了,能夠在 option 選項中進行修改,API 文檔也說過了。

當不能寫的時候,就會調用 ChannelWritabilityChanged 方法,用戶能夠在代碼中,讓寫操做進行的慢一點。

6. 總結

到了總結的時刻。

Netty 的 write 的操做不會當即寫入,而是存儲在了 ChannelOutboundBuffer 緩衝區裏,這個緩衝區內部是 Entry 節點組成的鏈表結構,經過 addMessage 方法添加進鏈表,經過 addFlush 方法表示能夠開始寫入了,最後經過 SocketChannel 的 flush0 方法真正的寫入到 JDK 的 Socket 中。同時須要注意若是 TCP 緩衝區到達一個水位線了,不能寫入 TCP 緩衝區了,就須要晚點寫入,這裏的方法判斷是 isFlushPending()。

其中,有一個須要注意的點就是,若是對方接收數據較慢,可能致使緩衝區存在大量的數據沒法釋放,致使OOM,Netty 經過一個 isWritable 開關嘗試解決此問題,但用戶須要重寫 ChannelWritabilityChanged 方法,由於一旦超過默認的高水位閾值,Netty 就會調用 ChannelWritabilityChanged 方法,執行完畢後,繼續進行 flush。用戶能夠在該方法中嘗試慢一點的操做。等到緩衝區的數據小於低水位的值時,開關就關閉了,就不會調用 ChannelWritabilityChanged 方法。所以,合理設置這兩個數值也挺重要的。

好,限於篇幅,關於 ChannelOutboundBuffer 的分析就到這裏,今天說的這幾個方法算是這個類的主要方法,由於 Netty 的寫操做都是圍繞這三個方法來的。

good luck!!!!!

相關文章
相關標籤/搜索