Netty 源碼分析之ByteBuf

Netty 源碼分析之ByteBuf


ByteBuf基礎

Java Nio 的Buffer

在進行數據傳輸的過程當中,咱們常常會用到緩衝區。
在Java NIO 爲咱們提供了原生的七種緩衝區實現,對應着Java 的七種基本類型。通常使用ByteBuffer較多。原生的Buffer雖然能知足咱們的平常使用,可是要進行復雜的應用的時候,確有點力不從心了,原生Buffer存在着如下缺點。所以Netty對其進行了封裝,提供了更爲友好的接口供咱們使用。java

  • 當咱們調用對應Buffer類的allocate方法來建立緩衝區實例的時候,會分配指定的空間,同時緩衝區的長度就會被固定,不能進行動態的增加或者收縮。若是咱們寫入的數據大於緩衝區的capacity的時候,就會發生數組越界錯誤。
  • Buffer只有一個位置標誌位屬性Position,咱們只能flip或者rewind方法來對position進行修改來處理數據的存取位置,一不當心就可能會致使錯誤。
  • Buffer只提供了存取、翻轉、釋放、標誌、比較、批量移動等緩衝區的基本操做,咱們想使用高級的功能,就得本身手動進行封裝及維護,使用很是不方便。

ByteBuf工做原理

ByteBuf也是經過字節數組做爲緩衝區來存取數據,經過外觀模式聚合了JDK NIO元素的ByteBuffer,進行封裝。
ByteBuf是經過readerIndex跟writerIndex兩個位置指針來協助緩衝區的讀寫操做的。
在對象初始化的時候,readerIndex和writerIndex的值爲0,隨着讀操做和寫操做的進行,writerIndex和readerIndex都會增長,不過readerIndex不能超過writerIndex,在進行讀取操做以後,0到readerIndex之間的空間會被視爲discard,調用ByteBuf的discardReadBytes方法,能夠對這部分空間進行釋放重用,相似於ByteBuffer的compact操做,對緩衝區進行壓縮。readerIndex到writerIndex的空間,至關於ByteBuffer的position到limit的空間,能夠對其進行讀取,WriterIndex到capacity的空間,則至關於ByteBuffer的limit到capacity的空間,是能夠繼續寫入的。
readerIndex跟writerIndex讓讀寫操做的位置指針分離,不須要對同一個位置指針進行調整,簡化了緩衝區的讀寫操做。
一樣,ByteBuf對讀寫操做進行了封裝,提供了動態擴展的能力,當咱們對緩衝區進行寫操做的時候,須要對剩餘的可用空間進行校驗,若是可用空間不足,同時要寫入的字節數小於可寫的最大字節數,會對緩衝區進行動態擴展,它會從新建立一個緩衝區,而後將之前的數據複製到新建立的緩衝區中,數組

ByteBuf基本功能

  • 順序讀
    在進行讀操做以前,首先對緩衝區可用的空間進行校驗。若是要讀取的字節長度小於0,就會拋出IllegalArgumentException異常,若是要讀取的字節長度大於已寫入的字節長度,會拋出IndexOutOfBoundsException異常。經過校驗以後,調用getBytes方法,從當前的readerIndex開始,讀取length長度的字節數據到目標dst中,因爲不一樣的子類實現不同,getBytes是個抽象方法,由對應的子類去實現。若是讀取數據成功,readerIndex將會增長相應的length。
public ByteBuf readBytes(ByteBuf dst, int dstIndex, int length) {
    checkReadableBytes(length);
    getBytes(readerIndex, dst, dstIndex, length);
    readerIndex += length;
    return this;
}
protected final void checkReadableBytes(int minimumReadableBytes) {
    ensureAccessible();
    if (minimumReadableBytes < 0) {
        throw new IllegalArgumentException("minimumReadableBytes: " + minimumReadableBytes + " (expected: >= 0)");
    }
    if (readerIndex > writerIndex - minimumReadableBytes) {
        throw new IndexOutOfBoundsException(String.format(
                "readerIndex(%d) + length(%d) exceeds writerIndex(%d): %s",
                readerIndex, minimumReadableBytes, writerIndex, this));
    }
}
  • 順序寫
    讀操做是將源字節數組從srcIndex開始,length長度的數據寫入到當前的ByteBuf中的。
    一開始須要對寫入數組的字節數進行校驗,若是寫入長度小於0,將會拋出IllegalArgumentException異常,若是寫入字節數小於當前ByteBuf的可寫入字節數,則經過檢驗。若是寫入字節數大於緩衝區最大可動態擴展的容量maxCapacity,就會拋出
    IndexOutOfBoundsException異常,不然的話,就會經過動態擴展來知足寫入須要的字節數。首先經過calculateNewCapacity計算出從新擴展後的容量,而後調用capacity方法進行擴展,不一樣的子類有不一樣實現,因此也是一個抽象方法。
    • 計算擴展容量,首先設置門閥值爲4m,若是要擴展的容量等於閥值就使用閥值做爲緩衝區新的容量,若是大於閥值就以4M做爲步長,每次增長4M,若是擴展期間,要擴展的容量比最大可擴展容量還大的話,就以最大可擴展容量maxCapacity爲新的容量。不然的話,就從64開始倍增,直到倍增以後的結果大於要擴展的容量,再把結果做爲緩衝區的新容量。
    • 經過先倍增再步長來擴展容量,若是咱們只是writerIndex+length的值做爲緩衝區的新容量,那麼再之後進行寫操做的時候,每次都須要進行容量擴展,容量擴展的過程須要進行內存複製,過多內存複製會致使系統的性能降低,之因此是倍增再部長,在最初空間比較小的時候,倍增操做並不會帶來太多的內存浪費,可是內存增加到必定的時候,再進行倍增的時候,就會對內存形成浪費,所以,須要設定一個閥值,到達閥值以後就經過步長的方法進行平滑的增加。
public ByteBuf writeBytes(byte[] src, int srcIndex, int length) {
    ensureWritable(length);
    setBytes(writerIndex, src, srcIndex, length);
    writerIndex += length;
    return this;
}
public ByteBuf ensureWritable(int minWritableBytes) {
    if (minWritableBytes < 0) {
        throw new IllegalArgumentException(String.format(
                "minWritableBytes: %d (expected: >= 0)", minWritableBytes));
    }

    if (minWritableBytes <= writableBytes()) {
        return this;
    }

    if (minWritableBytes > maxCapacity - writerIndex) {
        throw new IndexOutOfBoundsException(String.format(
                "writerIndex(%d) + minWritableBytes(%d) exceeds maxCapacity(%d): %s",
                writerIndex, minWritableBytes, maxCapacity, this));
    }

    // Normalize the current capacity to the power of 2.
    int newCapacity = calculateNewCapacity(writerIndex + minWritableBytes);

    // Adjust to the new capacity.
    capacity(newCapacity);
    return this;
}
private int calculateNewCapacity(int minNewCapacity) {
    final int maxCapacity = this.maxCapacity;
    final int threshold = 1048576 * 4; // 4 MiB page

    if (minNewCapacity == threshold) {
        return threshold;
    }

    // If over threshold, do not double but just increase by threshold.
    if (minNewCapacity > threshold) {
        int newCapacity = minNewCapacity / threshold * threshold;
        if (newCapacity > maxCapacity - threshold) {
            newCapacity = maxCapacity;
        } else {
            newCapacity += threshold;
        }
        return newCapacity;
    }

    // Not over threshold. Double up to 4 MiB, starting from 64.
    int newCapacity = 64;
    while (newCapacity < minNewCapacity) {
        newCapacity <<= 1;
    }

    return Math.min(newCapacity, maxCapacity);
}
//UnpooledHeapByteBuf的capacity實現
public ByteBuf capacity(int newCapacity) {
    ensureAccessible();
    if (newCapacity < 0 || newCapacity > maxCapacity()) {
        throw new IllegalArgumentException("newCapacity: " + newCapacity);
    }

    int oldCapacity = array.length;
    if (newCapacity > oldCapacity) {
        byte[] newArray = new byte[newCapacity];
        System.arraycopy(array, 0, newArray, 0, array.length);
        setArray(newArray);
    } else if (newCapacity < oldCapacity) {
        byte[] newArray = new byte[newCapacity];
        int readerIndex = readerIndex();
        if (readerIndex < newCapacity) {
            int writerIndex = writerIndex();
            if (writerIndex > newCapacity) {
                writerIndex(writerIndex = newCapacity);
            }
            System.arraycopy(array, readerIndex, newArray, readerIndex, writerIndex - readerIndex);
        } else {
            setIndex(newCapacity, newCapacity);
        }
        setArray(newArray);
    }
    return this;
}
  • Clear操做
    clear操做只是把readerIndex和writerIndex設置爲0,不會對存儲的數據進行修改。
public ByteBuf clear() {
    readerIndex = writerIndex = 0;
    return this;
}
  • 索引操做安全

    • 讀寫位置索引設置:主要是對邊界條件進行校驗,設置readerIndex的時候,newReaderIndex不能小於0跟大於writerIndex;設置writerIndex的時候,newWriterIndex必須大於readerIndex和小於當前的capacity。若是不能經過校驗的話,就會拋出IndexOutOfBoundsException異常。
    • mark和reset操做:因爲有readerIndex和writerIndex,所以進行mark或者reset須要指定相應的操做位置索引,mark操做會把當前的readerIndex或者writerIndex設置爲markedReaderIndex或者markedWriterIndex;reset操做的話,它是參入對應的mark值調用對應readerIndex()或者writerIndex();
  • 緩衝區重用
    能夠經過discardReadByte方法去重用已經讀取過的緩衝區。
    首先對readerIndex進行判斷:app

    • 若是readerIndex等於0,就說明沒有讀取數據,沒有能夠用來重用的空間,直接返回;
    • 若是readerIndex大於0且不等於writerIndex的話,說明有進行數據讀取被丟棄的緩衝區,也有尚未被讀取的緩衝區。調用setBytes方法進行字節數組的複製,將沒被讀取的數據移動到緩衝區的起始位置,從新去設置readerIndex和writerIndex,readerIndex爲0,writerIndex爲原writerIndex-readerIndex;同時,也須要對mark進行從新設置。
      • 首先對markedReaderIndex進行備份而後跟decrement進行比較,若是markedReaderIndex比decrement小的話,markedReaderIndex設置爲0,再用markedWriterIndex跟decrement比較,若是小於的話,markedWriterIndex也設置爲0,不然的話markedWriterIndex較少decrement;
      • 若是markedReaderIndex比decrement大的話,markedReaderIndex和markedReaderIndex都減去decrement就能夠了。
    • 若是readerIndex等於writerIndex的話,說明沒有能夠進行重用的緩衝區,直接對mark從新設置就能夠了,不須要內存複製。
public ByteBuf discardReadBytes() {
    ensureAccessible();
    if (readerIndex == 0) {
        return this;
    }

    if (readerIndex != writerIndex) {
        setBytes(0, this, readerIndex, writerIndex - readerIndex);
        writerIndex -= readerIndex;
        adjustMarkers(readerIndex);
        readerIndex = 0;
    } else {
        adjustMarkers(readerIndex);
        writerIndex = readerIndex = 0;
    }
    return this;
}
protected final void adjustMarkers(int decrement) {
    int markedReaderIndex = this.markedReaderIndex;
    if (markedReaderIndex <= decrement) {
        this.markedReaderIndex = 0;
        int markedWriterIndex = this.markedWriterIndex;
        if (markedWriterIndex <= decrement) {
            this.markedWriterIndex = 0;
        } else {
            this.markedWriterIndex = markedWriterIndex - decrement;
        }
    } else {
        this.markedReaderIndex = markedReaderIndex - decrement;
        markedWriterIndex -= decrement;
    }
}
  • skipBytes

當咱們須要跳過某些不須要的字節的時候,能夠調用skipBytes方法來跳過指定長度的字節來讀取後面的數據。
首先對跳躍長度進行判斷,若是跳躍長度小於0的話,會拋出IllegalArgumentException異常,或者跳躍長度大於當前緩衝區可讀長度的話,會拋出IndexOutOfBoundsException異常。若是校驗經過,新的readerindex爲原readerIndex+length,若是新的readerIndex大於writerIndex的話,會拋出IndexOutOfBoundsException異常,不然就更新readerIndex。函數

public ByteBuf skipBytes(int length) {
    checkReadableBytes(length);
    int newReaderIndex = readerIndex + length;
    if (newReaderIndex > writerIndex) {
        throw new IndexOutOfBoundsException(String.format(
                "length: %d (expected: readerIndex(%d) + length <= writerIndex(%d))",
                length, readerIndex, writerIndex));
    }
    readerIndex = newReaderIndex;
    return this;
}

ByteBuf源碼分析

ByteBuf

AbstractReferenceCountedByteBuf

AbstractReferenceCountedByteBuf是ByteBuf實現對引用進行計數的基類,用來跟蹤對象的分配和銷燬,實現自動內存回收。工具

  • 成員變量
    • refCntUpdater refCntUpdater是一個AtomicIntegerFieldUpdater類型的成員變量,它能夠對成員變量進行原子性更新操做,達到線程安全。
    • REFCNT_FIELD_OFFSET REFCNT_FIELD_OFFSET是標識refCnt字段在AbstractReferenceCountedByteBuf的內存地址,在UnpooledDirectByteBuf和PooledDirectByteBuf兩個子類中都會使用到這個偏移量。
    • refCnt volatile修飾保證變量的線程可見性,用來跟蹤對象的引用次數
  • 對象引用計數器
    每調用retain方法一次,引用計數器就會加一。retain方法經過自旋對引用計數器進行加一操做,引用計數器的初始值爲1,只要程序是正確執行的話,它的最小值應該爲1,當申請和釋放次數相等的時候,對應的ByteBuf就會被回收。當次數爲0時,代表對象被錯誤的引用,就會拋出IllegalReferenceCountException異常,若是次數等於Integer類型的最大值,就會拋出
    IllegalReferenceCountException異常。retain經過refCntUpdater的compareAndSet方法進行原子操做更新,compareAndSet會使用獲取的值與指望值進行比較,若是在比較器件,有其餘線程對變量進行修改,那麼比較失敗,會再次自旋,獲取引用計數器的值再次進行比較,不然的話,就會進行加一操做,退出自旋。
    release方法的話與retain方法相似,也是經過自旋循環進行判斷和更新,不過當refCnt的值等於1的時候,代表引用計數器的申請跟釋放次數同樣,對象引用已經不可達了,對象應該要被垃圾收集回收掉了,調用deallocate方法釋放ByteBuf對象
public ByteBuf retain() {
    for (;;) {
        int refCnt = this.refCnt;
        if (refCnt == 0) {
            throw new IllegalReferenceCountException(0, 1);
        }
        if (refCnt == Integer.MAX_VALUE) {
            throw new IllegalReferenceCountException(Integer.MAX_VALUE, 1);
        }
        if (refCntUpdater.compareAndSet(this, refCnt, refCnt + 1)) {
            break;
        }
    }
    return this;
}
    
public final boolean release() {
    for (;;) {
        int refCnt = this.refCnt;
        if (refCnt == 0) {
            throw new IllegalReferenceCountException(0, -1);
        }

        if (refCntUpdater.compareAndSet(this, refCnt, refCnt - 1)) {
            if (refCnt == 1) {
                deallocate();
                return true;
            }
            return false;
        }
    }
}

UnpooledHeapByteBuf

UnpooledHeapByteBuf是一個非線程池實現的在堆內存進行內存分配的字節緩衝區,在每次IO操做的都會去建立一個UnpooledHeapByteBuf對象,若是頻繁地對內存進行分配或者釋放會對性能形成影響。源碼分析

  • 成員變量
    • ByteBufAllocator 用於內存分配
    • array 字節數組做爲緩衝區,用於存儲字節數據
    • ByteBuffer 用來實現Netty ByteBuf 到Nio ByteBuffer的變換
  • 動態擴展緩衝區
    調用capacity方法動態擴展緩衝區,首先要對擴展容量進行校驗,若是新容量的大小小於0或者大於最大可擴展容量maxCapacity的話,拋出IllegalArgumentException異常。
    經過校驗以後,若是新擴展容量比原來大的話,則建立一個新的容量爲新擴展容量的字節數組緩衝區,而後調用System.arraycopy進行內存複製,將舊的數據複製到新數組中去,而後用setArray進行數組替換。動態擴展以後須要原來的視圖tmpNioBuffer設置爲控。
    若是新的容量小於當前緩衝區容量的話,不須要進行動態擴展,可是須要截取部分數據做爲子緩衝區。
    • 首先對當前的readerIndex是否小於newCapacity,若是小於的話繼續對writerIndex跟newCapacity進行比較,若是writerIndex大於newCapacity的話,就將writerIndex設置爲newCapacity,更新完索引以後就經過System.arrayCopy內存複製將當前可讀的數據複製到新的緩衝區字節數組中。
    • 若是newCapacity小於readerIndex的話,說明沒有新的可讀數據要複製到新的字節數組緩衝區中,只須要把writerIndex跟readerIndex都更新爲newCapacity既可,最後調用setArray更換字節數組。
public ByteBuf capacity(int newCapacity) {
    ensureAccessible();
    if (newCapacity < 0 || newCapacity > maxCapacity()) {
        throw new IllegalArgumentException("newCapacity: " + newCapacity);
    }

    int oldCapacity = array.length;
    if (newCapacity > oldCapacity) {
        byte[] newArray = new byte[newCapacity];
        System.arraycopy(array, 0, newArray, 0, array.length);
        setArray(newArray);
    } else if (newCapacity < oldCapacity) {
        byte[] newArray = new byte[newCapacity];
        int readerIndex = readerIndex();
        if (readerIndex < newCapacity) {
            int writerIndex = writerIndex();
            if (writerIndex > newCapacity) {
                writerIndex(writerIndex = newCapacity);
            }
            System.arraycopy(array, readerIndex, newArray, readerIndex, writerIndex - readerIndex);
        } else {
            setIndex(newCapacity, newCapacity);
        }
        setArray(newArray);
    }
    return this;
}
  • setBytes
    字節數組複製,首先對數據進行合法性檢驗,若是srcIndex或者index的值小於0,就會拋出IllegalArgumentException,若是index+length的值大於capacity的值或者srcIndex+length的值大於src.length的話,就會拋出IndexOutOfBoundsException異常。經過校驗以後,就調用System.arraycopy進行字節數組複製。
public ByteBuf setBytes(int index, byte[] src, int srcIndex, int length) {
    checkSrcIndex(index, length, srcIndex, src.length);
    System.arraycopy(src, srcIndex, array, index, length);
    return this;
}
protected final void checkSrcIndex(int index, int length, int srcIndex, int srcCapacity) {
    checkIndex(index, length);
    if (srcIndex < 0 || srcIndex > srcCapacity - length) {
        throw new IndexOutOfBoundsException(String.format(
                "srcIndex: %d, length: %d (expected: range(0, %d))", srcIndex, length, srcCapacity));
    }
}
  • Netty ByteBuf與Nio ByteBuffer轉換
    要將Netty的ByteBuf轉化爲Nio ByteBuffer,在ByteBuffer中有wrap靜態方法,只須要傳入對應的字節數組便可建立轉化爲ByteBuffer,在nioBuffer方法還調用了slice方法,它能夠建立一個從原ByteBuffer的position開始緩衝區,與原緩衝區共享同一段數據元素。nioBuffer方法不會重用緩衝區,只能保證writerIndex跟readerIndex的獨立性。
public ByteBuffer nioBuffer(int index, int length) {
    ensureAccessible();
    return ByteBuffer.wrap(array, index, length).slice();
}

PooledByteBuf

在Netty4以後加入內存池管理,經過內存池管理比以前ByteBuf的建立性能獲得了極大提升。性能

  • PoolChunk
    • Page 能夠用來分配的最小內存塊單位
    • Chunk page的集合

PoolChunk主要負責內存塊的分配及釋放,chunk中的page會構建成一顆二叉樹,默認狀況下page的大小是8K,chunk的大小是2^11 page,即16M,構成了11層的二叉樹,最下面一層的葉子節點有8192個,與page的數目同樣,每一次內存的分配必須保證連續性,方便內存操做。每一個節點會記錄本身在Memory Area的偏移地址,當一個節點表示的內存區域被分配以後,那麼該節點會被標誌爲已分配,該節點的全部子節點的內存請求都會忽略。每次內存分配的都是8k(2^n)大小的內存塊,當須要分配大小爲chunkSize/(2^k)的內存端時,爲了找到可用的內存段,會從第K層左邊開始尋找可用節點。this

  • PoolArena

在內存分配中,爲了可以集中管理內存的分配及釋放,同時提供分配和釋放內存的性能,通常都是會先預先分配一大塊連續的內存,不須要重複頻繁地進行內存操做,那一大塊連續的內存就叫作memory Arena,而PoolArena是Netty的內存池實現類。
在Netty中,PoolArena是由多個Chunk組成的,而每一個Chunk則由多個Page組成。PoolArena是由Chunk和Page共同組織和管理的。線程

  • PoolSubpage

當對於小於一個Page的內存分配的時候,每一個Page會被劃分爲大小相等的內存塊,它的大小是根據第一次申請內存分配的內存塊大小來決定的。一個Page只能分配與第一次內存內存的內存塊的大小相等的內存塊,若是想要想要申請大小不想等的內存塊,只能在新的Page上申請內存分配了。
Page中的存儲區域的使用狀況是經過一個long數組bitmap來維護的,每一位表示一個區域的佔用狀況。

PooledDirectByteBuf

  • 建立字節緩衝區
    因爲內存池實現,每次建立字節緩衝區的時候,不是直接new,而是從內存池中去獲取,而後設置引用計數器跟讀寫Index,跟緩衝區最大容量返回。
static PooledHeapByteBuf newInstance(int maxCapacity) {
    PooledHeapByteBuf buf = RECYCLER.get();
    buf.reuse(maxCapacity);
    return buf;
}
final void reuse(int maxCapacity) {
    maxCapacity(maxCapacity);
    setRefCnt(1);
    setIndex0(0, 0);
    discardMarks();
}
  • 複製字節緩衝區實例
    copy方法能夠複製一個字節緩衝區實例,與原緩衝區獨立。
    首先要對index和length進行合法性判斷,而後調用PooledByteBufAllocator的directBuffer方法分配一個新的緩衝區。newDirectBuffer方法是一個抽象方法,對於不一樣的子類有不一樣的實現。若是是unpooled的話,會直接建立一個新的緩衝區,若是是pooled的話,它會從內存池中獲取一個可用的緩衝區。
public ByteBuf copy(int index, int length) {
    checkIndex(index, length);
    ByteBuf copy = alloc().directBuffer(length, maxCapacity());
    copy.writeBytes(this, index, length);
    return copy;
}
public ByteBuf directBuffer(int initialCapacity, int maxCapacity) {
    if (initialCapacity == 0 && maxCapacity == 0) {
        return emptyBuf;
    }
    validate(initialCapacity, maxCapacity);
    return newDirectBuffer(initialCapacity, maxCapacity);
}
// PooledByteBufAllocator 
protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
    PoolThreadCache cache = threadCache.get();
    PoolArena<ByteBuffer> directArena = cache.directArena;

    ByteBuf buf;
    if (directArena != null) {
        buf = directArena.allocate(cache, initialCapacity, maxCapacity);
    } else {
        if (PlatformDependent.hasUnsafe()) {
            buf = new UnpooledUnsafeDirectByteBuf(this, initialCapacity, maxCapacity);
        } else {
            buf = new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
        }
    }

    return toLeakAwareBuffer(buf);
}
//UnpooledByteBufAllocator
protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
    ByteBuf buf;
    if (PlatformDependent.hasUnsafe()) {
        buf = new UnpooledUnsafeDirectByteBuf(this, initialCapacity, maxCapacity);
    } else {
        buf = new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
    }

    return toLeakAwareBuffer(buf);
}

ByteBuf輔助類分析

ByteBufHolder

ByteBufHolder是ByteBuf的一個容器,它能夠更方便地訪問ByteBuf中的數據,在使用不一樣的協議進行數據傳輸的時候,不一樣的協議消息體包含的數據格式和字段不同,因此抽象一個ByteBufHolder對ByteBuf進行包裝,不一樣的子類有不一樣的實現,使用者能夠根據本身的須要進行實現。Netty提供了一個默認實現DefaultByteBufHolder。

ByteBufAllocator

ByteBufAllocator是字節緩衝區分配器,根據Netty字節緩衝區的實現不一樣,分爲兩種不一樣的分配器PooledByteBufAllocator和UnpooledByteBufAllocator。他們提供了不一樣ByteBuf的分配方法。

CompositeByteBuf

CompositeByteBuf是一個虛擬的Buffer,它能夠將多個ByteBuf組裝爲一個ByteBuf視圖。
在Java NIO中,咱們有兩種實現的方法

  • 將其餘ByteBuffer的數據複製到一個ByteBuffer中,或者從新建立一個新的ByteBuffer,將其餘的ByteBuffer複製到新建的ByteBuffer中。
  • 經過容器將多個ByteBuffer存儲在一塊兒,進行統一的管理和維護。

在Netty中,CompositeByByteBuf中維護了一個Component類型的集合。Component是ByteBuf的包裝類,它聚合了ByteBuf.維護在集合中的位置偏移量等信息。通常狀況下,咱們應該使用ByteBufAllocator.compositeBuffer()和Unpooled.wrappedBuffer(ByteBuf...)方法來建立CompositeByteBuf,而不是直接經過構造函數去實例化一個CompositeByteBuf對象。

private int addComponent0(int cIndex, ByteBuf buffer) {
    checkComponentIndex(cIndex);
    if (buffer == null) {
        throw new NullPointerException("buffer");
    }

    int readableBytes = buffer.readableBytes();

    // No need to consolidate - just add a component to the list.
    Component c = new Component(buffer.order(ByteOrder.BIG_ENDIAN).slice());
    if (cIndex == components.size()) {
        components.add(c);
        if (cIndex == 0) {
            c.endOffset = readableBytes;
        } else {
            Component prev = components.get(cIndex - 1);
            c.offset = prev.endOffset;
            c.endOffset = c.offset + readableBytes;
        }
    } else {
        components.add(cIndex, c);
        if (readableBytes != 0) {
            updateComponentOffsets(cIndex);
        }
    }
    return cIndex;
}
private void consolidateIfNeeded() {
    final int numComponents = components.size();
    if (numComponents > maxNumComponents) {
        final int capacity = components.get(numComponents - 1).endOffset;
    
        ByteBuf consolidated = allocBuffer(capacity);
    
        for (int i = 0; i < numComponents; i ++) {
            Component c = components.get(i);
            ByteBuf b = c.buf;
            consolidated.writeBytes(b);
            c.freeIfNecessary();
        }
        Component c = new Component(consolidated);
        c.endOffset = c.length;
        components.clear();
        components.add(c);
    }
}

public CompositeByteBuf removeComponent(int cIndex) {
    checkComponentIndex(cIndex);
    Component comp = components.remove(cIndex);
    comp.freeIfNecessary();
    if (comp.length > 0) {
        updateComponentOffsets(cIndex);
    }
    return this;
}

private static final class Component {
    final ByteBuf buf;
    final int length;
    int offset;
    int endOffset;

    Component(ByteBuf buf) {
        this.buf = buf;
        length = buf.readableBytes();
    }

    void freeIfNecessary() {
        buf.release(); // We should not get a NPE here. If so, it must be a bug.
    }
}

ByteBufUtil

ByteBufUtil是ByteBuf的工具類,它提供了一系列的靜態方法來操做ByteBuf。

相關文章
相關標籤/搜索