Netty(7)源碼-ByteBuf

1、ByteBuf工做原理

1. ByteBuf是ByteBuffer的升級版:

jdk中經常使用的是ByteBuffer,從功能角度上,ByteBuffer能夠徹底知足須要,可是有如下缺點:算法

  • ByteBuffer一旦分配完成,長度固定,不能動態擴展和收縮,當須要編碼的POJO對象大於分配容量時發生索引越界異常
  • ByteBuffer只要一個標識位置的指針postion,讀寫切換比較麻煩,flip rewind等操做
  • 功能有限

ByteBuf依然是Byte數組緩衝區,擁有ByteBuffer的一切功能:數據庫

  • 7種Java基礎類型、byte數組、ByteBuffer(ByteBuf)等讀寫;
  • 緩衝區自身的copy和slice等;
  • 設置網絡字節序;
  • 構造緩衝區實例;
  • 操做位置指針方法;

2. ByteBuf的工做原理:

ByteBuf使用2個位置指針來協助緩衝區的讀寫操做,讀操做使用readerIndex,寫操做使用writerIndex。後端

(1) 一開始readerIndexwriterIndex都是0api

(2) 隨着寫入writerIndex增長,隨着讀取readerIndex增長,可是不會超過writerIndex數組

(3) 讀取以後,0~readerIndex這部分視爲discard,調用discardReadBytes方法,能夠釋放這部分空間安全

(4) readerIndexwriterIndex之間的數據是能夠讀取的,等價於ByteBufferposition-limit之間的數據網絡

(5) writerIndexcapacity之間的空間是可寫的,等價於ByteBuffer limitcapacity之間的可用空間。框架

3. 用圖演示上述過程:

初始分配的ByteBuf:ide

 

 寫入了N個字節以後:函數

 

讀取了M(<N)個字節以後的ByteBuf如圖所示:

調用了discardReadBytes操做以後的ByteBuf如圖所示:

調用了clear以後的ByteBuf如圖所示:

4. 動態擴展

跟大多數的自動擴容數組同樣,在進行put操做的時候,若是空間不足,就建立新的ByteBuffer實現自動擴容,並將以前的ByteBuffer複製到新的ByteBuffer中,最後釋放老的ByteBuffer。

    public ByteBuf writeBytes(ByteBuffer src) {
        ensureAccessible();
        int length = src.remaining();
        ensureWritable(length);
        setBytes(writerIndex, src);
        writerIndex += length;
        return this;
    }

注意到讀寫都是使用ByteBuffer,在容量不足的時候會自動擴容:

    private void ensureWritable0(int minWritableBytes) {
        if (minWritableBytes <= writableBytes()) {
            return;
        }

        if (minWritableBytes > maxCapacity - writerIndex) {
            throw new IndexOutOfBoundsException(String.format(
                    "writerIndex(%d) + minWritableBytes(%d) exceeds maxCapacity(%d): %s",
                    writerIndex, minWritableBytes, maxCapacity, this));
        }

        // Normalize the current capacity to the power of 2.
        int newCapacity = alloc().calculateNewCapacity(writerIndex + minWritableBytes, maxCapacity);

        // Adjust to the new capacity.
        capacity(newCapacity);
    }

2、ByteBuf功能介紹

1. 順序讀操做

  詳見api,相似於ByteBuffer的get

2. 順序寫操做

  詳見api,相似於ByteBuffer的put

3. readerIndex和writerIndex

讀索引和寫索引,將ByteBuf分割爲3個區域:

可讀區域和可寫區域,以及已經讀取過的區域,能夠調用discardReadBytes操做來重用這部分的空間,以節省內存,防止ByteBuf的動態擴張。這在私有協議棧解碼的時候很是有用,由於TCP層可能會粘包,幾百個整包消息被TCP做爲一個整包發送。這樣,使用discardReadBytes操做能夠重用以前的已經解碼過的緩衝區,從而防止接收緩衝區由於容量不足擴張。

可是,discardReadBytes是把雙刃劍,不能濫用。

4. Discardable bytes

動態擴張比較耗時,所以爲了提升性能,每每須要最大努力提高緩衝區的重用率。

discardbytes操做則能夠重用已經讀過的空間,減小擴容的次數。

可是,discardbytes操做自己也是本身數組的內存複製,因此頻繁調用也會致使性能降低,所以調用以前,請確認,你想要時間換取空間,而不是擴容。

    @Override
    public ByteBuf discardReadBytes() {
        ensureAccessible();
        if (readerIndex == 0) {
            return this;
        }

        if (readerIndex != writerIndex) {
            setBytes(0, this, readerIndex, writerIndex - readerIndex);
            writerIndex -= readerIndex;
            adjustMarkers(readerIndex);
            readerIndex = 0;
        } else {
            adjustMarkers(readerIndex);
            writerIndex = readerIndex = 0;
        }
        return this;
    }

5. Readable bytes和Writable bytes

  • 可讀區域中是數據實際存儲的區域,以read或者skip開頭的任何操做都將會從readerIndex開始讀取或者跳過指定的數據。若是讀的字節數>可讀字節數,throw IndexOutOfBoundsException。
  • 可寫區域是尚可填充的空間,任何以write開頭的操做都會從writeIndex開始向空閒空間開始寫入字節,若是寫入字節數>可寫字節數,也會拋出IndexOutOfBoundsException。

6. Clear操做

ByteBuffer的clear操做不會操做內容自己,而是修改指針位置。ByteBuf也同樣,clear以後0=readerIndex=writerIndex。

7. Mark和Reset

某些狀況須要可以回滾,Netty提供了相似的方法。

  • markReaderIndex:將當前的readerIndex備份到markedReaderIndex中;
  • resetReaderIndex:將當前的readerIndex設置爲mardedReaderIndex;
  • markWriterIndex:將當前的readerIndex備份到markedWriterIndex中;
  • resetWriterIndex:將當前的readerIndex設置爲mardedWriterIndex;

8. 查找操做

ByteBuf提供了查找方法用於知足不一樣的應用場景,詳細分類以下.

(1) indexOf(int fromIndex, int toIndex, byte value):從from到to查找value的值

(2) bytesBefore(byte value): readerIndex到writerIndex中查找value的值

(3) bytesBefore(int length, byte value):從readerIndex到readerIndex+length

(4) bytesBefore(int index, int length, byte value): 從index到Index+length

(5) forEachByte(ByteBufProcessor processor): 遍歷可讀字節數組,與ByteBufProcessor 設置的條件進行對比

(6) forEachByte(int index, int length, ByteBufProcessor processor): 相似上面

(7) forEachByteDesc(ByteBufProcessor processor): 同上,採用逆序

(8) forEachByteDesc(int index, int length, ByteBufProcessor processor):逆序

對於被查詢的字節,Netty在ByteBufProcessor中作好了抽象,定義以下:

  • FIND_NUL: NUL(0x00)
  • FIND_CR
  • FIND_LF
  • FIND_CRLF
  • FIND_LINER_WHITESPACE

9. Derived buffers

相似於數據庫的視圖。如下方法用於建立視圖或者恢復ByteBuf。

(1) duplicate: 返回當前ByteBuf的複製對象,兩者共享緩衝區內容,可是讀寫索引獨立,即修改內容內容會變,索引變化不影響原ByteBuf。

(2) copy: 複製一個對象,不共享,內容和索引都是獨立的;

(3) copy(int index, int length)

(4) slice: 返回當前ByteBuf的可讀子緩衝區,即從readerIndex到writerIndex的部分,共享內容,索引獨立。

(5) slice(int index, int length):共享內容,索引獨立。

10. 轉換爲ByteBuffer

(1) ByteBuffer nioBuffer(): 當前可讀緩衝區轉換爲ByteBuffer,共享內容,索引獨立,且沒法感知動態擴容;

(2) ByteBuffer nioBuffer(int index, int length)

11. 隨機讀寫 (set和get)

隨機讀api:

隨機寫api,一樣方式能夠查看。

不管是get仍是set,都會對其索引進行合法性校驗。

可是,set不支持動態擴展

3、源碼分析

3.1 主要類繼承關係

除了這些類以外,還有很是多的類:

1. 從內存分配的角度有2類:

(1) 堆內存:優勢是內存的分配和回收速度快,能夠被自動回收,缺點是若是進行SocketIO的讀寫,須要額外一次的內存複製,將堆內存對應的緩衝區複製到內核Channel中,性能有必定程度損失。

(2) 直接內存:在堆外進行分配,相對分配和回收速度會慢一些,可是將它寫入或者從Socket Channel讀取時,少了一次內存複製,速度更快。

經驗代表:ByteBuf的最佳實踐是在I/O通訊線程讀寫緩衝區使用DirectByteBuf,後端業務消息的編解碼模塊使用HeapByteBuf,這樣組合能夠達到性能最優。

2. 從內存回收角度上:

(1) 基於對象池的ByteBuf:內存池,能夠循環建立ByteBuf,提高內存的利用率,下降高負載致使的頻繁GC。

(2) 普通ByteBuf。

測試代表高負載使用內存池會更加的平穩。

儘管推薦使用基於內存池的ByteBuf,可是內存池的管理和維護更加複雜,也須要更加謹慎。

3.2 AbstactByteBuf

骨架類

1. 主要成員變量

    private static final boolean checkAccessible;

    static {
        checkAccessible = SystemPropertyUtil.getBoolean(PROP_MODE, true);
        if (logger.isDebugEnabled()) {
            logger.debug("-D{}: {}", PROP_MODE, checkAccessible);
        }
    }

    static final ResourceLeakDetector<ByteBuf> leakDetector =
            ResourceLeakDetectorFactory.instance().newResourceLeakDetector(ByteBuf.class);

    //讀索引
    int readerIndex;
    //寫索引
    int writerIndex;
    private int markedReaderIndex;
    private int markedWriterIndex;
    private int maxCapacity;

一些公共屬性的定義,這裏關注下leakDetector:用於檢測對象是否有泄露。

2. 讀操做系列

這裏沒有定義緩衝區的實現,由於不知道是直接內存仍是堆內存,可是不管是基於何種內存實現讀操做,一些基本的操做都在骨架類中已經實現,實現代碼複用,這也是抽象和繼承的價值所在。

這裏以方法readBytes爲例:

    @Override
    public ByteBuf readBytes(byte[] dst, int dstIndex, int length) {
       //1. 檢測緩衝區可讀長度
        checkReadableBytes(length);
       //2. 抽象獲取,由子類實現
        getBytes(readerIndex, dst, dstIndex, length);
       //3. 讀索引增長
        readerIndex += length;
        return this;
    }

檢測長度方法很是簡單,檢測可讀長度是否有length,這裏略過。

3. 寫操做系列

與讀相似

    @Override
    public ByteBuf writeBytes(byte[] src, int srcIndex, int length) {
        ensureAccessible();
        //1 .檢測寫長度合法
        ensureWritable(length);
        //2. 抽象方法,由子類實現
        setBytes(writerIndex, src, srcIndex, length);
        //3. 寫索引增長
        writerIndex += length;
        return this;
    }

爲何須要動態擴展?

  不少時候都是依據經驗來判斷Pojo對象的大小,若是這個估計值偏大則形成內存浪費,若是偏小直接拋出異常,這種作法對用戶很是不友好。

  而Netty的ByteBuf支持動態擴展,爲了保證安全,能夠指定最大容量。

如何進行計算?

  參數是writerIndex+minWriableBytes,即知足要求的最小容量。

  設置閥門值是4MB,若是新增的內存空間大於這個值,不採用倍增,而採用每次步進4MB的方式,每次增長後和maxCapacity比較,選擇其小者。

  若是擴容以後的新容量小於閥值,則以64進行倍增

這樣作的緣由無非是綜合2點因素:不但願一次增長容量過小,致使須要頻繁的擴容,不但願一次增長太多,形成空間上的浪費。

所以,在內存比較小的時候(<4MB)的時候,倍增64->128->256字節,這種方式大多數應用能夠接收

當內存達到閥值時,再倍增就會帶來額外的內存浪費,例如10MB->20MB,所以使用步增的方式進行擴張。

代碼以下:

    @Override
    public int calculateNewCapacity(int minNewCapacity, int maxCapacity) {
        if (minNewCapacity < 0) {
            throw new IllegalArgumentException("minNewCapacity: " + minNewCapacity + " (expectd: 0+)");
        }
        if (minNewCapacity > maxCapacity) {
            throw new IllegalArgumentException(String.format(
                    "minNewCapacity: %d (expected: not greater than maxCapacity(%d)",
                    minNewCapacity, maxCapacity));
        }
        final int threshold = 1048576 * 4; // 4 MiB page

        if (minNewCapacity == threshold) {
            return threshold;
        }

        // If over threshold, do not double but just increase by threshold.
        if (minNewCapacity > threshold) {
            int newCapacity = minNewCapacity / threshold * threshold;
            if (newCapacity > maxCapacity - threshold) {
                newCapacity = maxCapacity;
            } else {
                newCapacity += threshold;
            }
            return newCapacity;
        }

        // Not over threshold. Double up to 4 MiB, starting from 64.
        int newCapacity = 64;
        while (newCapacity < minNewCapacity) {
            newCapacity <<= 1;
        }

        return Math.min(newCapacity, maxCapacity);
    }

計算完長度,再建立新的緩衝區,因爲內存申請方式不一樣子類不一樣,依舊設置爲一個抽象方法:

    public abstract ByteBuf capacity(int newCapacity);

4. 操做索引

與索引相關的操做主要是讀寫索引 mark reset等等。這部分代碼至關簡單。

5. 重用緩衝區

0->readerIndex這部分的空間能夠重用。

    public ByteBuf discardReadBytes() {
        ensureAccessible();
        if (readerIndex == 0) {
            return this;
        }

        if (readerIndex != writerIndex) {
 //1. 字節組進行復制 setBytes(
0, this, readerIndex, writerIndex - readerIndex);
       //2. 從新設置索引 writerIndex
-= readerIndex; adjustMarkers(readerIndex); readerIndex = 0; } else { adjustMarkers(readerIndex); writerIndex = readerIndex = 0; } return this; }

注意到還要從新調整markedReaderIndex和markedWriterIndex。

6. skipBytes

在解碼的時候,有時候須要丟棄非法的數據報文。很是簡單,修改readerIndex便可,

    @Override
    public ByteBuf skipBytes(int length) {
        checkReadableBytes(length);
        readerIndex += length;
        return this;
    }

3.3 AbstractReferenceCountedByteBuf源碼分析

從類的名字能夠看出,該類的功能主要是引用計數,相似於JVM內存回收的對象引用計數器,用於跟蹤對象的分配和銷燬,用於自動的內存回收。

1. 成員變量

    private static final AtomicIntegerFieldUpdater<AbstractReferenceCountedByteBuf> refCntUpdater;

    static {
        AtomicIntegerFieldUpdater<AbstractReferenceCountedByteBuf> updater =
                PlatformDependent.newAtomicIntegerFieldUpdater(AbstractReferenceCountedByteBuf.class, "refCnt");
        if (updater == null) {
            updater = AtomicIntegerFieldUpdater.newUpdater(AbstractReferenceCountedByteBuf.class, "refCnt");
        }
        refCntUpdater = updater;
    }

    private volatile int refCnt = 1;

refCntUpdater是一個CAS類型變量,經過原子操做對成員變量進行更新。

refCnt是一個volatile修飾的字段,用於跟蹤對象的引用次數。

2. 對象引用計數器

CAS算法,每調用一次retain方法,引用計數器就會+1.

    @Override
    public ByteBuf retain() {
        for (;;) {
            int refCnt = this.refCnt;
            final int nextCnt = refCnt + 1;

            // Ensure we not resurrect (which means the refCnt was 0) and also that we encountered an overflow.
            if (nextCnt <= 1) {
                throw new IllegalReferenceCountException(refCnt, 1);
            }
            if (refCntUpdater.compareAndSet(this, refCnt, nextCnt)) {
                break;
            }
        }
        return this;
    }

下面看釋放引用計數器的代碼,也是使用CAS在一個自旋循環裏進行判斷和更新的。須要注意的是:當refCnt==1的時候意味着申請和釋放相等,說明對象引用已經不可達,該對象須要被垃圾回收掉,所以調用deallocate方法來釋放ByteBuf對象,代碼以下:

    @Override
    public boolean release() {
        for (;;) {
            int refCnt = this.refCnt;
            if (refCnt == 0) {
                throw new IllegalReferenceCountException(0, -1);
            }

            if (refCntUpdater.compareAndSet(this, refCnt, refCnt - 1)) {
                if (refCnt == 1) {
                    deallocate();
                    return true;
                }
                return false;
            }
        }
    }

3.4 UnpooledHeapByteBuf

基於堆內存,沒有對象池,意味着每次I/O的讀寫都會建立一個新的UnpooledHeapByteBuf,頻繁進行大塊內存的分配和回收可能會對性能有必定的影響,可是相比於堆外內存的申請和釋放,成本仍是要低一些。

相比於PooledHeapByteBuf,其原理更加的簡單,也不容易出現內存管理方面的問題,所以在知足性能的狀況下,推薦使用UnpooledHeapByteBuf。

1. 成員變量

    
  //1. 用於內存分配
  private final ByteBufAllocator alloc;
  //2. 數組緩衝區
byte[] array;
//3.
private ByteBuffer tmpNioBuf;

2. 動態擴展緩衝區

    public ByteBuf capacity(int newCapacity) {
        ensureAccessible();
        if (newCapacity < 0 || newCapacity > maxCapacity()) {
            throw new IllegalArgumentException("newCapacity: " + newCapacity);
        }

        int oldCapacity = array.length;
      //1. 若是新的容量值大於當前的緩衝區容量,須要動態擴展
if (newCapacity > oldCapacity) { byte[] newArray = new byte[newCapacity];
        //進行數組複製 System.arraycopy(array,
0, newArray, 0, array.length);
        // 替換舊的數組 setArray(newArray); }
else if (newCapacity < oldCapacity) {
       //此時,須要截取當前緩衝區建立一個新的子緩衝區
byte[] newArray = new byte[newCapacity]; int readerIndex = readerIndex();
        //若是讀索引<新的容量值
if (readerIndex < newCapacity) { int writerIndex = writerIndex(); if (writerIndex > newCapacity) { writerIndex(writerIndex = newCapacity); }
        //拷貝內容 System.arraycopy(array, readerIndex, newArray, readerIndex, writerIndex
- readerIndex); } else {
          //若是此時讀索引更大,無須拷貝數據 setIndex(newCapacity, newCapacity); } setArray(newArray); }
return this; }

3. 字節數組複製

    @Override
    public ByteBuf setBytes(int index, byte[] src, int srcIndex, int length) {
        checkSrcIndex(index, length, srcIndex, src.length);
        System.arraycopy(src, srcIndex, array, index, length);
        return this;
    }

此時不會修改readerIndex和writerIndex,只是修改內容.

4. 轉換爲ByteBuf

    public ByteBuffer nioBuffer(int index, int length) {
        ensureAccessible();
        return ByteBuffer.wrap(array, index, length).slice();
    }

5. 子類相關方法

isDirect: 因爲是基於heap,因此返回false

hasArray: 返回true

array: 返回array

    @Override
    public boolean hasArray() {
        return true;
    }

    @Override
    public byte[] array() {
        ensureAccessible();
        return array;
    }

    @Override
    public int arrayOffset() {
        return 0;
    }

    @Override
    public boolean hasMemoryAddress() {
        return false;
    }

    @Override
    public long memoryAddress() {
        throw new UnsupportedOperationException();
    }

其它:

因爲UnpooledDirectByteBuf原理和UnpooledHeapByteBuf相同,不一樣之處在於使用內部緩衝區DirectByteBuffer實現,這裏再也不描述。

        setByteBuffer(ByteBuffer.allocateDirect(initialCapacity));

3.5 PooledByteBuf內存池原理分析

細節很是複雜,這裏僅僅從設計角度上講解。

1. PoolArena

Arena自己是一塊區域,在內存管理中,Memory Arena是指內存中的一大塊連續的區域PoolArena就是Netty的內存池實現類。

爲了集中管理內存的分配和釋放,同時提升分配和釋放內存時候的性能,不少框架和應用都會經過預先申請一大塊內存,而後經過提供相應的分配和釋放接口來使用內存。

這樣,對內存的管理就會被集中到幾個類或者函數中,因爲再也不頻繁使用系統調用來申請和釋放內存,應用或者系統的性能也會大大提升。這種設計思路中,預先申請的那一大塊內存就會被稱爲Memeory Arena。

不一樣的框架中,Memory Arena的實現不一樣,Netty的PoolArena是由多個Chunk組成的大塊內存區域,而每一個Chunk則由一個或者多個Page組成。

代碼片斷以下:

abstract class PoolArena<T> implements PoolArenaMetric {
    static final boolean HAS_UNSAFE = PlatformDependent.hasUnsafe();

    enum SizeClass {
        Tiny,
        Small,
        Normal
    }

    static final int numTinySubpagePools = 512 >>> 4;

    final PooledByteBufAllocator parent;

    private final int maxOrder;
    final int pageSize;
    final int pageShifts;
    final int chunkSize;
    final int subpageOverflowMask;
    final int numSmallSubpagePools;
    private final PoolSubpage<T>[] tinySubpagePools;
    private final PoolSubpage<T>[] smallSubpagePools;

    private final PoolChunkList<T> q050;
    private final PoolChunkList<T> q025;
    private final PoolChunkList<T> q000;
    private final PoolChunkList<T> qInit;
    private final PoolChunkList<T> q075;
    private final PoolChunkList<T> q100;

    private final List<PoolChunkListMetric> chunkListMetrics;

2. PoolChunk

Chunk中Page被構建爲一棵二叉樹。假設一個Chunk由16個Page組成,那麼這些Page會被按照下圖方式組織起來。

Page的大小是4個byte,而Chunk的大小是64即4*16。樹有5層,葉子節點所在層用來分配全部的Page內存,而上一層用來分配2個Page,以此類推...

每一個節點都記錄了本身在整個Memory Arena中的偏移地址,當一個節點表明的內存區域被分配出去以後,這個節點就被標記爲已分配,自這個節點如下的全部節點在後面的內存請求都會被忽略。例如,須要16個byte的時候,就會在第三層尋找,而後標記已經分配,再分配只能尋找其餘的三個節點了。

對樹的遍歷算法採用的是深度優先的算法,可是在選擇哪一個子節點繼續遍歷的時候是隨機的。

3. PoolSubpage

對於小於一個Page的內存,Netty在Page中完成分配。每一個Page會被切分爲大小相同的多個存儲塊,存儲塊的大小由第一次申請內存的塊大小決定。

假設一個Page是8個字節,第一次申請的塊大小是4個字節,那麼這個Page就包含了2個存儲塊;若是第一次申請的是8個字節,那麼這個Page就被分紅一個存儲塊。同時,以後能分配的也是和第一次同樣的字節,若是不同,須要在一個新的Page中進行分配。

Page中存儲區域的使用狀態經過一個long數組來維護,數組中每一個long的每一位表示一個塊存儲區域的佔用狀況:0表示未佔用,1表示已佔用。對於個4bytes的Page來講,若是這個Page用來分配1個字節的存儲區域,那麼long數組只用一個元素的低4位就能夠描述,若是對於一個128bytes的Page,若是也是1byte分配就有128個,就須要2個long元素來表明區域佔用狀況。

final class PoolSubpage<T> implements PoolSubpageMetric {

    final PoolChunk<T> chunk;
    private final int memoryMapIdx;
    private final int runOffset;
    private final int pageSize;
    private final long[] bitmap;

    PoolSubpage<T> prev;
    PoolSubpage<T> next;

    boolean doNotDestroy;
    int elemSize;
    private int maxNumElems;
    private int bitmapLength;
    private int nextAvail;
    private int numAvail;

4. 內存回收策略

不管是Chunk仍是Page,都使用狀態位(bitmap)來標識內存是否可用,不一樣之處在於Chunk經過在二叉樹上對節點進行標識Page則是經過維護塊的狀態標識來實現

3.6 PooledDirectByteBuf

基於內存池實現,基於直接緩衝,與UnPooledDirectByteBuf惟一的不一樣就是內存分配和銷燬策略不一樣,其餘都是相同的。

1. 建立字節緩衝區實例

經過靜態工廠建立:

    static PooledDirectByteBuf newInstance(int maxCapacity) {
        PooledDirectByteBuf buf = RECYCLER.get();
        buf.reuse(maxCapacity);
        return buf;
    }

設置引用計數器爲1,設置緩衝區最大容量後返回。

    /**
     * Method must be called before reuse this {@link PooledByteBufAllocator}
     */
    final void reuse(int maxCapacity) {
        maxCapacity(maxCapacity);
        setRefCnt(1);
        setIndex0(0, 0);
        discardMarks();
    }

2. 複製新的字節緩衝區實例

若是使用者確實須要複製一個新的實例,與原來的PooledDirectByteBuf獨立,則調用它的copy(int index, int length) 能夠達到上述目標,代碼:

    @Override
    public ByteBuf copy(int index, int length) {
        checkIndex(index, length);
        ByteBuf copy = alloc().directBuffer(length, maxCapacity());
        copy.writeBytes(this, index, length);
        return copy;
    }

上述代碼中,首先對index和length進行合法性校驗,經過以後調用PooledByteBufAllocator分配一個新的ByteBuf,最終調用的是AbstractByteAllocator的directBuffer方法。

    @Override
    public ByteBuf directBuffer(int initialCapacity, int maxCapacity) {
        if (initialCapacity == 0 && maxCapacity == 0) {
            return emptyBuf;
        }
        validate(initialCapacity, maxCapacity);
        return newDirectBuffer(initialCapacity, maxCapacity);
    }

newDirectBuffer方法根據子類實現不一樣策略,此處是Pooled,從池中獲取而不是建立一個新的對象。

    @Override
    protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
        PoolThreadCache cache = threadCache.get();
        PoolArena<ByteBuffer> directArena = cache.directArena;

        ByteBuf buf;
        if (directArena != null) {
            buf = directArena.allocate(cache, initialCapacity, maxCapacity);
        } else {
            if (PlatformDependent.hasUnsafe()) {
                buf = UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity);
            } else {
                buf = new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
            }
        }

        return toLeakAwareBuffer(buf);
    }

3. 其它相關功能

    @Override
    public boolean hasArray() {
        return false;
    }

    @Override
    public byte[] array() {
        throw new UnsupportedOperationException("direct buffer");
    }

    @Override
    public int arrayOffset() {
        throw new UnsupportedOperationException("direct buffer");
    }

    @Override
    public boolean hasMemoryAddress() {
        return false;
    }

    @Override
    public long memoryAddress() {
        throw new UnsupportedOperationException();
    }

4、ByteBuf相關輔助類介紹

4.1 ByteBufHolder

ByteBufHolder是ByteBuf相關的容器,在Netty中很是有用。

例如HTTP協議的請求消息和應答消息均可以攜帶消息體,這個消息體在NIO ByteBuffer中就是ByteBuffer對象,在Netty中就是ByteBuf對象。而不一樣的協議消息體中能夠含有不一樣的協議字段和功能,所以須要對ByteBuf進行包裝和抽象。

爲了知足這些定製化的需求,Netty抽象出了ByteBufHolder對象,它包含了一個ByteBuf,另外還提供了一些其餘實用的方法,使用者繼承ByteBufHolder接口能夠按需封裝本身的實現。

下面是其類圖,很是豐富。

4.2 ByteBufAllocator

ByteBufAllocator是字節緩衝區分配器,按照Netty的緩衝區實現不一樣,共有2種不一樣的分配器,基於內存池的字節緩衝分配器和普通的字節緩衝區分配器。

下圖是主要API列表:

4.3 CompositeByteBuf

CompositeByteBuf容許將多個ByteBuf的實例組裝到一塊兒,造成一個統一的視圖。

某些場景下很是有用,例如某個協議POJO對象包含2部分:消息頭和消息體,它們都是ByteBuf對象。當須要對消息進行編碼的時候須要進行整合,若是使用JDK的話,有如下2種思路:

(1) 將某個ByteBuffer複製到另外一個ByteBuffer中,或者建立一個新的ByteBuffer

(2) 經過List等容器,統一維護和處理

Netty的作法則是使用組合模式進行優化。

public class CompositeByteBuf extends AbstractReferenceCountedByteBuf implements Iterable<ByteBuf> {

    private static final ByteBuffer EMPTY_NIO_BUFFER = Unpooled.EMPTY_BUFFER.nioBuffer();
    private static final Iterator<ByteBuf> EMPTY_ITERATOR = Collections.<ByteBuf>emptyList().iterator();

    private final ByteBufAllocator alloc;
    private final boolean direct;
    private final List<Component> components;
    private final int maxNumComponents;

    private boolean freed;

它定義了一個Component的集合,Component就是ByteBuf的包裝類:

    private static final class Component {
        final ByteBuf buf;
        final int length;
        int offset;
        int endOffset;

        Component(ByteBuf buf) {
            this.buf = buf;
            length = buf.readableBytes();
        }

        void freeIfNecessary() {
            buf.release(); // We should not get a NPE here. If so, it must be a bug.
        }
    }

增長和刪除的代碼:

    public CompositeByteBuf addComponent(boolean increaseWriterIndex, ByteBuf buffer) {
        checkNotNull(buffer, "buffer");
        addComponent0(increaseWriterIndex, components.size(), buffer);
        consolidateIfNeeded();
        return this;
    }
    /**
     * Remove the {@link ByteBuf} from the given index.
     *
     * @param cIndex the index on from which the {@link ByteBuf} will be remove
     */
    public CompositeByteBuf removeComponent(int cIndex) {
        checkComponentIndex(cIndex);
        Component comp = components.remove(cIndex);
        comp.freeIfNecessary();
        if (comp.length > 0) {
            // Only need to call updateComponentOffsets if the length was > 0
            updateComponentOffsets(cIndex);
        }
        return this;
    }

4.4 ByteBufUtil

工具類,提供靜態方法用於操做ByteBuf對象。

最有用的是對字符串進行編碼和解碼:

public static ByteBuf encodeString(ByteBufAllocator alloc, CharBuffer src, Charset charset): 對字符串進行編碼,使用指定的ByteBufAllocator生成一個新的ByteBuf。

還有方法是hexDump,將ByteBuf內容以16進制的字符串打印出來。

相關文章
相關標籤/搜索