Netty 零拷貝(一)NIO 對零拷貝的支持

Netty 零拷貝(二)NIO 對零拷貝的支持

Netty 系列目錄 (http://www.javashuo.com/article/p-hskusway-em.html)html

Buffer類結構

  • 非直接緩衝區(HeapByteBuffer):在 JVM 內存上分配一個字節數組 byte[] hb
  • 直接緩衝區(DirectByteBuffer):保存一個指向系統內核的地址 long address

1、非直接緩衝區和直接緩衝區

(1) Buffer 分配java

// 分配非直接緩衝區
public static ByteBuffer allocate(int capacity) {
    if (capacity < 0)
        throw new IllegalArgumentException();
    return new HeapByteBuffer(capacity, capacity);
}

// 分配直接緩衝區
public static ByteBuffer allocateDirect(int capacity) {
    return new DirectByteBuffer(capacity);
}

(2) ByteBuffer 內存存儲數組

public abstract class Buffer {
    // Invariants: mark <= position <= limit <= capacity
    private int mark = -1;
    private int position = 0;
    private int limit;
    private int capacity;

    // 直接緩衝區指向系統內核的一個地址,之因此放到父類中是爲了加快 JNI 的訪問速度
    long address;
}

public abstract class ByteBuffer extends Buffer {
    // 非直接緩衝區在 JVM 內存上分配一個字節數組
    final byte[] hb;
}

(3) DirectByteBufferapp

DirectByteBuffer(int cap) {
    super(-1, 0, cap, cap);
    boolean pa = VM.isDirectMemoryPageAligned();
    int ps = Bits.pageSize();
    long size = Math.max(1L, (long)cap + (pa ? ps : 0));
    Bits.reserveMemory(size, cap);

    long base = 0;
    try {
        base = unsafe.allocateMemory(size);
    } catch (OutOfMemoryError x) {
        Bits.unreserveMemory(size, cap);
        throw x;
    }
    unsafe.setMemory(base, size, (byte) 0);
    if (pa && (base % ps != 0)) {
        address = base + ps - (base & (ps - 1));
    } else {
        address = base;
    }
    cleaner = Cleaner.create(this, new Deallocator(base, size, cap));
    att = null
}

public byte get() {
    return ((unsafe.getByte(ix(nextGetIndex()))));
}

public ByteBuffer put(byte x) {
    unsafe.putByte(ix(nextPutIndex()), ((x)));
    return this;
}

DirectByteBuffer 對直接緩衝區的操做都委託給了類 sun.misc.Unsafe,Unsafe 都是一些本地方法 native。函數

public final class Unsafe {
    public native long allocateMemory(long bytes);
    public native byte    getByte(long address);
    public native void    putByte(long address, byte x);
}

2、直接緩衝區應用

使用直接緩衝區能夠避免用戶空間和系統空間之間的拷貝過程,即零拷貝。this

FileChannel inChannel = FileChannel.open(Paths.get("1.png"), StandardOpenOption.READ);
FileChannel outChannel = FileChannel.open(Paths.get("3.png"),
        StandardOpenOption.WRITE, StandardOpenOption.READ, StandardOpenOption.CREATE);

// 方式一:內存映射文件,直接緩衝區
MappedByteBuffer inMappedBuf = inChannel.map(FileChannel.MapMode.READ_ONLY, 0, inChannel.size());
//只有 READ_WRITE,沒有 WRITE,所以 outChannel 也要加上 READ
MappedByteBuffer outMappedBuf = outChannel.map(FileChannel.MapMode.READ_WRITE, 0, inChannel.size());

byte[] bytes = new byte[inMappedBuf.limit()];
inMappedBuf.get(bytes);
outMappedBuf.put(bytes);

// 方式二:transferTo 也是使用直接緩衝區
//inChannel.transferTo(0, inChannel.size(), outChannel);
//outChannel.transferFrom(inChannel, 0, inChannel.size());

直接緩衝區

3、DirectByteBuffer

Java NIO中的 direct buffer(主要是 DirectByteBuffer)實際上是分兩部分的:spa

Java        |      native
                   |
 DirectByteBuffer  |     malloc'd
 [    address   ] -+-> [   data    ]

其中 DirectByteBuffer 自身是一個 Java 對象,在 Java 堆中;而這個對象中有個 long 類型字段 address,記錄着一塊調用 malloc() 申請到的 native memory。netty

FileChannel 的 read(ByteBuffer dst) 函數,write(ByteBuffer src) 函數中,若是傳入的參數是 HeapBuffer 類型,則會臨時申請一塊 DirectBuffer,進行數據拷貝,而不是直接進行數據傳輸,這是出於什麼緣由?code

// OpenJDK 的 sun.nio.ch.IOUtil
static int write(FileDescriptor fd, ByteBuffer src, long position,
                     NativeDispatcher nd) throws IOException {
    if (src instanceof DirectBuffer)
        return writeFromNativeBuffer(fd, src, position, nd);

    // Substitute a native buffer
    int pos = src.position();
    int lim = src.limit();
    assert (pos <= lim);
    int rem = (pos <= lim ? lim - pos : 0);
    ByteBuffer bb = Util.getTemporaryDirectBuffer(rem);
    try {
        bb.put(src);
        bb.flip();
        // Do not update src until we see how many bytes were written
        src.position(pos);

        int n = writeFromNativeBuffer(fd, bb, position, nd);
        if (n > 0) {
            // now update src
            src.position(pos + n);
        }
        return n;
    } finally {
        Util.offerFirstTemporaryDirectBuffer(bb);
    }
}

這裏實際上是在遷就 OpenJDK 裏的 HotSpot VM 的一點實現細節。htm

HotSpot VM 裏的 GC 除了 CMS 以外都是要移動對象的,是所謂 「compacting GC」。

若是要把一個 Java 裏的 byte[] 對象的引用傳給 native 代碼,讓 native 代碼直接訪問數組的內容的話,就必需要保證 native 代碼在訪問的時候這個 byte[] 對象不能被移動,也就是要被「pin」(釘)住。

惋惜 HotSpot VM 出於一些取捨而決定不實現單個對象層面的 object pinning,要 pin 的話就得暫時禁用 GC ——也就等於把整個 Java 堆都給 pin 住。HotSpot VM 對 JNI 的 Critical 系 API 就是這樣實現的。這用起來就不那麼順手。

因此 Oracle/Sun JDK / OpenJDK 的這個地方就用了點繞彎的作法。 它假設把 HeapByteBuffer 背後的 byte[] 裏的內容拷貝一次是一個時間開銷能夠接受的操做,同時假設真正的 I/O 多是一個很慢的操做。

因而它就先把 HeapByteBuffer 背後的 byte[] 的內容拷貝到一個 DirectByteBuffer 背後的 native memory 去,這個拷貝會涉及 sun.misc.Unsafe.copyMemory() 的調用,背後是相似 memcpy() 的實現。這個操做本質上是會在整個拷貝過程當中暫時不容許發生 GC 的,雖然實現方式跟 JNI 的 Critical 系 API 不太同樣。(具體來講是 Unsafe.copyMemory() 是 HotSpot VM 的一個 intrinsic 方法,中間沒有 safepoint 因此 GC 沒法發生)。

而後數據被拷貝到 native memory 以後就好辦了,就去作真正的 I/O,把 DirectByteBuffer 背後的 native memory 地址傳給真正作 I/O 的函數。這邊就不須要再去訪問 Java 對象去讀寫要作 I/O 的數據了。

參考:

  1. 《Java NIO中,關於DirectBuffer,HeapBuffer的疑問?》:https://www.zhihu.com/question/57374068/answer/152691891

天天用心記錄一點點。內容也許不重要,但習慣很重要!

相關文章
相關標籤/搜索