Netty學習篇⑥--ByteBuf源碼分析

什麼是ByteBuf?html

ByteBuf在Netty中充當着很是重要的角色;它是在數據傳輸中負責裝載字節數據的一個容器;其內部結構和數組相似,初始化默認長度爲256,默認最大長度爲Integer.MAX_VALUE。

ByteBuf數據結構java

* <pre>
 *      +-------------------+------------------+------------------+
 *      | discardable bytes |  readable bytes  |  writable bytes  |
 *      |                   |     (CONTENT)    |                  |
 *      +-------------------+------------------+------------------+
 *      |                   |                  |                  |
 *      0      <=      readerIndex   <=   writerIndex    <=    capacity
 * </pre>

ByteBuf字節緩衝區主要由discardablereadablewritable三種類型的字節組成的;數組

ByteBuf字節緩衝區能夠操控readerIndexwriterIndex二個下標;這兩個下標都是單獨維護緩存

名詞 解釋 方法
discardable bytes 丟棄的字節;ByteBuf中已經讀取的字節 discardReadBytes();
readable bytes 剩餘的可讀的字節
writable bytes 已經寫入的字節
readerIndex 字節讀指針(數組下標) readerIndex()
writerIndex 字節寫指針(數組下標) writerIndex()

ByteBuf中主要的類-UML圖數據結構

avatar

ByteBuf怎麼建立的?ide

ByteBuf是經過Unpooled來進行建立;默認長度爲256,可自定義指定長度,最大長度爲Integer.MAX_VALUE;

ByteBuf建立的類型有哪幾種?工具

1. 基於內存管理分類源碼分析

類型 解釋 對應的字節緩衝區類
Pooled 池化;
簡單的理解就是pooled擁有一個pool池空間(poolArea),
凡是建立過的字節緩衝區都會被緩存進去,
有新的鏈接須要字節緩衝區會先從緩存中get,
取不到則在進行建立;
1.PooledDirectByteBuf
2.PooledHeapByteBuf
3.PooledUnsafeDirectByteBuf
4.PooledUnsafeHeapByteBuf
Unpooled 非池化;
每次都會建立一個字節緩衝區
1.UnpooledDirectByteBuf
2.UnpooledHeapByteBuf
3.UnpooledUnsafeDirectByteBuf
4.UnpooledUnsafeHeapByteBuf

優缺點:性能

  • 在頻繁的建立申請字節緩衝區的狀況下,池化要比非池化要好不少,池化減小了內存的建立和銷燬,重複使用
  • 在非頻繁的狀況下,非池化的性能要高於池化,不須要管理維護對象池,因此在不須要大量使用ByteBuf的狀況下推薦使用非池化來建立字節緩衝區

2. 基於內存分類this

類型 解釋 特色 構造方法
heapBuffer(經常使用) 堆字節緩衝區; 底層就是JVM的堆內存,只是IO讀寫須要從堆內存拷貝到內核中(相似以前學過的IO多路複用) buffer(128)
directBuffer(經常使用) 直接內存字節緩衝區; 直接存於操做系統內核空間(堆外內存) directBuffer(256)

優缺點:

  • heapBuffer是在JVM的堆內存中分配一個空間,使用完畢後經過JVM回收機制進行回收,可是數據傳輸到Channel中須要從堆內存中拷貝到系統內核中
  • directBuffer直接在堆外,系統內核中開闢一個空間,在數據傳輸上要比heapBuffer高(減小了內存拷貝),可是因爲不受JVM管理在建立和回收上要比heapBuffer更加耗時耗能;

每一種都有本身優點的地方,咱們要根據實際的業務來靈活的運用;若是涉及到大量的文件操做建議使用directBuffer(搬來搬去確實挺耗性能);大部分業務仍是推薦使用heapBuffer(heapBuffer,普通的業務搬來搬去相比在內核申請一塊內存和釋放內存來講要更加優)。

ByteBuf是怎麼樣回收的

1. heapBuffer

heapBuffer是基於堆內存來進行建立的,回收天然而然經過JVM的回收機制進行回收

2. directBuffer回收內存的方法

能夠經過DirectByteBuffer中的Cleaner來進行清除
或者依靠unsafe的釋放內存(freeMemory方法)也能夠進行回收

源碼分析

ByteBuf內存分配

ByteBuf的內存分配主要分爲heap(堆內存)和direct(堆外內存);

1. heap堆內存的分配:經過UnpooledByteBufAllocator類進行內存分配

1.1 建立堆緩衝區

protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) {
    // 是否支持unsafe
    return PlatformDependent.hasUnsafe() ?
        // 建立unsafe非池化堆字節緩衝區
        new InstrumentedUnpooledUnsafeHeapByteBuf(this, initialCapacity, maxCapacity)           :
        // 建立非池化堆字節緩衝區
        new InstrumentedUnpooledHeapByteBuf(this, initialCapacity, maxCapacity);
}

1.2 unsafe和非unsafe都是經過實例 UnpooledHeapByteBuf 來分配內存

// InstrumentedUnpooledUnsafeHeapByteBuf/InstrumentedUnpooledHeapByteBuf
// 最終都是經過這個方法來建立分配內存;後面會講講unsafe和普通的非unsafe的區別
protected UnpooledHeapByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
    // 設置最大容量
    super(maxCapacity);
    // 檢查內存分配類是否爲空
    checkNotNull(alloc, "alloc");

    if (initialCapacity > maxCapacity) {
        throw new IllegalArgumentException(String.format(
            "initialCapacity(%d) > maxCapacity(%d)", initialCapacity, maxCapacity));
    }

    this.alloc = alloc;
    // allocateArray初始化一個initialCapacity長度的字節數組
    setArray(allocateArray(initialCapacity));
    // 初始化讀寫索引爲0
    setIndex(0, 0);
}

// 初始化一個initialCapacity長度的字節數組
byte[] allocateArray(int initialCapacity) {
    return new byte[initialCapacity];
}

// 初始化讀寫索引爲0
@Override
public ByteBuf setIndex(int readerIndex, int writerIndex) {
    if (readerIndex < 0 || readerIndex > writerIndex || writerIndex > capacity()) {
        throw new IndexOutOfBoundsException(String.format(
            "readerIndex: %d, writerIndex: %d (expected: 0 <= readerIndex <= writerIndex <= capacity(%d))",
            readerIndex, writerIndex, capacity()));
    }
    setIndex0(readerIndex, writerIndex);
    return this;
}

final void setIndex0(int readerIndex, int writerIndex) {
    this.readerIndex = readerIndex;
    this.writerIndex = writerIndex;
}

從源碼能夠得知,堆內存ByteBuf經過判斷系統環境是否支持unsafe來判斷是建立UnsafeHeapByteBuf仍是heapByteBuf; 若是支持unsafe則返回 InstrumentedUnpooledUnsafeHeapByteBuf 實例,反之則返回 InstrumentedUnpooledHeapByteBuf實例;但它們都是分配一個byte數組來進行存儲字節數據。

1.3 unsafe和非unsafe建立的ByteBuf有什麼區別呢

unsafe和非unsafe建立的heapByteBuf區別在於獲取數據;非unsafe獲取數據直接是經過數組索引來進行獲取的;而unsafe獲取數據則是經過UNSAFE操控內存來獲取;咱們能夠經過源碼來看看

heapByteBuf獲取數據

@Override
public byte getByte(int index) {
    ensureAccessible();
    return _getByte(index);
}

@Override
protected byte _getByte(int index) {
    return HeapByteBufUtil.getByte(array, index);
}

// 直接返回數組對應索引的值
static byte getByte(byte[] memory, int index) {
    return memory[index];
}

unsafeHeapByteBuf獲取數據

@Override
public byte getByte(int index) {
    checkIndex(index);
    return _getByte(index);
}

@Override
protected byte _getByte(int index) {
    return UnsafeByteBufUtil.getByte(array, index);
}

static byte getByte(byte[] array, int index) {
    return PlatformDependent.getByte(array, index);
}

public static byte getByte(byte[] data, int index) {
    return PlatformDependent0.getByte(data, index);
}

static byte getByte(byte[] data, int index) {
    // 經過unsafe來獲取
    return UNSAFE.getByte(data, BYTE_ARRAY_BASE_OFFSET + index);
}

2. direct內存分配:unsafe建立和非unsafe建立

2.1 建立directBuffer

// PlatformDependent檢測運行環境的變量屬性,好比java環境,unsafe是否支持等
@Override
protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
    final ByteBuf buf;
    // 支持unsafe
    if (PlatformDependent.hasUnsafe()) {
        // 運行環境是否使用不清空的direct內存
        buf = PlatformDependent.useDirectBufferNoCleaner() ?
            new InstrumentedUnpooledUnsafeNoCleanerDirectByteBuf(this, initialCapacity, maxCapacity) :
        new InstrumentedUnpooledUnsafeDirectByteBuf(this, initialCapacity, maxCapacity);
    } else {
        // 建立非unsafe實例ByteBuf
        buf = new InstrumentedUnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
    }
    return disableLeakDetector ? buf : toLeakAwareBuffer(buf);
}

2.2 unsafe返回 UnpooledUnsafeDirectByteBuf 實例,非unsafe返回 UnpooledDirectByteBuf實例

// 建立unsafe direct字節緩衝區
protected UnpooledUnsafeDirectByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
    
    // 設置最大容量
    super(maxCapacity);
    if (alloc == null) {
        throw new NullPointerException("alloc");
    }
    if (initialCapacity < 0) {
        throw new IllegalArgumentException("initialCapacity: " + initialCapacity);
    }
    if (maxCapacity < 0) {
        throw new IllegalArgumentException("maxCapacity: " + maxCapacity);
    }
    if (initialCapacity > maxCapacity) {
        throw new IllegalArgumentException(String.format(
            "initialCapacity(%d) > maxCapacity(%d)", initialCapacity, maxCapacity));
    }

    this.alloc = alloc;
    // allocateDirect建立DirectByteBuffer(java nio)分配內存
    setByteBuffer(allocateDirect(initialCapacity), false);
}

// 非unsafe建立direct內存
protected UnpooledDirectByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
    super(maxCapacity);
    if (alloc == null) {
        throw new NullPointerException("alloc");
    }
    if (initialCapacity < 0) {
        throw new IllegalArgumentException("initialCapacity: " + initialCapacity);
    }
    if (maxCapacity < 0) {
        throw new IllegalArgumentException("maxCapacity: " + maxCapacity);
    }
    if (initialCapacity > maxCapacity) {
        throw new IllegalArgumentException(String.format(
            "initialCapacity(%d) > maxCapacity(%d)", initialCapacity, maxCapacity));
    }

    this.alloc = alloc;
    setByteBuffer(ByteBuffer.allocateDirect(initialCapacity));
}

2.3 分配 direct內存,返回 java nio ByteBuffer實例

protected ByteBuffer allocateDirect(int initialCapacity) {
    return ByteBuffer.allocateDirect(initialCapacity);
}

/**
* Allocates a new direct byte buffer. 分配一個新的直接內存字節緩衝區
*
* <p> The new buffer's position will be zero, its limit will be its
* capacity, its mark will be undefined, and each of its elements will be
* initialized to zero.  Whether or not it has a
* {@link #hasArray backing array} is unspecified.
*
* @param  capacity
*         The new buffer's capacity, in bytes
*
* @return  The new byte buffer
*
* @throws  IllegalArgumentException
*          If the <tt>capacity</tt> is a negative integer
*/
public static ByteBuffer allocateDirect(int capacity) {
    return new DirectByteBuffer(capacity);
}

// allocateDirect建立DirectByteBuffer(java nio)
DirectByteBuffer(int cap) {                   // package-private
    // 設置文件描述, 位置等信息
    super(-1, 0, cap, cap);
    boolean pa = VM.isDirectMemoryPageAligned();
    int ps = Bits.pageSize();
    long size = Math.max(1L, (long)cap + (pa ? ps : 0));
    Bits.reserveMemory(size, cap);

    long base = 0;
    try {
        // 經過unsafe類來分配內存
        base = unsafe.allocateMemory(size);
    } catch (OutOfMemoryError x) {
        Bits.unreserveMemory(size, cap);
        throw x;
    }
    unsafe.setMemory(base, size, (byte) 0);
    if (pa && (base % ps != 0)) {
        // Round up to page boundary
        address = base + ps - (base & (ps - 1));
    } else {
        address = base;
    }
    // 實例化cleaner,用於後續回收
    cleaner = Cleaner.create(this, new Deallocator(base, size, cap));
    att = null;
}

2.4 設置ByteBuffer的屬性(unsafe和非unsafe)

unsafe設置ByteBuffer

/**
* buffer 數據字節
* tryFree 嘗試釋放,默認爲false
*/
final void setByteBuffer(ByteBuffer buffer, boolean tryFree) {
    if (tryFree) {
        // 全局buffer設置成舊buffer
        ByteBuffer oldBuffer = this.buffer;
        if (oldBuffer != null) {
            if (doNotFree) {
                doNotFree = false;
            } else {
                // 釋放舊緩衝區的內存
                freeDirect(oldBuffer);
            }
        }
    }
    // 將當前傳入的buffer設置成全局buffer
    this.buffer = buffer;
    // 記錄內存地址
    memoryAddress = PlatformDependent.directBufferAddress(buffer);
    // 將臨時buff設置爲null
    tmpNioBuf = null;
    // 記錄容量大小
    capacity = buffer.remaining();
}

// 獲取對象在內存中的地址
static long directBufferAddress(ByteBuffer buffer) {
    return getLong(buffer, ADDRESS_FIELD_OFFSET);
}

// 經過unsafe操控系統內存,獲取對象在內存中的地址
private static long getLong(Object object, long fieldOffset) {
    return UNSAFE.getLong(object, fieldOffset);
}

非unsafe設置ByteBuffer

// 非unsafe設置屬性
 private void setByteBuffer(ByteBuffer buffer) {
     ByteBuffer oldBuffer = this.buffer;
     if (oldBuffer != null) {
         if (doNotFree) {
             doNotFree = false;
         } else {
             freeDirect(oldBuffer);
         }
     }

     this.buffer = buffer;
     tmpNioBuf = null;
     capacity = buffer.remaining();
 }

根據源碼,direct經過判斷運行系統環境是否使用useDirectBufferNoCleaner來實例不一樣的ByteBufferedReader(unsafe和非unsafe),可是他們最終都是經過ByteBuffer來分配內存,底層都是經過在不一樣的ByteBuf實例中構建一個ByteBuffer來進行存儲字節數據的(具體能夠看看UnpooledDirectByteBuf的set方法)

2.5 unsafe和非unsafe建立的directByteBuf的區別

unsafe獲取數據:UNSAFE經過索引的內存地址來獲取對應的值

@Override
protected byte _getByte(int index) {
    // UnsafeByteBufUtil unsafe工具類獲取
    return UnsafeByteBufUtil.getByte(addr(index));
}

// addr 獲取索引的內存地址
long addr(int index) {
    return memoryAddress + index;
}

static byte getByte(long address) {
    return PlatformDependent.getByte(address);
}

public static byte getByte(long address) {
    return PlatformDependent0.getByte(address);
}

// 經過UNSAFE獲取內存地址的值
static byte getByte(long address) {
    return UNSAFE.getByte(address);
}

非unsafe獲取數據: 直接經過對應索引的ByteBuffer獲取值

@Override
public byte getByte(int index) {
    // 檢查受權,及ByteBuffer對象是否還有引用
    ensureAccessible();
    return _getByte(index);
}

@Override
protected byte _getByte(int index) {
    // 經過索引獲取值
    return buffer.get(index);
}

3. ByteBuf擴容

每次咱們再往字節緩衝區中寫入數據的時候都會判斷當前容量是否還能寫入數據,當發現容量不夠時,此時ByteBuf會總動進行擴容;固然咱們也能夠手動更改ByteBuf的容量;詳細見代碼分析。
public static void main(String[] args) {
    // 利用非池化Unpooled類建立字節緩衝區
    ByteBuf byteBuf = Unpooled.buffer(2);

    System.out.println("initCapacity: " + byteBuf.capacity());

    byteBuf.writeByte(66);
    byteBuf.writeByte(67);
    byteBuf.readBytes(1);
    System.out.println("readerIndex: " + byteBuf.readerIndex());
    System.out.println("writerIndex: " + byteBuf.writerIndex());

    // 丟棄已經閱讀的字節
    byteBuf.discardReadBytes();
    byteBuf.writeByte(68);
    byteBuf.writeByte(69);
    System.out.println("readerIndex: " + byteBuf.readerIndex());
    System.out.println("writerIndex: " + byteBuf.writerIndex());
    System.out.println("capacity: " + byteBuf.capacity());
}

// 運行結果
initCapacity: 2
readerIndex: 1
writerIndex: 2
readerIndex: 0
writerIndex: 3
capacity: 64

上面代碼的操做步驟:初始化ByteBuf --- 寫入數據 --- 讀取數據 --- 丟棄數據 --- 再寫入數據;

丟棄了一個字節數的數據又寫入了2個字節數的數據,初始化容量的緩衝區明顯不夠發生了自動擴容,擴容後的容量:64;它是怎麼進行擴容的呢?何時擴容的呢?看下源碼

public ByteBuf writeByte(int value) {
    // 確保能夠寫入(判斷是否容量夠不夠寫入)
    ensureWritable0(1);
    // 設置寫索引、存值
    _setByte(writerIndex++, value);
    return this;
}

// minWritableBytes默認爲1 由於writeByte每次只能寫入一個字節數
final void ensureWritable0(int minWritableBytes) {
    // 檢查是否還有佔有權和是否還有引用
    ensureAccessible();
    // writableBytes() = capacity - writerIndex 剩餘可寫容量
    if (minWritableBytes <= writableBytes()) {
        return;
    }
    // ByteBuf雖然支持自動擴容可是也有上限(Integer.MAX_VALUE)
    if (minWritableBytes > maxCapacity - writerIndex) {
        throw new IndexOutOfBoundsException(String.format(
            "writerIndex(%d) + minWritableBytes(%d) exceeds maxCapacity(%d): %s",
            writerIndex, minWritableBytes, maxCapacity, this));
    }

    // 開始進行擴容 newCapacity = writerIndex + minWritableBytes
    int newCapacity = alloc().calculateNewCapacity(writerIndex + minWritableBytes, maxCapacity);

    // 將新的容量寫入到ByteBuf
    capacity(newCapacity);
}

/**
* 計算新的容量
* minNewCapacity 寫入的最小容量
* maxCapacity 最大容量及Integer.MAX_VALUE 2147483647
*/
public int calculateNewCapacity(int minNewCapacity, int maxCapacity) {
    if (minNewCapacity < 0) {
        throw new IllegalArgumentException("minNewCapacity: " + minNewCapacity + " (expected: 0+)");
    }
    if (minNewCapacity > maxCapacity) {
        throw new IllegalArgumentException(String.format(
            "minNewCapacity: %d (expected: not greater than maxCapacity(%d)",
            minNewCapacity, maxCapacity));
    }
    // 4兆大小 4194304
    final int threshold = CALCULATE_THRESHOLD; // 4 MiB page

    if (minNewCapacity == threshold) {
        return threshold;
    }

    // 若是超過了4兆,
    if (minNewCapacity > threshold) {
        // 新的容量擴容爲超過的倍數的容量
        int newCapacity = minNewCapacity / threshold * threshold;
        // 若是超過了最大的容量則直接設置爲最大容量
        if (newCapacity > maxCapacity - threshold) {
            newCapacity = maxCapacity;
        } else {
            newCapacity += threshold;
        }
        return newCapacity;
    }

    // 默認擴容大小爲64
    int newCapacity = 64;
    while (newCapacity < minNewCapacity) {
        // 左移一位 newCapacity = newCapacity*2
        newCapacity <<= 1;
    }

    return Math.min(newCapacity, maxCapacity);
}

從上面的源碼可知,自動擴容在4兆的範圍內變化的話,每次擴容都是64 * 2的N字方(N >= 1); 一旦超過了4兆則遞增倍數爲(newCapacity / 4194304) * 4194304即表示的是基於4兆增加的倍數。

4. ByteBuf和ByteBuffer的區別

  • ByteBuf單獨維護讀寫兩個數組下標,ByteBuffer只有一個下標索引,讀寫的時候須要手動設置(調用flip和rewind)
  • ByteBuffer不支持動態擴容(final型),ByteBuf支持動態擴容上限爲Integer.MAX_VALUE
  • ByteBuf支持建立對外內存(direct內存)存儲數據
  • ByteBuf支持的Api更加豐富

雖然Netty中使用的ByteBuf來進行緩存字節數據,可是最後在Channel中仍是以ByteBuffer(java nio)來進行傳輸

參考文獻:

https://www.cnblogs.com/stateis0/p/9062152.html
https://www.jianshu.com/p/1585e32cf6b4
https://blog.csdn.net/ZBylant/article/details/83037421
相關文章
相關標籤/搜索