目錄:java
每一個 ChannelSocket 的 Unsafe 都有一個綁定的 ChannelOutboundBuffer , Netty 向站外輸出數據的過程統一經過 ChannelOutboundBuffer 類進行封裝,目的是爲了提升網絡的吞吐量,在外面調用 write 的時候,數據並無寫到 Socket,而是寫到了 ChannelOutboundBuffer 這裏,當調用 flush 的時候,才真正的向 Socket 寫出。同時,本文也關注當緩衝區滿了的時候,Netty 如何處理。數組
官方文檔這麼介紹的:promise
(Transport implementors only) an internal data structure used by AbstractChannel to store its pending outbound write requests.
All methods must be called by a transport implementation from an I/O thread。
意思是,這個一個數據傳輸的實現者,一個內部的數據結構用於存儲等待的出站寫請求。全部的方法都必有由 IO 線程來調用。緩存
既然該類有一個內部的數據結構,咱們就看看他的數據結構的樣子,有如下幾個屬性:網絡
private Entry flushedEntry; // 即將被消費的開始節點 private Entry unflushedEntry;// 被添加的開始節點,但沒有準備好被消費。 private Entry tailEntry;// 最後一個節點
從上面的屬性能夠看出,這他麼就是個鏈表。不過,這個鏈表有2個頭,在調用 addFlush 方法的時候會將 unflushedEntry 賦值給 flushedEntry。表示即將從這裏開始刷新。具體以下圖:數據結構
調用 addMessage 方法的時候,建立一個 Entry ,將這個 Entry 追加到 TailEntry 節點後面,調用 addFlush 的時候,將 unflushedEntry 的引用賦給 flushedEntry,而後將 unflushedEntry 置爲 null。oop
當數據被寫進 Socket 後,從 flushedEntry(current) 節點開始,循環將每一個節點刪除。學習
關於這 3 個方法,咱們後面詳細解釋。this
該方法 doc 文檔:操作系統
Add given message to this ChannelOutboundBuffer. The given ChannelPromise will be notified once the message was written.
將給定的消息添加到 ChannelOutboundBuffer,一旦消息被寫入,就會通知 promise。
代碼以下:
public void addMessage(Object msg, int size, ChannelPromise promise) { Entry entry = Entry.newInstance(msg, size, total(msg), promise); if (tailEntry == null) { flushedEntry = null; tailEntry = entry; } else { Entry tail = tailEntry; tail.next = entry; tailEntry = entry; } if (unflushedEntry == null) { unflushedEntry = entry; } incrementPendingOutboundBytes(entry.pendingSize, false); }
說說方法步驟:
主要這個 Entry 節點的建立有點意思:
Netty 將在 ThreadLocalMap 中存儲了一個 Stack (棧)對象,存儲重複使用的 DefaultHandle 實例,該實例的 value 屬性就是 Entry ,因此這個 Entry 也是重複使用的,每次用完全部參數置爲 null,再返回到棧中,下次再用,從這個棧中彈出。重複利用。對象池的最佳實踐。並且是保存再線程中,速度更快,不會有線程競爭。這個設計卻是能夠學習如下。
看完了 addMessage ,再看看 addFlush 方法。
當 addMessage 成功添加進 ChannelOutboundBuffer 後,就須要 flush 刷新到 Socket 中去。可是這個方法並非作刷新到 Socket 的操做。而是將 unflushedEntry 的引用轉移到 flushedEntry 引用中,表示即將刷新這個 flushedEntry,至於爲何這麼作?
答:由於 Netty 提供了 promise,這個對象能夠作取消操做,例如,不發送這個 ByteBuf 了,因此,在 write 以後,flush 以前須要告訴 promise 不能作取消操做了。
代碼以下:
public void addFlush() { Entry entry = unflushedEntry; if (entry != null) { if (flushedEntry == null) { flushedEntry = entry; } do { flushed ++; if (!entry.promise.setUncancellable()) { int pending = entry.cancel(); decrementPendingOutboundBytes(pending, false, true); } entry = entry.next; } while (entry != null); unflushedEntry = null; } }
結合上面的圖:
設置以後,promise 調用 cancel 方法就會返回 false。
在調用完 outboundBuffer.addFlush() 方法後,Channel 會調用 flush0 方法作真正的刷新。
flush0 的核心是調用 dowrite 方法並傳入 outboundBuffer。
每種類型的 Channel 都實現都不同。咱們看的是 NioSocketChannel 的實現,方法很長,樓主截取重要邏輯:
// 拿到NIO Socket SocketChannel ch = javaChannel(); // 獲取自旋的次數,默認16 int writeSpinCount = config().getWriteSpinCount(); // 獲取設置的每一個 ByteBuf 的最大字節數,這個數字來自操做系統的 so_sndbuf 定義 int maxBytesPerGatheringWrite = ((NioSocketChannelConfig) config).getMaxBytesPerGatheringWrite(); // 調用 ChannelOutboundBuffer 的 nioBuffers 方法獲取 ByteBuffer 數組,從flushedEntry開始,循環獲取 ByteBuffer[] nioBuffers = in.nioBuffers(1024, maxBytesPerGatheringWrite); // ByteBuffer 的數量 int nioBufferCnt = in.nioBufferCount(); // 使用 NIO 寫入 Socket ch.write(buffer); // 調整最大字節數 adjustMaxBytesPerGatheringWrite(attemptedBytes, localWrittenBytes, maxBytesPerGatheringWrite); // 刪除 ChannelOutboundBuffer 中的 Entry in.removeBytes(localWrittenBytes); // 自旋減一,直到自旋小於0中止循環,固然若是 ChannelOutboundBuffer 空了,也會中止。 --writeSpinCount; // 若是自旋16次尚未完成 flush,則建立一個任務放進mpsc 隊列中執行。 incompleteWrite(writeSpinCount < 0);
上面的註釋基本就是 flush 的邏輯。
protected final void flush0() { if (!isFlushPending()) { super.flush0(); } } private boolean isFlushPending() { SelectionKey selectionKey = selectionKey(); return selectionKey.isValid() && (selectionKey.interestOps() & SelectionKey.OP_WRITE) != 0; }
這裏的判斷是:若是註冊了寫事件,就暫時不寫了,由於緩衝區到了水位線了,因此此次直接返回,等會再寫。等到 EventLoop 觸發寫事件了,就會調用 ch.unsafe().forceFlush()
方法將數據刷新到 TCP 緩衝區。
這裏有一個小知識點:
NIO 的寫事件大部分時候是不須要註冊的,只有當 TCP 緩衝區達到水位線了,不能寫入了,才須要註冊寫事件。當緩衝區有空間了,NIO 就會觸發寫事件。
從上面的邏輯上來看,不直到你們有沒有發現一個問題:若是對方 Socket 接收很慢,ChannelOutboundBuffer 就會積累不少的數據。而且這個 ChannelOutboundBuffer 是沒有大小限制的鏈表。可能會致使 OOM,Netty 已經考慮了這個問題,在 addMessage 方法的最後一行,incrementPendingOutboundBytes方法,會判斷 totalPendingSize 的大小是否超過了高水位閾值(默認64 kb),若是超過,關閉寫開關,調用 piepeline 的 fireChannelWritabilityChanged 方法可改變 flush 策略。
關於 channelWritabilityChanged API,Netty 這樣解釋:
當 Channel 的可寫狀態發生改變時被調用。用戶能夠確保寫操做不會完成的太快(以免發生 OOM)或者能夠在 Channel 變爲再次可寫時恢復寫入。能夠經過調用 Channel 的 isWritable 方法來檢測 Channel 的可寫性。與可寫性相關的閾值能夠經過 Channel.config().setWriteBufferHighWaterMark 和 Channel.config().setWriteBufferLowWaterMark 方法來設置,默認最小 32 kb,最大 64 kb。
那麼,上面時候恢復可寫狀態呢?remove 的時候,或者 addFlush 是丟棄了某個節點,會對 totalPendingSize 進行削減,削減以後進行判斷。若是 totalPendingSize 小於最低水位了。就恢復寫入。
也就是說,默認的狀況下,ChannelOutboundBuffer 緩存區的大小最大是 64 kb,最小是 32 kb,哪裏看出來的呢?
固然了,能夠在 option 選項中進行修改,API 文檔也說過了。
當不能寫的時候,就會調用 ChannelWritabilityChanged 方法,用戶能夠在代碼中,讓寫操做進行的慢一點。
到了總結的時刻。
Netty 的 write 的操做不會當即寫入,而是存儲在了 ChannelOutboundBuffer 緩衝區裏,這個緩衝區內部是 Entry 節點組成的鏈表結構,經過 addMessage 方法添加進鏈表,經過 addFlush 方法表示能夠開始寫入了,最後經過 SocketChannel 的 flush0 方法真正的寫入到 JDK 的 Socket 中。同時須要注意若是 TCP 緩衝區到達一個水位線了,不能寫入 TCP 緩衝區了,就須要晚點寫入,這裏的方法判斷是 isFlushPending()。
其中,有一個須要注意的點就是,若是對方接收數據較慢,可能致使緩衝區存在大量的數據沒法釋放,致使OOM,Netty 經過一個 isWritable 開關嘗試解決此問題,但用戶須要重寫 ChannelWritabilityChanged 方法,由於一旦超過默認的高水位閾值,Netty 就會調用 ChannelWritabilityChanged 方法,執行完畢後,繼續進行 flush。用戶能夠在該方法中嘗試慢一點的操做。等到緩衝區的數據小於低水位的值時,開關就關閉了,就不會調用 ChannelWritabilityChanged 方法。所以,合理設置這兩個數值也挺重要的。
好,限於篇幅,關於 ChannelOutboundBuffer 的分析就到這裏,今天說的這幾個方法算是這個類的主要方法,由於 Netty 的寫操做都是圍繞這三個方法來的。
good luck!!!!!