ChannelOutboundBuffer是Netty發送緩存,當Netty調用write時數據不會真正的去發送而是寫入到ChannelOutboundBuffer緩存隊列,直到調用flush方法Netty纔會從ChannelOutboundBuffer取數據發送。每一個Unsafe都會綁定一個ChannelOutboundBuffer,也就是說每一個客戶端鏈接上服務端都會建立一個ChannelOutboundBuffer綁定客戶端Channel。Netty設計ChannelOutboundBuffer是爲了減小TCP緩存的壓力提升系統的吞吐率。java
先來看下ChannelOutboundBuffer的4個重要字段數組
private Entry flushedEntry; 待發送數據起始節點 private Entry unflushedEntry;暫存數據起始節點 private Entry tailEntry;尾節點 private int flushed;待發送數據個數
Entry(flushedEntry) -->Entry--> ... Entry--> Entry(unflushedEntry) -->Entry ... Entry--> Entry(tailEntry)
flushedEntry(包括)到unflushedEntry之間的就是待發送數據,unflushedEntry(包括)到tailEntry就是暫存數據,flushed就是待發送數據個數。promise
正常狀況下待發送數據發送完成後會flushedEntry指向unflushedEntry位置,並將unflushedEntry指空變成以下狀況:緩存
Entry(flushedEntry) -->Entry ... Entry--> Entry(tailEntry)
可是若是出現TCP緩存滿的致使的半包狀況,flushedEntry不會向後移動或移動發送成功的個數個位置,例如發送成功了一個數據,就會向前移動一個位置,出現以下狀況:源碼分析
Entry(flushedEntry) -->... Entry--> Entry(unflushedEntry) -->Entry ... Entry--> Entry(tailEntry)
下面介紹ChannelOutboundBuffer中幾個主要的方法this
addMessage方法,功能是添加數據到隊列的隊尾。線程
addFlush方法,準備待發送的數據,在flush前須要調用。設計
nioBuffers方法,獲取待發送數據,發送數據的時候須要調用拿數據。指針
removeBytes方法,發送完成後須要調用刪除已經寫入TCP緩存成功的數據。code
下面對幾個方法源碼進行分析
addMessage方法是在系統調用write方法的時候調用
public void addMessage(Object msg, int size, ChannelPromise promise) { //將消息數據包裝成Entry對象 Entry entry = Entry.newInstance(msg, size, total(msg), promise); //隊列爲空的狀況 if (tailEntry == null) { flushedEntry = null; tailEntry = entry; //非空狀況,將新節點放尾部 } else { Entry tail = tailEntry; tail.next = entry; tailEntry = entry; } //若是unflushedEntry爲空,設置暫時還不須要數據起始節點 if (unflushedEntry == null) { unflushedEntry = entry; } // 增長待發送的總字節數 incrementPendingOutboundBytes(size, false); }
流程以下
第1步消息數據包裝成Entry對象,內部實現不是直接建立一個新的Entry,而是對已經回收的Entry的重複利用,來看下代碼:
static Entry newInstance(Object msg, int size, long total, ChannelPromise promise) { Entry entry = RECYCLER.get(); entry.msg = msg; entry.pendingSize = size; entry.total = total; entry.promise = promise; return entry; } public final T get() { if (maxCapacity == 0) { return newObject(NOOP_HANDLE); } Stack<T> stack = threadLocal.get(); DefaultHandle handle = stack.pop(); if (handle == null) { handle = stack.newHandle(); handle.value = newObject(handle); } return (T) handle.value; }
看下RECYCLER.get()實現,若是maxCapacity配置成0就直接建立一個新的Entry,默認maxCapacity默認是256,因此默認狀況下會用ThreadLocalMap獲取一個stack,stack裏存的都是原先回收的handle,stack線回到pop一個被回收的handle,若是stack爲空則建立一個新的handle,而後返回handle.value即Entry對象。RECYCLER.get()獲取到entry後會對entry從新賦值。
addFlush方法是在系統調用flush方法的時候調用
public void addFlush() { //獲取暫存數據 Entry entry = unflushedEntry; //暫存數據不爲空,說明還有數據能夠發送 if (entry != null) { //將待發送數據起始指針flushedEntry指向暫存起始節點 if (flushedEntry == null) { // there is no flushedEntry yet, so start with the entry flushedEntry = entry; } do { //增長髮送節點個數 flushed ++; //鎖定當前發送節點,防止其取消 if (!entry.promise.setUncancellable()) { //若是鎖定失敗,關閉節點,獲取節點時會自動過濾 int pending = entry.cancel(); // 減小待發送的總字節數跟incrementPendingOutboundBytes方法想對應 decrementPendingOutboundBytes(pending, false, true); } //獲取下個節點 entry = entry.next; } while (entry != null); //清空unflushedEntry指針 unflushedEntry = null; } }
以上方法的主要功能就是將暫存數據節點變成待發送節點,從上文內容知道須要發送的數據,是flushedEntry指向的節點到unflushedEntry指向的節點(不包含unflushedEntry)的之間的節點數據,因此下次發送要將flushedEntry指向unflushedEntry指向的節點做爲發送數據的起始節點。
結合代碼:
nioBuffers方法是在系統調用addFlush方法完成後調用
public ByteBuffer[] nioBuffers() { long nioBufferSize = 0; int nioBufferCount = 0; final InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get(); //獲取原生ByteBuffer數組,這裏的ByteBuffer是相同線程共享的 ByteBuffer[] nioBuffers = NIO_BUFFERS.get(threadLocalMap); //獲取待發送數據起始節點 Entry entry = flushedEntry; //循環取數據,isFlushedEntry是判斷待發送數據節點 while (isFlushedEntry(entry) && entry.msg instanceof ByteBuf) { //若是節點被關閉則忽略次節點 if (!entry.cancelled) { //獲取節點裏的ByteBuf ByteBuf buf = (ByteBuf) entry.msg; final int readerIndex = buf.readerIndex(); //獲取可發送ByteBuf總字節數 final int readableBytes = buf.writerIndex() - readerIndex; //可發送字節大於0繼續不然跳過 if (readableBytes > 0) { //每次累計的發送字節數,不能大於Integer.MAX_VALUE if (Integer.MAX_VALUE - readableBytes < nioBufferSize) { break; } //累計的發送字節數 nioBufferSize += readableBytes; //獲取entry中ByteBuffer的個數 int count = entry.count; if (count == -1) { entry.count = count = buf.nioBufferCount(); } int neededSpace = nioBufferCount + count; //nioBuffers數組沒法知足存放個數需求擴容處理 if (neededSpace > nioBuffers.length) { nioBuffers = expandNioBufferArray(nioBuffers, neededSpace, nioBufferCount); NIO_BUFFERS.set(threadLocalMap, nioBuffers); } //若是隻有1個直接獲取ByteBuffer放入nioBuffers數組中 if (count == 1) { ByteBuffer nioBuf = entry.buf; if (nioBuf == null) { entry.buf = nioBuf = buf.internalNioBuffer(readerIndex, readableBytes); } nioBuffers[nioBufferCount ++] = nioBuf; ///若是有多個循環獲取ByteBuffer放入nioBuffers數組中 } else { ByteBuffer[] nioBufs = entry.bufs; if (nioBufs == null) { entry.bufs = nioBufs = buf.nioBuffers(); } nioBufferCount = fillBufferArray(nioBufs, nioBuffers, nioBufferCount); } } } entry = entry.next; } this.nioBufferCount = nioBufferCount; this.nioBufferSize = nioBufferSize; return nioBuffers; }
以上方法的主要功能就是獲取須要發送數據並轉成原生的ByteBuffer數組類型,ByteBuffer數組這裏是相同線程共享的,也就是說一個客戶端跟服務端通信會使用相同的ByteBuffer數組來發送數據,這樣減小了空間建立和銷燬時間消耗。
結合代碼:
第1步的相同線程數據共享的實現原理是一種類ThreadLocal的實現,原生的ThreadLocal裏是使用ThreadLocalMap來存儲數據,而Netty設計了一種讀取更快的InternalThreadLocalMap來存數據,ThreadLocalMap裏存儲數據是用線性探測法解決衝突,致使的結果就是一次hash不必定找到數據。而InternalThreadLocalMap裏數據存儲的位置是固定不變的,因此一次就能獲取數據,然而致使的結果就是部分空間的浪費,很明顯,這是一種空間換時間的作法。
removeBytes方法是在系統調用nioBuffers方法並完成發送後調用
public void removeBytes(long writtenBytes) { for (;;) { //獲取flushedEntry指向的節點數據 Object msg = current(); if (!(msg instanceof ByteBuf)) { assert writtenBytes == 0; break; } final ByteBuf buf = (ByteBuf) msg; //獲取讀取的起始位置 final int readerIndex = buf.readerIndex(); //計算整個節點的數據字節長度 final int readableBytes = buf.writerIndex() - readerIndex; //若是整個節點的數據字節長度比發送成功的總字節長度小,刪除整個節點 if (readableBytes <= writtenBytes) { if (writtenBytes != 0) { progress(readableBytes); writtenBytes -= readableBytes; } remove(); //不然縮小當前節點的可發送字節長度 } else { // readableBytes > writtenBytes if (writtenBytes != 0) { buf.readerIndex(readerIndex + (int) writtenBytes); progress(writtenBytes); } break; } } //清理ByteBuffer數組 clearNioBuffers(); }
以上方法的主要功能就是移除已經發送成功的數據,移除的數據是從flushedEntry指向的節點開始遍歷鏈表移除,移除數據分2種狀況:
結合代碼:
ChannelOutboundBuffer是沒有容量限制的,在極端狀況下若是ChannelOutboundBuffer消耗比較慢而ChannelOutboundBuffer寫入過大會致使OOM,Netty在處理裏提供了ChannelWritabilityChanged方法,此方法會在ChannelOutboundBuffer的容量超過最高限額或者小於最低限額會被調用,用戶能夠實現次方法來監控容量的報警,來解決容量過大問題。