咱們知道,Java NIO的ByteBuffer只有一個position指針標識位置,讀寫切換時須要調用flip()方法,這樣容易出錯。而Netty爲了解決這個問題,使用了兩個指針readerIndex、writerIndex。固然,Netty的ByteBuf的功能不只僅如此,讓咱們一塊兒看看Netty的ButeBuf設計。後端
1、ByteBuf設計原理
1. 讀寫指針設計
ByteBuf經過兩個指針協助緩衝區的讀寫操做,讀操做用readerIndex,寫操做用writerIndex。數組
readerIndex和writerIndex初始值都是0,隨着寫入writerIndex會增長,一樣的,隨着讀取readerIndex會增長,可是readerIndex不能大於writerInder。併發
當讀取了必定數據後,0 ~ readerIndex之間就被視爲discard的,調用discardReadByte 方法,能夠釋放這段空間,令readerIndex = 0,writerIndex = writerIndex - discard。app
這樣的設計致使讀寫互不干擾,讀寫切換再也不須要調整位置指針,極大的簡化了緩衝區讀寫操做。性能
2. 擴容設計
咱們知道,Java NIO的ByteBuffer底層是數組,它自己並不提供擴容操做,若是緩衝區剩餘可寫空間不足,就會發生BufferOverflowExeption。this
public ByteBuffer put(byte[] src, int offset, int length) { checkBounds(offset, length, src.length); // 全部的put操做都有這個校驗,可用空間不足直接拋異常 if (length > remaining()) throw new BufferOverflowException(); int end = offset + length; for (int i = offset; i < end; i++) this.put(src[i]); return this; } public final int remaining() { return limit - position; }
爲了不這個問題,須要在put操做時先對剩餘可用空間校驗,若是剩餘空間不足,須要本身建立新的ByteBuffer,並將以前的ByteBuffer copy過來,這樣對使用者很不友善。而Netty的ByteBuffer對write操做進行了封裝,由Netty作緩衝區剩餘空間校驗,若是可用緩衝區不足,ByteBuf會自動進行動態擴展。spa
2、ByteBuf主要繼承關係
從內存分配角度來看,ByteBuf分爲兩類:設計
1)堆內存字節緩衝區:在堆內存中分配,能夠被JVM自動回收,可是在進行Socket的IO讀寫時,須要多作一次內存複製(堆內存中的緩衝區複製到內核Channel中),性能會有所降低。指針
2)直接內存字節緩衝區:在堆外進行內存分配,須要本身分配內存及回收,可是它在寫入或從Socket Channel中讀取時,少一次內存複製,性能比堆內存字節緩衝區要高。code
tips:在IO通信的讀寫緩衝區使用直接內存字節緩衝區,在後端業務消息編解碼模塊中使用堆內存字節緩衝區,可使性能達到最優。
從內存回收的角度來看,ByteBuf也分爲兩類:
1)基於對象池的ByteBuf,它的特色就是能夠重用ByteBuf。基於對象池的ByteBuf本身維護了一個內存池,能夠循環利用建立的ByteBuf,提高內存使用效率。
2)普通的ByteBuf,不能重用,每次都要新建立一個ByteBuf。
3、AbstractByteBuf
1. 成員變量
// 用於檢測對象是否泄漏 static final ResourceLeakDetector<ByteBuf> leakDetector = new ResourceLeakDetector<ByteBuf>(ByteBuf.class); // 讀操作 和 寫操做 的位置指針 int readerIndex; private int writerIndex; // 讀操做 和 寫操做 的標記,能夠經過reset()回到標記的地方 private int markedReaderIndex; private int markedWriterIndex; // 最大容量 private int maxCapacity; // private SwappedByteBuf swappedBuf;
2. 讀操做
首先檢查入參,若是要讀的數據長度小於0,說明參數傳錯了,拋IllegalArgumentException異常;若是要讀的數據長度大於可讀數據長度,拋IndexOutOfBoundsException。
校驗經過後,進行讀操做,這個由子類實現。
最後,調整readerIndex指針。
public ByteBuf readBytes(byte[] dst, int dstIndex, int length) { checkReadableBytes(length); // 由子類實現 getBytes(readerIndex, dst, dstIndex, length); readerIndex += length; return this; } protected final void checkReadableBytes(int minimumReadableBytes) { ensureAccessible(); if (minimumReadableBytes < 0) { throw new IllegalArgumentException("minimumReadableBytes: " + minimumReadableBytes + " (expected: >= 0)"); } if (readerIndex > writerIndex - minimumReadableBytes) { throw new IndexOutOfBoundsException(String.format( "readerIndex(%d) + length(%d) exceeds writerIndex(%d): %s", readerIndex, minimumReadableBytes, writerIndex, this)); } }
3. 寫操做
首先作數據校驗及緩衝區可用性校驗。
若是可用緩衝區容量不足以放下整個byte數組,則須要擴容。擴容時須要先計算新的ByteBuf容量並建立,而後將老的ByteBuf複製到新的ByteBuf中。
最後纔是寫操做。
public ByteBuf writeBytes(byte[] src, int srcIndex, int length) { // 數據校驗及擴容 ensureWritable(length); // 由子類實現 setBytes(writerIndex, src, srcIndex, length); writerIndex += length; return this; } public ByteBuf ensureWritable(int minWritableBytes) { // 校驗入參 if (minWritableBytes < 0) { throw new IllegalArgumentException(String.format( "minWritableBytes: %d (expected: >= 0)", minWritableBytes)); } // 若是可寫長度大於待寫長度,直接寫便可 if (minWritableBytes <= writableBytes()) { return this; } // 若是待寫長度大於最多可寫長度(擴容也無法知足),直接拋異常 if (minWritableBytes > maxCapacity - writerIndex) { throw new IndexOutOfBoundsException(String.format( "writerIndex(%d) + minWritableBytes(%d) exceeds maxCapacity(%d): %s", writerIndex, minWritableBytes, maxCapacity, this)); } // 計算新的ByteBuf的容量(找一個合適的新容量) int newCapacity = calculateNewCapacity(writerIndex + minWritableBytes); // 重建新的緩衝區,並將老的緩衝區的數據copy過去,交給子類實現 capacity(newCapacity); return this; } private int calculateNewCapacity(int minNewCapacity) { final int maxCapacity = this.maxCapacity; final int threshold = 1048576 * 4; // 步長4MB if (minNewCapacity == threshold) { return threshold; } // If over threshold, do not double but just increase by threshold. if (minNewCapacity > threshold) { int newCapacity = minNewCapacity / threshold * threshold;//讓它成爲4MB的整數倍 if (newCapacity > maxCapacity - threshold) { newCapacity = maxCapacity; } else { newCapacity += threshold; } return newCapacity; } // 比4MB小,採用64K步進的方式擴容,避免內存浪費 int newCapacity = 64; while (newCapacity < minNewCapacity) { newCapacity <<= 1; } return Math.min(newCapacity, maxCapacity); }
4. 釋放已讀緩衝區
前面說到,0 ~ readerIndex之間的數據已讀取過,這一段被視爲discard的,咱們能夠調用discardReadBytes()方法,釋放這部分緩衝區,達到緩衝區重用的目的。
可是,discardReadBytes()方法原理是數組拷貝,在執行這個方法時你須要先判斷是否值得這樣作。
public ByteBuf discardReadBytes() { ensureAccessible(); // 若是等於0,說明沒有可釋放緩衝區 if (readerIndex == 0) { return this; } if (readerIndex != writerIndex) { // 子類實現 setBytes(0, this, readerIndex, writerIndex - readerIndex); writerIndex -= readerIndex; // 同時調整markReaderIndex和markWriterIndex adjustMarkers(readerIndex); readerIndex = 0; } else { adjustMarkers(readerIndex); writerIndex = readerIndex = 0; } return this; } protected final void adjustMarkers(int decrement) { int markedReaderIndex = this.markedReaderIndex; if (markedReaderIndex <= decrement) { this.markedReaderIndex = 0; int markedWriterIndex = this.markedWriterIndex; if (markedWriterIndex <= decrement) { this.markedWriterIndex = 0; } else { this.markedWriterIndex = markedWriterIndex - decrement; } } else { this.markedReaderIndex = markedReaderIndex - decrement; markedWriterIndex -= decrement; } }
5. skipBytes(..)
在解碼的時候,有時須要丟棄非法數據包,獲取跳過不須要讀取的字節碼,此時可使用skipByte(..)方法,忽略指定長度的字節數組。
public ByteBuf skipBytes(int length) { // 校驗入參 checkReadableBytes(length); int newReaderIndex = readerIndex + length; if (newReaderIndex > writerIndex) { throw new IndexOutOfBoundsException(String.format( "length: %d (expected: readerIndex(%d) + length <= writerIndex(%d))", length, readerIndex, writerIndex)); } readerIndex = newReaderIndex; return this; }
4、AbstractReferenceCountedByteBuf
從名字能夠看出,這個類的做用主要是對ButeBuf引用進行計數,用於跟蹤對象的分配及銷燬。
1. 成員變量
// 併發包中的類,對 AbstractReferenceCountedByteBuf 中的 refCnt,進行原子化操做 private static final AtomicIntegerFieldUpdater<AbstractReferenceCountedByteBuf> refCntUpdater = AtomicIntegerFieldUpdater.newUpdater(AbstractReferenceCountedByteBuf.class, "refCnt"); // refCnt的偏移量,也就是 refCnt 在AbstractReferenceCountedByteBuf中的內存地址 private static final long REFCNT_FIELD_OFFSET; static { long refCntFieldOffset = -1; try { if (PlatformDependent.hasUnsafe()) { refCntFieldOffset = PlatformDependent.objectFieldOffset( AbstractReferenceCountedByteBuf.class.getDeclaredField("refCnt")); } } catch (Throwable t) { // Ignored } REFCNT_FIELD_OFFSET = refCntFieldOffset; } //對象引用次數 @SuppressWarnings("FieldMayBeFinal") private volatile int refCnt = 1;
2. 對象引用計數器
每調用一次 retain() 方法,引用計數器就加1。
因爲 refCnt 初始值爲1,每次申請加1,釋放減1,當申請數等於釋放數時,對象被回收,故 refCnt 不可能爲0。若是爲0,說明對象被錯誤、意外的引用了,拋出異常。
若是引用計數器達到整形最大值,則直接拋異常,除非是惡意破壞,不然不會出現這種狀況吧。
最後就是對 refCnt 作原子性的 cas 操做。
public ByteBuf retain() { for (;;) { int refCnt = this.refCnt; if (refCnt == 0) { throw new IllegalReferenceCountException(0, 1); } if (refCnt == Integer.MAX_VALUE) { throw new IllegalReferenceCountException(Integer.MAX_VALUE, 1); } if (refCntUpdater.compareAndSet(this, refCnt, refCnt + 1)) { break; } } return this; }
有增必有減,咱們看一下 refCnt 減小的代碼。
當對象被釋放時,refCnt 減1,當減到 1 時,說明申請數等於釋放數,須要將該對象回收掉。
public final boolean release() { for (;;) { int refCnt = this.refCnt; if (refCnt == 0) { throw new IllegalReferenceCountException(0, -1); } if (refCntUpdater.compareAndSet(this, refCnt, refCnt - 1)) { if (refCnt == 1) { // 垃圾回收 deallocate(); return true; } return false; } } }
5、UnpooledHeapByteBuf
前面的ByteBuf分類描述,咱們能夠判斷,UnpooledHeapByteBuf 就是普通的堆內存ByteBuf,沒有內存池,沒有堆外內存,使用起來更不容易出現內存管理方面的問題。
1. 成員變量
// 用於UnpooledHeapByteBuf內存分配 private final ByteBufAllocator alloc; // 緩衝區 private byte[] array; // Java NIO的ByteBuffer,用於Netty的ByteBuf到NIO的ByteBuffer轉換 private ByteBuffer tmpNioBuf;
2. 動態擴展緩衝區
在 AbstractByteBuf 中咱們提到,動態擴展緩衝區的操做是交給子類完成的,這裏咱們看一下 UnpooledHeapByteBuf 是怎麼作的。
首先對新的byte數組作校驗,而後進行ByteBuf重建。
這裏分爲三種狀況,
1)newCapacity > oldCapacity,直接建立一個新數組,拷貝過去就好了
2)newCapacity == oldCapacity,不作處理
3)newCapacity < oldCapacity,先判斷readerIndex,若是readerIndex大於等於newCapacity,說明沒有數據須要複製到緩衝區,直接設置readerIndex和writerIndex的值爲newCapacity便可;當readerIndex小於newCapacity時,readerIndex到writerIndex之間的數據須要複製到新的byte數組,這個時候,若是writerIndex - readerIndex > newCapacity,就會發生數組下標越界,爲了防止越界,當writerIndex > newCapacity時,令writerIndex = newCapacity,而後作 byte 數組賦值操做。最後,替換掉ByteBuf中持有的 byte數組引用,並令NIO 的 ByteBuffer爲 null。
public ByteBuf capacity(int newCapacity) { ensureAccessible(); // 1. 對入參作合法性校驗 if (newCapacity < 0 || newCapacity > maxCapacity()) { throw new IllegalArgumentException("newCapacity: " + newCapacity); } int oldCapacity = array.length; if (newCapacity > oldCapacity) { // 2. byte數組copy,而後替換掉原來的byte數組 byte[] newArray = new byte[newCapacity]; System.arraycopy(array, 0, newArray, 0, array.length); setArray(newArray); } else if (newCapacity < oldCapacity) { // 若是新容量小於老容量,則不須要動態擴展,可是須要截取當前緩衝區建立一個新的子緩衝區 byte[] newArray = new byte[newCapacity]; int readerIndex = readerIndex(); if (readerIndex < newCapacity) { int writerIndex = writerIndex(); if (writerIndex > newCapacity) { // 若是writerIndex大於newCapacity,則有可能發生越界,這裏直接截斷 writerIndex(writerIndex = newCapacity); } System.arraycopy(array, readerIndex, newArray, readerIndex, writerIndex - readerIndex); } else { // 若是readerIndex大於等於新的capacity,說明沒有數據須要複製到新緩衝區,直接將readerIndex和writerIndex設置爲newCapacity便可 setIndex(newCapacity, newCapacity); } setArray(newArray); } return this; } private void setArray(byte[] initialArray) { array = initialArray; tmpNioBuf = null; }
3. 字節數組複製
在AbstractByteBuf中的讀寫操做中,具體的讀寫操做由子類實現,咱們來看一下 UnpooledHeapByteBuf 是怎麼作的。
在寫操做中,首先檢查入參,而後將數據 copy 至 ByteBuf 的 byte 數組中。
在讀操做中,也是先檢查入參,而後將 ByteBuf 的 byte 數組 copy到指定的byte數組裏面。
public ByteBuf setBytes(int index, byte[] src, int srcIndex, int length) { // 根據AbstractByteBuf的寫操做可知,index爲writerIndex checkSrcIndex(index, length, srcIndex, src.length); System.arraycopy(src, srcIndex, array, index, length); return this; } protected final void checkSrcIndex(int index, int length, int srcIndex, int srcCapacity) { checkIndex(index, length); if (srcIndex < 0 || srcIndex > srcCapacity - length) { throw new IndexOutOfBoundsException(String.format( "srcIndex: %d, length: %d (expected: range(0, %d))", srcIndex, length, srcCapacity)); } } protected final void checkIndex(int index, int fieldLength) { ensureAccessible(); if (fieldLength < 0) { throw new IllegalArgumentException("length: " + fieldLength + " (expected: >= 0)"); } // writerIndex + length > capacity,數組下表越界 if (index < 0 || index > capacity() - fieldLength) { throw new IndexOutOfBoundsException(String.format( "index: %d, length: %d (expected: range(0, %d))", index, fieldLength, capacity())); } }
// 讀操做時,將字節數組copy出去
public ByteBuf getBytes(int index, byte[] dst, int dstIndex, int length) { checkDstIndex(index, length, dstIndex, dst.length); System.arraycopy(array, index, dst, dstIndex, length); return this; }
4. Netty 的 ByteBuf 轉換爲 NIO 的 ByteNuffer
利用byte數組建立一個新的ByteBuffer,並調用slice方法,清除 discard 區域。
public ByteBuffer nioBuffer(int index, int length) { ensureAccessible(); // slice():copy一個原來的position到limit之間的有效數據,建立一個新的ByteBuffer return ByteBuffer.wrap(array, index, length).slice(); } public static ByteBuffer wrap(byte[] array, int offset, int length) { try { return new HeapByteBuffer(array, offset, length); } catch (IllegalArgumentException x) { throw new IndexOutOfBoundsException(); } }
6、UnpooledDirectByteBuf
與UnpooledHeapByteBuf不一樣,UnpooledDIrectByteBuf是基於堆外內存建立的。
1. 成員變量
這裏跟 UnpooledHeapByteBuf 最大的不一樣就是,這裏使用 ByteBuffer 存儲數據,而 UnpooledHeapByteBuf 使用字節數組。另外一個不一樣就是,這裏的ByteBuffer使用的是NIO的DirectByteBuffer,須要本身手動釋放內存。
// ByteBuf內存分配 private final ByteBufAllocator alloc; // 這裏跟UnpooledHeapByteBuf不一樣,這裏使用的是NIO的ByteBuffer存儲字節數組 private ByteBuffer buffer; private ByteBuffer tmpNioBuf; private int capacity;
//用於標記ByteBuffer是否釋放了(這裏使用堆外內存建立ByteBuffer,須要本身作垃圾回收) private boolean doNotFree;
2. 動態擴展緩衝區
這裏的設計跟UnpooledHeapByteBuf是同樣的,不一樣的是這裏使用的是ByteBuffer而不是byte數組。
public ByteBuf capacity(int newCapacity) { ensureAccessible(); // 1. 校驗粗人慘 if (newCapacity < 0 || newCapacity > maxCapacity()) { throw new IllegalArgumentException("newCapacity: " + newCapacity); } int readerIndex = readerIndex(); int writerIndex = writerIndex(); int oldCapacity = capacity; if (newCapacity > oldCapacity) { // 這裏直接建立一個新的ByteBuffer,將老的ByteBuffer數據copy過去 ByteBuffer oldBuffer = buffer;
// 建立一個DirectByteBuffer ByteBuffer newBuffer = allocateDirect(newCapacity);
// 設置position和limit的值 oldBuffer.position(0).limit(oldBuffer.capacity()); newBuffer.position(0).limit(oldBuffer.capacity()); newBuffer.put(oldBuffer); newBuffer.clear(); // 替換老的ByteBuffer並釋放掉老的ByteBuffer setByteBuffer(newBuffer); } else if (newCapacity < oldCapacity) { // 這裏跟UnpooledHeapByteBuf處理是同樣的,詳細看UnpooledHeapByteBuf ByteBuffer oldBuffer = buffer; ByteBuffer newBuffer = allocateDirect(newCapacity); if (readerIndex < newCapacity) { if (writerIndex > newCapacity) { writerIndex(writerIndex = newCapacity); } oldBuffer.position(readerIndex).limit(writerIndex); newBuffer.position(readerIndex).limit(writerIndex); newBuffer.put(oldBuffer); newBuffer.clear(); } else { setIndex(newCapacity, newCapacity); } setByteBuffer(newBuffer); } return this; } // 建立DirectByteBuffer protected ByteBuffer allocateDirect(int initialCapacity) { return ByteBuffer.allocateDirect(initialCapacity); } private void setByteBuffer(ByteBuffer buffer) { ByteBuffer oldBuffer = this.buffer; if (oldBuffer != null) { if (doNotFree) { doNotFree = false; } else { // 釋放oldByteBuffer freeDirect(oldBuffer); } } this.buffer = buffer; tmpNioBuf = null; capacity = buffer.remaining(); }
3. 字節數組複製
咱們先看寫操做的setBytes()方法,一樣的,先進行參數校驗,而後建立一個臨時的ByteBuffer,這個ByteBuffer與 buffer 的 content 共用,往 tmpBuf 中寫數據至關於往 buffer 中寫數據。
public ByteBuf setBytes(int index, byte[] src, int srcIndex, int length) { // 參數校驗 checkSrcIndex(index, length, srcIndex, src.length); // 建立一個臨時的tmpBuf ByteBuffer tmpBuf = internalNioBuffer(); tmpBuf.clear().position(index).limit(index + length); tmpBuf.put(src, srcIndex, length); return this; } private ByteBuffer internalNioBuffer() { ByteBuffer tmpNioBuf = this.tmpNioBuf; if (tmpNioBuf == null) { // 令tempNioBuf和buffer共用同一個ByteBuffer內容,修改了tmpNioByteBuf,也等同於修改了buffer // 可是它們的position、limit都是獨立的 this.tmpNioBuf = tmpNioBuf = buffer.duplicate(); } return tmpNioBuf; }
而後再來看讀操做的getBytes()方法,一樣的,先檢查入參,而後建立出一個臨時的 ByteBuffer,由這個臨時的 ByteBuffer 作讀操做。
public ByteBuf readBytes(byte[] dst, int dstIndex, int length) { checkReadableBytes(length); getBytes(readerIndex, dst, dstIndex, length, true); readerIndex += length; return this; } private void getBytes(int index, byte[] dst, int dstIndex, int length, boolean internal) { checkDstIndex(index, length, dstIndex, dst.length); if (dstIndex < 0 || dstIndex > dst.length - length) { throw new IndexOutOfBoundsException(String.format( "dstIndex: %d, length: %d (expected: range(0, %d))", dstIndex, length, dst.length)); } ByteBuffer tmpBuf; if (internal) { tmpBuf = internalNioBuffer(); } else { tmpBuf = buffer.duplicate(); } tmpBuf.clear().position(index).limit(index + length); tmpBuf.get(dst, dstIndex, length); }
4. Netty 的 ByteBuf 轉換爲 NIO 的 ByteNuffer
這裏直接拿buufer的content建立一個新的ByteBuffer。
public ByteBuffer nioBuffer(int index, int length) { return ((ByteBuffer) buffer.duplicate().position(index).limit(index + length)).slice(); } public ByteBuffer duplicate() { return new DirectByteBuffer(this, this.markValue(), this.position(), this.limit(), this.capacity(), 0); }
7、PooledDirectByteBuf
PooledDirectByteBuf基於內存池實現,與UnpooledDirectByteBuf惟一的不一樣就是緩衝區的分配和銷燬策略。
1. 建立字節緩衝區
因爲採用內存池實現,因此新建實例的時候不能使用 new 建立,而是從內存池中獲取,而後設置引用計數器的值。
static PooledDirectByteBuf newInstance(int maxCapacity) { PooledDirectByteBuf buf = RECYCLER.get(); buf.setRefCnt(1); buf.maxCapacity(maxCapacity); return buf; }
2. 複製新的字節緩衝區
一樣的,複製新字節緩衝區時,也須要經過內存池建立一個字節緩衝區,而後執行復制操做。
public ByteBuf copy(int index, int length) { // 參數校驗 checkIndex(index, length); // 從內存池中建立一個ByteBuf ByteBuf copy = alloc().directBuffer(length, maxCapacity()); // 複製操做 copy.writeBytes(this, index, length); return copy; }
8、PooledHeapByteBuf
PooledHeapByteBuf 與 PooledDirectByteBuf 不一樣的地方在於建立對象時使用的是byte數組而不是ByteBuffer,這裏咱們就不在討論了。