本文是永順大牛寫的系列教程《源碼之下無祕密 ── 作最好的 Netty 源碼分析教程》的續寫章節。本章主要介紹Netty中用來承接數據的ByteBuf的底層實現原理。java
咱們在《Java NIO 的前生今世 之三 NIO Buffer 詳解》以及《認識 Java NIO》已經詳細瞭解了NIO Buffer。這裏先回憶下NIO Buffer的一些特性:數組
相似NIO ByteBuffer,ByteBuf底層實現也是字節數組,也一樣由讀寫指針來控制讀寫位置。在ByteBuf的繼承類AbstractByteBuf中定義瞭如下讀寫指針字段:ide
// 當前讀指針 int readerIndex; // 當前寫指針 int writerIndex; // 暫存的讀指針 private int markedReaderIndex; // 暫存的寫指針 private int markedWriterIndex; // 最大容量 private int maxCapacity;
// 引用數加1 ReferenceCounted retain(); // 引用數減1,若是引用數減爲0,則釋放該對象。 // 若是該對象被釋放則返回true,不然返回false。 // 注意:子類實現實際上是減2,後文會提到。 boolean release();
// 1.返回當前容量 public abstract int capacity(); // 2.調整當前容量 public abstract ByteBuf capacity(int newCapacity); // 3.最大容量(capacity的最大上限) public abstract int maxCapacity();
注意 capacity() <= maxCapacity()。
// 讀寫指針相關方法 // 1.獲取當前讀指針 public abstract int readerIndex(); // 2.設置當前讀指針 public abstract ByteBuf readerIndex(int readerIndex); // 3.獲取當前寫指針 public abstract int writerIndex(); // 4.設置當前寫指針 public abstract ByteBuf writerIndex(int writerIndex); // 5.同時設置讀寫指針 public abstract ByteBuf setIndex(int readerIndex, int writerIndex); // 6.獲取可讀字節數(this.writerIndex - this.readerIndex) public abstract int readableBytes(); // 7.獲取可寫字節數(this.capacity - this.writerIndex) public abstract int writableBytes(); // 8.獲取最大可寫字節數 (this.maxCapacity - this.writerIndex)} public abstract int maxWritableBytes(); // 9.是否可讀(this.writerIndex - this.readerIndex) public abstract boolean isReadable(); // 10.是否可寫(this.capacity - this.writerIndex) public abstract boolean isWritable(); // 11.清空(至關於setIndex(0, 0)) public abstract ByteBuf clear(); // 12.記錄讀指針 public abstract ByteBuf markReaderIndex(); // 13.從記錄中恢復讀指針 public abstract ByteBuf resetReaderIndex(); // 14.記錄寫指針 public abstract ByteBuf markWriterIndex(); // 15.從記錄中恢復寫指針 public abstract ByteBuf resetWriterIndex(); // 16.丟棄已讀字節 public abstract ByteBuf discardReadBytes();
// 隨機讀寫數據 // ... 這部分相似的方法很是多,如下只列舉一部分 ... // 1.從指定位置讀取數據 public abstract boolean getBoolean(int index); public abstract short getUnsignedByte(int index); public abstract short getShort(int index); public abstract int getUnsignedShort(int index); public abstract int getInt(int index); public abstract long getLong(int index); public abstract double getDouble(int index); public abstract short getShortLE(int index);(LE:Little Endian byte order,表示小端序,下同) public abstract int getIntLE(int index); public abstract long getLongLE(int index); // 略... // 2.在指定位置寫入數據 public abstract ByteBuf setBoolean(int index, boolean value); public abstract ByteBuf setByte(int index, int value); public abstract ByteBuf setShortLE(int index, int value); public abstract ByteBuf setInt(int index, int value); public abstract ByteBuf setIntLE(int index, int value); // 略...
// 1. 在readerIndex位置讀取數據並移動指針 public abstract boolean readBoolean(); public abstract byte readByte(); public abstract short readShort(); public abstract short readShortLE(); public abstract int readInt(); public abstract int readIntLE(); // 略... // 2. 在位置寫入數據並移動指針 public abstract ByteBuf writeBoolean(boolean value); public abstract ByteBuf writeByte(int value); public abstract ByteBuf writeShort(int value); public abstract ByteBuf writeShortLE(int value); public abstract ByteBuf writeInt(int value); public abstract ByteBuf writeIntLE(int value); // 略...
public abstract ByteBuf slice(); public abstract ByteBuf slice(int index, int length); public abstract ByteBuf duplicate(); public abstract ByteBuf retainedSlice(); // 更新引用計數 public abstract ByteBuf retainedDuplicate(); // 更新引用計數
// 判斷底層是否爲NIO direct buffer public abstract boolean isDirect();
ByteBuf的淺複製分片其實就是與原來的ByteBuf共享同一個存儲空間,而且也能夠被多個分片同時共享。以slice(int index, int length)方法爲例:
// io.netty.buffer.AbstractByteBuf.java @Override public ByteBuf slice(int index, int length) { ensureAccessible(); return new UnpooledSlicedByteBuf(this, index, length); }
// 被分片的ByteBuf private final ByteBuf buffer; // 偏移量 private final int adjustment; AbstractUnpooledSlicedByteBuf(ByteBuf buffer, int index, int length) { super(length); checkSliceOutOfBounds(index, length, buffer); if (buffer instanceof AbstractUnpooledSlicedByteBuf) { // 若是傳入的是slice分片,則須要疊加其偏移量 this.buffer = ((AbstractUnpooledSlicedByteBuf) buffer).buffer; adjustment = ((AbstractUnpooledSlicedByteBuf) buffer).adjustment + index; } else if (buffer instanceof DuplicatedByteBuf) { // 若是傳入的是dulicated分片,不須要疊加(由於其偏移量爲0) this.buffer = buffer.unwrap(); adjustment = index; } else { this.buffer = buffer; adjustment = index; } // 初始化當前最大容量,對分片來講,最大容量不能超過length initLength(length); // 初始化寫指針 writerIndex(length); }
public DuplicatedByteBuf(ByteBuf buffer) { this(buffer, buffer.readerIndex(), buffer.writerIndex()); } DuplicatedByteBuf(ByteBuf buffer, int readerIndex, int writerIndex) { super(buffer.maxCapacity()); if (buffer instanceof DuplicatedByteBuf) { this.buffer = ((DuplicatedByteBuf) buffer).buffer; } else if (buffer instanceof AbstractPooledDerivedByteBuf) { this.buffer = buffer.unwrap(); } else { this.buffer = buffer; } // 直接複用原ByteBuf的讀寫指針 setIndex(readerIndex, writerIndex); markReaderIndex(); markWriterIndex(); }
值得注意的是,不管是slice仍是duplicate,都沒有調用retain()方法來改變底層ByteBuf的引用計數。 因此,若是底層ByteBuf調用release()後被釋放,那麼全部基於該ByteBuf的淺複製對象都不能進行讀寫。因此要確保淺複製實例的使用安全,須要經過調用一次retain()方法來遞增底層ByteBuf的引用計數;而後在淺複製實例使用結束後,再調用一次release()來遞減底層ByteBuf的引用計數。
CompositeByteBuf也是一個很是典型的ByteBuf,用來將多個ByteBuf組合在一塊兒,造成一個邏輯上的ByteBuf。這點和分片ByteBuf很是相似,都屬於在邏輯層面上避免拷貝,實現所謂的「零複製」(Zero Copy)。
private Component[] components; // resized when needed private static final class Component { final ByteBuf buf; int adjustment; int offset; int endOffset; private ByteBuf slice; // cached slice, may be null }
@Override protected byte _getByte(int index) { // 肯定索引index所在的Component對象 Component c = findComponent0(index); // 對Component對象所包裝的ByteBuf進行讀寫 return c.buf.getByte(c.idx(index)); } private Component findComponent0(int offset) { // 先檢查最近訪問的Component是否知足條件 Component la = lastAccessed; if (la != null && offset >= la.offset && offset < la.endOffset) { return la; } // 不然二分查找 return findIt(offset); } // 二分查找 private Component findIt(int offset) { for (int low = 0, high = componentCount; low <= high;) { int mid = low + high >>> 1; Component c = components[mid]; if (offset >= c.endOffset) { low = mid + 1; } else if (offset < c.offset) { high = mid - 1; } else { lastAccessed = c; return c; } } throw new Error("should not reach here"); }
private static final AtomicIntegerFieldUpdater<AbstractReferenceCountedByteBuf> refCntUpdater = AtomicIntegerFieldUpdater.newUpdater(AbstractReferenceCountedByteBuf.class, "refCnt"); // even => "real" refcount is (refCnt >>> 1); odd => "real" refcount is 0 @SuppressWarnings("unused") private volatile int refCnt = 2;
refCntUpdater是修改refCnt字段的原子更新器。而refCnt是存儲引用計數的字段。注意,當前ByteBuf的引用數爲 refCnt / 2,所以當refCnt等於1時,引用數爲0。
@Override public ByteBuf retain() { return retain0(1); } private ByteBuf retain0(final int increment) { // 將increment擴大兩倍爲adjustedIncrement int adjustedIncrement = increment << 1; // 此處容許溢出,由於後邊有判斷溢出的邏輯 // 將adjustedIncrement更新到refCnt,所以refCnt初始值爲2,因此恆爲偶數 int oldRef = refCntUpdater.getAndAdd(this, adjustedIncrement); // 若是oldRef不是偶數,直接拋異常 if ((oldRef & 1) != 0) { throw new IllegalReferenceCountException(0, increment); } // 若是oldRef 和 oldRef + adjustedIncrement 正負異號,則意味着已經溢出。 if ((oldRef <= 0 && oldRef + adjustedIncrement >= 0) || (oldRef >= 0 && oldRef + adjustedIncrement < oldRef)) { // 發生溢出須要回滾adjustedIncrement refCntUpdater.getAndAdd(this, -adjustedIncrement); // 而後拋異常 throw new IllegalReferenceCountException(realRefCnt(oldRef), increment); } return this; }
註釋已經講得很明白,這裏再補充下:每次調用retain(),都會嘗試給refCnt加2,因此確保了refCnt恆爲偶數,也就是說當前引用數爲refCnt / 2。這裏爲啥設計爲遞增2而不是遞增1,估計是位運算更加高效吧,並且實際應用中Integer.MAX_VALUE / 2的引用數也是綽綽有餘。
@Override public boolean release() { return release0(1); } private boolean release0(int decrement) { int rawCnt = nonVolatileRawCnt(), realCnt = toLiveRealCnt(rawCnt, decrement); if (decrement == realCnt) { // 若是decrement == realCnt,意味着須要釋放對象 if (refCntUpdater.compareAndSet(this, rawCnt, 1)) { deallocate(); return true; } return retryRelease0(decrement); } return releaseNonFinal0(decrement, rawCnt, realCnt); } private boolean releaseNonFinal0(int decrement, int rawCnt, int realCnt) { if (decrement < realCnt // all changes to the raw count are 2x the "real" change && refCntUpdater.compareAndSet(this, rawCnt, rawCnt - (decrement << 1))) { return false; } // 上述更新失敗則調用重試方法 return retryRelease0(decrement); } private boolean retryRelease0(int decrement) { // 死循環不斷重試釋放引用 for (;;) { int rawCnt = refCntUpdater.get(this), realCnt = toLiveRealCnt(rawCnt, decrement); if (decrement == realCnt) { if (refCntUpdater.compareAndSet(this, rawCnt, 1)) { // 若是refCnt爲1,意味着實際的引用數爲1/2=0,因此須要釋放掉 deallocate(); return true; } } else if (decrement < realCnt) { // 若是當前引用數realCnt大於decrement,則能夠正常更新 if (refCntUpdater.compareAndSet(this, rawCnt, rawCnt - (decrement << 1))) { return false; } } else { // 若是當前引用數realCnt小於decrement,則拋出引用異常 throw new IllegalReferenceCountException(realCnt, -decrement); } Thread.yield(); // this benefits throughput under high contention } } /** * Like {@link #realRefCnt(int)} but throws if refCnt == 0 */ private static int toLiveRealCnt(int rawCnt, int decrement) { if ((rawCnt & 1) == 0) { // 若是是偶數,則引用數爲rawCnt >>> 1 return rawCnt >>> 1; } // 若是是奇數,意味着該對象可能已經被釋放掉 throw new IllegalReferenceCountException(0, -decrement); }
private final ByteBufAllocator alloc; // 使用字節數組保存數據 byte[] array; public UnpooledHeapByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) { super(maxCapacity); checkNotNull(alloc, "alloc"); if (initialCapacity > maxCapacity) { throw new IllegalArgumentException(String.format( "initialCapacity(%d) > maxCapacity(%d)", initialCapacity, maxCapacity)); } this.alloc = alloc; setArray(allocateArray(initialCapacity)); setIndex(0, 0); } // 分配字節數組 protected byte [] allocateArray(int initialCapacity) { return new byte[initialCapacity]; }
// 使用DirectBuffer保存數據 private ByteBuffer buffer; public UnpooledDirectByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) { super(maxCapacity); if (alloc == null) { throw new NullPointerException("alloc"); } if (initialCapacity < 0) { throw new IllegalArgumentException("initialCapacity: " + initialCapacity); } if (maxCapacity < 0) { throw new IllegalArgumentException("maxCapacity: " + maxCapacity); } if (initialCapacity > maxCapacity) { throw new IllegalArgumentException(String.format( "initialCapacity(%d) > maxCapacity(%d)", initialCapacity, maxCapacity)); } this.alloc = alloc; setByteBuffer(allocateDirect(initialCapacity)); } // 分配DirectBuffer protected ByteBuffer allocateDirect(int initialCapacity) { return ByteBuffer.allocateDirect(initialCapacity); }
@Override protected void deallocate() { freeArray(array); array = EmptyArrays.EMPTY_BYTES; } protected void freeArray(byte[] array) { // NOOP }
@Override protected void deallocate() { ByteBuffer buffer = this.buffer; if (buffer == null) { return; } this.buffer = null; if (!doNotFree) { // 若是DirectBuffer還沒被釋放,則嘗試釋放之 freeDirect(buffer); } } /** * Free a direct {@link ByteBuffer} */ protected void freeDirect(ByteBuffer buffer) { PlatformDependent.freeDirectBuffer(buffer); }
private static final Recycler<PooledHeapByteBuf> RECYCLER = new Recycler<PooledHeapByteBuf>() { @Override protected PooledHeapByteBuf newObject(Handle<PooledHeapByteBuf> handle) { return new PooledHeapByteBuf(handle, 0); } };
public final T get() { if (maxCapacityPerThread == 0) { return newObject((Handle<T>) NOOP_HANDLE); } // 嘗試從棧中獲取閒置對象 Stack<T> stack = threadLocal.get(); DefaultHandle<T> handle = stack.pop(); if (handle == null) { // 若是沒有閒置對象,調用newObject新建一個新的對象。 handle = stack.newHandle(); handle.value = newObject(handle); } return (T) handle.value; }
static PooledDirectByteBuf newInstance(int maxCapacity) { PooledDirectByteBuf buf = RECYCLER.get(); buf.reuse(maxCapacity); return buf; } /** * Method must be called before reuse this {@link PooledByteBufAllocator} */ final void reuse(int maxCapacity) { maxCapacity(maxCapacity); setRefCnt(1); setIndex0(0, 0); discardMarks(); }
@Override protected final void deallocate() { if (handle >= 0) { final long handle = this.handle; this.handle = -1; memory = null; chunk.arena.free(chunk, tmpNioBuf, handle, maxLength, cache); tmpNioBuf = null; chunk = null; recycle(); } } private void recycle() { recyclerHandle.recycle(this); }
static final class DefaultHandle<T> implements Handle<T> { private int lastRecycledId; private int recycleId; boolean hasBeenRecycled; private Stack<?> stack; private Object value; DefaultHandle(Stack<?> stack) { this.stack = stack; } @Override public void recycle(Object object) { if (object != value) { throw new IllegalArgumentException("object does not belong to handle"); } Stack<?> stack = this.stack; if (lastRecycledId != recycleId || stack == null) { throw new IllegalStateException("recycled already"); } // 將該handler從新放入棧中 stack.push(this); } }
@Override protected byte _getByte(int index) { return UnsafeByteBufUtil.getByte(addr(index)); }
而UnsafeByteBufUtil底層調用了sum.misc.Unsafe :
static byte getByte(long address) { return UNSAFE.getByte(address); }
@Override protected byte _getByte(int index) { return HeapByteBufUtil.getByte(memory, idx(index)); }
static byte getByte(byte[] memory, int index) { return memory[index]; }
@BenchmarkMode({Mode.Throughput}) @Warmup(iterations = 1) @Measurement(iterations = 2, time = 1) @OutputTimeUnit(TimeUnit.SECONDS) @Fork(value = 2) @Threads(8) @State(Scope.Benchmark) @OperationsPerInvocation public class UnsafeBenchmark { private static byte[] unsafeByteArray; private static byte[] safeByteArray; @Setup public void setup() { unsafeByteArray = new byte[100]; safeByteArray = new byte[100]; } @Benchmark public void unsafeMethod() { int value = 1; UnsafeByteBufUtil.setByte(unsafeByteArray, 0, value); UnsafeByteBufUtil.getByte(unsafeByteArray, 0); long longValue = 100L; UnsafeByteBufUtil.setLong(unsafeByteArray, 0, longValue); UnsafeByteBufUtil.getLong(unsafeByteArray, 0); } @Benchmark public void safeMethod() { int value = 1; HeapByteBufUtil.setByte(safeByteArray, 0, value); HeapByteBufUtil.getByte(safeByteArray, 0); long longValue = 100L; HeapByteBufUtil.setLong(safeByteArray, 0, longValue); HeapByteBufUtil.getLong(safeByteArray, 0); } public static void main(String[] args) throws RunnerException { Options opt = new OptionsBuilder() .include(UnsafeBenchmark.class.getSimpleName()) .build(); new Runner(opt).run(); } }
Benchmark Mode Samples Score Error Units c.t.n.u.UnsafeBenchmark.safeMethod thrpt 4 168827679.833 ± 71641561.636 ops/s c.t.n.u.UnsafeBenchmark.unsafeMethod thrpt 4 3141320463.164 ± 1204482723.948 ops/s