網絡數據的基本單位老是字節。Java NIO 提供了 ByteBuffer 做爲它 的字節容器,可是這個類使用起來過於複雜,並且也有些繁瑣。java
Netty 的 ByteBuffer 替代品是 ByteBuf,一個強大的實現,既解決了 JDK API 的侷限性, 又爲網絡應用程序的開發者提供了更好的 API。算法
Netty 的數據處理 API 經過兩個組件暴露——abstract class ByteBuf 和 interface ByteBufHolder。後端
下面是一些 ByteBuf API 的優勢:數組
其餘類可用於管理 ByteBuf 實例的分配,以及執行各類針對於數據容器自己和它所持有的 數據的操做。咱們將在仔細研究 ByteBuf 和 ByteBufHolder 時探討這些特性。安全
由於全部的網絡通訊都涉及字節序列的移動,因此高效易用的數據結構明顯是必不可少的。 Netty 的 ByteBuf 實現知足並超越了這些需求。讓咱們首先來看看它是如何經過使用不一樣的索引 來簡化對它所包含的數據的訪問的吧。網絡
ByteBuf 維護了兩個不一樣的索引:一個用於讀取,一個用於寫入。當你從 ByteBuf 讀取時, 它的 readerIndex 將會被遞增已經被讀取的字節數。一樣地,當你寫入 ByteBuf 時,它的 writerIndex 也會被遞增。圖 5-1 展現了一個空 ByteBuf 的佈局結構和狀態。數據結構
要了解這些索引兩兩之間的關係,請考慮一下,若是打算讀取字節直到 readerIndex 達到 和 writerIndex 一樣的值時會發生什麼。在那時,你將會到達「能夠讀取的」數據的末尾。就 如同試圖讀取超出數組末尾的數據同樣,試圖讀取超出該點的數據將會觸發一個 IndexOutOfBoundsException。多線程
ByteBuf是一個抽象類,內部所有是抽象的函數接口,AbstractByteBuf這個抽象類基本實現了ByteBuf,下面咱們經過分析AbstractByteBuf裏面的實現來分析ByteBuf的工做原理。併發
ByteBuf都是基於字節序列的,相似於一個字節數組。在AbstractByteBuf裏面定義了下面5個變量:ide
//源碼 int readerIndex; //讀索引 int writerIndex; //寫索引 private int markedReaderIndex;//標記讀索引 private int markedWriterIndex;//標記寫索引 private int maxCapacity;//緩衝區的最大容量
ByteBuf 與JDK中的 ByteBuffer 的最大區別之一就是:
(1)netty的ByteBuf採用了讀/寫索引分離,一個初始化的ByteBuf的readerIndex和writerIndex都處於0位置。
(2)當讀索引和寫索引處於同一位置時,若是咱們繼續讀取,就會拋出異常IndexOutOfBoundsException。
(3)對於ByteBuf的任何讀寫操做都會分別單獨的維護讀索引和寫索引。maxCapacity最大容量默認的限制就是Integer.MAX_VALUE。
JDK中的Buffer的類型 有heapBuffer和directBuffer兩種類型,可是在netty中除了heap和direct類型外,還有composite Buffer(複合緩衝區類型)。
這是最經常使用的類型,ByteBuf將數據存儲在JVM的堆空間,經過將數據存儲在數組中實現的。
1)堆緩衝的優勢是:因爲數據存儲在JVM的堆中能夠快速建立和快速釋放,而且提供了數組的直接快速訪問的方法。
2)堆緩衝缺點是:每次讀寫數據都要先將數據拷貝到直接緩衝區再進行傳遞。
這種模式被稱爲支撐數組 (backing array),它能在沒有使用池化的狀況下提供快速的分配和釋放。這種方式,如代碼清單 5-1 所示,很是適合於有遺留的數據須要處理的狀況。
ByteBuf heapBuf = ...; if (heapBuf.hasArray()) { byte[] array = heapBuf.array(); int offset = heapBuf.arrayOffset() + heapBuf.readerIndex(); int length = heapBuf.readableBytes(); handleArray(array, offset, length); }
NIO 在 JDK 1.4 中引入的 ByteBuffer 類容許 JVM 實現經過本地調用來分配內存。這主要是爲了不在每次調用本地 I/O 操做以前(或者以後)將緩衝區的內容復 制到一箇中間緩衝區(或者從中間緩衝區把內容複製到緩衝區)。
Direct Buffer在堆以外直接分配內存,直接緩衝區不會佔用堆的容量。事實上,在經過套接字發送它以前,JVM將會在內部把你的緩衝 區複製到一個直接緩衝區中。因此若是使用直接緩衝區能夠節約一次拷貝。
(1)Direct Buffer的優勢是:在使用Socket傳遞數據時性能很好,因爲數據直接在內存中,不存在從JVM拷貝數據到直接緩衝區的過程,性能好。
(2)缺點是:相對於基於堆的緩衝區,它們的分配和釋放都較爲昂貴。若是你 正在處理遺留代碼,你也可能會遇到另一個缺點:由於數據不是在堆上,因此你不得不進行一 次複製。
雖然netty的Direct Buffer有這個缺點,可是netty經過內存池來解決這個問題。直接緩衝池不支持數組訪問數據,但能夠經過間接的方式訪問數據數組:
ByteBuf directBuf = ...; if (!directBuf.hasArray()) { int length = directBuf.readableBytes(); byte[] array = new byte[length]; directBuf.getBytes(directBuf.readerIndex(), array); handleArray(array, 0, length); }
不過對於一些IO通訊線程中讀寫緩衝時建議使用DirectByteBuffer,由於這涉及到大量的IO數據讀寫。對於後端的業務消息的編解碼模塊使用HeapByteBuffer。
第三種也是最後一種模式使用的是複合緩衝區,它爲多個 ByteBuf 提供一個聚合視圖。在 這裏你能夠根據須要添加或者刪除 ByteBuf 實例,這是一個 JDK 的 ByteBuffer 實現徹底缺 失的特性。
Netty 經過一個 ByteBuf 子類——CompositeByteBuf——實現了這個模式,它提供了一 個將多個緩衝區表示爲單個合併緩衝區的虛擬表示
Netty提供了Composite ByteBuf來處理複合緩衝區。例如:一條消息由Header和Body組成,將header和body組裝成一條消息發送出去。下圖顯示了Composite ByteBuf組成header和body:
若是使用的是JDK的ByteBuffer就不能簡單的實現,只能經過建立數組或則新的ByteBuffer,再將裏面的內容複製到新的ByteBuffer中,下面給出了一個CompositeByteBuf的使用示例:
//組合緩衝區 CompositeByteBuf compBuf = Unpooled.compositeBuffer(); //堆緩衝區 ByteBuf heapBuf = Unpooled.buffer(8); //直接緩衝區 ByteBuf directBuf = Unpooled.directBuffer(16); //添加ByteBuf到CompositeByteBuf compBuf.addComponents(heapBuf, directBuf); //刪除第一個ByteBuf compBuf.removeComponent(0); Iterator<ByteBuf> iter = compBuf.iterator(); while(iter.hasNext()){ System.out.println(iter.next().toString()); } //使用數組訪問數據 if(!compBuf.hasArray()){ int len = compBuf.readableBytes(); byte[] arr = new byte[len]; compBuf.getBytes(0, arr); }
Netty使用了CompositeByteBuf來優化套接字的I/O操做,儘量地消除了 由JDK的緩衝區實現所致使的性能以及內存使用率的懲罰。( 這尤爲適用於 JDK 所使用的一種稱爲分散/收集 I/O(Scatter/Gather I/O)的技術,定義爲「一種輸入和 輸出的方法,其中,單個系統調用從單個數據流寫到一組緩衝區中,或者,從單個數據源讀到一組緩衝 區中」。《Linux System Programming》,做者 Robert Love(O’Reilly, 2007)) 這種優化發生在Netty的核心代碼中, 所以不會被暴露出來,可是你應該知道它所帶來的影響。
ByteBuf提供讀/寫索引,從0開始的索引,第一個字節索引是0,最後一個字節的索引是capacity-1,下面給出一個示例遍歷ByteBuf的字節:
public static void main(String[] args) { //建立一個16字節的buffer,這裏默認是建立heap buffer ByteBuf buf = Unpooled.buffer(16); //寫數據到buffer for(int i=0; i<16; i++){ buf.writeByte(i+1); } //讀數據 for(int i=0; i<buf.capacity(); i++){ System.out.print(buf.getByte(i)+", "); } } /***output: 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, */
這裏有一點須要注意的是:經過那些須要一個索引值參數的方法(getByte(i))之一索引訪問byte時不會改變真實的讀索引和寫索引,咱們能夠經過ByteBuf的readerIndex()或則writerIndex()函數來分別推動讀索引和寫索引。
@Override public ByteBuf writeByte(int value) { ensureAccessible();//檢驗是否能夠寫入 ensureWritable0(1); _setByte(writerIndex++, value);//這裏寫索引自增了 return this; } @Override public byte readByte() { checkReadableBytes0(1); int i = readerIndex; byte b = _getByte(i); readerIndex = i + 1;//這裏讀索引自增了 return b; }
雖然 ByteBuf 同時具備讀索引和寫索引,可是 JDK 的 ByteBuffer 卻只有一個索引,這 也就是爲何必須調用 flip()方法來在讀模式和寫模式之間進行切換的緣由。
首先圖 5-3 展現了 ByteBuf 是如何被它的兩個索引劃分紅 3 個區域的
對於已經讀過的字節,咱們須要回收,經過調用ByteBuf.discardReadBytes()來回收已經讀取過的字節,discardReadBytes()將回收從索引0到readerIndex之間的字節。調用discardReadBytes()方法以後會變成以下圖所示;
雖然你可能會傾向於頻繁地調用 discardReadBytes()方法以確保可寫分段的最大化,可是 請注意,很明顯discardReadBytes()函數極可能會致使內存的複製,它須要移動ByteBuf中可讀字節到開始位置,因此該操做會致使時間開銷。說白了也就是時間換空間。
ByteBuf 的可讀字節分段存儲了實際數據。新分配的、包裝的或者複製的緩衝區的默認的 readerIndex 值爲 0。任何名稱以 read 或者 skip 開頭的操做都將檢索或者跳過位於當前 readerIndex 的數據,而且將它增長已讀字節數。
當咱們讀取字節的時候,通常要先判斷buffer中是否有字節可讀,這時候能夠調用isReadable()函數來判斷:源碼以下:
@Override public boolean isReadable() { return writerIndex > readerIndex; }
可寫字節分段是指一個擁有未定義內容的、寫入就緒的內存區域。新分配的緩衝區的 writerIndex 的默認值爲 0。任何名稱以 write 開頭的操做都將從當前的 writerIndex 處 開始寫數據,並將它增長已經寫入的字節數。若是寫操做的目標也是 ByteBuf,而且沒有指定 源索引的值,則源緩衝區的 readerIndex 也一樣會被增長相同的大小。
其實也就是判斷 讀索引是否小於寫索引 來判斷是否還能夠讀取字節。在判斷是否可寫時也是判斷寫索引是否小於最大容量來判斷。
@Override public boolean isWritable() { return capacity() > writerIndex; }
清除ByteBuf來講,有兩種形式,第一種是clear()函數:源碼以下:
@Override public ByteBuf clear() { readerIndex = writerIndex = 0; return this; }
很明顯這種方式並無真實的清除緩衝區中的數據,而只是把讀/寫索引值從新都置爲0了,這與discardReadBytes()方法有很大的區別。
從源碼可知,每一個ByteBuf有兩個標註索引,
private int markedReaderIndex;//標記讀索引 private int markedWriterIndex;//標記寫索引
能夠經過重置方法返回上次標記的索引的位置。
調用duplicate()、slice()、slice(int index, int length)等方法能夠建立一個現有緩衝區的視圖(現有緩衝區與原有緩衝區是指向相同內存)。衍生的緩衝區有獨立的readerIndex和writerIndex和標記索引。若是須要現有的緩衝區的全新副本,可使用copy()得到。
前面咱們也講過了,ByteBuf主要有三種類型,heap、direct和composite類型,下面介紹建立這三種Buffer的方法:
(1)經過ByteBufAllocator這個接口來建立ByteBuf,這個接口能夠建立上面的三種Buffer,通常都是經過channel的alloc()接口獲取。
(2)經過Unpooled類裏面的靜態方法,建立Buffer
CompositeByteBuf compBuf = Unpooled.compositeBuffer();
ByteBuf heapBuf = Unpooled.buffer(8); ByteBuf directBuf = Unpooled.directBuffer(16);
還有一點就是,ByteBuf裏面的數據都是保存在字節數組裏面的:
byte[] array;
先來講說ByteBuffer的缺點:
(1)下面是NIO中ByteBuffer存儲字節的字節數組的定義,咱們能夠知道ByteBuffer的字節數組是被定義成final的,也就是長度固定。一旦分配完成就不能擴容和收縮,靈活性低,並且當待存儲的對象字節很大可能出現數組越界,用戶使用起來稍不當心就可能出現異常。若是要避免越界,在存儲以前就要只要需求字節大小,若是buffer的空間不夠就建立一個更大的新的ByteBuffer,再將以前的Buffer中數據複製過去,這樣的效率是奇低的。
final byte[] hb;// Non-null only for heap buffers
(2)ByteBuffer只用了一個position指針來標識位置,讀寫模式切換時須要調用flip()函數和rewind()函數,使用起來須要很是當心,否則很容易出錯誤。
下面說說對應的ByteBuf的優勢:
(1)ByteBuf是吸收ByteBuffer的缺點以後從新設計,存儲字節的數組是動態的,最大是Integer.MAX_VALUE。這裏的動態性存在write操做中,write時得知buffer不夠時,會自動擴容。
(2) ByteBuf的讀寫索引分離,使用起來十分方便。此外ByteBuf還新增了不少方便實用的功能。
看類名咱們就能夠知道,該類主要是對引用進行計數,有點相似於JVM中判斷對象是否可回收的引用計數算法。這個類主要是根據ByteBuf的引用次數判斷ByteBuf是否可被自動回收。下面來看看源碼:
成員變量
private static final AtomicIntegerFieldUpdater<AbstractReferenceCountedByteBuf> refCntUpdater; //靜態代碼段初始化refCntUpdater static { AtomicIntegerFieldUpdater<AbstractReferenceCountedByteBuf> updater = PlatformDependent.newAtomicIntegerFieldUpdater(AbstractReferenceCountedByteBuf.class, "refCnt"); if (updater == null) { updater = AtomicIntegerFieldUpdater.newUpdater(AbstractReferenceCountedByteBuf.class, "refCnt"); } refCntUpdater = updater; } private volatile int refCnt = 1;
首先咱們能看到refCntUpdater這個變量,這是一個原子變量類AtomicIntegerFieldUpdater,她是一個靜態變量,並且是在static代碼段裏面實例化的,這說明這個類是單例的。這個類的主要做用是以原子的方式對成員變量進行更新操做以實現線程安全(這裏線程安全的保證也就是CAS+volatile)。
而後是定義了refCnt變量,用於跟蹤對象的引用次數,使用volatile修飾解決原子變量可視性問題。
對象引用計數器
那麼,對對象的引用計數與釋放是怎麼實現的呢?核心就是兩個函數:
//計數加1 retain(); //計數減一 release();
下面分析這兩個函數源碼:
每調用一次retain()函數一次,引用計數器就會加一,因爲可能存在多線程併發使用的情景,因此必須保證累加操做是線程安全的,那麼是怎麼保證的呢?咱們來看一下源碼:
public ByteBuf retain() { return retain0(1); } public ByteBuf retain(int increment) { return retain0(checkPositive(increment, "increment")); } /** 最後都是調用這個函數。 */ private ByteBuf retain0(int increment) { for (;;) { int refCnt = this.refCnt; final int nextCnt = refCnt + increment; // Ensure we not resurrect (which means the refCnt was 0) and also that we encountered an overflow. if (nextCnt <= increment) { throw new IllegalReferenceCountException(refCnt, increment); } if (refCntUpdater.compareAndSet(this, refCnt, nextCnt)) { break; } } return this; }
在retain0()函數中, 經過for(;;)來實現了自旋鎖。經過自旋來對引用計數器refCnt執行加1操做。這裏的加一操做是經過原子變量refCntUpdater的compareAndSet(this, refCnt, nextCnt)方法實現的,這個經過硬件級別的CAS保證了原子性,若是修改失敗了就會不停的自旋,直到修改爲功爲止。
下面再看看釋放的過程:release()函數:
private boolean release0(int decrement) { for (;;) { int refCnt = this.refCnt; if (refCnt < decrement) { throw new IllegalReferenceCountException(refCnt, -decrement); } if (refCntUpdater.compareAndSet(this, refCnt, refCnt - decrement)) { if (refCnt == decrement) { deallocate(); return true; } return false; } } }
這裏基本和retain()函數同樣,也是經過自旋和CAS保證執行的正確的將計數器減一。這裏須要注意的是當refCnt == decrement
也就是引用對象不可達時,就須要調用deallocate();方法來釋放ByteBuf對象。
從類名就能夠知道UnpooledHeapByteBuf 是基於堆內存的字節緩衝區,沒有基於對象池實現,這意味着每次的IO讀寫都會建立一個UnpooledHeapByteBuf對象,會形成必定的性能影響,可是也不容易出現內存管理的問題。
成員變量
有三個成員變量,各自的含義見註釋。
//緩衝區分配器,用於UnpooledHeapByteBuf的內存分配。在UnpooledHeapByteBuf構造器中實例化 private final ByteBufAllocator alloc; //字節數組做爲緩衝區 byte[] array; //實現ByteBuf與NIO中ByteBuffer的轉換 private ByteBuffer tmpNioBuf;
動態擴展緩衝區
在說道AbstractByteBuf的時候,ByteBuf是能夠自動擴展緩衝區大小的,這裏咱們分析一下在UnpooledHeapByteBuf中是怎麼實現的。
public ByteBuf capacity(int newCapacity) { ensureAccessible(); if (newCapacity < 0 || newCapacity > maxCapacity()) { throw new IllegalArgumentException("newCapacity: " + newCapacity); } int oldCapacity = array.length; if (newCapacity > oldCapacity) { byte[] newArray = new byte[newCapacity]; System.arraycopy(array, 0, newArray, 0, array.length); setArray(newArray); } else if (newCapacity < oldCapacity) { byte[] newArray = new byte[newCapacity]; int readerIndex = readerIndex(); if (readerIndex < newCapacity) { int writerIndex = writerIndex(); if (writerIndex > newCapacity) { writerIndex(writerIndex = newCapacity); } System.arraycopy(array, readerIndex, newArray, readerIndex, writerIndex - readerIndex); } else { setIndex(newCapacity, newCapacity); } setArray(newArray); } return this; }
裏面的實現並不複雜:
(1)首先獲取本來的容量oldCapacity;
(2)若是新需求容量大於oldCapacity,以新的容量newCapacity建立字節數組,將原來的字節數組內容經過調用System.arraycopy(array, 0, newArray, 0, array.length);複製過去,並將新的字節數組設爲ByteBuf的字節數組。
(3)若是新需求容量小於oldCapacity就不須要動態擴展,可是須要截取出一段新緩衝區。
PooledDirectByteBuf基於內存池實現的,具體的內存池的實現原理,比較複雜,我沒分析清楚,具體的只知道,內存池就是一片提早申請的內存,當須要ByteBuf的時候,就從內存池中申請一片內存,這樣效率比較高。
PooledDirectByteBuf和UnPooledDirectByteBuf基本同樣,惟一不一樣的就是內存分配策略。
建立字節緩衝區實例
因爲PooledDirectByteBuf基於內存池實現的,因此不能經過new關鍵字直接實例化一個對象,而是直接從內存池中獲取,而後設置引用計數器的值。看下源碼:
static PooledDirectByteBuf newInstance(int maxCapacity) { PooledDirectByteBuf buf = RECYCLER.get(); buf.reuse(maxCapacity); return buf; }
經過RECYCLER對象的get()函數從內存池獲取PooledDirectByteBuf對象。而後在buf.reuse(maxCapacity);
函數裏面設置引用計數器爲1。