目標:介紹Buffer的相關實現邏輯、介紹dubbo-remoting-api中的buffer包內的源碼解析。
緩存區在NIO框架中很是重要,它做爲字節容器,每一個NIO框架都有本身的相應的設計實現。好比Java NIO有ByteBuffer的設計,Mina有IoBuffer的設計,Netty4有ByteBuf的設計。 那麼在本文講到的內容是dubbo對於緩衝區作的一些接口定義,而且作了不一樣的框架實現緩衝區公共的邏輯。下面是本文要講到的類圖:java
接下來我就按照類圖上的一個一個分析,忽略test類。git
該接口繼承了Comparable接口,該接口是通道緩存接口,是字節容器,在netty中也有通道緩存的設計,也就是io.netty.buffer.ByteBuf,該接口的方法定義和設計跟ByteBuf幾乎同樣,連註釋都同樣,因此我就再也不細說了。github
該類實現了ChannelBuffer接口,是通道緩存的抽象類,它實現了ChannelBuffer全部方法,可是它實現的方法都是須要被重寫的方法,具體的實現都是須要子類來實現。如今咱們來惡補一下這個通道緩存的原理,固然這個原理跟netty的ByteBuf原理是分不開的。AbstractChannelBuffer維護了兩個索引,一個用於讀取,另外一個用於寫入當你從通道緩存中讀取時,readerIndex將會被遞增已經被讀取的字節數,一樣的當你寫入的時候writerIndex也會被遞增。api
/** * 讀索引 */ private int readerIndex; /** * 寫索引 */ private int writerIndex; /** * 標記讀索引 */ private int markedReaderIndex; /** * 標記寫索引 */ private int markedWriterIndex;
能夠看到該類有四個屬性,讀索引和寫索引的做用就是我上述介紹的,讀索引和寫索引的起始位置都爲索引位置0。而標記讀索引和標記寫索引是爲了作備份回滾,當對緩衝區進行讀寫操做時,可能須要對以前的操做進行回滾,咱們就須要將當前的讀寫索引備份到相應的標記索引中。數組
該類的其餘方法都是利用四個屬性來操做,無非就是檢測是否有數據可讀或者仍是否有空間可寫等方法,作一些前置條件的校驗以及索引的設置,具體的實現都是須要子類來實現,因此我就不貼代碼,由於邏輯比較簡單。緩存
該類繼承了AbstractChannelBuffer類,該類是動態的通道緩存區類,也就是該類是從ChannelBufferFactory工廠中動態的生成緩衝區,默認使用的工廠是HeapChannelBufferFactory。微信
/** * 通道緩存區工廠 */ private final ChannelBufferFactory factory; /** * 通道緩存區 */ private ChannelBuffer buffer; public DynamicChannelBuffer(int estimatedLength) { // 默認是HeapChannelBufferFactory this(estimatedLength, HeapChannelBufferFactory.getInstance()); } public DynamicChannelBuffer(int estimatedLength, ChannelBufferFactory factory) { // 若是預計長度小於0 則拋出異常 if (estimatedLength < 0) { throw new IllegalArgumentException("estimatedLength: " + estimatedLength); } // 若是工廠爲空,則拋出空指針異常 if (factory == null) { throw new NullPointerException("factory"); } // 設置工廠 this.factory = factory; // 建立緩存區 buffer = factory.getBuffer(estimatedLength); }
能夠看到,該類有兩個屬性,全部的實現方法都是調用了buffer的方法,不過該buffer產生是經過工廠動態生成的。而且從構造方法來看,默認使用HeapChannelBufferFactory。app
@Override public void ensureWritableBytes(int minWritableBytes) { // 若是最小寫入的字節數不大於可寫的字節數,則結束 if (minWritableBytes <= writableBytes()) { return; } // 新增容量 int newCapacity; // 此緩衝區可包含的字節數等於0。 if (capacity() == 0) { // 新增容量設置爲1 newCapacity = 1; } else { // 新增容量設置爲緩衝區可包含的字節數 newCapacity = capacity(); } // 最小新增容量 = 當前的寫索引+最小寫入的字節數 int minNewCapacity = writerIndex() + minWritableBytes; // 當新增容量比最小新增容量小 while (newCapacity < minNewCapacity) { // 新增容量左移1位,也就是加倍 newCapacity <<= 1; } // 經過工廠建立該容量大小當緩衝區 ChannelBuffer newBuffer = factory().getBuffer(newCapacity); // 從buffer中讀取數據到newBuffer中 newBuffer.writeBytes(buffer, 0, writerIndex()); // 替換原來到緩衝區 buffer = newBuffer; }
該方法是確保數組有可寫的容量,該方法是重寫了父類的方法,經過傳入一個最小寫入的字節數,來對緩衝區進行擴容,能夠看到,當現有的緩衝區不夠大的時候,會對緩衝區進行加倍對擴容,直到buffer的大小大於傳入的最小可寫字節數。框架
@Override public ChannelBuffer copy(int index, int length) { // 建立緩衝區,預計長度最小爲64,或者更大 DynamicChannelBuffer copiedBuffer = new DynamicChannelBuffer(Math.max(length, 64), factory()); // 複製數據 copiedBuffer.buffer = buffer.copy(index, length); // 設置索引,讀索引設置爲0,寫索引設置爲copy的數據長度 copiedBuffer.setIndex(0, length); // 返回緩存區 return copiedBuffer; }
該方法是複製數據,在建立緩衝區的時候,預計長度最小是64,,而後從新設置讀索引寫索引。ide
其餘方法都調用了buffer的方法或者調用了父類的方法,因此再也不這裏多說。
該方法繼承AbstractChannelBuffer,該類是基於 Java NIO中的ByteBuffer來實現相關的讀寫數據等操做。
/** * ByteBuffer實例 */ private final ByteBuffer buffer; /** * 容量 */ private final int capacity; public ByteBufferBackedChannelBuffer(ByteBuffer buffer) { if (buffer == null) { throw new NullPointerException("buffer"); } // 建立一個新的字節緩衝區,新緩衝區的大小將是此緩衝區的剩餘容量 this.buffer = buffer.slice(); // 返回buffer的剩餘容量 capacity = buffer.remaining(); // 設置寫索引 writerIndex(capacity); }
上述就是該類的屬性和構造函數,能夠看到它有一個ByteBuffer類型的實例,而且capacity是buffer的剩餘容量。
還有其餘的方法好比getByte方法是從buffer中讀取數據方法,setBytes方法是把數據寫入buffer,它們都有不少重載方法,爲就不一一講解了,它們都是調用了ByteBuffer中的一些方法,若是對於Java NIO中的ByteBuffer方法不是很熟悉的朋友,須要先了解一下Java NIO中的ByteBuffer。
該方法繼承了AbstractChannelBuffer,該類中buffer是基於字節數組實現
/** * The underlying heap byte array that this buffer is wrapping. * 此緩衝區包裝的基礎堆字節數組。 */ protected final byte[] array; /** * Creates a new heap buffer with a newly allocated byte array. * 使用新分配的字節數組建立新的堆緩衝區。 * * @param length the length of the new byte array */ public HeapChannelBuffer(int length) { this(new byte[length], 0, 0); } /** * Creates a new heap buffer with an existing byte array. * 使用現有字節數組建立新的堆緩衝區。 * * @param array the byte array to wrap */ public HeapChannelBuffer(byte[] array) { this(array, 0, array.length); } /** * Creates a new heap buffer with an existing byte array. * 使用現有字節數組建立新的堆緩衝區。 * * @param array the byte array to wrap * @param readerIndex the initial reader index of this buffer * @param writerIndex the initial writer index of this buffer */ protected HeapChannelBuffer(byte[] array, int readerIndex, int writerIndex) { if (array == null) { throw new NullPointerException("array"); } this.array = array; setIndex(readerIndex, writerIndex); }
該類有好幾個構造函數,都是基於字節數組的,也就是在該類中包裝了一個字節數組,把構造函數傳入的字節數組傳入到該屬性中。其餘方法邏輯比較簡單。
public interface ChannelBufferFactory { /** * 得到緩衝區實例 * @param capacity * @return */ ChannelBuffer getBuffer(int capacity); ChannelBuffer getBuffer(byte[] array, int offset, int length); ChannelBuffer getBuffer(ByteBuffer nioBuffer); }
該接口是通道緩衝區工廠,其中就只定義了得到通道緩衝區的方法,比較好理解,它有兩個實現類,我後續會講到。
該類實現了ChannelBufferFactory,該類就是基於字節數組來建立緩衝區的工廠。
public class HeapChannelBufferFactory implements ChannelBufferFactory { /** * 單例 */ private static final HeapChannelBufferFactory INSTANCE = new HeapChannelBufferFactory(); public HeapChannelBufferFactory() { super(); } public static ChannelBufferFactory getInstance() { return INSTANCE; } @Override public ChannelBuffer getBuffer(int capacity) { // 建立一個capacity容量的緩衝區 return ChannelBuffers.buffer(capacity); } @Override public ChannelBuffer getBuffer(byte[] array, int offset, int length) { return ChannelBuffers.wrappedBuffer(array, offset, length); } @Override public ChannelBuffer getBuffer(ByteBuffer nioBuffer) { // 判斷該緩衝區是否有字節數組支持 if (nioBuffer.hasArray()) { // 使用 return ChannelBuffers.wrappedBuffer(nioBuffer); } // 建立一個nioBuffer剩餘容量的緩衝區 ChannelBuffer buf = getBuffer(nioBuffer.remaining()); // 記錄下nioBuffer的位置 int pos = nioBuffer.position(); // 寫入數據到buf buf.writeBytes(nioBuffer); // 把nioBuffer的位置重置到pos nioBuffer.position(pos); return buf; } }
該類利用了單例模式,其中的方法比較簡單,就是調用了ChannelBuffers中的方法,調用的方法實際上仍是使用了HeapChannelBuffer中建立緩衝區的方法。
該類實現了ChannelBufferFactory接口,是直接緩衝區工廠,用來建立直接緩衝區。
public class DirectChannelBufferFactory implements ChannelBufferFactory { /** * 單例 */ private static final DirectChannelBufferFactory INSTANCE = new DirectChannelBufferFactory(); public DirectChannelBufferFactory() { super(); } public static ChannelBufferFactory getInstance() { return INSTANCE; } @Override public ChannelBuffer getBuffer(int capacity) { if (capacity < 0) { throw new IllegalArgumentException("capacity: " + capacity); } if (capacity == 0) { return ChannelBuffers.EMPTY_BUFFER; } // 生成直接緩衝區 return ChannelBuffers.directBuffer(capacity); } @Override public ChannelBuffer getBuffer(byte[] array, int offset, int length) { if (array == null) { throw new NullPointerException("array"); } if (offset < 0) { throw new IndexOutOfBoundsException("offset: " + offset); } if (length == 0) { return ChannelBuffers.EMPTY_BUFFER; } if (offset + length > array.length) { throw new IndexOutOfBoundsException("length: " + length); } ChannelBuffer buf = getBuffer(length); buf.writeBytes(array, offset, length); return buf; } @Override public ChannelBuffer getBuffer(ByteBuffer nioBuffer) { // 若是nioBuffer不是隻讀,而且它是直接緩衝區 if (!nioBuffer.isReadOnly() && nioBuffer.isDirect()) { // 建立一個緩衝區 return ChannelBuffers.wrappedBuffer(nioBuffer); } // 建立一個nioBuffer剩餘容量的緩衝區 ChannelBuffer buf = getBuffer(nioBuffer.remaining()); // 記錄下nioBuffer的位置 int pos = nioBuffer.position(); // 寫入數據到buf buf.writeBytes(nioBuffer); // 把nioBuffer的位置重置到pos nioBuffer.position(pos); return buf; }
該類中的實現方式與HeapChannelBufferFactory中的實現方式差很少,惟一的區別就是它建立的是一個直接緩衝區。
該類是緩衝區的工具類,提供建立、比較 ChannelBuffer 等公用方法。我在這裏舉兩個方法來說:
public static ChannelBuffer wrappedBuffer(ByteBuffer buffer) { // 若是緩衝區沒有剩餘容量 if (!buffer.hasRemaining()) { return EMPTY_BUFFER; } // 若是是字節數組生成的緩衝區 if (buffer.hasArray()) { // 使用buffer的字節數組生成一個新的緩衝區 return wrappedBuffer(buffer.array(), buffer.arrayOffset() + buffer.position(), buffer.remaining()); } else { // 基於ByteBuffer建立一個緩衝區(利用buffer的剩餘容量建立) return new ByteBufferBackedChannelBuffer(buffer); } }
該方法經過buffer來建立一個新的緩衝區。能夠看出來調用的就是上述生成緩衝區的三個類中的方法,ChannelBuffers中不少方法都是這樣去實現的,邏輯比較簡單。
public static boolean equals(ChannelBuffer bufferA, ChannelBuffer bufferB) { // 得到bufferA的可讀數據 final int aLen = bufferA.readableBytes(); // 若是兩個緩衝區的可讀數據大小不同,則不是同一個 if (aLen != bufferB.readableBytes()) { return false; } final int byteCount = aLen & 7; // 得到兩個比較的緩衝區的讀索引 int aIndex = bufferA.readerIndex(); int bIndex = bufferB.readerIndex(); // 最多比較緩衝區中的7個數據 for (int i = byteCount; i > 0; i--) { // 一旦有一個數據不同,則不是同一個 if (bufferA.getByte(aIndex) != bufferB.getByte(bIndex)) { return false; } aIndex++; bIndex++; } return true; }
該方法就是比較兩個緩衝區是否爲同一個,重寫了equals。
該類繼承了OutputStream
/** * 緩衝區 */ private final ChannelBuffer buffer; /** * 記錄開始寫入的索引 */ private final int startIndex; public ChannelBufferOutputStream(ChannelBuffer buffer) { if (buffer == null) { throw new NullPointerException("buffer"); } this.buffer = buffer; // 把開始寫入數據的索引記錄下來 startIndex = buffer.writerIndex(); }
該類中包裝了一個緩衝區對象和startIndex,startIndex是記錄開始寫入的索引。
public int writtenBytes() { return buffer.writerIndex() - startIndex; }
該方法是返回寫入了多少數據。
該類裏面還有write方法,都是調用了buffer.writeBytes。
該類繼承了InputStream
/** * 緩衝區 */ private final ChannelBuffer buffer; /** * 記錄開始讀數據的索引 */ private final int startIndex; /** * 結束讀數據的索引 */ private final int endIndex; public ChannelBufferInputStream(ChannelBuffer buffer) { this(buffer, buffer.readableBytes()); } public ChannelBufferInputStream(ChannelBuffer buffer, int length) { if (buffer == null) { throw new NullPointerException("buffer"); } if (length < 0) { throw new IllegalArgumentException("length: " + length); } if (length > buffer.readableBytes()) { throw new IndexOutOfBoundsException(); } this.buffer = buffer; // 記錄開始讀數據的索引 startIndex = buffer.readerIndex(); // 設置結束讀數據的索引 endIndex = startIndex + length; // 標記讀索引 buffer.markReaderIndex(); }
該類裏面包裝了讀開始索引和結束索引,而且在構造方法中初始化這些屬性。
public int readBytes() { return buffer.readerIndex() - startIndex; }
該方法是返回讀了多少數據。
@Override public int available() throws IOException { return endIndex - buffer.readerIndex(); }
該方法是返回還剩多少數據沒讀
@Override public int read() throws IOException { if (!buffer.readable()) { return -1; } return buffer.readByte() & 0xff; } @Override public int read(byte[] b, int off, int len) throws IOException { // 判斷是否還有數據可讀 int available = available(); if (available == 0) { return -1; } // 得到須要讀取的數據長度 len = Math.min(available, len); buffer.readBytes(b, off, len); return len; }
該方法是讀數據,返回讀了數據長度。
@Override public long skip(long n) throws IOException { if (n > Integer.MAX_VALUE) { return skipBytes(Integer.MAX_VALUE); } else { return skipBytes((int) n); } } private int skipBytes(int n) throws IOException { int nBytes = Math.min(available(), n); // 跳過一些數據 buffer.skipBytes(nBytes); return nBytes; }
該方法是跳過n長度來讀數據。
該部分相關的源碼解析地址: https://github.com/CrazyHZM/i...
該文章講解了Buffer的相關實現邏輯,我不少方法都沒有貼出源碼,由於不少都是基於Java NIO的ByteBuffer都設計實現,而且要注意AbstractChannelBuffer的三個子類,也就是生成緩衝區的三種形式,還有就是要注意兩個建立緩衝區實例的工廠。下一篇我會講解telnet部分。若是我在哪一部分寫的不夠到位或者寫錯了,歡迎給我提意見,個人私人微信號碼:HUA799695226。