本文是永順大牛寫的系列教程《源碼之下無祕密 ── 作最好的 Netty 源碼分析教程》的續寫章節。本章主要介紹Netty中用來承接數據的ByteBuf的底層實現原理。java
永順前輩已寫完的章節有以下:算法
筆者嘗試續寫的章節:segmentfault
本文使用的netty版本爲4.1.33.Finalapi
咱們在《Java NIO 的前生今世 之三 NIO Buffer 詳解》以及《認識 Java NIO》已經詳細瞭解了NIO Buffer。這裏先回憶下NIO Buffer的一些特性:數組
Netty的ByteBuf的底層實現有些許相似,但相比ByteBuffer實現了很是多擴展,並摒棄了一些不足:緩存
咱們先來看看ByteBuf的類圖:
安全
ByteBuf以及其子類的命名很是規整,僅從名字上咱們就能夠將各個子類劃分爲如下幾類:bash
以上各類類型的都會在下文展開分析。服務器
相似NIO ByteBuffer,ByteBuf底層實現也是字節數組,也一樣由讀寫指針來控制讀寫位置。在ByteBuf的繼承類AbstractByteBuf中定義瞭如下讀寫指針字段:ide
// 當前讀指針 int readerIndex; // 當前寫指針 int writerIndex; // 暫存的讀指針 private int markedReaderIndex; // 暫存的寫指針 private int markedWriterIndex; // 最大容量 private int maxCapacity;
須要注意的事maxCapacity是對底層字節數組進行擴容的最大容量,並非當前容量capacity。經過這幾個指針,其實能夠將字節數組劃分爲如下幾部分:
有以下性質:
有如上性質,能夠推出如下ByteBuf的一些方法實現:
更多方法見下。
ByteBuf繼承了Comparable和RefrenceCounted,其中後者是支持引用計數的接口,它的核心方法包含兩個:
// 引用數加1 ReferenceCounted retain(); // 引用數減1,若是引用數減爲0,則釋放該對象。 // 若是該對象被釋放則返回true,不然返回false。 // 注意:子類實現實際上是減2,後文會提到。 boolean release();
再來看ByteBuf的核心方法:
// 1.返回當前容量 public abstract int capacity(); // 2.調整當前容量 public abstract ByteBuf capacity(int newCapacity); // 3.最大容量(capacity的最大上限) public abstract int maxCapacity();
注意 capacity() <= maxCapacity()。
// 讀寫指針相關方法 // 1.獲取當前讀指針 public abstract int readerIndex(); // 2.設置當前讀指針 public abstract ByteBuf readerIndex(int readerIndex); // 3.獲取當前寫指針 public abstract int writerIndex(); // 4.設置當前寫指針 public abstract ByteBuf writerIndex(int writerIndex); // 5.同時設置讀寫指針 public abstract ByteBuf setIndex(int readerIndex, int writerIndex); // 6.獲取可讀字節數(this.writerIndex - this.readerIndex) public abstract int readableBytes(); // 7.獲取可寫字節數(this.capacity - this.writerIndex) public abstract int writableBytes(); // 8.獲取最大可寫字節數 (this.maxCapacity - this.writerIndex)} public abstract int maxWritableBytes(); // 9.是否可讀(this.writerIndex - this.readerIndex) public abstract boolean isReadable(); // 10.是否可寫(this.capacity - this.writerIndex) public abstract boolean isWritable(); // 11.清空(至關於setIndex(0, 0)) public abstract ByteBuf clear(); // 12.記錄讀指針 public abstract ByteBuf markReaderIndex(); // 13.從記錄中恢復讀指針 public abstract ByteBuf resetReaderIndex(); // 14.記錄寫指針 public abstract ByteBuf markWriterIndex(); // 15.從記錄中恢復寫指針 public abstract ByteBuf resetWriterIndex(); // 16.丟棄已讀字節 public abstract ByteBuf discardReadBytes();
上述方法都是圍繞着readerIndex、writerIndex、capital、maxcapital等四個值衍生的方法。實現都很是相似而簡單。
// 隨機讀寫數據 // ... 這部分相似的方法很是多,如下只列舉一部分 ... // 1.從指定位置讀取數據 public abstract boolean getBoolean(int index); public abstract short getUnsignedByte(int index); public abstract short getShort(int index); public abstract int getUnsignedShort(int index); public abstract int getInt(int index); public abstract long getLong(int index); public abstract double getDouble(int index); public abstract short getShortLE(int index);(LE:Little Endian byte order,表示小端序,下同) public abstract int getIntLE(int index); public abstract long getLongLE(int index); // 略... // 2.在指定位置寫入數據 public abstract ByteBuf setBoolean(int index, boolean value); public abstract ByteBuf setByte(int index, int value); public abstract ByteBuf setShortLE(int index, int value); public abstract ByteBuf setInt(int index, int value); public abstract ByteBuf setIntLE(int index, int value); // 略...
上述方法支持指定位置的讀寫數據,其中讀數據並不會改變指針值。
// 1. 在readerIndex位置讀取數據並移動指針 public abstract boolean readBoolean(); public abstract byte readByte(); public abstract short readShort(); public abstract short readShortLE(); public abstract int readInt(); public abstract int readIntLE(); // 略... // 2. 在位置寫入數據並移動指針 public abstract ByteBuf writeBoolean(boolean value); public abstract ByteBuf writeByte(int value); public abstract ByteBuf writeShort(int value); public abstract ByteBuf writeShortLE(int value); public abstract ByteBuf writeInt(int value); public abstract ByteBuf writeIntLE(int value); // 略...
上述方法從讀(或寫)指針位置順序日後讀(或寫)數據,並移動讀(或寫)指針。
public abstract ByteBuf slice(); public abstract ByteBuf slice(int index, int length); public abstract ByteBuf duplicate(); public abstract ByteBuf retainedSlice(); // 更新引用計數 public abstract ByteBuf retainedDuplicate(); // 更新引用計數
ByteBuf支持分片獲取,實現快速的低成本淺複製。
// 判斷底層是否爲NIO direct buffer public abstract boolean isDirect();
上一節咱們提到了ByteBuf支持淺複製分片,其中分爲slice淺複製和duplicate淺複製。duplicate與slice的區別是,duplicate是對整個ByteBuf的淺複製,而slice只是對ByteBuf中的一部分進行淺複製。
ByteBuf的淺複製分片其實就是與原來的ByteBuf共享同一個存儲空間,而且也能夠被多個分片同時共享。以slice(int index, int length)方法爲例:
// io.netty.buffer.AbstractByteBuf.java @Override public ByteBuf slice(int index, int length) { ensureAccessible(); return new UnpooledSlicedByteBuf(this, index, length); }
slice方法內很是簡單,只是新建了一個分片對象UnpooledSlicedByteBuf,構造函數傳入了當前ByteBuf(this)、開始索引(index)以及分片長度(length);
在父類的構造行數裏,對該分片對象進行了初始化:
// 被分片的ByteBuf private final ByteBuf buffer; // 偏移量 private final int adjustment; AbstractUnpooledSlicedByteBuf(ByteBuf buffer, int index, int length) { super(length); checkSliceOutOfBounds(index, length, buffer); if (buffer instanceof AbstractUnpooledSlicedByteBuf) { // 若是傳入的是slice分片,則須要疊加其偏移量 this.buffer = ((AbstractUnpooledSlicedByteBuf) buffer).buffer; adjustment = ((AbstractUnpooledSlicedByteBuf) buffer).adjustment + index; } else if (buffer instanceof DuplicatedByteBuf) { // 若是傳入的是dulicated分片,不須要疊加(由於其偏移量爲0) this.buffer = buffer.unwrap(); adjustment = index; } else { this.buffer = buffer; adjustment = index; } // 初始化當前最大容量,對分片來講,最大容量不能超過length initLength(length); // 初始化寫指針 writerIndex(length); }
可見,slice分片僅僅是對原ByteBuf進行了一層封裝,並無發生任何內存複製行爲,因此是很是高效快捷的操做。
與slice相似,duplicate也是如此手法。惟一不一樣是,duplicate是對整個ByteBuf進行淺複製:
public DuplicatedByteBuf(ByteBuf buffer) { this(buffer, buffer.readerIndex(), buffer.writerIndex()); } DuplicatedByteBuf(ByteBuf buffer, int readerIndex, int writerIndex) { super(buffer.maxCapacity()); if (buffer instanceof DuplicatedByteBuf) { this.buffer = ((DuplicatedByteBuf) buffer).buffer; } else if (buffer instanceof AbstractPooledDerivedByteBuf) { this.buffer = buffer.unwrap(); } else { this.buffer = buffer; } // 直接複用原ByteBuf的讀寫指針 setIndex(readerIndex, writerIndex); markReaderIndex(); markWriterIndex(); }
值得注意的是,不管是slice仍是duplicate,都沒有調用retain()方法來改變底層ByteBuf的引用計數。 因此,若是底層ByteBuf調用release()後被釋放,那麼全部基於該ByteBuf的淺複製對象都不能進行讀寫。因此要確保淺複製實例的使用安全,須要經過調用一次retain()方法來遞增底層ByteBuf的引用計數;而後在淺複製實例使用結束後,再調用一次release()來遞減底層ByteBuf的引用計數。
CompositeByteBuf也是一個很是典型的ByteBuf,用來將多個ByteBuf組合在一塊兒,造成一個邏輯上的ByteBuf。這點和分片ByteBuf很是相似,都屬於在邏輯層面上避免拷貝,實現所謂的「零複製」(Zero Copy)。
CompositeByteBuf在內部維護一個可擴容的components數組,全部被組合的ByteBuf被封裝爲Component對象,對象中緩存了該ByteBuf的偏移量adjustment、開始索引offset、結束索引endOffset等。
private Component[] components; // resized when needed private static final class Component { final ByteBuf buf; int adjustment; int offset; int endOffset; private ByteBuf slice; // cached slice, may be null }
對CompositeByteBuf的讀寫,須要先在components數組裏二分查找對應索引所在的Component對象,而後對Component對象所包裝的ByteBuf進行讀寫。以下:
@Override protected byte _getByte(int index) { // 肯定索引index所在的Component對象 Component c = findComponent0(index); // 對Component對象所包裝的ByteBuf進行讀寫 return c.buf.getByte(c.idx(index)); } private Component findComponent0(int offset) { // 先檢查最近訪問的Component是否知足條件 Component la = lastAccessed; if (la != null && offset >= la.offset && offset < la.endOffset) { return la; } // 不然二分查找 return findIt(offset); } // 二分查找 private Component findIt(int offset) { for (int low = 0, high = componentCount; low <= high;) { int mid = low + high >>> 1; Component c = components[mid]; if (offset >= c.endOffset) { low = mid + 1; } else if (offset < c.offset) { high = mid - 1; } else { lastAccessed = c; return c; } } throw new Error("should not reach here"); }
CompositeByteBuf的核心思想大體如上,其餘細節不做深究。
引用計數功能是在AbstractReferenceCountedByteBuf類中實現的。核心功能使用CAS原子操做和位運算實現。關鍵字段有兩個:
private static final AtomicIntegerFieldUpdater<AbstractReferenceCountedByteBuf> refCntUpdater = AtomicIntegerFieldUpdater.newUpdater(AbstractReferenceCountedByteBuf.class, "refCnt"); // even => "real" refcount is (refCnt >>> 1); odd => "real" refcount is 0 @SuppressWarnings("unused") private volatile int refCnt = 2;
refCntUpdater是修改refCnt字段的原子更新器。而refCnt是存儲引用計數的字段。注意,當前ByteBuf的引用數爲 refCnt / 2,所以當refCnt等於1時,引用數爲0。
retain方法能夠增長ByteBuf的引用計數。核心代碼以下:
@Override public ByteBuf retain() { return retain0(1); } private ByteBuf retain0(final int increment) { // 將increment擴大兩倍爲adjustedIncrement int adjustedIncrement = increment << 1; // 此處容許溢出,由於後邊有判斷溢出的邏輯 // 將adjustedIncrement更新到refCnt,所以refCnt初始值爲2,因此恆爲偶數 int oldRef = refCntUpdater.getAndAdd(this, adjustedIncrement); // 若是oldRef不是偶數,直接拋異常 if ((oldRef & 1) != 0) { throw new IllegalReferenceCountException(0, increment); } // 若是oldRef 和 oldRef + adjustedIncrement 正負異號,則意味着已經溢出。 if ((oldRef <= 0 && oldRef + adjustedIncrement >= 0) || (oldRef >= 0 && oldRef + adjustedIncrement < oldRef)) { // 發生溢出須要回滾adjustedIncrement refCntUpdater.getAndAdd(this, -adjustedIncrement); // 而後拋異常 throw new IllegalReferenceCountException(realRefCnt(oldRef), increment); } return this; }
註釋已經講得很明白,這裏再補充下:每次調用retain(),都會嘗試給refCnt加2,因此確保了refCnt恆爲偶數,也就是說當前引用數爲refCnt / 2。這裏爲啥設計爲遞增2而不是遞增1,估計是位運算更加高效吧,並且實際應用中Integer.MAX_VALUE / 2的引用數也是綽綽有餘。
偏偏相反,release()操做每次減小引用計數2,以下:
@Override public boolean release() { return release0(1); } private boolean release0(int decrement) { int rawCnt = nonVolatileRawCnt(), realCnt = toLiveRealCnt(rawCnt, decrement); if (decrement == realCnt) { // 若是decrement == realCnt,意味着須要釋放對象 if (refCntUpdater.compareAndSet(this, rawCnt, 1)) { deallocate(); return true; } return retryRelease0(decrement); } return releaseNonFinal0(decrement, rawCnt, realCnt); } private boolean releaseNonFinal0(int decrement, int rawCnt, int realCnt) { if (decrement < realCnt // all changes to the raw count are 2x the "real" change && refCntUpdater.compareAndSet(this, rawCnt, rawCnt - (decrement << 1))) { return false; } // 上述更新失敗則調用重試方法 return retryRelease0(decrement); } private boolean retryRelease0(int decrement) { // 死循環不斷重試釋放引用 for (;;) { int rawCnt = refCntUpdater.get(this), realCnt = toLiveRealCnt(rawCnt, decrement); if (decrement == realCnt) { if (refCntUpdater.compareAndSet(this, rawCnt, 1)) { // 若是refCnt爲1,意味着實際的引用數爲1/2=0,因此須要釋放掉 deallocate(); return true; } } else if (decrement < realCnt) { // 若是當前引用數realCnt大於decrement,則能夠正常更新 if (refCntUpdater.compareAndSet(this, rawCnt, rawCnt - (decrement << 1))) { return false; } } else { // 若是當前引用數realCnt小於decrement,則拋出引用異常 throw new IllegalReferenceCountException(realCnt, -decrement); } Thread.yield(); // this benefits throughput under high contention } } /** * Like {@link #realRefCnt(int)} but throws if refCnt == 0 */ private static int toLiveRealCnt(int rawCnt, int decrement) { if ((rawCnt & 1) == 0) { // 若是是偶數,則引用數爲rawCnt >>> 1 return rawCnt >>> 1; } // 若是是奇數,意味着該對象可能已經被釋放掉 throw new IllegalReferenceCountException(0, -decrement); }
release0算法流程:
retryRelease0算法流程:
咱們看到ByteBuf分爲兩類池化(Pooled)和非池化(Unpooled)。非池化的ByteBuf每次新建都會申請新的內存空間,而且用完即棄,給JVM的垃圾回收帶來負擔;而池化的ByteBuf經過內部棧來保存閒置的對象空間,每次新建ByteBuf的時候,優先向內部棧申請閒置的對象空間,而且用完以後從新歸還給內部棧,從而減小了JVM的垃圾回收壓力。
非池化的ByteBuf實現很是簡單粗暴。下邊分別以UnpooledHeapByteBuf和UnpooledDirectByteBuf爲例:
UnpooledHeapByteBuf在構造函數裏直接新建了一個字節數組來保存數據:
private final ByteBufAllocator alloc; // 使用字節數組保存數據 byte[] array; public UnpooledHeapByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) { super(maxCapacity); checkNotNull(alloc, "alloc"); if (initialCapacity > maxCapacity) { throw new IllegalArgumentException(String.format( "initialCapacity(%d) > maxCapacity(%d)", initialCapacity, maxCapacity)); } this.alloc = alloc; setArray(allocateArray(initialCapacity)); setIndex(0, 0); } // 分配字節數組 protected byte [] allocateArray(int initialCapacity) { return new byte[initialCapacity]; }
而UnpooledDirectByteBuf則在構造函數中直接新建了一個DirectBuffer:
// 使用DirectBuffer保存數據 private ByteBuffer buffer; public UnpooledDirectByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) { super(maxCapacity); if (alloc == null) { throw new NullPointerException("alloc"); } if (initialCapacity < 0) { throw new IllegalArgumentException("initialCapacity: " + initialCapacity); } if (maxCapacity < 0) { throw new IllegalArgumentException("maxCapacity: " + maxCapacity); } if (initialCapacity > maxCapacity) { throw new IllegalArgumentException(String.format( "initialCapacity(%d) > maxCapacity(%d)", initialCapacity, maxCapacity)); } this.alloc = alloc; setByteBuffer(allocateDirect(initialCapacity)); } // 分配DirectBuffer protected ByteBuffer allocateDirect(int initialCapacity) { return ByteBuffer.allocateDirect(initialCapacity); }
UnpooledHeapByteBuf的釋放全權交給JVM:
@Override protected void deallocate() { freeArray(array); array = EmptyArrays.EMPTY_BYTES; } protected void freeArray(byte[] array) { // NOOP }
而UnpooledDirectByteBuf則嘗試主動釋放其擁有的DirectBuffer:
@Override protected void deallocate() { ByteBuffer buffer = this.buffer; if (buffer == null) { return; } this.buffer = null; if (!doNotFree) { // 若是DirectBuffer還沒被釋放,則嘗試釋放之 freeDirect(buffer); } } /** * Free a direct {@link ByteBuffer} */ protected void freeDirect(ByteBuffer buffer) { PlatformDependent.freeDirectBuffer(buffer); }
池化的ByteBuf都繼承自PooledByteBuf<T>類,包括PooledHeapByteBuf、PooledDirectByteBuf、PooledUnsafeDirectByteBuf、PooledUnsafeHeapByteBuf。
這四個子類都持有一個回收器字段,例如,在PooledHeapByteBuf中有:
private static final Recycler<PooledHeapByteBuf> RECYCLER = new Recycler<PooledHeapByteBuf>() { @Override protected PooledHeapByteBuf newObject(Handle<PooledHeapByteBuf> handle) { return new PooledHeapByteBuf(handle, 0); } };
Recycler<T>是一個抽象類,全部的子類都要實現一個newObject方法,用於新建一個子類ByteBuf對象。
Recycler<T>本質上實現的是一個棧的功能,新建ByteBuf的時候,能夠向Recycler<T>申請一個閒置對象;當ByteBuf使用完畢後,能夠回收並歸還給Recycler<T>。
子類調用RECYCLER.get()來申請閒置對象,方法實現:
public final T get() { if (maxCapacityPerThread == 0) { return newObject((Handle<T>) NOOP_HANDLE); } // 嘗試從棧中獲取閒置對象 Stack<T> stack = threadLocal.get(); DefaultHandle<T> handle = stack.pop(); if (handle == null) { // 若是沒有閒置對象,調用newObject新建一個新的對象。 handle = stack.newHandle(); handle.value = newObject(handle); } return (T) handle.value; }
可見,新建池化的ByteBuf都是優先從棧中獲取閒置對象;當棧沒有閒置對象再新建。值得注意的是,新建對象還傳入了新建的handle,這個handle在對象回收階段會使用到。
另外爲了抹去歷史的使用痕跡,每一個新申請的ByteBuf對象,都會調用reuse方法進行初始化(以PooledDirectByteBuf爲例):
static PooledDirectByteBuf newInstance(int maxCapacity) { PooledDirectByteBuf buf = RECYCLER.get(); buf.reuse(maxCapacity); return buf; } /** * Method must be called before reuse this {@link PooledByteBufAllocator} */ final void reuse(int maxCapacity) { maxCapacity(maxCapacity); setRefCnt(1); setIndex0(0, 0); discardMarks(); }
前面咱們提到:當ByteBuf引用數爲0的時候,會調用deallocate()方法進行釋放。實現以下:
@Override protected final void deallocate() { if (handle >= 0) { final long handle = this.handle; this.handle = -1; memory = null; chunk.arena.free(chunk, tmpNioBuf, handle, maxLength, cache); tmpNioBuf = null; chunk = null; recycle(); } } private void recycle() { recyclerHandle.recycle(this); }
如上,最後調用了handler進行回收。所謂的回收動做,其實就是放回棧中:
static final class DefaultHandle<T> implements Handle<T> { private int lastRecycledId; private int recycleId; boolean hasBeenRecycled; private Stack<?> stack; private Object value; DefaultHandle(Stack<?> stack) { this.stack = stack; } @Override public void recycle(Object object) { if (object != value) { throw new IllegalArgumentException("object does not belong to handle"); } Stack<?> stack = this.stack; if (lastRecycledId != recycleId || stack == null) { throw new IllegalStateException("recycled already"); } // 將該handler從新放入棧中 stack.push(this); } }
還有一類使用了Unsafe操做的ByteBuf,例如:UnpooledUnsafeDirectByteBuf、UnpooledUnsafeHeapByteBuf、PooledUnsafeHeapByteBuf、PooledUnsafeDirectByteBuf。
這類ByteBuf的特色就是全部的讀寫操做都是用了sun.misc.Unsafe。以下:
@Override protected byte _getByte(int index) { return UnsafeByteBufUtil.getByte(addr(index)); }
而UnsafeByteBufUtil底層調用了sum.misc.Unsafe :
static byte getByte(long address) { return UNSAFE.getByte(address); }
而不是用Unsafe操做的ByteBuf,通常使用的是HeapByteBufUtil:
@Override protected byte _getByte(int index) { return HeapByteBufUtil.getByte(memory, idx(index)); }
而HeapByteBufUtil底層實際上是簡單的數組尋址:
static byte getByte(byte[] memory, int index) { return memory[index]; }
Unsafe操做能夠帶來很是可觀的性能提高,我寫了一個簡單的Benchmark測了下:
@BenchmarkMode({Mode.Throughput}) @Warmup(iterations = 1) @Measurement(iterations = 2, time = 1) @OutputTimeUnit(TimeUnit.SECONDS) @Fork(value = 2) @Threads(8) @State(Scope.Benchmark) @OperationsPerInvocation public class UnsafeBenchmark { private static byte[] unsafeByteArray; private static byte[] safeByteArray; @Setup public void setup() { unsafeByteArray = new byte[100]; safeByteArray = new byte[100]; } @Benchmark public void unsafeMethod() { int value = 1; UnsafeByteBufUtil.setByte(unsafeByteArray, 0, value); UnsafeByteBufUtil.getByte(unsafeByteArray, 0); long longValue = 100L; UnsafeByteBufUtil.setLong(unsafeByteArray, 0, longValue); UnsafeByteBufUtil.getLong(unsafeByteArray, 0); } @Benchmark public void safeMethod() { int value = 1; HeapByteBufUtil.setByte(safeByteArray, 0, value); HeapByteBufUtil.getByte(safeByteArray, 0); long longValue = 100L; HeapByteBufUtil.setLong(safeByteArray, 0, longValue); HeapByteBufUtil.getLong(safeByteArray, 0); } public static void main(String[] args) throws RunnerException { Options opt = new OptionsBuilder() .include(UnsafeBenchmark.class.getSimpleName()) .build(); new Runner(opt).run(); } }
測試結果顯示UnsafeByteBufUtil的性能很是優越:
Benchmark Mode Samples Score Error Units c.t.n.u.UnsafeBenchmark.safeMethod thrpt 4 168827679.833 ± 71641561.636 ops/s c.t.n.u.UnsafeBenchmark.unsafeMethod thrpt 4 3141320463.164 ± 1204482723.948 ops/s
本文詳細介紹了ByteBuf及其子類的實現原理,包括讀寫指針、經常使用方法、淺複製、引用計數、池化、Unsafe對象等實現原理。
經過上述深刻分析,咱們瞭解到ByteBuf在Netty中承擔着運載數據的重要功能。若是將Netty比做一個完整的生物體,那麼將ByteBuf比做血液,那就再恰當不過了。