目標:介紹Buffer的相關實現邏輯、介紹dubbo-remoting-api中的buffer包內的源碼解析。java
緩存區在NIO框架中很是重要,它做爲字節容器,每一個NIO框架都有本身的相應的設計實現。好比Java NIO有ByteBuffer的設計,Mina有IoBuffer的設計,Netty4有ByteBuf的設計。 那麼在本文講到的內容是dubbo對於緩衝區作的一些接口定義,而且作了不一樣的框架實現緩衝區公共的邏輯。下面是本文要講到的類圖:git
接下來我就按照類圖上的一個一個分析,忽略test類。github
該接口繼承了Comparable接口,該接口是通道緩存接口,是字節容器,在netty中也有通道緩存的設計,也就是io.netty.buffer.ByteBuf,該接口的方法定義和設計跟ByteBuf幾乎同樣,連註釋都同樣,因此我就再也不細說了。api
該類實現了ChannelBuffer接口,是通道緩存的抽象類,它實現了ChannelBuffer全部方法,可是它實現的方法都是須要被重寫的方法,具體的實現都是須要子類來實現。如今咱們來惡補一下這個通道緩存的原理,固然這個原理跟netty的ByteBuf原理是分不開的。AbstractChannelBuffer維護了兩個索引,一個用於讀取,另外一個用於寫入當你從通道緩存中讀取時,readerIndex將會被遞增已經被讀取的字節數,一樣的當你寫入的時候writerIndex也會被遞增。數組
/** * 讀索引 */
private int readerIndex;
/** * 寫索引 */
private int writerIndex;
/** * 標記讀索引 */
private int markedReaderIndex;
/** * 標記寫索引 */
private int markedWriterIndex;
複製代碼
能夠看到該類有四個屬性,讀索引和寫索引的做用就是我上述介紹的,讀索引和寫索引的起始位置都爲索引位置0。而標記讀索引和標記寫索引是爲了作備份回滾,當對緩衝區進行讀寫操做時,可能須要對以前的操做進行回滾,咱們就須要將當前的讀寫索引備份到相應的標記索引中。緩存
該類的其餘方法都是利用四個屬性來操做,無非就是檢測是否有數據可讀或者仍是否有空間可寫等方法,作一些前置條件的校驗以及索引的設置,具體的實現都是須要子類來實現,因此我就不貼代碼,由於邏輯比較簡單。app
該類繼承了AbstractChannelBuffer類,該類是動態的通道緩存區類,也就是該類是從ChannelBufferFactory工廠中動態的生成緩衝區,默認使用的工廠是HeapChannelBufferFactory。框架
/** * 通道緩存區工廠 */
private final ChannelBufferFactory factory;
/** * 通道緩存區 */
private ChannelBuffer buffer;
public DynamicChannelBuffer(int estimatedLength) {
// 默認是HeapChannelBufferFactory
this(estimatedLength, HeapChannelBufferFactory.getInstance());
}
public DynamicChannelBuffer(int estimatedLength, ChannelBufferFactory factory) {
// 若是預計長度小於0 則拋出異常
if (estimatedLength < 0) {
throw new IllegalArgumentException("estimatedLength: " + estimatedLength);
}
// 若是工廠爲空,則拋出空指針異常
if (factory == null) {
throw new NullPointerException("factory");
}
// 設置工廠
this.factory = factory;
// 建立緩存區
buffer = factory.getBuffer(estimatedLength);
}
複製代碼
能夠看到,該類有兩個屬性,全部的實現方法都是調用了buffer的方法,不過該buffer產生是經過工廠動態生成的。而且從構造方法來看,默認使用HeapChannelBufferFactory。ide
@Override
public void ensureWritableBytes(int minWritableBytes) {
// 若是最小寫入的字節數不大於可寫的字節數,則結束
if (minWritableBytes <= writableBytes()) {
return;
}
// 新增容量
int newCapacity;
// 此緩衝區可包含的字節數等於0。
if (capacity() == 0) {
// 新增容量設置爲1
newCapacity = 1;
} else {
// 新增容量設置爲緩衝區可包含的字節數
newCapacity = capacity();
}
// 最小新增容量 = 當前的寫索引+最小寫入的字節數
int minNewCapacity = writerIndex() + minWritableBytes;
// 當新增容量比最小新增容量小
while (newCapacity < minNewCapacity) {
// 新增容量左移1位,也就是加倍
newCapacity <<= 1;
}
// 經過工廠建立該容量大小當緩衝區
ChannelBuffer newBuffer = factory().getBuffer(newCapacity);
// 從buffer中讀取數據到newBuffer中
newBuffer.writeBytes(buffer, 0, writerIndex());
// 替換原來到緩衝區
buffer = newBuffer;
}
複製代碼
該方法是確保數組有可寫的容量,該方法是重寫了父類的方法,經過傳入一個最小寫入的字節數,來對緩衝區進行擴容,能夠看到,當現有的緩衝區不夠大的時候,會對緩衝區進行加倍對擴容,直到buffer的大小大於傳入的最小可寫字節數。函數
@Override
public ChannelBuffer copy(int index, int length) {
// 建立緩衝區,預計長度最小爲64,或者更大
DynamicChannelBuffer copiedBuffer = new DynamicChannelBuffer(Math.max(length, 64), factory());
// 複製數據
copiedBuffer.buffer = buffer.copy(index, length);
// 設置索引,讀索引設置爲0,寫索引設置爲copy的數據長度
copiedBuffer.setIndex(0, length);
// 返回緩存區
return copiedBuffer;
}
複製代碼
該方法是複製數據,在建立緩衝區的時候,預計長度最小是64,,而後從新設置讀索引寫索引。
其餘方法都調用了buffer的方法或者調用了父類的方法,因此再也不這裏多說。
該方法繼承AbstractChannelBuffer,該類是基於 Java NIO中的ByteBuffer來實現相關的讀寫數據等操做。
/** * ByteBuffer實例 */
private final ByteBuffer buffer;
/** * 容量 */
private final int capacity;
public ByteBufferBackedChannelBuffer(ByteBuffer buffer) {
if (buffer == null) {
throw new NullPointerException("buffer");
}
// 建立一個新的字節緩衝區,新緩衝區的大小將是此緩衝區的剩餘容量
this.buffer = buffer.slice();
// 返回buffer的剩餘容量
capacity = buffer.remaining();
// 設置寫索引
writerIndex(capacity);
}
複製代碼
上述就是該類的屬性和構造函數,能夠看到它有一個ByteBuffer類型的實例,而且capacity是buffer的剩餘容量。
還有其餘的方法好比getByte方法是從buffer中讀取數據方法,setBytes方法是把數據寫入buffer,它們都有不少重載方法,爲就不一一講解了,它們都是調用了ByteBuffer中的一些方法,若是對於Java NIO中的ByteBuffer方法不是很熟悉的朋友,須要先了解一下Java NIO中的ByteBuffer。
該方法繼承了AbstractChannelBuffer,該類中buffer是基於字節數組實現
/** * The underlying heap byte array that this buffer is wrapping. * 此緩衝區包裝的基礎堆字節數組。 */
protected final byte[] array;
/** * Creates a new heap buffer with a newly allocated byte array. * 使用新分配的字節數組建立新的堆緩衝區。 * * @param length the length of the new byte array */
public HeapChannelBuffer(int length) {
this(new byte[length], 0, 0);
}
/** * Creates a new heap buffer with an existing byte array. * 使用現有字節數組建立新的堆緩衝區。 * * @param array the byte array to wrap */
public HeapChannelBuffer(byte[] array) {
this(array, 0, array.length);
}
/** * Creates a new heap buffer with an existing byte array. * 使用現有字節數組建立新的堆緩衝區。 * * @param array the byte array to wrap * @param readerIndex the initial reader index of this buffer * @param writerIndex the initial writer index of this buffer */
protected HeapChannelBuffer(byte[] array, int readerIndex, int writerIndex) {
if (array == null) {
throw new NullPointerException("array");
}
this.array = array;
setIndex(readerIndex, writerIndex);
}
複製代碼
該類有好幾個構造函數,都是基於字節數組的,也就是在該類中包裝了一個字節數組,把構造函數傳入的字節數組傳入到該屬性中。其餘方法邏輯比較簡單。
public interface ChannelBufferFactory {
/** * 得到緩衝區實例 * @param capacity * @return */
ChannelBuffer getBuffer(int capacity);
ChannelBuffer getBuffer(byte[] array, int offset, int length);
ChannelBuffer getBuffer(ByteBuffer nioBuffer);
}
複製代碼
該接口是通道緩衝區工廠,其中就只定義了得到通道緩衝區的方法,比較好理解,它有兩個實現類,我後續會講到。
該類實現了ChannelBufferFactory,該類就是基於字節數組來建立緩衝區的工廠。
public class HeapChannelBufferFactory implements ChannelBufferFactory {
/** * 單例 */
private static final HeapChannelBufferFactory INSTANCE = new HeapChannelBufferFactory();
public HeapChannelBufferFactory() {
super();
}
public static ChannelBufferFactory getInstance() {
return INSTANCE;
}
@Override
public ChannelBuffer getBuffer(int capacity) {
// 建立一個capacity容量的緩衝區
return ChannelBuffers.buffer(capacity);
}
@Override
public ChannelBuffer getBuffer(byte[] array, int offset, int length) {
return ChannelBuffers.wrappedBuffer(array, offset, length);
}
@Override
public ChannelBuffer getBuffer(ByteBuffer nioBuffer) {
// 判斷該緩衝區是否有字節數組支持
if (nioBuffer.hasArray()) {
// 使用
return ChannelBuffers.wrappedBuffer(nioBuffer);
}
// 建立一個nioBuffer剩餘容量的緩衝區
ChannelBuffer buf = getBuffer(nioBuffer.remaining());
// 記錄下nioBuffer的位置
int pos = nioBuffer.position();
// 寫入數據到buf
buf.writeBytes(nioBuffer);
// 把nioBuffer的位置重置到pos
nioBuffer.position(pos);
return buf;
}
}
複製代碼
該類利用了單例模式,其中的方法比較簡單,就是調用了ChannelBuffers中的方法,調用的方法實際上仍是使用了HeapChannelBuffer中建立緩衝區的方法。
該類實現了ChannelBufferFactory接口,是直接緩衝區工廠,用來建立直接緩衝區。
public class DirectChannelBufferFactory implements ChannelBufferFactory {
/** * 單例 */
private static final DirectChannelBufferFactory INSTANCE = new DirectChannelBufferFactory();
public DirectChannelBufferFactory() {
super();
}
public static ChannelBufferFactory getInstance() {
return INSTANCE;
}
@Override
public ChannelBuffer getBuffer(int capacity) {
if (capacity < 0) {
throw new IllegalArgumentException("capacity: " + capacity);
}
if (capacity == 0) {
return ChannelBuffers.EMPTY_BUFFER;
}
// 生成直接緩衝區
return ChannelBuffers.directBuffer(capacity);
}
@Override
public ChannelBuffer getBuffer(byte[] array, int offset, int length) {
if (array == null) {
throw new NullPointerException("array");
}
if (offset < 0) {
throw new IndexOutOfBoundsException("offset: " + offset);
}
if (length == 0) {
return ChannelBuffers.EMPTY_BUFFER;
}
if (offset + length > array.length) {
throw new IndexOutOfBoundsException("length: " + length);
}
ChannelBuffer buf = getBuffer(length);
buf.writeBytes(array, offset, length);
return buf;
}
@Override
public ChannelBuffer getBuffer(ByteBuffer nioBuffer) {
// 若是nioBuffer不是隻讀,而且它是直接緩衝區
if (!nioBuffer.isReadOnly() && nioBuffer.isDirect()) {
// 建立一個緩衝區
return ChannelBuffers.wrappedBuffer(nioBuffer);
}
// 建立一個nioBuffer剩餘容量的緩衝區
ChannelBuffer buf = getBuffer(nioBuffer.remaining());
// 記錄下nioBuffer的位置
int pos = nioBuffer.position();
// 寫入數據到buf
buf.writeBytes(nioBuffer);
// 把nioBuffer的位置重置到pos
nioBuffer.position(pos);
return buf;
}
複製代碼
該類中的實現方式與HeapChannelBufferFactory中的實現方式差很少,惟一的區別就是它建立的是一個直接緩衝區。
該類是緩衝區的工具類,提供建立、比較 ChannelBuffer 等公用方法。我在這裏舉兩個方法來說:
public static ChannelBuffer wrappedBuffer(ByteBuffer buffer) {
// 若是緩衝區沒有剩餘容量
if (!buffer.hasRemaining()) {
return EMPTY_BUFFER;
}
// 若是是字節數組生成的緩衝區
if (buffer.hasArray()) {
// 使用buffer的字節數組生成一個新的緩衝區
return wrappedBuffer(buffer.array(), buffer.arrayOffset() + buffer.position(), buffer.remaining());
} else {
// 基於ByteBuffer建立一個緩衝區(利用buffer的剩餘容量建立)
return new ByteBufferBackedChannelBuffer(buffer);
}
}
複製代碼
該方法經過buffer來建立一個新的緩衝區。能夠看出來調用的就是上述生成緩衝區的三個類中的方法,ChannelBuffers中不少方法都是這樣去實現的,邏輯比較簡單。
public static boolean equals(ChannelBuffer bufferA, ChannelBuffer bufferB) {
// 得到bufferA的可讀數據
final int aLen = bufferA.readableBytes();
// 若是兩個緩衝區的可讀數據大小不同,則不是同一個
if (aLen != bufferB.readableBytes()) {
return false;
}
final int byteCount = aLen & 7;
// 得到兩個比較的緩衝區的讀索引
int aIndex = bufferA.readerIndex();
int bIndex = bufferB.readerIndex();
// 最多比較緩衝區中的7個數據
for (int i = byteCount; i > 0; i--) {
// 一旦有一個數據不同,則不是同一個
if (bufferA.getByte(aIndex) != bufferB.getByte(bIndex)) {
return false;
}
aIndex++;
bIndex++;
}
return true;
}
複製代碼
該方法就是比較兩個緩衝區是否爲同一個,重寫了equals。
該類繼承了OutputStream
/** * 緩衝區 */
private final ChannelBuffer buffer;
/** * 記錄開始寫入的索引 */
private final int startIndex;
public ChannelBufferOutputStream(ChannelBuffer buffer) {
if (buffer == null) {
throw new NullPointerException("buffer");
}
this.buffer = buffer;
// 把開始寫入數據的索引記錄下來
startIndex = buffer.writerIndex();
}
複製代碼
該類中包裝了一個緩衝區對象和startIndex,startIndex是記錄開始寫入的索引。
public int writtenBytes() {
return buffer.writerIndex() - startIndex;
}
複製代碼
該方法是返回寫入了多少數據。
該類裏面還有write方法,都是調用了buffer.writeBytes。
該類繼承了InputStream
/** * 緩衝區 */
private final ChannelBuffer buffer;
/** * 記錄開始讀數據的索引 */
private final int startIndex;
/** * 結束讀數據的索引 */
private final int endIndex;
public ChannelBufferInputStream(ChannelBuffer buffer) {
this(buffer, buffer.readableBytes());
}
public ChannelBufferInputStream(ChannelBuffer buffer, int length) {
if (buffer == null) {
throw new NullPointerException("buffer");
}
if (length < 0) {
throw new IllegalArgumentException("length: " + length);
}
if (length > buffer.readableBytes()) {
throw new IndexOutOfBoundsException();
}
this.buffer = buffer;
// 記錄開始讀數據的索引
startIndex = buffer.readerIndex();
// 設置結束讀數據的索引
endIndex = startIndex + length;
// 標記讀索引
buffer.markReaderIndex();
}
複製代碼
該類裏面包裝了讀開始索引和結束索引,而且在構造方法中初始化這些屬性。
public int readBytes() {
return buffer.readerIndex() - startIndex;
}
複製代碼
該方法是返回讀了多少數據。
@Override
public int available() throws IOException {
return endIndex - buffer.readerIndex();
}
複製代碼
該方法是返回還剩多少數據沒讀
@Override
public int read() throws IOException {
if (!buffer.readable()) {
return -1;
}
return buffer.readByte() & 0xff;
}
@Override
public int read(byte[] b, int off, int len) throws IOException {
// 判斷是否還有數據可讀
int available = available();
if (available == 0) {
return -1;
}
// 得到須要讀取的數據長度
len = Math.min(available, len);
buffer.readBytes(b, off, len);
return len;
}
複製代碼
該方法是讀數據,返回讀了數據長度。
@Override
public long skip(long n) throws IOException {
if (n > Integer.MAX_VALUE) {
return skipBytes(Integer.MAX_VALUE);
} else {
return skipBytes((int) n);
}
}
private int skipBytes(int n) throws IOException {
int nBytes = Math.min(available(), n);
// 跳過一些數據
buffer.skipBytes(nBytes);
return nBytes;
}
複製代碼
該方法是跳過n長度來讀數據。
該部分相關的源碼解析地址:github.com/CrazyHZM/in…
該文章講解了Buffer的相關實現邏輯,我不少方法都沒有貼出源碼,由於不少都是基於Java NIO的ByteBuffer都設計實現,而且要注意AbstractChannelBuffer的三個子類,也就是生成緩衝區的三種形式,還有就是要注意兩個建立緩衝區實例的工廠。下一篇我會講解telnet部分。若是我在哪一部分寫的不夠到位或者寫錯了,歡迎給我提意見。