在進行數據傳輸的過程當中,咱們常常會用到緩衝區。
在Java NIO 爲咱們提供了原生的七種緩衝區實現,對應着Java 的七種基本類型。通常使用ByteBuffer較多。原生的Buffer雖然能知足咱們的平常使用,可是要進行復雜的應用的時候,確有點力不從心了,原生Buffer存在着如下缺點。所以Netty對其進行了封裝,提供了更爲友好的接口供咱們使用。java
Buffer只有一個位置標誌位屬性Position,咱們只能flip或者rewind方法來對position進行修改來處理數據的存取位置,一不當心就可能會致使錯誤。
Buffer只提供了存取、翻轉、釋放、標誌、比較、批量移動等緩衝區的基本操做,咱們想使用高級的功能,就得本身手動進行封裝及維護,使用很是不方便。
ByteBuf也是經過字節數組做爲緩衝區來存取數據,經過外觀模式聚合了JDK NIO元素的ByteBuffer,進行封裝。
ByteBuf是經過readerIndex跟writerIndex兩個位置指針來協助緩衝區的讀寫操做的。
在對象初始化的時候,readerIndex和writerIndex的值爲0,隨着讀操做和寫操做的進行,writerIndex和readerIndex都會增長,不過readerIndex不能超過writerIndex,在進行讀取操做以後,0到readerIndex之間的空間會被視爲discard,調用ByteBuf的discardReadBytes方法,能夠對這部分空間進行釋放重用,相似於ByteBuffer的compact操做,對緩衝區進行壓縮。readerIndex到writerIndex的空間,至關於ByteBuffer的position到limit的空間,能夠對其進行讀取,WriterIndex到capacity的空間,則至關於ByteBuffer的limit到capacity的空間,是能夠繼續寫入的。
readerIndex跟writerIndex讓讀寫操做的位置指針分離,不須要對同一個位置指針進行調整,簡化了緩衝區的讀寫操做。
一樣,ByteBuf對讀寫操做進行了封裝,提供了動態擴展的能力,當咱們對緩衝區進行寫操做的時候,須要對剩餘的可用空間進行校驗,若是可用空間不足,同時要寫入的字節數小於可寫的最大字節數,會對緩衝區進行動態擴展,它會從新建立一個緩衝區,而後將之前的數據複製到新建立的緩衝區中,數組
public ByteBuf readBytes(ByteBuf dst, int dstIndex, int length) { checkReadableBytes(length); getBytes(readerIndex, dst, dstIndex, length); readerIndex += length; return this; } protected final void checkReadableBytes(int minimumReadableBytes) { ensureAccessible(); if (minimumReadableBytes < 0) { throw new IllegalArgumentException("minimumReadableBytes: " + minimumReadableBytes + " (expected: >= 0)"); } if (readerIndex > writerIndex - minimumReadableBytes) { throw new IndexOutOfBoundsException(String.format( "readerIndex(%d) + length(%d) exceeds writerIndex(%d): %s", readerIndex, minimumReadableBytes, writerIndex, this)); } }
public ByteBuf writeBytes(byte[] src, int srcIndex, int length) { ensureWritable(length); setBytes(writerIndex, src, srcIndex, length); writerIndex += length; return this; } public ByteBuf ensureWritable(int minWritableBytes) { if (minWritableBytes < 0) { throw new IllegalArgumentException(String.format( "minWritableBytes: %d (expected: >= 0)", minWritableBytes)); } if (minWritableBytes <= writableBytes()) { return this; } if (minWritableBytes > maxCapacity - writerIndex) { throw new IndexOutOfBoundsException(String.format( "writerIndex(%d) + minWritableBytes(%d) exceeds maxCapacity(%d): %s", writerIndex, minWritableBytes, maxCapacity, this)); } // Normalize the current capacity to the power of 2. int newCapacity = calculateNewCapacity(writerIndex + minWritableBytes); // Adjust to the new capacity. capacity(newCapacity); return this; } private int calculateNewCapacity(int minNewCapacity) { final int maxCapacity = this.maxCapacity; final int threshold = 1048576 * 4; // 4 MiB page if (minNewCapacity == threshold) { return threshold; } // If over threshold, do not double but just increase by threshold. if (minNewCapacity > threshold) { int newCapacity = minNewCapacity / threshold * threshold; if (newCapacity > maxCapacity - threshold) { newCapacity = maxCapacity; } else { newCapacity += threshold; } return newCapacity; } // Not over threshold. Double up to 4 MiB, starting from 64. int newCapacity = 64; while (newCapacity < minNewCapacity) { newCapacity <<= 1; } return Math.min(newCapacity, maxCapacity); } //UnpooledHeapByteBuf的capacity實現 public ByteBuf capacity(int newCapacity) { ensureAccessible(); if (newCapacity < 0 || newCapacity > maxCapacity()) { throw new IllegalArgumentException("newCapacity: " + newCapacity); } int oldCapacity = array.length; if (newCapacity > oldCapacity) { byte[] newArray = new byte[newCapacity]; System.arraycopy(array, 0, newArray, 0, array.length); setArray(newArray); } else if (newCapacity < oldCapacity) { byte[] newArray = new byte[newCapacity]; int readerIndex = readerIndex(); if (readerIndex < newCapacity) { int writerIndex = writerIndex(); if (writerIndex > newCapacity) { writerIndex(writerIndex = newCapacity); } System.arraycopy(array, readerIndex, newArray, readerIndex, writerIndex - readerIndex); } else { setIndex(newCapacity, newCapacity); } setArray(newArray); } return this; }
public ByteBuf clear() { readerIndex = writerIndex = 0; return this; }
索引操做安全
緩衝區重用
能夠經過discardReadByte方法去重用已經讀取過的緩衝區。
首先對readerIndex進行判斷:app
public ByteBuf discardReadBytes() { ensureAccessible(); if (readerIndex == 0) { return this; } if (readerIndex != writerIndex) { setBytes(0, this, readerIndex, writerIndex - readerIndex); writerIndex -= readerIndex; adjustMarkers(readerIndex); readerIndex = 0; } else { adjustMarkers(readerIndex); writerIndex = readerIndex = 0; } return this; } protected final void adjustMarkers(int decrement) { int markedReaderIndex = this.markedReaderIndex; if (markedReaderIndex <= decrement) { this.markedReaderIndex = 0; int markedWriterIndex = this.markedWriterIndex; if (markedWriterIndex <= decrement) { this.markedWriterIndex = 0; } else { this.markedWriterIndex = markedWriterIndex - decrement; } } else { this.markedReaderIndex = markedReaderIndex - decrement; markedWriterIndex -= decrement; } }
當咱們須要跳過某些不須要的字節的時候,能夠調用skipBytes方法來跳過指定長度的字節來讀取後面的數據。
首先對跳躍長度進行判斷,若是跳躍長度小於0的話,會拋出IllegalArgumentException異常,或者跳躍長度大於當前緩衝區可讀長度的話,會拋出IndexOutOfBoundsException異常。若是校驗經過,新的readerindex爲原readerIndex+length,若是新的readerIndex大於writerIndex的話,會拋出IndexOutOfBoundsException異常,不然就更新readerIndex。函數
public ByteBuf skipBytes(int length) { checkReadableBytes(length); int newReaderIndex = readerIndex + length; if (newReaderIndex > writerIndex) { throw new IndexOutOfBoundsException(String.format( "length: %d (expected: readerIndex(%d) + length <= writerIndex(%d))", length, readerIndex, writerIndex)); } readerIndex = newReaderIndex; return this; }
AbstractReferenceCountedByteBuf是ByteBuf實現對引用進行計數的基類,用來跟蹤對象的分配和銷燬,實現自動內存回收。工具
public ByteBuf retain() { for (;;) { int refCnt = this.refCnt; if (refCnt == 0) { throw new IllegalReferenceCountException(0, 1); } if (refCnt == Integer.MAX_VALUE) { throw new IllegalReferenceCountException(Integer.MAX_VALUE, 1); } if (refCntUpdater.compareAndSet(this, refCnt, refCnt + 1)) { break; } } return this; } public final boolean release() { for (;;) { int refCnt = this.refCnt; if (refCnt == 0) { throw new IllegalReferenceCountException(0, -1); } if (refCntUpdater.compareAndSet(this, refCnt, refCnt - 1)) { if (refCnt == 1) { deallocate(); return true; } return false; } } }
UnpooledHeapByteBuf是一個非線程池實現的在堆內存進行內存分配的字節緩衝區,在每次IO操做的都會去建立一個UnpooledHeapByteBuf對象,若是頻繁地對內存進行分配或者釋放會對性能形成影響。源碼分析
public ByteBuf capacity(int newCapacity) { ensureAccessible(); if (newCapacity < 0 || newCapacity > maxCapacity()) { throw new IllegalArgumentException("newCapacity: " + newCapacity); } int oldCapacity = array.length; if (newCapacity > oldCapacity) { byte[] newArray = new byte[newCapacity]; System.arraycopy(array, 0, newArray, 0, array.length); setArray(newArray); } else if (newCapacity < oldCapacity) { byte[] newArray = new byte[newCapacity]; int readerIndex = readerIndex(); if (readerIndex < newCapacity) { int writerIndex = writerIndex(); if (writerIndex > newCapacity) { writerIndex(writerIndex = newCapacity); } System.arraycopy(array, readerIndex, newArray, readerIndex, writerIndex - readerIndex); } else { setIndex(newCapacity, newCapacity); } setArray(newArray); } return this; }
public ByteBuf setBytes(int index, byte[] src, int srcIndex, int length) { checkSrcIndex(index, length, srcIndex, src.length); System.arraycopy(src, srcIndex, array, index, length); return this; } protected final void checkSrcIndex(int index, int length, int srcIndex, int srcCapacity) { checkIndex(index, length); if (srcIndex < 0 || srcIndex > srcCapacity - length) { throw new IndexOutOfBoundsException(String.format( "srcIndex: %d, length: %d (expected: range(0, %d))", srcIndex, length, srcCapacity)); } }
public ByteBuffer nioBuffer(int index, int length) { ensureAccessible(); return ByteBuffer.wrap(array, index, length).slice(); }
在Netty4以後加入內存池管理,經過內存池管理比以前ByteBuf的建立性能獲得了極大提升。性能
PoolChunk主要負責內存塊的分配及釋放,chunk中的page會構建成一顆二叉樹,默認狀況下page的大小是8K,chunk的大小是2^11 page,即16M,構成了11層的二叉樹,最下面一層的葉子節點有8192個,與page的數目同樣,每一次內存的分配必須保證連續性,方便內存操做。每一個節點會記錄本身在Memory Area的偏移地址,當一個節點表示的內存區域被分配以後,那麼該節點會被標誌爲已分配,該節點的全部子節點的內存請求都會忽略。每次內存分配的都是8k(2^n)大小的內存塊,當須要分配大小爲chunkSize/(2^k)的內存端時,爲了找到可用的內存段,會從第K層左邊開始尋找可用節點。this
在內存分配中,爲了可以集中管理內存的分配及釋放,同時提供分配和釋放內存的性能,通常都是會先預先分配一大塊連續的內存,不須要重複頻繁地進行內存操做,那一大塊連續的內存就叫作memory Arena,而PoolArena是Netty的內存池實現類。
在Netty中,PoolArena是由多個Chunk組成的,而每一個Chunk則由多個Page組成。PoolArena是由Chunk和Page共同組織和管理的。線程
當對於小於一個Page的內存分配的時候,每一個Page會被劃分爲大小相等的內存塊,它的大小是根據第一次申請內存分配的內存塊大小來決定的。一個Page只能分配與第一次內存內存的內存塊的大小相等的內存塊,若是想要想要申請大小不想等的內存塊,只能在新的Page上申請內存分配了。
Page中的存儲區域的使用狀況是經過一個long數組bitmap來維護的,每一位表示一個區域的佔用狀況。
static PooledHeapByteBuf newInstance(int maxCapacity) { PooledHeapByteBuf buf = RECYCLER.get(); buf.reuse(maxCapacity); return buf; } final void reuse(int maxCapacity) { maxCapacity(maxCapacity); setRefCnt(1); setIndex0(0, 0); discardMarks(); }
public ByteBuf copy(int index, int length) { checkIndex(index, length); ByteBuf copy = alloc().directBuffer(length, maxCapacity()); copy.writeBytes(this, index, length); return copy; } public ByteBuf directBuffer(int initialCapacity, int maxCapacity) { if (initialCapacity == 0 && maxCapacity == 0) { return emptyBuf; } validate(initialCapacity, maxCapacity); return newDirectBuffer(initialCapacity, maxCapacity); } // PooledByteBufAllocator protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) { PoolThreadCache cache = threadCache.get(); PoolArena<ByteBuffer> directArena = cache.directArena; ByteBuf buf; if (directArena != null) { buf = directArena.allocate(cache, initialCapacity, maxCapacity); } else { if (PlatformDependent.hasUnsafe()) { buf = new UnpooledUnsafeDirectByteBuf(this, initialCapacity, maxCapacity); } else { buf = new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity); } } return toLeakAwareBuffer(buf); } //UnpooledByteBufAllocator protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) { ByteBuf buf; if (PlatformDependent.hasUnsafe()) { buf = new UnpooledUnsafeDirectByteBuf(this, initialCapacity, maxCapacity); } else { buf = new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity); } return toLeakAwareBuffer(buf); }
ByteBufHolder是ByteBuf的一個容器,它能夠更方便地訪問ByteBuf中的數據,在使用不一樣的協議進行數據傳輸的時候,不一樣的協議消息體包含的數據格式和字段不同,因此抽象一個ByteBufHolder對ByteBuf進行包裝,不一樣的子類有不一樣的實現,使用者能夠根據本身的須要進行實現。Netty提供了一個默認實現DefaultByteBufHolder。
ByteBufAllocator是字節緩衝區分配器,根據Netty字節緩衝區的實現不一樣,分爲兩種不一樣的分配器PooledByteBufAllocator和UnpooledByteBufAllocator。他們提供了不一樣ByteBuf的分配方法。
CompositeByteBuf是一個虛擬的Buffer,它能夠將多個ByteBuf組裝爲一個ByteBuf視圖。
在Java NIO中,咱們有兩種實現的方法
在Netty中,CompositeByByteBuf中維護了一個Component類型的集合。Component是ByteBuf的包裝類,它聚合了ByteBuf.維護在集合中的位置偏移量等信息。通常狀況下,咱們應該使用ByteBufAllocator.compositeBuffer()和Unpooled.wrappedBuffer(ByteBuf...)方法來建立CompositeByteBuf,而不是直接經過構造函數去實例化一個CompositeByteBuf對象。
private int addComponent0(int cIndex, ByteBuf buffer) { checkComponentIndex(cIndex); if (buffer == null) { throw new NullPointerException("buffer"); } int readableBytes = buffer.readableBytes(); // No need to consolidate - just add a component to the list. Component c = new Component(buffer.order(ByteOrder.BIG_ENDIAN).slice()); if (cIndex == components.size()) { components.add(c); if (cIndex == 0) { c.endOffset = readableBytes; } else { Component prev = components.get(cIndex - 1); c.offset = prev.endOffset; c.endOffset = c.offset + readableBytes; } } else { components.add(cIndex, c); if (readableBytes != 0) { updateComponentOffsets(cIndex); } } return cIndex; } private void consolidateIfNeeded() { final int numComponents = components.size(); if (numComponents > maxNumComponents) { final int capacity = components.get(numComponents - 1).endOffset; ByteBuf consolidated = allocBuffer(capacity); for (int i = 0; i < numComponents; i ++) { Component c = components.get(i); ByteBuf b = c.buf; consolidated.writeBytes(b); c.freeIfNecessary(); } Component c = new Component(consolidated); c.endOffset = c.length; components.clear(); components.add(c); } } public CompositeByteBuf removeComponent(int cIndex) { checkComponentIndex(cIndex); Component comp = components.remove(cIndex); comp.freeIfNecessary(); if (comp.length > 0) { updateComponentOffsets(cIndex); } return this; } private static final class Component { final ByteBuf buf; final int length; int offset; int endOffset; Component(ByteBuf buf) { this.buf = buf; length = buf.readableBytes(); } void freeIfNecessary() { buf.release(); // We should not get a NPE here. If so, it must be a bug. } }
ByteBufUtil是ByteBuf的工具類,它提供了一系列的靜態方法來操做ByteBuf。