Netty 之 Zero-copy 的實現(上)

維基百科中對 Zero-copy 的解釋是java

零拷貝技術是指計算機執行操做時,CPU不須要先將數據從某處內存複製到另外一個特定區域。這種技術一般用於經過網絡傳輸文件時節省CPU週期和內存帶寬。網絡

維基百科裏提到的零拷貝是在硬件和操做系統層面的,而本文主要介紹的是Netty在應用層面的優化。不過須要注意的是,零拷貝並不是字面意義上的沒有內存拷貝,而是避免多餘的拷貝操做,即便是系統層的零拷貝也有從設備到內存,內存到設備的數據拷貝過程。jvm

Netty 的零拷貝體如今如下幾個方面ide

  • ByteBufslice 操做並不會拷貝一份新的 ByteBuf 內存空間,而是直接借用原來的 ByteBuf ,只是獨立地保存讀寫索引。
  • Netty 提供了 CompositeByteBuf 類,能夠將多個 ByteBuf 組合成一個邏輯上的 ByteBuf
  • Netty 的 FileRegion 中包裝了 NIOFileChannel.transferTo()方法,該方法在底層系統支持的狀況下會調用 sendfile 方法,從而在傳輸文件時避免了用戶態的內存拷貝。
  • Netty 的 PooledDirectByteBuf 等類中封裝了 NIODirectByteBuffer ,而 DirectByteBuffer 是直接在 jvm 堆外分配的內存,省去了堆外內存向堆內存拷貝的開銷。

下面來簡單介紹下這幾種方式。函數

slice

如下以 AbstractUnpooledSlicedByteBuf 爲例講解 slice 的零拷貝原理,至於內存池化的實現 PooledSlicedByteBuf ,由於內存池要經過引用計數來控制內存的釋放,因此代碼裏會出現不少與本文主題無關的邏輯,這裏就不拿來舉栗子了。優化

// 切片ByteBuf的構造函數,其中字段adjustment爲切片ByteBuf相對於被切片ByteBuf的偏移
// 量,兩個ByteBuf共用一塊內存空間,字段buffer爲實際存儲數據的ByteBuf
AbstractUnpooledSlicedByteBuf(ByteBuf buffer, int index, int length) {
    super(length);
    checkSliceOutOfBounds(index, length, buffer);//檢查slice是否越界
    
    if (buffer instanceof AbstractUnpooledSlicedByteBuf) {
        // 若是被切片ByteBuf也是AbstractUnpooledSlicedByteBuf對象
        this.buffer = ((AbstractUnpooledSlicedByteBuf) buffer).buffer;
        adjustment = ((AbstractUnpooledSlicedByteBuf) buffer).adjustment + index;
    } else if (buffer instanceof DuplicatedByteBuf) {
        // 若是被切片ByteBuf爲DuplicatedByteBuf對象,則
        // 用unwrap獲得實際存儲數據的ByteBuf賦值buffer
        this.buffer = buffer.unwrap();
        adjustment = index;
    } else {
        // 若是被切片ByteBuf爲通常ByteBuf對象,則直接賦值buffer
        this.buffer = buffer;
        adjustment = index;
    }

    initLength(length);
    writerIndex(length);
}

以上爲 AbstractUnpooledSlicedByteBuf 類的構造函數,比較簡單,就不詳細介紹了。this

下面來看看 AbstractUnpooledSlicedByteBufByteBuf 接口的實現代碼,以 getBytes 方法爲例:操作系統

@Override
public ByteBuf getBytes(int index, ByteBuffer dst) {
    checkIndex0(index, dst.remaining());//檢查是否越界
    unwrap().getBytes(idx(index), dst);
    return this;
}

@Override
public ByteBuf unwrap() {
    return buffer;
}

private int idx(int index) {
    return index + adjustment;
}

這是 AbstractUnpooledSlicedByteBuf 重載的 getBytes 方法,能夠看到 AbstractUnpooledSlicedByteBuf 是直接在封裝的 ByteBuf 上取的字節,可是從新計算了索引,加上了相對偏移量。code

CompositeByteBuf

在有些場景裏,咱們的數據會分散在多個 ByteBuf 上,可是咱們又但願將這些 ByteBuf 聚合在一個 ByteBuf 裏處理。這裏最直觀的想法是將全部 ByteBuf 的數據拷貝到一個 ByteBuf 上,可是這樣會有大量的內存拷貝操做,產生很大的CPU開銷。component

CompositeByteBuf 能夠很好地解決這個問題,正如名字同樣,這是一個複合 ByteBuf ,內部由不少的 ByteBuf 組成,但 CompositeByteBuf 給它們作了一層封裝,能夠直接以 ByteBuf 的接口操做它們。

/**
 * Precondition is that {@code buffer != null}.
 */
private int addComponent0(boolean increaseWriterIndex, int cIndex, ByteBuf buffer) {
    assert buffer != null;
    boolean wasAdded = false;
    try {
        // 檢查新增的component的索引是否合法
        checkComponentIndex(cIndex);

        // buffer的長度
        int readableBytes = buffer.readableBytes();

        // No need to consolidate - just add a component to the list.
        @SuppressWarnings("deprecation")
        // 統一爲大端ByteBuf
        Component c = new Component(buffer.order(ByteOrder.BIG_ENDIAN).slice());
        if (cIndex == components.size()) {
            // 若是索引等於components的大小,則加在components尾部
            wasAdded = components.add(c);
            if (cIndex == 0) {
                // 若是components中只有一個元素
                c.endOffset = readableBytes;
            } else {
                // 若是components中有多個元素
                Component prev = components.get(cIndex - 1);
                c.offset = prev.endOffset;
                c.endOffset = c.offset + readableBytes;
            }
        } else {
            // 若是新的ByteBuf是插在components中間
            components.add(cIndex, c);
            wasAdded = true;
            if (readableBytes != 0) {
                // 若是components的大小不爲0,則依次更新cIndex以後的
                // 全部components的offset和endOffset
                updateComponentOffsets(cIndex);
            }
        }
        if (increaseWriterIndex) {
            // 若是要更新writerIndex
            writerIndex(writerIndex() + buffer.readableBytes());
        }
        return cIndex;
    } finally {
        if (!wasAdded) {
            // 若是沒添加成功,則釋放ByteBuf
            buffer.release();
        }
    }
}

這是添加一個新的 ByteBuf 的邏輯,核心是 offsetendOffset ,分別指代一個 ByteBufCompositeByteBuf 中開始和結束的索引,它們惟一標記了這個 ByteBufCompositeByteBuf 中的位置。

弄清楚了這個,咱們會發現上面的代碼無外乎作了兩件事:

  1. ByteBuf 封裝成 Component 加到 components 合適的位置上
  2. 使 components 裏的每一個 ComponentoffsetendOffset 值都正確

下面來看看 CompositeByteBufByteBuf 接口的實現代碼,一樣以 getBytes 方法爲例:

@Override
public CompositeByteBuf getBytes(int index, ByteBuf dst, int dstIndex, int length) {
    // 查索引是否越界
    checkDstIndex(index, length, dstIndex, dst.capacity());
    if (length == 0) {
        return this;
    }

    // 用二分搜索查找index對應的Component在components中的索引
    int i = toComponentIndex(index);
    // 循環讀直至length爲0
    while (length > 0) {
        Component c = components.get(i);
        ByteBuf s = c.buf;
        int adjustment = c.offset;
        // 取length和ByteBuf剩餘字節數中的較小值
        int localLength = Math.min(length, s.capacity() - (index - adjustment));
        // 開始索引爲index - c.offset,而不是0
        s.getBytes(index - adjustment, dst, dstIndex, localLength);
        index += localLength;
        dstIndex += localLength;
        length -= localLength;
        i ++;
    }
    return this;
}

/**
 * Return the index for the given offset
 */
public int toComponentIndex(int offset) {
    checkIndex(offset);

    for (int low = 0, high = components.size(); low <= high;) {
        int mid = low + high >>> 1;
        Component c = components.get(mid);
        if (offset >= c.endOffset) {
            low = mid + 1;
        } else if (offset < c.offset) {
            high = mid - 1;
        } else {
            return mid;
        }
    }

    throw new Error("should not reach here");
}

能夠看到 CompositeByteBuf 在處理 index 時是先將其轉換成對應 Componentcomponents 中的索引,以及在 Component 中的偏移,而後從這個 Component 的這個偏移開始,日後循環取字節,直到讀完。

NOTE:這裏有個小trick,由於 components 是有序排列的,因此 toComponentIndex 作索引轉換時沒有直接遍歷,而是用的二分查找。

今天寫得有點累了,這裏留個坑,下一篇再填上。

相關文章
相關標籤/搜索