dubbo源碼解析(十一)遠程通訊——Buffer

遠程通信——Buffer

目標:介紹Buffer的相關實現邏輯、介紹dubbo-remoting-api中的buffer包內的源碼解析。

前言

緩存區在NIO框架中很是重要,它做爲字節容器,每一個NIO框架都有本身的相應的設計實現。好比Java NIO有ByteBuffer的設計,Mina有IoBuffer的設計,Netty4有ByteBuf的設計。 那麼在本文講到的內容是dubbo對於緩衝區作的一些接口定義,而且作了不一樣的框架實現緩衝區公共的邏輯。下面是本文要講到的類圖:java

buffer類圖

接下來我就按照類圖上的一個一個分析,忽略test類。git

源碼分析

(一)ChannelBuffer

該接口繼承了Comparable接口,該接口是通道緩存接口,是字節容器,在netty中也有通道緩存的設計,也就是io.netty.buffer.ByteBuf,該接口的方法定義和設計跟ByteBuf幾乎同樣,連註釋都同樣,因此我就再也不細說了。github

(二)AbstractChannelBuffer

該類實現了ChannelBuffer接口,是通道緩存的抽象類,它實現了ChannelBuffer全部方法,可是它實現的方法都是須要被重寫的方法,具體的實現都是須要子類來實現。如今咱們來惡補一下這個通道緩存的原理,固然這個原理跟netty的ByteBuf原理是分不開的。AbstractChannelBuffer維護了兩個索引,一個用於讀取,另外一個用於寫入當你從通道緩存中讀取時,readerIndex將會被遞增已經被讀取的字節數,一樣的當你寫入的時候writerIndex也會被遞增。api

/**
 * 讀索引
 */
private int readerIndex;

/**
 * 寫索引
 */
private int writerIndex;

/**
 * 標記讀索引
 */
private int markedReaderIndex;

/**
 * 標記寫索引
 */
private int markedWriterIndex;

能夠看到該類有四個屬性,讀索引和寫索引的做用就是我上述介紹的,讀索引和寫索引的起始位置都爲索引位置0。而標記讀索引和標記寫索引是爲了作備份回滾,當對緩衝區進行讀寫操做時,可能須要對以前的操做進行回滾,咱們就須要將當前的讀寫索引備份到相應的標記索引中。數組

該類的其餘方法都是利用四個屬性來操做,無非就是檢測是否有數據可讀或者仍是否有空間可寫等方法,作一些前置條件的校驗以及索引的設置,具體的實現都是須要子類來實現,因此我就不貼代碼,由於邏輯比較簡單。緩存

(三)DynamicChannelBuffer

該類繼承了AbstractChannelBuffer類,該類是動態的通道緩存區類,也就是該類是從ChannelBufferFactory工廠中動態的生成緩衝區,默認使用的工廠是HeapChannelBufferFactory。微信

1.屬性和構造方法

/**
 * 通道緩存區工廠
 */
private final ChannelBufferFactory factory;

/**
 * 通道緩存區
 */
private ChannelBuffer buffer;

public DynamicChannelBuffer(int estimatedLength) {
    // 默認是HeapChannelBufferFactory
    this(estimatedLength, HeapChannelBufferFactory.getInstance());
}

public DynamicChannelBuffer(int estimatedLength, ChannelBufferFactory factory) {
    // 若是預計長度小於0 則拋出異常
    if (estimatedLength < 0) {
        throw new IllegalArgumentException("estimatedLength: " + estimatedLength);
    }
    // 若是工廠爲空,則拋出空指針異常
    if (factory == null) {
        throw new NullPointerException("factory");
    }
    // 設置工廠
    this.factory = factory;
    // 建立緩存區
    buffer = factory.getBuffer(estimatedLength);
}

能夠看到,該類有兩個屬性,全部的實現方法都是調用了buffer的方法,不過該buffer產生是經過工廠動態生成的。而且從構造方法來看,默認使用HeapChannelBufferFactory。app

2.ensureWritableBytes

@Override
public void ensureWritableBytes(int minWritableBytes) {
    // 若是最小寫入的字節數不大於可寫的字節數,則結束
    if (minWritableBytes <= writableBytes()) {
        return;
    }

    // 新增容量
    int newCapacity;
    // 此緩衝區可包含的字節數等於0。
    if (capacity() == 0) {
        // 新增容量設置爲1
        newCapacity = 1;
    } else {
        // 新增容量設置爲緩衝區可包含的字節數
        newCapacity = capacity();
    }
    // 最小新增容量 = 當前的寫索引+最小寫入的字節數
    int minNewCapacity = writerIndex() + minWritableBytes;
    // 當新增容量比最小新增容量小
    while (newCapacity < minNewCapacity) {
        // 新增容量左移1位,也就是加倍
        newCapacity <<= 1;
    }

    // 經過工廠建立該容量大小當緩衝區
    ChannelBuffer newBuffer = factory().getBuffer(newCapacity);
    // 從buffer中讀取數據到newBuffer中
    newBuffer.writeBytes(buffer, 0, writerIndex());
    // 替換原來到緩衝區
    buffer = newBuffer;
}

該方法是確保數組有可寫的容量,該方法是重寫了父類的方法,經過傳入一個最小寫入的字節數,來對緩衝區進行擴容,能夠看到,當現有的緩衝區不夠大的時候,會對緩衝區進行加倍對擴容,直到buffer的大小大於傳入的最小可寫字節數。框架

3.copy

@Override
public ChannelBuffer copy(int index, int length) {
    // 建立緩衝區,預計長度最小爲64,或者更大
    DynamicChannelBuffer copiedBuffer = new DynamicChannelBuffer(Math.max(length, 64), factory());
    // 複製數據
    copiedBuffer.buffer = buffer.copy(index, length);
    // 設置索引,讀索引設置爲0,寫索引設置爲copy的數據長度
    copiedBuffer.setIndex(0, length);
    // 返回緩存區
    return copiedBuffer;
}

該方法是複製數據,在建立緩衝區的時候,預計長度最小是64,,而後從新設置讀索引寫索引。ide

其餘方法都調用了buffer的方法或者調用了父類的方法,因此再也不這裏多說。

(四)ByteBufferBackedChannelBuffer

該方法繼承AbstractChannelBuffer,該類是基於 Java NIO中的ByteBuffer來實現相關的讀寫數據等操做。

/**
 * ByteBuffer實例
 */
private final ByteBuffer buffer;

/**
 * 容量
 */
private final int capacity;

public ByteBufferBackedChannelBuffer(ByteBuffer buffer) {
    if (buffer == null) {
        throw new NullPointerException("buffer");
    }

    // 建立一個新的字節緩衝區,新緩衝區的大小將是此緩衝區的剩餘容量
    this.buffer = buffer.slice();
    // 返回buffer的剩餘容量
    capacity = buffer.remaining();
    // 設置寫索引
    writerIndex(capacity);
}

上述就是該類的屬性和構造函數,能夠看到它有一個ByteBuffer類型的實例,而且capacity是buffer的剩餘容量。

還有其餘的方法好比getByte方法是從buffer中讀取數據方法,setBytes方法是把數據寫入buffer,它們都有不少重載方法,爲就不一一講解了,它們都是調用了ByteBuffer中的一些方法,若是對於Java NIO中的ByteBuffer方法不是很熟悉的朋友,須要先了解一下Java NIO中的ByteBuffer。

(五)HeapChannelBuffer

該方法繼承了AbstractChannelBuffer,該類中buffer是基於字節數組實現

/**
 * The underlying heap byte array that this buffer is wrapping.
 * 此緩衝區包裝的基礎堆字節數組。
 */
protected final byte[] array;

/**
 * Creates a new heap buffer with a newly allocated byte array.
 * 使用新分配的字節數組建立新的堆緩衝區。
 *
 * @param length the length of the new byte array
 */
public HeapChannelBuffer(int length) {
    this(new byte[length], 0, 0);
}

/**
 * Creates a new heap buffer with an existing byte array.
 * 使用現有字節數組建立新的堆緩衝區。
 *
 * @param array the byte array to wrap
 */
public HeapChannelBuffer(byte[] array) {
    this(array, 0, array.length);
}

/**
 * Creates a new heap buffer with an existing byte array.
 * 使用現有字節數組建立新的堆緩衝區。
 *
 * @param array       the byte array to wrap
 * @param readerIndex the initial reader index of this buffer
 * @param writerIndex the initial writer index of this buffer
 */
protected HeapChannelBuffer(byte[] array, int readerIndex, int writerIndex) {
    if (array == null) {
        throw new NullPointerException("array");
    }
    this.array = array;
    setIndex(readerIndex, writerIndex);
}

該類有好幾個構造函數,都是基於字節數組的,也就是在該類中包裝了一個字節數組,把構造函數傳入的字節數組傳入到該屬性中。其餘方法邏輯比較簡單。

(六)ChannelBufferFactory

public interface ChannelBufferFactory {

    /**
     * 得到緩衝區實例
     * @param capacity
     * @return
     */
    ChannelBuffer getBuffer(int capacity);

    ChannelBuffer getBuffer(byte[] array, int offset, int length);

    ChannelBuffer getBuffer(ByteBuffer nioBuffer);

}

該接口是通道緩衝區工廠,其中就只定義了得到通道緩衝區的方法,比較好理解,它有兩個實現類,我後續會講到。

(七)HeapChannelBufferFactory

該類實現了ChannelBufferFactory,該類就是基於字節數組來建立緩衝區的工廠。

public class HeapChannelBufferFactory implements ChannelBufferFactory {

    /**
     * 單例
     */
    private static final HeapChannelBufferFactory INSTANCE = new HeapChannelBufferFactory();

    public HeapChannelBufferFactory() {
        super();
    }

    public static ChannelBufferFactory getInstance() {
        return INSTANCE;
    }

    @Override
    public ChannelBuffer getBuffer(int capacity) {
        // 建立一個capacity容量的緩衝區
        return ChannelBuffers.buffer(capacity);
    }

    @Override
    public ChannelBuffer getBuffer(byte[] array, int offset, int length) {
        return ChannelBuffers.wrappedBuffer(array, offset, length);
    }

    @Override
    public ChannelBuffer getBuffer(ByteBuffer nioBuffer) {
        // 判斷該緩衝區是否有字節數組支持
        if (nioBuffer.hasArray()) {
            // 使用
            return ChannelBuffers.wrappedBuffer(nioBuffer);
        }

        // 建立一個nioBuffer剩餘容量的緩衝區
        ChannelBuffer buf = getBuffer(nioBuffer.remaining());
        // 記錄下nioBuffer的位置
        int pos = nioBuffer.position();
        // 寫入數據到buf
        buf.writeBytes(nioBuffer);
        // 把nioBuffer的位置重置到pos
        nioBuffer.position(pos);
        return buf;
    }

}

該類利用了單例模式,其中的方法比較簡單,就是調用了ChannelBuffers中的方法,調用的方法實際上仍是使用了HeapChannelBuffer中建立緩衝區的方法。

(八)DirectChannelBufferFactory

該類實現了ChannelBufferFactory接口,是直接緩衝區工廠,用來建立直接緩衝區。

public class DirectChannelBufferFactory implements ChannelBufferFactory {

    /**
     * 單例
     */
    private static final DirectChannelBufferFactory INSTANCE = new DirectChannelBufferFactory();

    public DirectChannelBufferFactory() {
        super();
    }

    public static ChannelBufferFactory getInstance() {
        return INSTANCE;
    }

    @Override
    public ChannelBuffer getBuffer(int capacity) {
        if (capacity < 0) {
            throw new IllegalArgumentException("capacity: " + capacity);
        }
        if (capacity == 0) {
            return ChannelBuffers.EMPTY_BUFFER;
        }
        // 生成直接緩衝區
        return ChannelBuffers.directBuffer(capacity);
    }

    @Override
    public ChannelBuffer getBuffer(byte[] array, int offset, int length) {
        if (array == null) {
            throw new NullPointerException("array");
        }
        if (offset < 0) {
            throw new IndexOutOfBoundsException("offset: " + offset);
        }
        if (length == 0) {
            return ChannelBuffers.EMPTY_BUFFER;
        }
        if (offset + length > array.length) {
            throw new IndexOutOfBoundsException("length: " + length);
        }

        ChannelBuffer buf = getBuffer(length);
        buf.writeBytes(array, offset, length);
        return buf;
    }

    @Override
    public ChannelBuffer getBuffer(ByteBuffer nioBuffer) {
        // 若是nioBuffer不是隻讀,而且它是直接緩衝區
        if (!nioBuffer.isReadOnly() && nioBuffer.isDirect()) {
            // 建立一個緩衝區
            return ChannelBuffers.wrappedBuffer(nioBuffer);
        }

        // 建立一個nioBuffer剩餘容量的緩衝區
        ChannelBuffer buf = getBuffer(nioBuffer.remaining());
        // 記錄下nioBuffer的位置
        int pos = nioBuffer.position();
        // 寫入數據到buf
        buf.writeBytes(nioBuffer);
        // 把nioBuffer的位置重置到pos
        nioBuffer.position(pos);
        return buf;
    }

該類中的實現方式與HeapChannelBufferFactory中的實現方式差很少,惟一的區別就是它建立的是一個直接緩衝區。

(九)ChannelBuffers

該類是緩衝區的工具類,提供建立、比較 ChannelBuffer 等公用方法。我在這裏舉兩個方法來說:

public static ChannelBuffer wrappedBuffer(ByteBuffer buffer) {
    // 若是緩衝區沒有剩餘容量
    if (!buffer.hasRemaining()) {
        return EMPTY_BUFFER;
    }
    // 若是是字節數組生成的緩衝區
    if (buffer.hasArray()) {
        // 使用buffer的字節數組生成一個新的緩衝區
        return wrappedBuffer(buffer.array(), buffer.arrayOffset() + buffer.position(), buffer.remaining());
    } else {
        // 基於ByteBuffer建立一個緩衝區(利用buffer的剩餘容量建立)
        return new ByteBufferBackedChannelBuffer(buffer);
    }
}

該方法經過buffer來建立一個新的緩衝區。能夠看出來調用的就是上述生成緩衝區的三個類中的方法,ChannelBuffers中不少方法都是這樣去實現的,邏輯比較簡單。

public static boolean equals(ChannelBuffer bufferA, ChannelBuffer bufferB) {
    // 得到bufferA的可讀數據
    final int aLen = bufferA.readableBytes();
    // 若是兩個緩衝區的可讀數據大小不同,則不是同一個
    if (aLen != bufferB.readableBytes()) {
        return false;
    }

    final int byteCount = aLen & 7;

    // 得到兩個比較的緩衝區的讀索引
    int aIndex = bufferA.readerIndex();
    int bIndex = bufferB.readerIndex();

    // 最多比較緩衝區中的7個數據
    for (int i = byteCount; i > 0; i--) {
        // 一旦有一個數據不同,則不是同一個
        if (bufferA.getByte(aIndex) != bufferB.getByte(bIndex)) {
            return false;
        }
        aIndex++;
        bIndex++;
    }

    return true;
}

該方法就是比較兩個緩衝區是否爲同一個,重寫了equals。

(十)ChannelBufferOutputStream

該類繼承了OutputStream

1.屬性和構造方法

/**
 * 緩衝區
 */
private final ChannelBuffer buffer;
/**
 * 記錄開始寫入的索引
 */
private final int startIndex;

public ChannelBufferOutputStream(ChannelBuffer buffer) {
    if (buffer == null) {
        throw new NullPointerException("buffer");
    }
    this.buffer = buffer;
    // 把開始寫入數據的索引記錄下來
    startIndex = buffer.writerIndex();
}

該類中包裝了一個緩衝區對象和startIndex,startIndex是記錄開始寫入的索引。

2.writtenBytes

public int writtenBytes() {
    return buffer.writerIndex() - startIndex;
}

該方法是返回寫入了多少數據。

該類裏面還有write方法,都是調用了buffer.writeBytes。

(十一)ChannelBufferInputStream

該類繼承了InputStream

1.屬性和構造函數

/**
 * 緩衝區
 */
private final ChannelBuffer buffer;
/**
 * 記錄開始讀數據的索引
 */
private final int startIndex;
/**
 * 結束讀數據的索引
 */
private final int endIndex;

public ChannelBufferInputStream(ChannelBuffer buffer) {
    this(buffer, buffer.readableBytes());
}

public ChannelBufferInputStream(ChannelBuffer buffer, int length) {
    if (buffer == null) {
        throw new NullPointerException("buffer");
    }
    if (length < 0) {
        throw new IllegalArgumentException("length: " + length);
    }
    if (length > buffer.readableBytes()) {
        throw new IndexOutOfBoundsException();
    }

    this.buffer = buffer;
    // 記錄開始讀數據的索引
    startIndex = buffer.readerIndex();
    // 設置結束讀數據的索引
    endIndex = startIndex + length;
    // 標記讀索引
    buffer.markReaderIndex();
}

該類裏面包裝了讀開始索引和結束索引,而且在構造方法中初始化這些屬性。

2.readBytes

public int readBytes() {
    return buffer.readerIndex() - startIndex;
}

該方法是返回讀了多少數據。

3.available

@Override
public int available() throws IOException {
    return endIndex - buffer.readerIndex();
}

該方法是返回還剩多少數據沒讀

4.read

@Override
public int read() throws IOException {
    if (!buffer.readable()) {
        return -1;
    }
    return buffer.readByte() & 0xff;
}

@Override
public int read(byte[] b, int off, int len) throws IOException {
    // 判斷是否還有數據可讀
    int available = available();
    if (available == 0) {
        return -1;
    }

    // 得到須要讀取的數據長度
    len = Math.min(available, len);
    buffer.readBytes(b, off, len);
    return len;
}

該方法是讀數據,返回讀了數據長度。

5.skip

@Override
public long skip(long n) throws IOException {
    if (n > Integer.MAX_VALUE) {
        return skipBytes(Integer.MAX_VALUE);
    } else {
        return skipBytes((int) n);
    }
}

private int skipBytes(int n) throws IOException {
    int nBytes = Math.min(available(), n);
    // 跳過一些數據
    buffer.skipBytes(nBytes);
    return nBytes;
}

該方法是跳過n長度來讀數據。

後記

該部分相關的源碼解析地址: https://github.com/CrazyHZM/i...

該文章講解了Buffer的相關實現邏輯,我不少方法都沒有貼出源碼,由於不少都是基於Java NIO的ByteBuffer都設計實現,而且要注意AbstractChannelBuffer的三個子類,也就是生成緩衝區的三種形式,還有就是要注意兩個建立緩衝區實例的工廠。下一篇我會講解telnet部分。若是我在哪一部分寫的不夠到位或者寫錯了,歡迎給我提意見,個人私人微信號碼:HUA799695226。

相關文章
相關標籤/搜索