netty4.x ByteBuf 基本機制及其骨架實現

概述

netty 是一個 NIO 框架,在 JDK API 已提供相對直接的 NIO Library 的狀況下,幾乎不多的軟件系統會直接用 NIO 進行編程,也不多有開發者會直接使用 NIO 技術開發網絡相關的程序。由於 native nio library 已飽受詬病,API 難用,容易出錯,存在一些聲稱解決但還沒解決的 bug(bug id = 6403933,JDK 1.7 聲稱解決了該 Bug,但實際上只是下降了該 bug 發生的機率),使用 native nio library 來開發可靠性、魯棒性高的網絡程序,工做量以及出錯率都要更高,使用 netty 框架,就是爲了解決這些問題。java

ByteBuffer 的懺悔

基於 NIO 非阻塞模型的編程,基本上是面向數據容器編程,BIO 與 NIO 除了它們在阻塞 IO 線程方面有所不一樣外,它們在操做數據方面是有一些共性的,那就是從網絡流中讀數據,並放入一個容器中。
對於 BIO 來講,大多數時候,這個容器就是一個字節數組編程

byte[] buf = new byte[8196];
int cnt = 0;
while ((cnt = input.read(buf)) != -1) {
    //......
}

在這裏,容器就是指 buf 這個字節數組。而在 NIO 中,容器是指 ByteBuffer,因爲 NIO 編程的複雜性,須要解決相似於 TCP 半包問題等,所以對這個容器的要求不只僅是「存儲數據」那麼簡單,還但願這個容器能提供另外的功能,這是 ByteBuffer 存在的緣由,它提供了一些方便的 API,讓開發者操做底層的字節數組。
然而 ByteBuffer 存在幾個不得人心的缺點:設計模式

  1. API 功能有限
  2. 長度固定
  3. 讀寫時的手工操做

從 ByteBuffer 的源碼裏能夠看到,它用 4 個下標來輔助管理本身身上的數據,參見它的父類 java.nio.Buffer
ByteBuffer 原理數組

capacity 是 ByteBuffer 的總容量,一旦設定不能改變,就像一個水缸同樣,水缸的大小永遠是你看到它的時候那麼大,它不會變大也不會變小,最多能裝多少水是肯定的;
在這裏假設一種先向 ByteBuffer 寫數據後再讀出來的場景。往 ByteBuffer 裏寫入數據時,寫入多少數據,position 這個下標就會增長多少,換言之,在往 ByteBuffer 寫數據時,position 指向的是下一個能夠寫入的位置,而 limit 此時會和 capacity 同樣大。開始讀數據時,第一個,要知道從哪裏讀,第二個,要知道讀到哪裏爲止,爲此 ByteBuffer 提供了一個 flip() 方法,這個 flip() 方法將 limit 置爲 position 位置,此時 limit 表明要讀到哪裏爲止,再將 position 位置置爲 0,此時 position 表明要從哪裏開始讀。微信

flip 演示

所以,在讀的時候,讀取 position 到 limit 之間的數據,就能讀到上一次寫入的數據。但不得不說,這種方法顯得有點笨拙,不太人性化,這意味着在編寫代碼的時候,要時刻謹記寫完數據後,讀數據以前,要先調用 flip 方法,這種「不著名」的潛規則,容易讓開發者趟坑。網絡

ByteBuf 讓人耳目一新

netty 中的 ByteBuf 採用了新的作法,只用兩個下標來輔助管理數據,分別是 readerIndex 和 writerIndexapp

ByteBuf 初始下標位置

readerIndex 表明當前讀取的位置,writerIndex 表明下一個能夠寫入的位置,寫入一部分數據後,writerIndex 往右移動,而 readerIndex 和 writeIndex 之間的數據就變爲可讀的了。框架

寫入一部分數據後

若是原先寫入了 N 個長度的數據,接下來讀取 M (M < N)個長度的數據,那麼讀取後 ByteBuf 就變成下面的樣子ide

讀取一部分數據後

咱們再也不須要那笨拙的 flip 方法了,只須要關注 readerIndex 與 writerIndex。工具

ByteBuf 譜系

在 netty4.x 中,ByteBuf 是一個抽象類,但它也是在十分抽象,由於它定義的全部方法都是抽象方法,若是換我來想,我會想怎麼不定義爲一個 Interface 呢,ByteBuf 類也加了一個註解

@SuppressWarnings("ClassMayBeInterface")

但這麼作其實無傷大雅,留着抽象類的身份,猜想是考慮到了之後可能增長工具類方法或者公共方法。ByteBuf 下的子類以下圖所示:

ByteBuf 子類

除了 AbstractByteBuf 類,其它直接的子類都給人一種有「特殊做用」的感受,好比說 EmptyByteBuf。最主要的類仍是 AbstractByteBuf 類,它定義了大多數 ByteBuf 功能的公共邏輯代碼,在 netty 應用程序的開發中,用到的 ByteBuf 的功能,以及 ByteBuf 的具體實例,都跟它有關。

AbstractByteBuf 譜系

特殊做用的類

與 AbstractByteBuf 同級的類有 EmptyByteBuf,UnreleasableByteBuf,SwappedByteBuf 以及 ReplayingDecoderBuffer。想要解讀這些有特殊做用的類,須要先了解字節序引用計數

字節序

兩個計算機系統之間通訊,經過網絡發送字節數據,雙方必須爲字節數據的順序達成一致的協議,不然將沒法對數據進行正確的解析,不一樣的計算機體系結構有不一樣的字節序,字節序可分爲大端字節序(big-endian)和小端字節序(little-endian)。

  • 小端就是低位字節排放在內存的低地址端,高位字節排放在內存的高地址端
  • 大端就是高位字節排放在內存的低地址端,低位字節排放在內存的高地址端

以數字 0x12 34 56 78爲例,在大端模式下,其存儲的形式爲:

低地址 -----------------> 高地址
0x12  |  0x34  |  0x56  |  0x78

小端模式下,其存儲形式爲:

低地址 ------------------> 高地址
0x78  |  0x56  |  0x34  |  0x12

通常狀況下,基於 TCP 的網絡通訊約定採用大端字節序,而機器 CPU 的字節序則各有各的不一樣。

SwappedByteBuf 與字節序

SwappedByteBuf 這個類的命名並無直接地反映出類的做用,在 ByteBuf 類中定義了一個方法,用於設置該 ByteBuf 中的數據採用的是哪一種字節序存儲數據:

public abstract ByteBuf order(ByteOrder endianness);

netty 中的 ByteBuf 默認是使用 big-endian 的,若是須要修改字節序,意味着讀寫數據的時候要進行順序的轉換,通常狀況下咱們會直接在 ByteBuf 的讀寫方法裏去作修改,但那樣意味着要修改不少個方法,netty 的作法是爲每一個 ByteBuf 集成一個 SwappedByteBuf,做爲自身的字節序包裝器。以 AbstractByteBuf 的 order 方法爲例:

@Override
    public ByteBuf order(ByteOrder endianness) {
        if (endianness == null) {
            throw new NullPointerException("endianness");
        }
        if (endianness == order()) {
            return this;
        }

        SwappedByteBuf swappedBuf = this.swappedBuf;
        if (swappedBuf == null) {
            this.swappedBuf = swappedBuf = new SwappedByteBuf(this);
        }
        return swappedBuf;
    }

AbstractByteBuf 組合了一個 SwappedByteBuf 實例,當它的 order 方法被調用來設置字節序時,若是設置的字節序與自身的字節序不一樣,那麼就將本身披上 SwappedByteBuf 外套,返回自身。接下來看 SwappedByteBuf 的具體實現,能夠發現,SwappedByteBuf 裏維護了被它包裝的 ByteBuf,以及新的 ByteOrder。

public final class SwappedByteBuf extends ByteBuf {

    private final ByteBuf buf;
    private final ByteOrder order;

    public SwappedByteBuf(ByteBuf buf) {
        if (buf == null) {
            throw new NullPointerException("buf");
        }
        this.buf = buf;
        if (buf.order() == ByteOrder.BIG_ENDIAN) {
            order = ByteOrder.LITTLE_ENDIAN;
        } else {
            order = ByteOrder.BIG_ENDIAN;
        }
    }

    ......  
}

與字節序無關的操做,都 delegate 給原來的 buf,例如:

@Override
    public int capacity() {
        return buf.capacity();
    }

而與字節序有關的操做,則根據當前的字節序,對數據進行反排序處理,例如 writeInt 方法:

@Override
    public ByteBuf writeInt(int value) {
        buf.writeInt(ByteBufUtil.swapInt(value));
        return this;
    }
/**
     * Toggles the endianness of the specified 32-bit integer.
     */
    public static int swapInt(int value) {
        return Integer.reverseBytes(value);
    }

一樣,除了寫數據相關的方法,讀數據相關的方法也是這麼處理的。

引用計數

netty 中 ByteBuf 用來做爲數據的容器,是一種頻繁被建立和銷燬的對象,ByteBuf 須要的內存空間,能夠在 JVM Heap 中申請分配,也能夠在 Direct Memory 中申請,其中在 Direct Memory 中分配的 ByteBuf,其建立和銷燬的代價比在 JVM Heap 中的更高,但拋開哪一個代價高哪一個代價低不說,光是頻繁建立和頻繁銷燬這一點,就已奠基了效率不高的基調。
netty 中支持 ByteBuf 的池化,而引用計數就是實現池化的關鍵技術點,不過並不是只有池化的 ByteBuf 纔有引用計數,非池化的也會有引用計數。
ByteBuf 類實現了 ReferenceCounted 接口,該接口標記一個類是一個引用計數管理對象。

public abstract class ByteBuf implements ReferenceCounted, Comparable<ByteBuf>

ReferenceCounted 接口定義了這幾個方法:

public interface ReferenceCounted {
    int refCnt();

    ReferenceCounted retain();

    ReferenceCounted retain(int increment);

    boolean release();

    boolean release(int decrement);
}

每個引用計數對象,都維護了自身的引用計數,當第一次被建立時,引用計數爲1,經過 refCnt() 方法能夠獲得當前的引用計數,retain() retain(int increment) 增長自身的引用計數,而 release() 和 release(int increment) 則減小當前的引用計數,若是引用計數達到 0,而且當前的 ByteBuf 被釋放成功,那這兩個方法的返回值爲 true。須要注意的是,各類不一樣類型的 ByteBuf 本身決定機子的釋放方式,若是是池化的 ByteBuf,那麼就會進池子,若是不是池化的,則銷燬底層的字節數組引用或者釋放對應的堆外內存。
經過 AbstractReferenceCountedByteBuf 這個類的 release 方法實現,能夠看出大概的執行邏輯:

@Override
    public final boolean release() {
        for (;;) {
            int refCnt = this.refCnt;
            if (refCnt == 0) {
                throw new IllegalReferenceCountException(0, -1);
            }

            if (refCntUpdater.compareAndSet(this, refCnt, refCnt - 1)) {
                if (refCnt == 1) {
                    deallocate();
                    return true;
                }
                return false;
            }
        }
    }

釋放對象的方法定義在 deallocate() 方法裏,而它是個抽象方法。
對於非池化的 heap ByteBuf 來講,釋放對象實際上就是釋放底層字節數組的引用:

@Override
    protected void deallocate() {
        array = null;
    }

對於非池化的 direct ByteBuf 來講,釋放對象實際上就是釋放堆外內存:

@Override
    protected void deallocate() {
        ByteBuffer buffer = this.buffer;
        if (buffer == null) {
            return;
        }

        this.buffer = null;

        if (!doNotFree) {
            PlatformDependent.freeDirectBuffer(buffer);
        }

        if (leak != null) {
            leak.close();
        }
    }

對於池化的 ByteBuf 來講,就是把本身歸還到對象池裏:

@Override
    protected final void deallocate() {
        if (handle >= 0) {
            final long handle = this.handle;
            this.handle = -1;
            memory = null;
            chunk.arena.free(chunk, handle);
            if (leak != null) {
                leak.close();
            } else {
                recycle();
            }
        }
    }

UnreleasableByteBuf 與引用計數

顧名思義,這個類就是不可釋放的 ByteBuf,它也是一個包裝器模式的引用,被它包裝的 ByteBuf 不會受引用計數的影響,不會被釋放,它對 ReferenceCounted 接口的實現以下所示:

@Override
    public ByteBuf retain(int increment) {
        return this;
    }

    @Override
    public ByteBuf retain() {
        return this;
    }

    @Override
    public boolean isReadable(int size) {
        return buf.isReadable(size);
    }

    @Override
    public boolean isWritable(int size) {
        return buf.isWritable(size);
    }

    @Override
    public int refCnt() {
        return buf.refCnt();
    }

可見它直接忽略了對 retain 和 release 方法的調用效果,這種「不可釋放的 ByteBuf」在什麼狀況下會用到呢,在一些靜態的具備固定內容而且內容不改變的 ByteBuf 時候會用到,由於很是經常使用,因此不須要釋放,會更有效率。例如在處理 HTTP 協議時候,常常須要返回帶有回車換行的數據,這裏回車換行就能夠定義爲一個靜態的 ByteBuf,而且不容許釋放。這有點相似於設計模式中單例模式的那個「單例」。

EmptyByteBuf

EmptyByteBuf 是一個沒有任何內容,也不容許讀或者寫的 ByteBuf,它存在的目的是爲了在調用 ByteBufAllocator 建立新 ByteBuf 的時候,若是指定容量大小爲0,則返回一個 EmptyByteBuf,這裏僅僅是單例模式的一個運用

ReplayingDecoderBuffer

這個 ByteBuf 專用於 ReplayingDecoder,這個 decoder 主要是爲了完成對一段已知長度報文進行全包獲取,由於這個場景在網絡編程中太經常使用了,所以 netty 單獨實現了一個 ReplayingDecoder 來應對這種場景。這裏暫時不深刻講解 ReplayingDecoder。

ByteBuf 骨架實現

AbstractByteBuf 是 ByteBuf 的骨架實現,它實現了大部分與 ByteBuf 有關的功能方法,把不肯定的行爲留爲抽象方法,交給它的實現者去實現。

setter 與 getter

爲了實踐面向對象封裝的特性,見過太多類在定義其變量的 setter 和 getter 方法時,清一色地使用 setXXX(int xxx)getXXX()。不過 netty 的編碼風格中,它的 setter 和 getter 方法是這樣的:

public ByteBuf readerIndex(int readerIndex); // setter
public int readerIndex(); //getter

方法名同名,但參數列表和返回值不同。而且對於 setter 類方法來講,它支持更加 modern 的作法,那就是方法的鏈式調用,setter 後返回自身,立馬能夠進行下一次方法調用。
但在 AbstractByteBuf 中仍是有以 set 開頭的的方法的,好比說:

@Override
    public ByteBuf setIndex(int readerIndex, int writerIndex) {
        if (readerIndex < 0 || readerIndex > writerIndex || writerIndex > capacity()) {
            throw new IndexOutOfBoundsException(String.format(
                    "readerIndex: %d, writerIndex: %d (expected: 0 <= readerIndex <= writerIndex <= capacity(%d))",
                    readerIndex, writerIndex, capacity()));
        }
        this.readerIndex = readerIndex;
        this.writerIndex = writerIndex;
        return this;
    }

而其它的 set 開頭的方法,則不能說它是 setter 了,由於這些方法其實是在操做數據,爲某個下標位置填入數據,例如:

public ByteBuf setByte(int index, int value);

讀取數據

AbstractByteBuf 中有兩類讀取數據的方法,一類以 get 開頭,例如 getInt(),另外一類以 read 開頭,例如readInt()。這二者的區別是,get 不會致使 readerIndex 的增長,而 read 會致使 readerIndex 的增長;另外一個區別是,read 只能讀取已經被寫入的數據,也就是說,讀取的位置不能超過 writeIndex,而 get 卻能夠在任意位置讀取,只要不超過 capacity 就能夠。經過如下代碼能夠看出這兩點區別:

@Override
    public int getInt(int index) {
        checkIndex(index, 4);
        return _getInt(index);
    }

    protected final void checkIndex(int index, int fieldLength) {
        ensureAccessible();
        if (fieldLength < 0) {
            throw new IllegalArgumentException("length: " + fieldLength + " (expected: >= 0)");
        }
        if (index < 0 || index > capacity() - fieldLength) {
            throw new IndexOutOfBoundsException(String.format(
                    "index: %d, length: %d (expected: range(0, %d))", index, fieldLength, capacity()));
        }
    }

    @Override
    public int readInt() {
        checkReadableBytes(4);
        int v = _getInt(readerIndex);
        readerIndex += 4;
        return v;
    }

    protected final void checkReadableBytes(int minimumReadableBytes) {
        ensureAccessible();
        if (minimumReadableBytes < 0) {
            throw new IllegalArgumentException("minimumReadableBytes: " + minimumReadableBytes + " (expected: >= 0)");
        }
        if (readerIndex > writerIndex - minimumReadableBytes) {
            throw new IndexOutOfBoundsException(String.format(
                    "readerIndex(%d) + length(%d) exceeds writerIndex(%d): %s",
                    readerIndex, minimumReadableBytes, writerIndex, this));
        }
    }

在這裏也體現了出了先前提到的引用計數的做用,在讀取的時候,會調用 ensureAccessible() 方法來肯定當前本身的引用計數是多少。若是是 0,則這次讀取時非法的。

protected final void ensureAccessible() {
        if (refCnt() == 0) {
            throw new IllegalReferenceCountException(0);
        }
    }

一樣,並不是只有讀取數據纔會判斷引用計數,寫入數據的時候也會判斷引用計數。
真正讀取數據的方法,定義成了抽象方法,供不一樣的實現者去實現,例如 _getInt() 方法,Heap ByteBuf 的實現是直接讀取底層的數組:

@Override
    protected int _getInt(int index) {
        return  (array[index]     & 0xff) << 24 |
                (array[index + 1] & 0xff) << 16 |
                (array[index + 2] & 0xff) <<  8 |
                 array[index + 3] & 0xff;
    }

而 Direct ByteBuf,則是委託給了 ByteBuffer :

@Override
    protected int _getInt(int index) {
        return buffer.getInt(index);
    }

寫入數據

與讀取數據同樣,寫入數據也分爲改變 writerIndex 和不改變 writerIndex 的方法,分別是 write 開頭和 set 開頭。其中 set 開頭的方法和讀取數據時的 get 開頭的方法同樣,都只是檢查一下有沒有超過 capacity,並不會去檢查 writerIndex 或者是 readerIndex,至關於說這些方法能夠在任意一個地方寫入數據,只要不超過 capacity,以下所示:

@Override
    public ByteBuf setInt(int index, int value) {
        checkIndex(index, 4);
        _setInt(index, value);
        return this;
    }

而 write 開頭的方法的調用,則會對應着 writerIndex 的增加:

@Override
    public ByteBuf writeInt(int value) {
        ensureWritable(4);
        _setInt(writerIndex, value);
        writerIndex += 4;
        return this;
    }

注意在這裏,寫入操做還伴隨着對是否有足夠的空間寫入的肯定,繼而伴隨着 ByteBuf 的動態擴容。

ByteBuf 動態擴容機制

若是當前沒有足夠的空間寫入數據了,ByteBuffer 會直接報錯,而 ByteBuf 則會進行動態擴容,其擴容的主要邏輯在如下的方法裏:

@Override
    public ByteBuf ensureWritable(int minWritableBytes) {
        if (minWritableBytes < 0) {
            throw new IllegalArgumentException(String.format(
                    "minWritableBytes: %d (expected: >= 0)", minWritableBytes));
        }

        if (minWritableBytes <= writableBytes()) {
            return this;
        }

        if (minWritableBytes > maxCapacity - writerIndex) {
            throw new IndexOutOfBoundsException(String.format(
                    "writerIndex(%d) + minWritableBytes(%d) exceeds maxCapacity(%d): %s",
                    writerIndex, minWritableBytes, maxCapacity, this));
        }

        // Normalize the current capacity to the power of 2.
        int newCapacity = calculateNewCapacity(writerIndex + minWritableBytes);

        // Adjust to the new capacity.
        capacity(newCapacity);
        return this;
    }

首先的前提是,擴容雖好,但並不意味着能夠無限擴容,所以有一個 maxCapaciy 變量限制着:你能夠擴容,但不能夠無限擴容,我容許你走進個人世界,但不容許你在個人世界裏走來走去。
擴容的邏輯主要分爲兩塊:

  1. 計算新的容量
  2. 擴展至新容量

計算新容量的方法以下所示:

private int calculateNewCapacity(int minNewCapacity) {
        final int maxCapacity = this.maxCapacity;
        final int threshold = 1048576 * 4; // 4 MiB page

        if (minNewCapacity == threshold) {
            return threshold;
        }

        // If over threshold, do not double but just increase by threshold.
        if (minNewCapacity > threshold) {
            int newCapacity = minNewCapacity / threshold * threshold;
            if (newCapacity > maxCapacity - threshold) {
                newCapacity = maxCapacity;
            } else {
                newCapacity += threshold;
            }
            return newCapacity;
        }

        // Not over threshold. Double up to 4 MiB, starting from 64.
        int newCapacity = 64;
        while (newCapacity < minNewCapacity) {
            newCapacity <<= 1;
        }

        return Math.min(newCapacity, maxCapacity);
    }

計算新容量的邏輯很簡單,若是指望的新容量不超過 4MB,則從 64 字節開始,一直翻倍,直到超過時望的新容量,此時新的容量不大於 4MB,而且是 64 的倍數。若是指望的新容量已經超過了 4MB,那麼就再增長 4 MB 的倍數,至因而1倍仍是2倍仍是N倍,由指望的容量決定。
計算完新的容量,接下來就須要把 ByteBuf 的容量擴展至新的容量,擴展容量對於不一樣類型的 ByteBuf 來講,其實現方式也不同,例如對於 Heap ByteBuf 來講,擴容就意味着數組拷貝,以下所示:

@Override
    public ByteBuf capacity(int newCapacity) {
        ensureAccessible();
        if (newCapacity < 0 || newCapacity > maxCapacity()) {
            throw new IllegalArgumentException("newCapacity: " + newCapacity);
        }

        int oldCapacity = array.length;
        if (newCapacity > oldCapacity) {
            byte[] newArray = new byte[newCapacity];
            System.arraycopy(array, readerIndex(), newArray, readerIndex(), readableBytes());
            setArray(newArray);
        } else if (newCapacity < oldCapacity) {
            byte[] newArray = new byte[newCapacity];
            int readerIndex = readerIndex();
            if (readerIndex < newCapacity) {
                int writerIndex = writerIndex();
                if (writerIndex > newCapacity) {
                    writerIndex(writerIndex = newCapacity);
                }
                System.arraycopy(array, readerIndex, newArray, readerIndex, writerIndex - readerIndex);
            } else {
                setIndex(newCapacity, newCapacity);
            }
            setArray(newArray);
        }
        return this;
    }

這是 ByteBuf 比 ByteBuffer 更好的一個地方,既有 maxCapacity 防止無限擴容,又能在容許的範圍內動態擴展容量,開發者無須關心。至於擴展的梯段爲何是 4MB,還沒辦法知道這個值是怎麼來的,應該是通過大量的測試或者以經驗來判斷的。

丟棄一部分數據

前面提到一張圖,當寫入數據後讀取一部分數據,被讀取後的那一部分,實際上就變成了能夠丟棄的數據了,不然就會有一種「佔着茅坑不拉shi」的感受了,白白佔用了大量的空間

能夠丟棄的數據

AbstractByteBuf 提供了方法來對這些數據進行丟棄,原理其實就是將有效的數據移位,重置 readerIndex 和 writerIndex,對於 Heap ByteBuf 來講,這一般也意味着數組拷貝。

@Override
    public ByteBuf discardReadBytes() {
        ensureAccessible();
        if (readerIndex == 0) {
            return this;
        }

        if (readerIndex != writerIndex) {
            setBytes(0, this, readerIndex, writerIndex - readerIndex);
            writerIndex -= readerIndex;
            adjustMarkers(readerIndex);
            readerIndex = 0;
        } else {
            adjustMarkers(readerIndex);
            writerIndex = readerIndex = 0;
        }
        return this;
    }

一般,數組拷貝是一個關於性能的敏感詞,過多的數組拷貝,意味着效率低,所以除非能確承認以丟棄的數據佔整個 ByteBuf 的大部分,不然不要輕易去顯式丟棄那些已經讀取的數據。

掃一掃關注個人微信公衆號

相關文章
相關標籤/搜索