NIO中緩衝區是數據傳輸的基礎,JDK經過ByteBuffer實現,Netty框架中並未採用JDK原生的ByteBuffer,而是構造了ByteBuf。html
ByteBuf對ByteBuffer作了大量的優化,好比說內存池,零拷貝,引用計數(不依賴GC),本文主要是分析這些優化,學習這些優化思想,學以至用,在實際工程中,借鑑這些優化方案和思想。數組
首先先講一下這裏面須要用的基礎知識,在JVM中 內存可分爲兩大塊,一個是堆內存,一個是直接內存。這裏簡單介紹一下緩存
堆內存是Jvm所管理的內存,相比方法區,棧內存,堆內存是最大的一塊。全部的對象實例實例以及數組都要在堆上分配。網絡
Java的垃圾收集器是能夠在堆上回收垃圾。app
JVM使用Native函數在堆外分配內存,以後經過Java堆中的DirectByteBuffer對象做爲這塊內存的引用進行操做。直接內存不會受到Java堆的限制,只受本機內存影響。框架
Java的GC只會在老年區滿了觸發Full GC時,纔會去順便清理直接內存的廢棄對象。dom
在NIO中,全部數據都是用緩衝區處理的。讀寫數據,都是在緩衝區中進行的。緩存區實質是是一個數組,一般使用字節緩衝區——ByteBuffer。ide
屬性:函數
屬性post |
說明 |
capacity |
緩衝區的大小,一旦申請將不能改變 |
position |
位置索引,表示讀模式或者寫模式數據的位置,讀模式和寫模式切換的時候position會被重置爲0,positon最大可謂capacity-1。 |
limit |
在寫模式下,Buffer的limit表示你最多能往Buffer裏寫多少數據。讀模式下,limit等於buffer的capacity |
mark |
標記,指一個備忘位置,調用mark()來設定mark=position,調用reset()來設定postion=mark,標記未設定前是未定義的. |
使用方式:
ByteBuffer能夠申請兩種方式的內存,分別爲堆內存和直接內存,首先看申請堆內存。
// 申請堆內存 ByteBuffer HeapbyteBuffer = ByteBuffer.allocate(1024);
很簡單,就一行代碼,再看看allocate方法。
public static ByteBuffer allocate(int capacity) { if (capacity < 0) throw new IllegalArgumentException(); return new HeapByteBuffer(capacity, capacity); }
其實就是new一個HeapByteBuffer對象。這個 HeapByteBuffer繼承自ByteBuffer,構造器採用了父類的構造器,以下所示:
HeapByteBuffer(int cap, int lim) { // package-private super(-1, 0, lim, cap, new byte[cap], 0); /* hb = new byte[cap]; offset = 0; */ } //ByteBuffer構造器 ByteBuffer(int mark, int pos, int lim, int cap, // package-private byte[] hb, int offset) { super(mark, pos, lim, cap); this.hb = hb; this.offset = offset; }
結合ByteBuffer的四個屬性,初始化的時候就能夠賦值capaticy,limit,position,mark,至於byte[] hb, int offsef這兩個屬性,JDK文檔給出的解釋是backing array,and array offset。它是一個回滾數組,offset是數組的偏移值。
申請直接內存:
// 申請直接內存 ByteBuffer DirectbyteBuffer = ByteBuffer.allocateDirect(1024);
allocateDirect()實際上就是new的一個DirectByteBuffer對象,不過這個new 一個普通對象不同。這裏使用了Native函數來申請內存,在Java中就是調用unsafe對象
public static ByteBuffer allocateDirect(int capacity) { return new DirectByteBuffer(capacity); } DirectByteBuffer(int cap) { // package-private super(-1, 0, cap, cap); boolean pa = VM.isDirectMemoryPageAligned(); int ps = Bits.pageSize(); long size = Math.max(1L, (long)cap + (pa ? ps : 0)); Bits.reserveMemory(size, cap); long base = 0; try { base = unsafe.allocateMemory(size); } catch (OutOfMemoryError x) { Bits.unreserveMemory(size, cap); throw x; } unsafe.setMemory(base, size, (byte) 0); if (pa && (base % ps != 0)) { // Round up to page boundary address = base + ps - (base & (ps - 1)); } else { address = base; } cleaner = Cleaner.create(this, new Deallocator(base, size, cap)); att = null; }
申請方法不一樣的內存有不一樣的用法。接下來看一看ByteBuffer的經常使用方法與如何使用
Bytebuf的讀和寫是使用put()和get()方法實現的
// 讀操做 public byte get() { return hb[ix(nextGetIndex())]; } final int nextGetIndex() { if (position >= limit) throw new BufferUnderflowException(); return position++; } // 寫操做 public ByteBuffer put(byte x) { hb[ix(nextPutIndex())] = x; return this; } final int nextPutIndex() { if (position >= limit) throw new BufferOverflowException(); return position++; }
從代碼中能夠看出,讀和寫操做都會改變ByteBuffer的position屬性,這兩個操做是共用的position屬性。這樣就會帶來一個問題,讀寫操做會致使數據出錯啊,數據位置出錯。
ByteBuffer提供了flip()方法,讀寫模式切換,切換的時候會改變position和limit的位置。看看flip()怎麼實現的:
public final Buffer flip() { // 1. 設置 limit 爲當前位置 limit = position; // 2. 設置 position 爲0 position = 0; mark = -1; return this; }
這裏就不重點介紹了,有些細節能夠本身去深究。
Netty使用的自身的ByteBuf對象來進行數據傳輸,本質上使用了外觀模式對JDK的ByteBuffer進行封裝。
相較於原生的ByteBuffer,Netty的ByteBuf作了不少優化,零拷貝,內存池加速,讀寫索引。
首先要明白一點,Netty的內存池是不依賴於JVM自己的GC的。
回顧一下直接內存的GC:
上文提到Java的GC只會在老年區滿了觸發Full GC時,纔會去順便清理直接內存的廢棄對象。
JVM中的直接內存,存在堆內存中其實就是DirectByteBuffer類,它自己其實很小,真的內存是在堆外,這裏是映射關係。
每次申請直接內存,都先看看是否超限 —— 直接內存的限額默認(可用 -XX:MaxDirectMemorySize 從新設定)。
若是超過限額,就會主動執行System.gc(),這樣會帶來一個影響,系統會中斷100ms。若是沒有成功回收直接內存,而且仍是超過直接內存的限額,就會拋出OOM——內存溢出。
繼續從GC角度分析,DirectByteBuffer熬過了幾回young gc以後,會進入老年代。當老年代滿了以後,會觸發Full GC。
由於自己很小,很難佔滿老年代,所以基本不會觸發Full GC,帶來的後果是大量堆外內存一直佔着不放,沒法進行內存回收。
還有最後一個辦法,就是依靠申請額度超限時觸發的system.gc(),可是前面提到,它會中斷進程100ms,若是在這100ms的之間,系統未完成GC,仍會拋出OOM。
因此這個最後一個辦法也不是徹底保險的。
Netty使用了引用計數的方式,主動回收內存。回收的對象包括非池直接內存,和內存池中的內存。
Netty中使用引用計數機制來管理資源,ByteBuf其實是實現了ReferenceCounted接口,當實例化ByteBuf對象時,引用計數加1。
當應用代碼保持一個對象引用時,會調用retain方法將計數增長1,對象使用完畢進行釋放,調用release將計數器減1.
當引用計數變爲0時,對象將釋放全部的資源,返回內存池。
Netty內存泄漏檢測級別: 禁用(DISABLED) - 徹底禁止泄露檢測。不推薦。 簡單(SIMPLE) - 告訴咱們取樣的1%的緩衝是否發生了泄露。默認。 高級(ADVANCED) - 告訴咱們取樣的1%的緩衝發生泄露的地方 偏執(PARANOID) - 跟高級選項相似,但此選項檢測全部緩衝,而不只僅是取樣的那1%。此選項在自動測試階段頗有用。若是構建(build)輸出包含了LEAK,可認爲構建失敗
也可使用JVM的-Dio.netty.leakDetectionLevel選項來指定泄漏檢測級別。
內存跟蹤
在內存池中分配內存,獲得的ByteBuf對象都是通過toLeakAwareBuffer()方法封裝的,該方法做用就是對ByteBuf對象進行引用計數,使用SimpleLeakAwareByteBuf或者AdvancedLeakAwareByteBuf來包裝ByteBuf。此外該方法只對非池內存中的直接內存和內存池中的內存進行內存泄露檢測。
//裝飾器模式,用SimpleLeakAwareByteBuf或AdvancedLeakAwareByteBuf來包裝原始的ByteBuf protected static ByteBuf toLeakAwareBuffer(ByteBuf buf) { ResourceLeakTracker<ByteBuf> leak; //根據設置的Level來選擇使用何種裝飾器 switch (ResourceLeakDetector.getLevel()) { case SIMPLE: //建立用於跟蹤和表示內容泄露的ResourcLeak對象 leak = AbstractByteBuf.leakDetector.track(buf); if (leak != null) { //只在ByteBuf.order方法中調用ResourceLeak.record buf = new SimpleLeakAwareByteBuf(buf, leak); } break; case ADVANCED: case PARANOID: leak = AbstractByteBuf.leakDetector.track(buf); if (leak != null) { //只在ByteBuf.order方法中調用ResourceLeak.record buf = new AdvancedLeakAwareByteBuf(buf, leak); } break; default: break; } return buf; }
實際上,內存泄露檢測是在AbstractByteBuf.leakDetector.track(buf)進行的,來看看track方法的具體實現。
/** * Creates a new {@link ResourceLeakTracker} which is expected to be closed via * {@link ResourceLeakTracker#close(Object)} when the related resource is deallocated. * * @return the {@link ResourceLeakTracker} or {@code null} */ @SuppressWarnings("unchecked") public final ResourceLeakTracker<T> track(T obj) { return track0(obj); } @SuppressWarnings("unchecked") private DefaultResourceLeak track0(T obj) { Level level = ResourceLeakDetector.level; // 不進行內存跟蹤 if (level == Level.DISABLED) { return null; } if (level.ordinal() < Level.PARANOID.ordinal()) { //若是監控級別低於PARANOID,在必定的採樣頻率下報告內存泄露 if ((PlatformDependent.threadLocalRandom().nextInt(samplingInterval)) == 0) { reportLeak(); return new DefaultResourceLeak(obj, refQueue, allLeaks); } return null; } //每次須要分配 ByteBuf 時,報告內存泄露狀況 reportLeak(); return new DefaultResourceLeak(obj, refQueue, allLeaks); }
再來看看返回對象——DefaultResourceLeak,他的實現方式以下:
private static final class DefaultResourceLeak<T> extends WeakReference<Object> implements ResourceLeakTracker<T>, ResourceLeak {
它繼承了虛引用WeakReference,虛引用徹底不影響目標對象的垃圾回收,可是會在目標對象被VM垃圾回收時加入到引用隊列,
正常狀況下ResourceLeak對象,會將監控的資源的引用計數爲0時被清理掉。
可是當資源的引用計數失常,ResourceLeak對象也會被加入到引用隊列.
存在着這樣一種狀況:沒有成對調用ByteBuf的retain和relaease方法,致使ByteBuf沒有被正常釋放,當ResourceLeak(引用隊列) 中存在元素時,即代表有內存泄露。
Netty中的 reportLeak()方法來報告內存泄露狀況,經過檢查引用隊列來判斷是否有內存泄露,並報告跟蹤狀況.
方法代碼以下:
Netty中有handler鏈,消息有本Handler傳到下一個Handler。因此Netty引入了一個規則,誰是最後使用者,誰負責釋放。
根據誰最後使用誰負責釋放的原則,每一個Handler對消息可能有三種處理方式
假設每個Handler都把消息往下傳,Handler並也不知道誰是啓動Netty時所設定的Handler鏈的最後一員,因此Netty在Handler鏈的最末補了一個TailHandler,若是此時消息仍然是ReferenceCounted類型就會被release掉。
總結:
1.Netty在不一樣的內存泄漏檢測級別狀況下,採樣機率是不同的,在Simple狀況下出現了Leak,要設置「-Dio.netty.leakDetectionLevel=advanced」再跑一次代碼,找到建立和訪問的地方。
2.Netty中的內存泄露檢測是經過對ByteBuf對象進行裝飾,利用虛引用和引用計數來對非池中的直接內存和內存池中內存進行跟蹤,判斷是否發生內存泄露。
3.計數器基於 AtomicIntegerFieldUpdater,由於ByteBuf對象不少,若是都把int包一層AtomicInteger花銷較大,而AtomicIntegerFieldUpdater只須要一個全局的靜態變量。
Netty中將內存池分爲五種不一樣的形態:Arena,ChunkList,Chunk,Page,SubPage.
首先來看Netty最大的內存單位PoolArena——連續的內存塊。它是由多個PoolChunkList和兩個SubPagePools(一個是tinySubPagePool,一個是smallSubPagePool)組成的。以下圖所示:
1.PoolChunkList是一個雙向的鏈表,PoolChunkList負責管理多個PoolChunk的生命週期。
2.PoolChunk中包含多個Page,Page的大小默認是8192字節,也能夠設置系統變量io.netty.allocator.pageSize來改變頁的大小。自定義頁大小有以下限制:1.必須大於4096字節,2.必須是2的整次數冪。
3.塊(PoolChunk)的大小是由頁的大小和maxOrder算出來的,計算公式是:chunkSize = 2^{maxOrder} * pageSize。maxOrder的默認值是11,也能夠經過io.netty.allocator.maxOrder系統變量設置,只能是0-14的範圍,因此chunksize的默認大小爲:(2^11)*8192=16MB
Page中包含多個SubPage。
PoolChunk內部維護了一個平衡二叉樹,以下圖所示:
PoolSubPage
一般一個頁(page)的大小就達到了10^13(8192字節),一般一次申請分配內存沒有這麼大,可能很小。
因而Netty將頁(page)劃分紅更小的片斷——SubPage
Netty定義這樣的內存單元是爲了更好的分配內存,接下來看一下一個ByteBuf是如何在內存池中申請內存的。
分配原則:
內存池中的內存分配是在PoolArea中進行的。
應用中在內存池中申請內存的方法:
// 在內存池中申請 直接內存 ByteBuf directByteBuf = ByteBufAllocator.DEFAULT.directBuffer(1024); // 在內存池中申請 堆內存 ByteBuf heapByteBuf = ByteBufAllocator.DEFAULT.heapBuffer(1024);
接下來,一層一層的看下來,在Netty中申請內存是如何實現的。就拿申請直接內存舉例,首先看directBuffer方法。
// directBuffer方法實現 @Override public ByteBuf directBuffer(int initialCapacity) { return directBuffer(initialCapacity, DEFAULT_MAX_CAPACITY); } // 校驗申請大小,返回申請的直接內存 @Override public ByteBuf directBuffer(int initialCapacity, int maxCapacity) { if (initialCapacity == 0 && maxCapacity == 0) { return emptyBuf; } validate(initialCapacity, maxCapacity); return newDirectBuffer(initialCapacity, maxCapacity); } //PooledByteBufAllocator類中的 newDirectBuffer方法的實現 @Override protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) { // Netty避免每一個線程對內存池的競爭,在每一個線程都提供了PoolThreadCache線程內的內存池 PoolThreadCache cache = threadCache.get(); PoolArena<ByteBuffer> directArena = cache.directArena; // 若是緩存存在,則分配內存 final ByteBuf buf; if (directArena != null) { buf = directArena.allocate(cache, initialCapacity, maxCapacity); } else { // 緩存不存在,則分配非池內存 buf = PlatformDependent.hasUnsafe() ? UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity) : new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity); } // 經過toLeakAwareBuffer包裝成內存泄漏檢測的buffer return toLeakAwareBuffer(buf); }
通常狀況下,內存都是在buf = directArena.allocate(cache, initialCapacity, maxCapacity)這行代碼進行內存分配的,也就是說在內存的連續塊PoolArena中進行的內存分配。
接下來,咱們根據內存分配原則來進行內存研讀PoolArena中的allocate方法。
1 PooledByteBuf<T> allocate(PoolThreadCache cache, int reqCapacity, int maxCapacity) { 2 PooledByteBuf<T> buf = newByteBuf(maxCapacity); 3 allocate(cache, buf, reqCapacity); 4 return buf; 5 } 6 7 private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) { 8 final int normCapacity = normalizeCapacity(reqCapacity); 9 if (isTinyOrSmall(normCapacity)) { // capacity < pageSize 10 int tableIdx; 11 PoolSubpage<T>[] table; 12 boolean tiny = isTiny(normCapacity); 13 if (tiny) { // < 512 14 15 // 若是申請內存小於512字節,則會在tingSubPagePools中進行分配 16 if (cache.allocateTiny(this, buf, reqCapacity, normCapacity)) { 17 // was able to allocate out of the cache so move on 18 return; 19 } 20 tableIdx = tinyIdx(normCapacity); 21 table = tinySubpagePools; 22 } else { 23 // 若是大於512小於PageSize字節,則會在smallSubPagePools進行分配 24 if (cache.allocateSmall(this, buf, reqCapacity, normCapacity)) { 25 // was able to allocate out of the cache so move on 26 return; 27 } 28 tableIdx = smallIdx(normCapacity); 29 table = smallSubpagePools; 30 } 31 32 final PoolSubpage<T> head = table[tableIdx]; 33 34 /** 35 * Synchronize on the head. This is needed as {@link PoolChunk#allocateSubpage(int)} and 36 * {@link PoolChunk#free(long)} may modify the doubly linked list as well. 37 */ 38 synchronized (head) { 39 final PoolSubpage<T> s = head.next; 40 if (s != head) { 41 assert s.doNotDestroy && s.elemSize == normCapacity; 42 long handle = s.allocate(); 43 assert handle >= 0; 44 s.chunk.initBufWithSubpage(buf, handle, reqCapacity); 45 incTinySmallAllocation(tiny); 46 return; 47 } 48 } 49 synchronized (this) { 50 allocateNormal(buf, reqCapacity, normCapacity); 51 } 52 53 incTinySmallAllocation(tiny); 54 return; 55 } 56 if (normCapacity <= chunkSize) { 57 if (cache.allocateNormal(this, buf, reqCapacity, normCapacity)) { 58 // was able to allocate out of the cache so move on 59 return; 60 } 61 synchronized (this) { 62 allocateNormal(buf, reqCapacity, normCapacity); 63 ++allocationsNormal; 64 } 65 } else { 66 // Huge allocations are never served via the cache so just call allocateHuge 67 allocateHuge(buf, reqCapacity); 68 } 69 }
底層IO處理線程的緩衝區使用堆外直接緩衝區,減小一次IO複製。業務消息的編解碼使用堆緩衝區,分配效率更高,並且不涉及到內核緩衝區的複製問題。
Netty默認不使用內存池,須要在建立服務端或者客戶端的時候進行配置。
//Boss線程池內存池配置. .option(ChannelOption.ALLOCATOR,PooledByteBufAllocator.DEFAULT) //Work線程池內存池配置. .childOption(ChannelOption.ALLOCATOR,PooledByteBufAllocator.DEFAULT);
本人的想法是:
1.I/O處理線程使內存池中的直接內存,開啓以上配置
2.在handler處理業務的時候,使用內存池中的堆內存
還有一點值得注意的是:在使用完內存池中的ByteBuf,必定要記得釋放,即調用release():
// 在內存池中申請 直接內存 ByteBuf directByteBuf = ByteBufAllocator.DEFAULT.directBuffer(1024); // 歸還到內存池 directByteBuf.release();
若是handler繼承了SimpleChannelInboundHandler,那麼它將會自動釋放Bytefuf.詳情可見:
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { boolean release = true; try { if (acceptInboundMessage(msg)) { @SuppressWarnings("unchecked") I imsg = (I) msg; channelRead0(ctx, imsg); } else { release = false; ctx.fireChannelRead(msg); } } finally { // autoRelease默認爲true if (autoRelease && release) { // 釋放Bytebuf,歸還到內存池 ReferenceCountUtil.release(msg); } } }
該部分是重點介紹的部分,首先將它與傳統的I/O read和write操做做對比,看看有什麼不一樣,首先須要理解一下用戶態和內存態的概念
用戶態(User Mode)和內核態(Kernel Mode),也能夠叫用戶空間和內核
用戶態:受限的訪問內存,而且不容許訪問硬件設備。
內核態:本質上是一個軟件,能夠控制計算機的硬件資源(如網卡,硬盤),能夠訪問內存全部數據。
用戶程序都是運行在用戶態中的,好比JVM,就是用戶程序,因此它運行在用戶態中。
用戶態是不能直接訪問硬件設備的,若是須要一次I/O操做,那就必須利用系統調用機制切換到內核態(用戶態與內核態之間的轉換稱爲上下文切換),進行硬盤讀寫。
好比說一次傳統網絡I/O:
第一步,從用戶態切換到內核態,將用戶緩衝區的數據拷貝到內核緩衝區,執行send操做。
第二步,數據發送由底層的操做系統進行,此時從內核態切換到用戶態,將內核緩存區的數據拷貝到網卡的緩衝區
總結:也就是一次普通的網絡I/O,至少通過兩次上下文切換,和兩次內存拷貝。
什麼是零拷貝?
當須要傳輸的數據遠大於內核緩衝區的大小時,內核緩衝區就成爲I/O的性能瓶頸。零拷貝就是杜絕了內核緩衝區與用戶緩衝區的的數據拷貝。
因此零拷貝適合大數據量的傳輸。
拿傳統的網絡I/O作對比,零拷貝I/O是怎樣的一個過程:
用戶程序執行transferTo(),將用戶緩衝區待發送的數據拷貝到網卡緩衝區。
很簡單,一步完成,中間少了用戶態到內存態的拷貝。
Netty中零拷貝如何實現
Netty的中零拷貝與上述零拷貝是不同的,它並非系統層面上的零拷貝,只是相對於ByteBuf而言的。
Netty中的零拷貝:
1.CompositeByteBuf,將多個ByteBuf合併爲一個邏輯上的ByteBuf,避免了各個ByteBuf之間的拷貝。
使用方式:
CompositeByteBuf compositeByteBuf = Unpooled.compositeBuffer(); compositeByteBuf.addComponents(true, ByteBuf1, ByteBuf1);
注意:addComponents第一個參數必須爲true,那麼writeIndex纔不爲0,才能從compositeByteBuf中讀到數據。
2.wrapedBuffer()方法,將byte[]數組包裝成ByteBuf對象。
byte[] bytes = data.getBytes(); ByteBuf byteBuf = Unpooled.wrappedBuffer(bytes);
Unpooled.wrappedBuffer(bytes)就是進行了byte[]數組的包裝工做,過程當中不存在內存拷貝。
即包裝出來的ByteBuf和byte[]數組指向了同一個存儲空間。由於值引用,因此bytes修改也會影響byteBuf 的值。
3.ByteBuf的分割,slice()方法。將一個ByteBuf對象切分紅多個ByteBuf對象。
ByteBuf directByteBuf = ByteBufAllocator.DEFAULT.directBuffer(1024); ByteBuf header = directByteBuf.slice(0,50); ByteBuf body = directByteBuf.slice(51,1024);
header和body兩個ByteBuf對象實際上仍是指向directByteBuf的存儲空間。
總結:
本文很長很長,博主陸陸續續寫了有一個月的時間。可是隻是窺探Netty內存池中的冰山一角,更可能是要在實際項目中進行驗證才能起到效果。
關於Netty的文章會持續更新,共勉!~~~喜歡的話,給個推薦,若是不足和錯誤之處,請予以斧正~
參考:
對於 Netty ByteBuf 的零拷貝(Zero Copy) 的理解
Netty官方wiki