Netty源碼分析之ChannelOutboundBuffer

ChannelOutboundBuffer介紹

ChannelOutboundBuffer是Netty發送緩存,當Netty調用write時數據不會真正的去發送而是寫入到ChannelOutboundBuffer緩存隊列,直到調用flush方法Netty纔會從ChannelOutboundBuffer取數據發送。每一個Unsafe都會綁定一個ChannelOutboundBuffer,也就是說每一個客戶端鏈接上服務端都會建立一個ChannelOutboundBuffer綁定客戶端Channel。Netty設計ChannelOutboundBuffer是爲了減小TCP緩存的壓力提升系統的吞吐率。java

ChannelOutboundBuffer設計

先來看下ChannelOutboundBuffer的4個重要字段數組

private Entry flushedEntry; 待發送數據起始節點
private Entry unflushedEntry;暫存數據起始節點
private Entry tailEntry;尾節點
private int flushed;待發送數據個數
Entry(flushedEntry) -->Entry--> ... Entry--> Entry(unflushedEntry) -->Entry ... Entry--> Entry(tailEntry)

flushedEntry(包括)到unflushedEntry之間的就是待發送數據,unflushedEntry(包括)到tailEntry就是暫存數據,flushed就是待發送數據個數。promise

正常狀況下待發送數據發送完成後會flushedEntry指向unflushedEntry位置,並將unflushedEntry指空變成以下狀況:緩存

Entry(flushedEntry) -->Entry ... Entry--> Entry(tailEntry)

可是若是出現TCP緩存滿的致使的半包狀況,flushedEntry不會向後移動或移動發送成功的個數個位置,例如發送成功了一個數據,就會向前移動一個位置,出現以下狀況:源碼分析

Entry(flushedEntry) -->... Entry--> Entry(unflushedEntry) -->Entry ... Entry--> Entry(tailEntry)

下面介紹ChannelOutboundBuffer中幾個主要的方法this

  • addMessage方法,功能是添加數據到隊列的隊尾。線程

  • addFlush方法,準備待發送的數據,在flush前須要調用。設計

  • nioBuffers方法,獲取待發送數據,發送數據的時候須要調用拿數據。指針

  • removeBytes方法,發送完成後須要調用刪除已經寫入TCP緩存成功的數據。code

下面對幾個方法源碼進行分析

addMessage方法源碼分析

addMessage方法是在系統調用write方法的時候調用

public void addMessage(Object msg, int size, ChannelPromise promise) {
        //將消息數據包裝成Entry對象
        Entry entry = Entry.newInstance(msg, size, total(msg), promise);
        //隊列爲空的狀況
        if (tailEntry == null) {
            flushedEntry = null;
            tailEntry = entry;
        //非空狀況,將新節點放尾部
        } else {
            Entry tail = tailEntry;
            tail.next = entry;
            tailEntry = entry;
        }
        //若是unflushedEntry爲空,設置暫時還不須要數據起始節點
        if (unflushedEntry == null) {
            unflushedEntry = entry;
        }

        // 增長待發送的總字節數
        incrementPendingOutboundBytes(size, false);
    }

流程以下

  • 1.將消息數據包裝成Entry對象。
  • 2.若是隊列爲空,直接設置尾節點爲當前節點,不然將新節點放尾部。
  • 3.unflushedEntry爲空說明不存在暫時不須要發送的節點,當前節點就是第一個暫時不須要發送的節點。
  • 4.CAS方式增長未發送節點字節數。

第1步消息數據包裝成Entry對象,內部實現不是直接建立一個新的Entry,而是對已經回收的Entry的重複利用,來看下代碼:

static Entry newInstance(Object msg, int size, long total, ChannelPromise promise) {
        Entry entry = RECYCLER.get();
        entry.msg = msg;
        entry.pendingSize = size;
        entry.total = total;
        entry.promise = promise;
        return entry;
    }
    
        public final T get() {
        if (maxCapacity == 0) {
            return newObject(NOOP_HANDLE);
        }
        Stack<T> stack = threadLocal.get();
        DefaultHandle handle = stack.pop();
        if (handle == null) {
            handle = stack.newHandle();
            handle.value = newObject(handle);
        }
        return (T) handle.value;
    }

看下RECYCLER.get()實現,若是maxCapacity配置成0就直接建立一個新的Entry,默認maxCapacity默認是256,因此默認狀況下會用ThreadLocalMap獲取一個stack,stack裏存的都是原先回收的handle,stack線回到pop一個被回收的handle,若是stack爲空則建立一個新的handle,而後返回handle.value即Entry對象。RECYCLER.get()獲取到entry後會對entry從新賦值。

addFlush方法源碼分析

addFlush方法是在系統調用flush方法的時候調用

public void addFlush() {
        //獲取暫存數據
        Entry entry = unflushedEntry;
        //暫存數據不爲空,說明還有數據能夠發送
        if (entry != null) {
            //將待發送數據起始指針flushedEntry指向暫存起始節點
            if (flushedEntry == null) {
                // there is no flushedEntry yet, so start with the entry
                flushedEntry = entry;
            }
            do {
                //增長髮送節點個數
                flushed ++;
                //鎖定當前發送節點,防止其取消
                if (!entry.promise.setUncancellable()) {
                    //若是鎖定失敗,關閉節點,獲取節點時會自動過濾
                    int pending = entry.cancel();
                    // 減小待發送的總字節數跟incrementPendingOutboundBytes方法想對應
                    decrementPendingOutboundBytes(pending, false, true);
                }
                //獲取下個節點
                entry = entry.next;
            } while (entry != null);

            //清空unflushedEntry指針
            unflushedEntry = null;
        }
    }

以上方法的主要功能就是將暫存數據節點變成待發送節點,從上文內容知道須要發送的數據,是flushedEntry指向的節點到unflushedEntry指向的節點(不包含unflushedEntry)的之間的節點數據,因此下次發送要將flushedEntry指向unflushedEntry指向的節點做爲發送數據的起始節點。

結合代碼:

  • 1.先獲取unflushedEntry指向的暫存數據的起始節點
  • 2.將待發送數據起始指針flushedEntry指向暫存起始節點
  • 3.經過promise.setUncancellable()鎖定待發送數據,反正發送過程當中取消,若是鎖定過程當中發現其節點已經取消,則調用entry.cancel()取消節點發送,並減小待發送的總字節數。

nioBuffers方法源碼分析

nioBuffers方法是在系統調用addFlush方法完成後調用

public ByteBuffer[] nioBuffers() {
        long nioBufferSize = 0;
        int nioBufferCount = 0;
        final InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
        //獲取原生ByteBuffer數組,這裏的ByteBuffer是相同線程共享的
        ByteBuffer[] nioBuffers = NIO_BUFFERS.get(threadLocalMap);
        //獲取待發送數據起始節點
        Entry entry = flushedEntry;
        //循環取數據,isFlushedEntry是判斷待發送數據節點
        while (isFlushedEntry(entry) && entry.msg instanceof ByteBuf) {
            //若是節點被關閉則忽略次節點
            if (!entry.cancelled) {
                //獲取節點裏的ByteBuf
                ByteBuf buf = (ByteBuf) entry.msg;
                final int readerIndex = buf.readerIndex();
                //獲取可發送ByteBuf總字節數
                final int readableBytes = buf.writerIndex() - readerIndex;
                //可發送字節大於0繼續不然跳過
                if (readableBytes > 0) {
                    //每次累計的發送字節數,不能大於Integer.MAX_VALUE
                    if (Integer.MAX_VALUE - readableBytes < nioBufferSize) {
                        break;
                    }
                    //累計的發送字節數
                    nioBufferSize += readableBytes;
                    //獲取entry中ByteBuffer的個數
                    int count = entry.count;
                    if (count == -1) {
                        entry.count = count =  buf.nioBufferCount();
                    }
                    int neededSpace = nioBufferCount + count;
                    //nioBuffers數組沒法知足存放個數需求擴容處理
                    if (neededSpace > nioBuffers.length) {
                        nioBuffers = expandNioBufferArray(nioBuffers, neededSpace, nioBufferCount);
                        NIO_BUFFERS.set(threadLocalMap, nioBuffers);
                    }
                    //若是隻有1個直接獲取ByteBuffer放入nioBuffers數組中
                    if (count == 1) {
                        ByteBuffer nioBuf = entry.buf;
                        if (nioBuf == null) {
                            entry.buf = nioBuf = buf.internalNioBuffer(readerIndex, readableBytes);
                        }
                        nioBuffers[nioBufferCount ++] = nioBuf;
                    ///若是有多個循環獲取ByteBuffer放入nioBuffers數組中
                    } else {
                        ByteBuffer[] nioBufs = entry.bufs;
                        if (nioBufs == null) {
                            entry.bufs = nioBufs = buf.nioBuffers();
                        }
                        nioBufferCount = fillBufferArray(nioBufs, nioBuffers, nioBufferCount);
                    }
                }
            }
            entry = entry.next;
        }
        this.nioBufferCount = nioBufferCount;
        this.nioBufferSize = nioBufferSize;

        return nioBuffers;
    }

以上方法的主要功能就是獲取須要發送數據並轉成原生的ByteBuffer數組類型,ByteBuffer數組這裏是相同線程共享的,也就是說一個客戶端跟服務端通信會使用相同的ByteBuffer數組來發送數據,這樣減小了空間建立和銷燬時間消耗。

結合代碼:

  • 1.調用NIO_BUFFERS.get獲取原生ByteBuffer數組,這裏的ByteBuffer是相同線程共享的。
  • 2.從待發送數據起始節點開始循環處理數據,直至處理到unflushedEntry指向的Entry,或者到最後或者累計的發送字節數大於Integer.MAX_VALUE。
  • 3.處理跳過被關閉的節點。
  • 4.若是ByteBuffer數組太小則進行擴容。
  • 5.將ByteBuf轉成ByteBuffer類型存入ByteBuffer數組。
  • 6.處理下個節點。

第1步的相同線程數據共享的實現原理是一種類ThreadLocal的實現,原生的ThreadLocal裏是使用ThreadLocalMap來存儲數據,而Netty設計了一種讀取更快的InternalThreadLocalMap來存數據,ThreadLocalMap裏存儲數據是用線性探測法解決衝突,致使的結果就是一次hash不必定找到數據。而InternalThreadLocalMap裏數據存儲的位置是固定不變的,因此一次就能獲取數據,然而致使的結果就是部分空間的浪費,很明顯,這是一種空間換時間的作法。

removeBytes方法源碼分析

removeBytes方法是在系統調用nioBuffers方法並完成發送後調用

public void removeBytes(long writtenBytes) {
        for (;;) {
            //獲取flushedEntry指向的節點數據
            Object msg = current();
            if (!(msg instanceof ByteBuf)) {
                assert writtenBytes == 0;
                break;
            }

            final ByteBuf buf = (ByteBuf) msg;
            //獲取讀取的起始位置
            final int readerIndex = buf.readerIndex();
            //計算整個節點的數據字節長度
            final int readableBytes = buf.writerIndex() - readerIndex;
            //若是整個節點的數據字節長度比發送成功的總字節長度小,刪除整個節點
            if (readableBytes <= writtenBytes) {
                if (writtenBytes != 0) {
                    progress(readableBytes);
                    writtenBytes -= readableBytes;
                }
                remove();
            //不然縮小當前節點的可發送字節長度
            } else { // readableBytes > writtenBytes
                if (writtenBytes != 0) {
                    buf.readerIndex(readerIndex + (int) writtenBytes);
                    progress(writtenBytes);
                }
                break;
            }
        }
        //清理ByteBuffer數組
        clearNioBuffers();
    }

以上方法的主要功能就是移除已經發送成功的數據,移除的數據是從flushedEntry指向的節點開始遍歷鏈表移除,移除數據分2種狀況:

  • 1.第一種就是當前整個節點的數據已經發送成功,這種狀況的作法就是將整個節點移除便可。
  • 2.第二種就是當前節點部分發送成功,這種狀況的作法就是將當前節點的可發送字節數縮短,好比說當前節點有100kb,只發送了30kb,那就將此節點縮短至70kb。

結合代碼:

  • 1.獲取flushedEntry指向的節點數據。
  • 2.計算整個節點的數據字節長度。
  • 3.若是當前整個節點的數據已經發送成功將整個節點移除,不然將當前節點的可發送字節數縮短。
  • 4.清理ByteBuffer數組。
  • 5.處理下個節點。

總結

ChannelOutboundBuffer是沒有容量限制的,在極端狀況下若是ChannelOutboundBuffer消耗比較慢而ChannelOutboundBuffer寫入過大會致使OOM,Netty在處理裏提供了ChannelWritabilityChanged方法,此方法會在ChannelOutboundBuffer的容量超過最高限額或者小於最低限額會被調用,用戶能夠實現次方法來監控容量的報警,來解決容量過大問題。

相關文章
相關標籤/搜索