Netty 源碼分析之 五 奔騰的血液: ByteBuf

本文是永順大牛寫的系列教程《源碼之下無祕密 ── 作最好的 Netty 源碼分析教程》的續寫章節。本章主要介紹Netty中用來承接數據的ByteBuf的底層實現原理。java

寫在最前

永順前輩已寫完的章節有以下:算法

筆者嘗試續寫的章節:segmentfault

本文使用的netty版本爲4.1.33.Finalapi

ByteBuf與ByteBuffer

咱們在《Java NIO 的前生今世 之三 NIO Buffer 詳解》以及《認識 Java NIO》已經詳細瞭解了NIO Buffer。這裏先回憶下NIO Buffer的一些特性:數組

  • ByteBuffer底層實現包含四個關鍵字段,並知足大小關係:mark <= position <= limit <= capacity;
  • ByteBuffer存在寫模式和讀模式兩種狀態,內部方法能夠觸發狀態切換,好比flip方法從寫狀態切換爲讀狀態;
  • 不一樣類型的ByteBuffer支持不一樣的數據類型,包括ByteBuffer、ShortBuffer、CharBuffer、DoubleBuffer、FloatBuffer、IntBuffer等類型。

Netty的ByteBuf的底層實現有些許相似,但相比ByteBuffer實現了很是多擴展,並摒棄了一些不足:緩存

  • 不區分讀寫狀態,不須要切換狀態;
  • 支持池化,避免頻繁的GC回收;
  • 支持引用計數;
  • 類型兼容(同一個ByteBuf能夠承載各類數據類型);
  • 支持Unsafe操做的ByteBuf;
  • 支持堆外和堆內兩種ByteBuf;
  • 支持零拷貝的複合類型CompositeByteBuf;
  • ...

ByteBuf繼承關係

咱們先來看看ByteBuf的類圖:
class-diagram.png安全

ByteBuf以及其子類的命名很是規整,僅從名字上咱們就能夠將各個子類劃分爲如下幾類:bash

  • 池化和非池化的ByteBuf,例如:PooledHeapByteBuf 和 UnpooledHeapByteBuf;
  • 含Unsafe操做的ByteBuf,例如:PooledUnsafeHeapByteBuf;
  • 分片類型的ByteBuf,例如:PooledSliceByteBuf和PooledDuplicatedByteBuf;
  • 組合ByteBuf,例如:CompositeBuf;
  • 實現了引用計數的ByteBuf。

以上各類類型的都會在下文展開分析。服務器

ByteBuf的讀寫指針

相似NIO ByteBuffer,ByteBuf底層實現也是字節數組,也一樣由讀寫指針來控制讀寫位置。在ByteBuf的繼承類AbstractByteBuf中定義瞭如下讀寫指針字段:ide

// 當前讀指針
    int readerIndex;
    // 當前寫指針
    int writerIndex;
    // 暫存的讀指針
    private int markedReaderIndex;
    // 暫存的寫指針
    private int markedWriterIndex;
    // 最大容量
    private int maxCapacity;

須要注意的事maxCapacity是對底層字節數組進行擴容的最大容量,並非當前容量capacity。經過這幾個指針,其實能夠將字節數組劃分爲如下幾部分:

ByteBuf讀寫指針.png

有以下性質:

  • 每讀取一個字節,readerIndex遞增1;直到readerIndex等於writerIndex,表示ByteBuf已經不可讀;
  • 每寫入一個字節,writerIndex遞增1;直到writerIndex等於capacity,表示ByteBuf已經不可寫;
  • 當writerIndex等於capacity表示底層字節數組須要擴容,且最大擴容不能超過max capacity。

有如上性質,能夠推出如下ByteBuf的一些方法實現:

  • readableBytes():可讀字節數 --> this.writerIndex - this.readerIndex
  • writableBytes():可寫字節數 --> this.capacity - this.writerIndex
  • isReadable():是否可讀 --> this.writerIndex - this.readerIndex > 0
  • isWritable():是否可寫 --> this.capacity - this.writerIndex > 0

更多方法見下。

ByteBuf接口經常使用方法

ByteBuf繼承了Comparable和RefrenceCounted,其中後者是支持引用計數的接口,它的核心方法包含兩個:

// 引用數加1
    ReferenceCounted retain();
    // 引用數減1,若是引用數減爲0,則釋放該對象。
    // 若是該對象被釋放則返回true,不然返回false。
    // 注意:子類實現實際上是減2,後文會提到。
    boolean release();

再來看ByteBuf的核心方法:

  • 容量有關方法
// 1.返回當前容量
    public abstract int capacity();
    // 2.調整當前容量
    public abstract ByteBuf capacity(int newCapacity);
    // 3.最大容量(capacity的最大上限)
    public abstract int maxCapacity();

注意 capacity() <= maxCapacity()。

  • 讀寫指針有關方法
// 讀寫指針相關方法
    // 1.獲取當前讀指針
    public abstract int readerIndex();
    // 2.設置當前讀指針
    public abstract ByteBuf readerIndex(int readerIndex);
    // 3.獲取當前寫指針
    public abstract int writerIndex();
    // 4.設置當前寫指針
    public abstract ByteBuf writerIndex(int writerIndex);
    // 5.同時設置讀寫指針
    public abstract ByteBuf setIndex(int readerIndex, int writerIndex);
    // 6.獲取可讀字節數(this.writerIndex - this.readerIndex)
    public abstract int readableBytes();
    // 7.獲取可寫字節數(this.capacity - this.writerIndex)
    public abstract int writableBytes();
    // 8.獲取最大可寫字節數 (this.maxCapacity - this.writerIndex)}
    public abstract int maxWritableBytes();
    // 9.是否可讀(this.writerIndex - this.readerIndex)
    public abstract boolean isReadable();
    // 10.是否可寫(this.capacity - this.writerIndex)
    public abstract boolean isWritable();
    // 11.清空(至關於setIndex(0, 0))
    public abstract ByteBuf clear();
    // 12.記錄讀指針
    public abstract ByteBuf markReaderIndex();
    // 13.從記錄中恢復讀指針
    public abstract ByteBuf resetReaderIndex();
    // 14.記錄寫指針 
    public abstract ByteBuf markWriterIndex();
    // 15.從記錄中恢復寫指針
    public abstract ByteBuf resetWriterIndex();
    // 16.丟棄已讀字節
    public abstract ByteBuf discardReadBytes();

上述方法都是圍繞着readerIndex、writerIndex、capital、maxcapital等四個值衍生的方法。實現都很是相似而簡單。

  • 隨機讀寫數據有關方法
// 隨機讀寫數據
    // ... 這部分相似的方法很是多,如下只列舉一部分 ...
    // 1.從指定位置讀取數據
    public abstract boolean getBoolean(int index);
    public abstract short getUnsignedByte(int index);
    public abstract short getShort(int index);
    public abstract int getUnsignedShort(int index);
    public abstract int   getInt(int index);
    public abstract long  getLong(int index);
    public abstract double getDouble(int index);
    public abstract short getShortLE(int index);(LE:Little Endian byte order,表示小端序,下同)
    public abstract int   getIntLE(int index);
    public abstract long  getLongLE(int index);
    // 略...
    // 2.在指定位置寫入數據
    public abstract ByteBuf setBoolean(int index, boolean value);
    public abstract ByteBuf setByte(int index, int value);
    public abstract ByteBuf setShortLE(int index, int value);
    public abstract ByteBuf setInt(int index, int value);
    public abstract ByteBuf setIntLE(int index, int value);
    // 略...

上述方法支持指定位置的讀寫數據,其中讀數據並不會改變指針值。

  • 順序讀寫數據有關方法。
// 1. 在readerIndex位置讀取數據並移動指針
    public abstract boolean readBoolean();
    public abstract byte  readByte();
    public abstract short readShort();
    public abstract short readShortLE();
    public abstract int   readInt();
    public abstract int   readIntLE();
    // 略...
    // 2. 在位置寫入數據並移動指針
    public abstract ByteBuf writeBoolean(boolean value);
    public abstract ByteBuf writeByte(int value);
    public abstract ByteBuf writeShort(int value);
    public abstract ByteBuf writeShortLE(int value);
    public abstract ByteBuf writeInt(int value);
    public abstract ByteBuf writeIntLE(int value);
    // 略...

上述方法從讀(或寫)指針位置順序日後讀(或寫)數據,並移動讀(或寫)指針。

  • 分片相關方法
public abstract ByteBuf slice();
    public abstract ByteBuf slice(int index, int length);
    public abstract ByteBuf duplicate();
    public abstract ByteBuf retainedSlice(); // 更新引用計數
    public abstract ByteBuf retainedDuplicate(); // 更新引用計數

ByteBuf支持分片獲取,實現快速的低成本淺複製。

  • 其餘方法
// 判斷底層是否爲NIO direct buffer
    public abstract boolean isDirect();

ByteBuf淺複製實現

上一節咱們提到了ByteBuf支持淺複製分片,其中分爲slice淺複製和duplicate淺複製。duplicate與slice的區別是,duplicate是對整個ByteBuf的淺複製,而slice只是對ByteBuf中的一部分進行淺複製。

ByteBuf的淺複製分片其實就是與原來的ByteBuf共享同一個存儲空間,而且也能夠被多個分片同時共享。以slice(int index, int length)方法爲例:

// io.netty.buffer.AbstractByteBuf.java

    @Override
    public ByteBuf slice(int index, int length) {
        ensureAccessible();
        return new UnpooledSlicedByteBuf(this, index, length);
    }

slice方法內很是簡單,只是新建了一個分片對象UnpooledSlicedByteBuf,構造函數傳入了當前ByteBuf(this)、開始索引(index)以及分片長度(length);

在父類的構造行數裏,對該分片對象進行了初始化:

// 被分片的ByteBuf
    private final ByteBuf buffer;
    // 偏移量
    private final int adjustment;

    AbstractUnpooledSlicedByteBuf(ByteBuf buffer, int index, int length) {
        super(length);
        checkSliceOutOfBounds(index, length, buffer);
        if (buffer instanceof AbstractUnpooledSlicedByteBuf) {
            // 若是傳入的是slice分片,則須要疊加其偏移量
            this.buffer = ((AbstractUnpooledSlicedByteBuf) buffer).buffer;
            adjustment = ((AbstractUnpooledSlicedByteBuf) buffer).adjustment + index;
        } else if (buffer instanceof DuplicatedByteBuf) {
            // 若是傳入的是dulicated分片,不須要疊加(由於其偏移量爲0)
            this.buffer = buffer.unwrap();
            adjustment = index;
        } else {
            this.buffer = buffer;
            adjustment = index;
        }
        // 初始化當前最大容量,對分片來講,最大容量不能超過length
        initLength(length);
        // 初始化寫指針
        writerIndex(length);
    }

可見,slice分片僅僅是對原ByteBuf進行了一層封裝,並無發生任何內存複製行爲,因此是很是高效快捷的操做。

與slice相似,duplicate也是如此手法。惟一不一樣是,duplicate是對整個ByteBuf進行淺複製:

public DuplicatedByteBuf(ByteBuf buffer) {
        this(buffer, buffer.readerIndex(), buffer.writerIndex());
    }

    DuplicatedByteBuf(ByteBuf buffer, int readerIndex, int writerIndex) {
        super(buffer.maxCapacity());

        if (buffer instanceof DuplicatedByteBuf) {
            this.buffer = ((DuplicatedByteBuf) buffer).buffer;
        } else if (buffer instanceof AbstractPooledDerivedByteBuf) {
            this.buffer = buffer.unwrap();
        } else {
            this.buffer = buffer;
        }
        // 直接複用原ByteBuf的讀寫指針
        setIndex(readerIndex, writerIndex);
        markReaderIndex();
        markWriterIndex();
    }

值得注意的是,不管是slice仍是duplicate,都沒有調用retain()方法來改變底層ByteBuf的引用計數。 因此,若是底層ByteBuf調用release()後被釋放,那麼全部基於該ByteBuf的淺複製對象都不能進行讀寫。因此要確保淺複製實例的使用安全,須要經過調用一次retain()方法來遞增底層ByteBuf的引用計數;而後在淺複製實例使用結束後,再調用一次release()來遞減底層ByteBuf的引用計數。

CompositeByteBuf

CompositeByteBuf也是一個很是典型的ByteBuf,用來將多個ByteBuf組合在一塊兒,造成一個邏輯上的ByteBuf。這點和分片ByteBuf很是相似,都屬於在邏輯層面上避免拷貝,實現所謂的「零複製」(Zero Copy)。

CompositeByteBuf在內部維護一個可擴容的components數組,全部被組合的ByteBuf被封裝爲Component對象,對象中緩存了該ByteBuf的偏移量adjustment、開始索引offset、結束索引endOffset等。

private Component[] components; // resized when needed

    private static final class Component {
        final ByteBuf buf;
        int adjustment;
        int offset;
        int endOffset;

        private ByteBuf slice; // cached slice, may be null
    }

對CompositeByteBuf的讀寫,須要先在components數組裏二分查找對應索引所在的Component對象,而後對Component對象所包裝的ByteBuf進行讀寫。以下:

@Override
    protected byte _getByte(int index) {
        // 肯定索引index所在的Component對象
        Component c = findComponent0(index);
        // 對Component對象所包裝的ByteBuf進行讀寫
        return c.buf.getByte(c.idx(index));
    }

    private Component findComponent0(int offset) {
        // 先檢查最近訪問的Component是否知足條件
        Component la = lastAccessed;
        if (la != null && offset >= la.offset && offset < la.endOffset) {
           return la;
        }
        // 不然二分查找
        return findIt(offset);
    }

    // 二分查找
    private Component findIt(int offset) {
        for (int low = 0, high = componentCount; low <= high;) {
            int mid = low + high >>> 1;
            Component c = components[mid];
            if (offset >= c.endOffset) {
                low = mid + 1;
            } else if (offset < c.offset) {
                high = mid - 1;
            } else {
                lastAccessed = c;
                return c;
            }
        }

        throw new Error("should not reach here");
    }

CompositeByteBuf的核心思想大體如上,其餘細節不做深究。

ByteBuf引用計數實現

引用計數字段

引用計數功能是在AbstractReferenceCountedByteBuf類中實現的。核心功能使用CAS原子操做和位運算實現。關鍵字段有兩個:

private static final AtomicIntegerFieldUpdater<AbstractReferenceCountedByteBuf> refCntUpdater =
            AtomicIntegerFieldUpdater.newUpdater(AbstractReferenceCountedByteBuf.class, "refCnt");

    // even => "real" refcount is (refCnt >>> 1); odd => "real" refcount is 0
    @SuppressWarnings("unused")
    private volatile int refCnt = 2;

refCntUpdater是修改refCnt字段的原子更新器。而refCnt是存儲引用計數的字段。注意,當前ByteBuf的引用數爲 refCnt / 2,所以當refCnt等於1時,引用數爲0。

增長引用retain

retain方法能夠增長ByteBuf的引用計數。核心代碼以下:

@Override
    public ByteBuf retain() {
        return retain0(1);
    }

    private ByteBuf retain0(final int increment) {
        // 將increment擴大兩倍爲adjustedIncrement
        int adjustedIncrement = increment << 1; // 此處容許溢出,由於後邊有判斷溢出的邏輯
        // 將adjustedIncrement更新到refCnt,所以refCnt初始值爲2,因此恆爲偶數
        int oldRef = refCntUpdater.getAndAdd(this, adjustedIncrement);
        // 若是oldRef不是偶數,直接拋異常
        if ((oldRef & 1) != 0) {
            throw new IllegalReferenceCountException(0, increment);
        }
        // 若是oldRef 和 oldRef + adjustedIncrement 正負異號,則意味着已經溢出。
        if ((oldRef <= 0 && oldRef + adjustedIncrement >= 0)
                || (oldRef >= 0 && oldRef + adjustedIncrement < oldRef)) {
            // 發生溢出須要回滾adjustedIncrement
            refCntUpdater.getAndAdd(this, -adjustedIncrement);
            // 而後拋異常
            throw new IllegalReferenceCountException(realRefCnt(oldRef), increment);
        }
        return this;
    }

註釋已經講得很明白,這裏再補充下:每次調用retain(),都會嘗試給refCnt加2,因此確保了refCnt恆爲偶數,也就是說當前引用數爲refCnt / 2。這裏爲啥設計爲遞增2而不是遞增1,估計是位運算更加高效吧,並且實際應用中Integer.MAX_VALUE / 2的引用數也是綽綽有餘。

釋放引用release

偏偏相反,release()操做每次減小引用計數2,以下:

@Override
    public boolean release() {
        return release0(1);
    }

    private boolean release0(int decrement) {
        int rawCnt = nonVolatileRawCnt(), realCnt = toLiveRealCnt(rawCnt, decrement);
        if (decrement == realCnt) {
            // 若是decrement == realCnt,意味着須要釋放對象
            if (refCntUpdater.compareAndSet(this, rawCnt, 1)) {
                deallocate();
                return true;
            }
            return retryRelease0(decrement);
        }
        return releaseNonFinal0(decrement, rawCnt, realCnt);
    }

    private boolean releaseNonFinal0(int decrement, int rawCnt, int realCnt) {
        if (decrement < realCnt
                // all changes to the raw count are 2x the "real" change
                && refCntUpdater.compareAndSet(this, rawCnt, rawCnt - (decrement << 1))) {
            return false;
        }
        // 上述更新失敗則調用重試方法
        return retryRelease0(decrement);
    }

    private boolean retryRelease0(int decrement) {
        // 死循環不斷重試釋放引用
        for (;;) {
            int rawCnt = refCntUpdater.get(this), realCnt = toLiveRealCnt(rawCnt, decrement);
            if (decrement == realCnt) {
                if (refCntUpdater.compareAndSet(this, rawCnt, 1)) {
                    // 若是refCnt爲1,意味着實際的引用數爲1/2=0,因此須要釋放掉
                    deallocate();
                    return true;
                }
            } else if (decrement < realCnt) {
                // 若是當前引用數realCnt大於decrement,則能夠正常更新
                if (refCntUpdater.compareAndSet(this, rawCnt, rawCnt - (decrement << 1))) {
                    return false;
                }
            } else {
                // 若是當前引用數realCnt小於decrement,則拋出引用異常
                throw new IllegalReferenceCountException(realCnt, -decrement);
            }
            Thread.yield(); // this benefits throughput under high contention
        }
    }

    /**
     * Like {@link #realRefCnt(int)} but throws if refCnt == 0
     */
    private static int toLiveRealCnt(int rawCnt, int decrement) {
        if ((rawCnt & 1) == 0) {
            // 若是是偶數,則引用數爲rawCnt >>> 1
            return rawCnt >>> 1;
        }
        // 若是是奇數,意味着該對象可能已經被釋放掉
        throw new IllegalReferenceCountException(0, -decrement);
    }

release0算法流程:

    1. 獲取當前計數rawCnt,獲取實際引用數realCnt;
    1. 判斷decrement是否等於realCnt;
    • 2.1 若是相等,意味着本次release以後,對象須要被釋放,嘗試原子操做修改引用數;
      • 2.1.1 若是修改爲功,直接釋放對象並返回true;
      • 2.2.2 若是修改失敗,調用retryRelease0進行循環重試釋放;
    • 2.2 若是不相等,意味着本次release以後,對象依然存活,嘗試調用releaseNonFinal0;
      • 2.2.1 若是decrement < realCnt,且原子修改引用計數成功,直接返回false;
      • 2.2.2 不然,調用retryRelease0進行循環重試釋放。

retryRelease0算法流程:

    1. 死循環開始;
    1. 獲取當前計數rawCnt,獲取實際引用數realCnt;
    1. 判斷decrement == realCnt;
    • 3.1 若是相等,意味着本次release以後,對象須要被釋放,嘗試原子操做修改引用數;
      • 3.1.1 若是修改爲功,直接釋放對象並返回true;
      • 3.1.2 不然跳轉6;
    1. 判斷decrement < realCnt;
    • 4.1 若是成立,意味着本次release以後,對象依然存活,嘗試原子更新引用計數;
      • 4.1.1 若是修改爲功,直接返回false;
      • 4.1.2 不然跳轉6;
    1. 其餘狀況(decrement > realCnt) 直接拋異常;
    1. Thread.yield();
    1. 跳轉到1。

池化和非池化

咱們看到ByteBuf分爲兩類池化(Pooled)和非池化(Unpooled)。非池化的ByteBuf每次新建都會申請新的內存空間,而且用完即棄,給JVM的垃圾回收帶來負擔;而池化的ByteBuf經過內部棧來保存閒置的對象空間,每次新建ByteBuf的時候,優先向內部棧申請閒置的對象空間,而且用完以後從新歸還給內部棧,從而減小了JVM的垃圾回收壓力。

非池化實現

非池化的ByteBuf實現很是簡單粗暴。下邊分別以UnpooledHeapByteBuf和UnpooledDirectByteBuf爲例:

  • 對象分配

UnpooledHeapByteBuf在構造函數裏直接新建了一個字節數組來保存數據:

private final ByteBufAllocator alloc;
    // 使用字節數組保存數據
    byte[] array;

    public UnpooledHeapByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
        super(maxCapacity);

        checkNotNull(alloc, "alloc");

        if (initialCapacity > maxCapacity) {
            throw new IllegalArgumentException(String.format(
                    "initialCapacity(%d) > maxCapacity(%d)", initialCapacity, maxCapacity));
        }

        this.alloc = alloc;
        setArray(allocateArray(initialCapacity));
        setIndex(0, 0);
    }
    // 分配字節數組
    protected byte [] allocateArray(int initialCapacity) {
        return new byte[initialCapacity];
    }

而UnpooledDirectByteBuf則在構造函數中直接新建了一個DirectBuffer:

// 使用DirectBuffer保存數據
    private ByteBuffer buffer;
    public UnpooledDirectByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
        super(maxCapacity);
        if (alloc == null) {
            throw new NullPointerException("alloc");
        }
        if (initialCapacity < 0) {
            throw new IllegalArgumentException("initialCapacity: " + initialCapacity);
        }
        if (maxCapacity < 0) {
            throw new IllegalArgumentException("maxCapacity: " + maxCapacity);
        }
        if (initialCapacity > maxCapacity) {
            throw new IllegalArgumentException(String.format(
                    "initialCapacity(%d) > maxCapacity(%d)", initialCapacity, maxCapacity));
        }

        this.alloc = alloc;
        setByteBuffer(allocateDirect(initialCapacity));
    }
    // 分配DirectBuffer
    protected ByteBuffer allocateDirect(int initialCapacity) {
        return ByteBuffer.allocateDirect(initialCapacity);
    }
  • 對象釋放

UnpooledHeapByteBuf的釋放全權交給JVM:

@Override
    protected void deallocate() {
        freeArray(array);
        array = EmptyArrays.EMPTY_BYTES;
    }

    protected void freeArray(byte[] array) {
        // NOOP
    }

而UnpooledDirectByteBuf則嘗試主動釋放其擁有的DirectBuffer:

@Override
    protected void deallocate() {
        ByteBuffer buffer = this.buffer;
        if (buffer == null) {
            return;
        }

        this.buffer = null;

        if (!doNotFree) {
            // 若是DirectBuffer還沒被釋放,則嘗試釋放之
            freeDirect(buffer);
        }
    }

    /**
     * Free a direct {@link ByteBuffer}
     */
    protected void freeDirect(ByteBuffer buffer) {
        PlatformDependent.freeDirectBuffer(buffer);
    }

池化

池化的ByteBuf都繼承自PooledByteBuf<T>類,包括PooledHeapByteBuf、PooledDirectByteBuf、PooledUnsafeDirectByteBuf、PooledUnsafeHeapByteBuf。

這四個子類都持有一個回收器字段,例如,在PooledHeapByteBuf中有:

private static final Recycler<PooledHeapByteBuf> RECYCLER = new Recycler<PooledHeapByteBuf>() {
        @Override
        protected PooledHeapByteBuf newObject(Handle<PooledHeapByteBuf> handle) {
            return new PooledHeapByteBuf(handle, 0);
        }
    };

Recycler<T>是一個抽象類,全部的子類都要實現一個newObject方法,用於新建一個子類ByteBuf對象。

Recycler<T>本質上實現的是一個棧的功能,新建ByteBuf的時候,能夠向Recycler<T>申請一個閒置對象;當ByteBuf使用完畢後,能夠回收並歸還給Recycler<T>。

  • 申請ByteBuf

子類調用RECYCLER.get()來申請閒置對象,方法實現:

public final T get() {
        if (maxCapacityPerThread == 0) {
            return newObject((Handle<T>) NOOP_HANDLE);
        }
        // 嘗試從棧中獲取閒置對象
        Stack<T> stack = threadLocal.get();
        DefaultHandle<T> handle = stack.pop();
        if (handle == null) {
            // 若是沒有閒置對象,調用newObject新建一個新的對象。
            handle = stack.newHandle();
            handle.value = newObject(handle);
        }
        return (T) handle.value;
    }

可見,新建池化的ByteBuf都是優先從棧中獲取閒置對象;當棧沒有閒置對象再新建。值得注意的是,新建對象還傳入了新建的handle,這個handle在對象回收階段會使用到。

另外爲了抹去歷史的使用痕跡,每一個新申請的ByteBuf對象,都會調用reuse方法進行初始化(以PooledDirectByteBuf爲例):

static PooledDirectByteBuf newInstance(int maxCapacity) {
        PooledDirectByteBuf buf = RECYCLER.get();
        buf.reuse(maxCapacity);
        return buf;
    }
    /**
     * Method must be called before reuse this {@link PooledByteBufAllocator}
     */
    final void reuse(int maxCapacity) {
        maxCapacity(maxCapacity);
        setRefCnt(1);
        setIndex0(0, 0);
        discardMarks();
    }
  • 回收ByteBuf

前面咱們提到:當ByteBuf引用數爲0的時候,會調用deallocate()方法進行釋放。實現以下:

@Override
    protected final void deallocate() {
        if (handle >= 0) {
            final long handle = this.handle;
            this.handle = -1;
            memory = null;
            chunk.arena.free(chunk, tmpNioBuf, handle, maxLength, cache);
            tmpNioBuf = null;
            chunk = null;
            recycle();
        }
    }

    private void recycle() {
        recyclerHandle.recycle(this);
    }

如上,最後調用了handler進行回收。所謂的回收動做,其實就是放回棧中:

static final class DefaultHandle<T> implements Handle<T> {
        private int lastRecycledId;
        private int recycleId;

        boolean hasBeenRecycled;

        private Stack<?> stack;
        private Object value;

        DefaultHandle(Stack<?> stack) {
            this.stack = stack;
        }

        @Override
        public void recycle(Object object) {
            if (object != value) {
                throw new IllegalArgumentException("object does not belong to handle");
            }

            Stack<?> stack = this.stack;
            if (lastRecycledId != recycleId || stack == null) {
                throw new IllegalStateException("recycled already");
            }
            // 將該handler從新放入棧中
            stack.push(this);
        }
    }

Unsafe

Unsafe實現

還有一類使用了Unsafe操做的ByteBuf,例如:UnpooledUnsafeDirectByteBuf、UnpooledUnsafeHeapByteBuf、PooledUnsafeHeapByteBuf、PooledUnsafeDirectByteBuf。

這類ByteBuf的特色就是全部的讀寫操做都是用了sun.misc.Unsafe。以下:

@Override
    protected byte _getByte(int index) {
        return UnsafeByteBufUtil.getByte(addr(index));
    }

而UnsafeByteBufUtil底層調用了sum.misc.Unsafe :

static byte getByte(long address) {
        return UNSAFE.getByte(address);
    }

而不是用Unsafe操做的ByteBuf,通常使用的是HeapByteBufUtil:

@Override
    protected byte _getByte(int index) {
        return HeapByteBufUtil.getByte(memory, idx(index));
    }

而HeapByteBufUtil底層實際上是簡單的數組尋址:

static byte getByte(byte[] memory, int index) {
        return memory[index];
    }

Unsafe的性能提高

Unsafe操做能夠帶來很是可觀的性能提高,我寫了一個簡單的Benchmark測了下:

@BenchmarkMode({Mode.Throughput})
    @Warmup(iterations = 1)
    @Measurement(iterations = 2, time = 1)
    @OutputTimeUnit(TimeUnit.SECONDS)
    @Fork(value = 2)
    @Threads(8)
    @State(Scope.Benchmark)
    @OperationsPerInvocation
    public class UnsafeBenchmark {

        private static byte[] unsafeByteArray;

        private static byte[] safeByteArray;

        @Setup
        public void setup() {
            unsafeByteArray = new byte[100];

            safeByteArray = new byte[100];
        }

        @Benchmark
        public void unsafeMethod() {
            int value = 1;
            UnsafeByteBufUtil.setByte(unsafeByteArray, 0, value);
            UnsafeByteBufUtil.getByte(unsafeByteArray, 0);

            long longValue = 100L;
            UnsafeByteBufUtil.setLong(unsafeByteArray, 0, longValue);
            UnsafeByteBufUtil.getLong(unsafeByteArray, 0);
        }

        @Benchmark
        public void safeMethod() {
            int value = 1;
            HeapByteBufUtil.setByte(safeByteArray, 0, value);
            HeapByteBufUtil.getByte(safeByteArray, 0);

            long longValue = 100L;
            HeapByteBufUtil.setLong(safeByteArray, 0, longValue);
            HeapByteBufUtil.getLong(safeByteArray, 0);
        }

        public static void main(String[] args) throws RunnerException {
            Options opt = new OptionsBuilder()
                    .include(UnsafeBenchmark.class.getSimpleName())
                    .build();

            new Runner(opt).run();
        }
    }

測試結果顯示UnsafeByteBufUtil的性能很是優越:

Benchmark                                Mode  Samples           Score            Error  Units
c.t.n.u.UnsafeBenchmark.safeMethod      thrpt        4   168827679.833 ±   71641561.636  ops/s
c.t.n.u.UnsafeBenchmark.unsafeMethod    thrpt        4  3141320463.164 ± 1204482723.948  ops/s

總結

本文詳細介紹了ByteBuf及其子類的實現原理,包括讀寫指針、經常使用方法、淺複製、引用計數、池化、Unsafe對象等實現原理。
經過上述深刻分析,咱們瞭解到ByteBuf在Netty中承擔着運載數據的重要功能。若是將Netty比做一個完整的生物體,那麼將ByteBuf比做血液,那就再恰當不過了。

參考資料

相關文章
相關標籤/搜索