本文地址: https://juejin.im/post/5db8ea...
Netty 高性能的緣由除了前面提到的 NIO 的 Reactor 線程模型, 零拷貝也是其高性能的一個重要緣由.(其實源碼解讀放到了最後)
java
Netty 默認都會優先使用 unsafe 的實現
算法
Netty 先申請一塊連續的空間做爲 ByteBuf 池, 須要用到的時候直接去池裏面取, 用完以後返還給 ByteBuf 池, 而不須要每次要用 ByteBuf 的時候都去申請. 堆外對象的建立比堆內的耗時. 數組
總結: 池化的做用就是加快程序獲取到操做的對象 ~~~~緩存
堆內指的在 JVM 中的數據,申請、操做都是在jvm裏.多線程
堆外的直接緩衝區指的是申請內存的時候用的 native 方法申請的 非jvm堆 的內存, 這一部份內存 OS 是能夠直接使用的 , 不像堆內的內存OS要使用的話還須要複製一次到直接緩衝區.申請的是堆外的內存, 這時候 java 中的對象(DirectByteBuf)只是一些reader/writer Index(memory(內存地址), offset(偏移量) 等)的處理, 寫數據/讀數據都是經過 native 對堆外的數據在進行操做.jvm
總結:用堆外內存是爲了防止對象的拷貝, 提高效率 ~~~~函數
unsafe 這個東西是 sun.misc 中提供的一個類, 用這個類能夠直接經過 native 方法操做內存, 固然也會有效率提高, 上面說的申請和操做堆外內存就是用這個叫作 unsafe 的東西來完成的. 可是用這個 unsafe 必須對內存操做很是熟悉, 否則很是容易出錯, 因此官方爲何把它叫作 unsafe 也是有道理的. post
總結: 直接操做內存, 效率提高, 可是使用容易出錯 性能
簡單說明: PoolThreadLocalCache 和 Recycle 都使用了 ThreadLocal 變量, 減小多線程的爭搶,提高操做效率.
測試
maxOrder 默認11 : 徹底二叉樹的深度(根節點是第0層, 因此客觀來講的一共有 maxOrder+1 層)
pageSize 默認8192 (8k) : 上面徹底二叉樹的最底層的葉子結點 page 的默認大小
pageShifts 默認13: 這個是 pageSize 的對數, 2^pageShifts = pageSize , pageSize 默認爲 8192, 因此這個默認值爲 13
chunkSize 默認 16m(pageSize * maxOrder): 這個是每一個 chunk 的大小, 就是下面 chunk圖 的每一層的大小
一個page裏面的最小劃分單位爲16byte, 16這個數字很重要, 後續有幾個關鍵計算的地方使用到
chunk 的結構
~~~~每一層的總和都是16m, 一直細分到最底層,每一個 page 爲 8192(8k),因此最底層有2k個節點,這裏固然沒有所有畫出來, subPage 都在page上進行操做.
作一個簡單測試, 測試堆外內存的申請和堆內內存申請的耗時:
static void nioAllocTest(){ int num = 10; int cnt = 100; int size = 256; ByteBuffer buf; long start1,end1,start2,end2; long sum1,sum2; for(int i = 0;i<num;i++){ sum1=sum2=0; int j; for(j = 0;j<cnt;j++) { start1 = System.nanoTime(); buf = ByteBuffer.allocateDirect(size); end1 = System.nanoTime(); sum1+=(end1-start1); // System.out.println("direct 申請時間: "+(end1-start1)); start2 = System.nanoTime(); buf = ByteBuffer.allocate(size); end2 = System.nanoTime(); // System.out.println("heap 申請時間: "+(end2-start2)); // System.out.println("-----"); sum2+=(end2-start2); } System.out.println(String.format("第 %s 輪申請 %s 次 %s 字節平均耗時 [direct: %s , heap: %s].",i,j,size,sum1/cnt, sum2/cnt)); } }
輸出結果爲:
第 0 輪申請 100 次 256 字節平均耗時 [direct: 4864 , heap: 1616].
第 1 輪申請 100 次 256 字節平均耗時 [direct: 5763 , heap: 1641].
第 2 輪申請 100 次 256 字節平均耗時 [direct: 4771 , heap: 1672].
第 3 輪申請 100 次 256 字節平均耗時 [direct: 4961 , heap: 883].
第 4 輪申請 100 次 256 字節平均耗時 [direct: 3556 , heap: 870].
第 5 輪申請 100 次 256 字節平均耗時 [direct: 5159 , heap: 726].
第 6 輪申請 100 次 256 字節平均耗時 [direct: 3739 , heap: 843].
第 7 輪申請 100 次 256 字節平均耗時 [direct: 3910 , heap: 221].
第 8 輪申請 100 次 256 字節平均耗時 [direct: 2191 , heap: 590].
第 9 輪申請 100 次 256 字節平均耗時 [direct: 1624 , heap: 615].
能夠看到 direct 堆外內存的申請耗時明顯多於 jvm堆的申請耗時, 這裏的耗時是幾倍(測試次數的很少可能不太準確, 感興趣的同窗能夠測試更大/更小的size, 可能會發現一些「有趣」的事).
作一個簡單測試,測試池化的效果
static void nettyPooledTest(){ try { int num = 10; int cnt = 100; int size = 8192; ByteBuf direct1, direct2, heap1, heap2; long start1, end1, start2, end2, start3, end3, start4, end4; long sum1, sum2, sum3, sum4; for (int i = 0; i<num; i++) { sum1 = sum2 = sum3 = sum4 = 0; int j; for (j = 0; j<cnt; j++) { start1 = System.nanoTime(); direct1 = PooledByteBufAllocator.DEFAULT.directBuffer(size); end1 = System.nanoTime(); sum1 += (end1-start1); start2 = System.nanoTime(); direct2 = UnpooledByteBufAllocator.DEFAULT.directBuffer(size); end2 = System.nanoTime(); sum2 += (end2-start2); start3 = System.nanoTime(); heap1 = PooledByteBufAllocator.DEFAULT.heapBuffer(size); end3 = System.nanoTime(); sum3 += (end3-start3); start4 = System.nanoTime(); heap2 = UnpooledByteBufAllocator.DEFAULT.heapBuffer(size); end4 = System.nanoTime(); sum4 += (end4-start4); direct1.release(); direct2.release(); heap1.release(); heap2.release(); } System.out.println(String.format("Netty 第 %s 輪申請 %s 次 [%s] 字節平均耗時 [direct.pooled: [%s] , direct.unpooled: [%s] , heap.pooled: [%s] , heap.unpooled: [%s]].", i, j, size, sum1/cnt, sum2/cnt, sum3/cnt, sum4/cnt)); } }catch(Exception e){ e.printStackTrace(); }finally { } }
最終輸出的結果:
Netty 第 0 輪申請 100 次 [8192] 字節平均耗時 [direct.pooled: [1784931] , direct.unpooled: [105310] , heap.pooled: [202306] , heap.unpooled: [23317]].
Netty 第 1 輪申請 100 次 [8192] 字節平均耗時 [direct.pooled: [12849] , direct.unpooled: [15457] , heap.pooled: [12671] , heap.unpooled: [12693]].
Netty 第 2 輪申請 100 次 [8192] 字節平均耗時 [direct.pooled: [13589] , direct.unpooled: [14459] , heap.pooled: [18783] , heap.unpooled: [13803]].
Netty 第 3 輪申請 100 次 [8192] 字節平均耗時 [direct.pooled: [10185] , direct.unpooled: [11644] , heap.pooled: [9809] , heap.unpooled: [12770]].
Netty 第 4 輪申請 100 次 [8192] 字節平均耗時 [direct.pooled: [15980] , direct.unpooled: [53980] , heap.pooled: [5641] , heap.unpooled: [12467]].
Netty 第 5 輪申請 100 次 [8192] 字節平均耗時 [direct.pooled: [4903] , direct.unpooled: [34215] , heap.pooled: [6659] , heap.unpooled: [12311]].
Netty 第 6 輪申請 100 次 [8192] 字節平均耗時 [direct.pooled: [2445] , direct.unpooled: [7197] , heap.pooled: [2849] , heap.unpooled: [11010]].
Netty 第 7 輪申請 100 次 [8192] 字節平均耗時 [direct.pooled: [2578] , direct.unpooled: [4750] , heap.pooled: [3904] , heap.unpooled: [255689]].
Netty 第 8 輪申請 100 次 [8192] 字節平均耗時 [direct.pooled: [1855] , direct.unpooled: [3492] , heap.pooled: [37822] , heap.unpooled: [3983]].
Netty 第 9 輪申請 100 次 [8192] 字節平均耗時 [direct.pooled: [1932] , direct.unpooled: [2961] , heap.pooled: [1825] , heap.unpooled: [6098]].
這裏看 DirectByteBuffer, 頻繁的申請堆外內存的話, 會下降服務端的性能, 這時候池化的做用就顯現出來了.池化只需開始的時候申請一塊足夠大的內存, 後續獲取對象只是從池裏取出, 用完返還Pool, 並不是每次都單獨去申請, 省去了後續使用從堆外申請空間的耗時.
這裏就講我的感受最重要的一個, 也是 netty 默認使用的類型: PooledUnsafeDirectByteBuf
, 咱們也從它的申請 PooledByteBufAllocator.DEFAULT.directBuffer() 開始講起.
下面從PooledByteBufAllocator.DEFAULT.directBuffer()進入
// 到第一個要分析的方法 protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) { // 從 threadlLocal 獲取一個線程本地緩存池 PoolThreadCache cache = (PoolThreadCache)this.threadCache.get(); // 這個緩存池包含 heap 和 direct 兩種, 獲取直接緩存池 PoolArena<ByteBuffer> directArena = cache.directArena; Object buf; if (directArena != null) { buf = directArena.allocate(cache, initialCapacity, maxCapacity); // 這裏往下 -- 1 } else { // 若是沒有堆外緩存池, 直接申請堆外的 ByteBuf, 優先使用 unsafe buf = PlatformDependent.hasUnsafe() ? UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity) : new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity); } return toLeakAwareBuffer((ByteBuf)buf); } // 1 directArena.allocate(cache, initialCapacity, maxCapacity); PooledByteBuf<T> allocate(PoolThreadCache cache, int reqCapacity, int maxCapacity) { // newByteBuf(maxCapacity); 有兩種實現, directArena 和 heapArena // Pool 的爲在 recycle 中重用一個 ByteBuf PooledByteBuf<T> buf = newByteBuf(maxCapacity); // -- 2 allocate(cache, buf, reqCapacity); // -- 7 return buf; } // 2 newByteBuf(maxCapacity) protected PooledByteBuf<ByteBuffer> newByteBuf(int maxCapacity) { // 優先使用 PooledUnsafeDirect if (HAS_UNSAFE) { // PooledUnsafeDirect return PooledUnsafeDirectByteBuf.newInstance(maxCapacity); // -- 3 } else { // PooledDirect return PooledDirectByteBuf.newInstance(maxCapacity); } } // 3 PooledUnsafeDirectByteBuf.newInstance static PooledUnsafeDirectByteBuf newInstance(int maxCapacity) { // 從用於回收的 ThreadLocal 中獲取一個 ByteBuf PooledUnsafeDirectByteBuf buf = RECYCLER.get(); // -- 4 // 重置 ByteBuf 的下標等 buf.reuse(maxCapacity); // -- 6 return buf; } // 4 Recycler.get() public final T get() { if (maxCapacityPerThread == 0) { return newObject((Handle<T>) NOOP_HANDLE); } // 每一個線程都有一個棧 Stack<T> stack = threadLocal.get(); // 彈出一個 handle DefaultHandle<T> handle = stack.pop(); // 若是 stack 中沒有 handle 則新建一個 if (handle == null) { handle = stack.newHandle(); // newObject 由調用者實現, 不一樣的 ByteBuf 建立各自不一樣的 ByteBuf, 須要由建立者實現 // handle.value is ByteBuf, 從上面跟下來, 因此這裏是 PooledUnsafeDirectByteBuf handle.value = newObject(handle); // -- 5 } // 返回一個 ByteBuf return (T) handle.value; } // 5 Stack.pop() , 從棧中取出一個 handle DefaultHandle<T> pop() { int size = this.size; if (size == 0) { if (!scavenge()) { return null; } size = this.size; } size --; // 取出棧最上面的 handle DefaultHandle ret = elements[size]; elements[size] = null; if (ret.lastRecycledId != ret.recycleId) { throw new IllegalStateException("recycled multiple times"); } // 重置這個 handle 的信息 ret.recycleId = 0; ret.lastRecycledId = 0; this.size = size; return ret; } // 6 重用 ByteBuf 以前須要重置一下以前的下標等 final void reuse(int maxCapacity) { maxCapacity(maxCapacity); setRefCnt(1); setIndex0(0, 0); discardMarks(); }
上面的1到6步, 從 PoolThreadLocalCache 中獲取堆外的Arena, 而且根據出須要的大小從 RECYCLE 中獲取一個線程本地的 ByteBuf 棧, 從棧中彈出一個 ByteBuf 而且重置 ByteBuf 的讀寫下標等.
講到這裏, 代碼中第二步的就算跟蹤完了, 接下來就是第七步開始了.
PooledByteBuf<T> allocate(PoolThreadCache cache, int reqCapacity, int maxCapacity) { // newByteBuf(maxCapacity); 有兩種實現, directArena 和 heapArena // Pool 的爲在 recycle 中重用一個 ByteBuf PooledByteBuf<T> buf = newByteBuf(maxCapacity); // -- 2 allocate(cache, buf, reqCapacity); // -- 7 return buf; }
上面講到從 RECYCLE 的 線程本地棧 中獲取到了一個 ByteBuf ,而且重置了讀寫下標等. 接下來的纔算是重點.咱們繼續跟着代碼走下去
// allocate(cache, buf, reqCapacity); -- 7 // 這一段都很重要,代碼複製比較多, normal(>8192) 和 huge(>16m) 的暫時不作分析 private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) { // 計算應該申請的大小 final int normCapacity = normalizeCapacity(reqCapacity); // -- 8 // 申請的大小是否小於一頁 (默認8192) 的大小 if (isTinyOrSmall(normCapacity)) { // capacity < pageSize int tableIdx; PoolSubpage<T>[] table; // reqCapacity < 512 boolean tiny = isTiny(normCapacity); if (tiny) { // < 512 is tiny // 申請 tiny 容量的空間 if (cache.allocateTiny(this, buf, reqCapacity, normCapacity)) { return; } // 計算屬於哪一個子頁, tiny 以 16B 爲單位 tableIdx = tinyIdx(normCapacity); table = tinySubpagePools; } else { //8192 > reqCapacity >= 512 is small // small 以 1024爲單位 if (cache.allocateSmall(this, buf, reqCapacity, normCapacity)) { return; } tableIdx = smallIdx(normCapacity); table = smallSubpagePools; } // head 指向本身在 table 中的位置的頭 final PoolSubpage<T> head = table[tableIdx]; /** * Synchronize on the head. This is needed as {@link PoolChunk#allocateSubpage(int)} and * {@link PoolChunk#free(long)} may modify the doubly linked list as well. */ synchronized (head) { final PoolSubpage<T> s = head.next; // 這裏判斷是否已經添加過 subPage // 添加過的話, 直接在該 subPage 上面進行操做, 記錄標識位等 if (s != head) { assert s.doNotDestroy && s.elemSize == normCapacity; // 在 subPage 的 bitmap 中的下標 long handle = s.allocate(); assert handle >= 0; // 用 已經初始化過的 bytebuf 初始化 subPage 中的信息 s.chunk.initBufWithSubpage(buf, handle, reqCapacity); // 計數 incTinySmallAllocation(tiny); return; } } // 第一次建立該類型大小的 ByteBuf, 須要建立一個subPage synchronized (this) { allocateNormal(buf, reqCapacity, normCapacity); } // 增長計數 incTinySmallAllocation(tiny); return; } }
計算應該申請的 ByteBuf 的大小
// 8 如下代碼是在 normalizeCapacity(reqCapacity) 中 // 若是 reqCapacity >= 512 ,則使用 跟hashMap 相同的擴容算法 // reqCapacity < 512(tiny類型) 則將 reqCapacity 變成 16 的倍數 if (!isTiny(reqCapacity)) { // 是否是很熟悉, 有沒有印象 HashMap 的擴容, 找一個不小於原數的2的指數次冪大小的數 int normalizedCapacity = reqCapacity; normalizedCapacity --; normalizedCapacity |= normalizedCapacity >>> 1; normalizedCapacity |= normalizedCapacity >>> 2; normalizedCapacity |= normalizedCapacity >>> 4; normalizedCapacity |= normalizedCapacity >>> 8; normalizedCapacity |= normalizedCapacity >>> 16; normalizedCapacity ++; // if (normalizedCapacity < 0) { normalizedCapacity >>>= 1; } assert directMemoryCacheAlignment == 0 || (normalizedCapacity & directMemoryCacheAlignmentMask) == 0; return normalizedCapacity; } // reqCapacity < 512 // 已是16的倍數,不作操做 if ((reqCapacity & 15) == 0) { return reqCapacity; } // 不是16的倍數,轉化爲16的倍數 return (reqCapacity & ~15) + 16;
由於 small 和 tiny
仍是有比較多類似的, 因此咱們選 tiny
來說
// 申請 tiny 容量的空間 if (cache.allocateTiny(this, buf, reqCapacity, normCapacity)) { return; } // 計算屬於哪一個子頁, tiny 以 16b 爲單位 tableIdx = tinyIdx(normCapacity); table = tinySubpagePools; // head 指向本身在 table 中的位置的頭 final PoolSubpage<T> head = table[tableIdx];
這裏看到 tinySubpagePools, 看名字應該是存儲 tinySubPage 的地方, 跟蹤一下能夠看到, tinySubPage 在構造方法裏進行了初始化
tinySubpagePools = newSubpagePoolArray(numTinySubpagePools); // 初始化 32 種類型的 subPage 的 head , 這裏是記錄 head for (int i = 0; i < tinySubpagePools.length; i ++) { tinySubpagePools[i] = newSubpagePoolHead(pageSize); } // 512 / 16 = 32 static final int numTinySubpagePools = 512 >>> 4;
numTinySubpagePools , 這是一個靜態變量, 512是small和tiny的邊界點, 512 >>> 4 = 32
,爲何是無符號右移4位, 還記得上面說的 subPage 分配的基本單位嗎, subPage 分配的基本單位就是 16byte, 因此這裏是計算從16 到 512 以 16爲單位 一共有多少種類型大小的 ByteBuf, tinySubpagePools->[16,32,48....512]
, 上面的 tinyIdx(int normCapacity) 就是計算屬於哪一種類型的ByteBuf 並獲取該類型 ByteBuf 在 tinySubpagePools 中的下標, 後續就能夠根據下標獲取到 pool 中對應下標的 head, 構造函數中初始化了全部的 head, 實際申請的話 ,不是用這個head來申請, 而是會另外 new 一個 subPage ,而後跟這個 head 造成雙向鏈表. 按照上面的代碼順序, 接下來就到了 poolSubPage(init or allocate)
final PoolChunk<T> chunk; // 當前 subPage 所處的 Page 節點下標 private final int memoryMapIdx; // 當前子頁的 head 在 該 chunk 中的偏移值, 單位爲 pageSize(default 8192) private final int runOffset; // default 8192 private final int pageSize; // 默認 8 個 long 的字節長度, long是64位, 8*64 = 512, 512 * 16(subPage最低按照16字節分配) = 8192(one default page) // 意思是將 一個page分爲 512 個 16byte, 每個 16byte 用一位(bit)來標記是否使用, 一個long有64bit, 因此一共須要 512 / 64 = 8個long類型來做爲標記位 private final long[] bitmap; // 這個是指一個 Page 中最多能夠存儲多少個 elemSize 大小 ByteBuf // maxNumElems = pageSize / elemSize private int maxNumElems; // 已經容納多少個 elemSize 大小的 ByteBuf private int numAvail; // 這個是記錄真正能使用到的 bit 的length, 由於你不可能每一個 page 中的 elemSize 都是16,確定是有其餘大小的, 在 PoolSubPage 的 init 方法中能夠看到: bitmapLength = maxNumElems >>> 6; private int bitmapLength; // 因此初始化方法 init(), 只初始化 bitmapLength 個 long 類型 /** * for (int i = 0; i < bitmapLength; i ++) { * bitmap[i] = 0; * } */
總結下來就是, 一個 8192 大小的 page, 先根據傳入的大小計算最多能容納多少個該大小的字節數組(堆外都是用字節數組) maxNumElems, 再根據最大能容納的數量計算最多能用到多少個 long類型的數字做爲標記位 bitmapLength , 最後初始化bitmap, 可見bitmap 是標記page中已經使用過的位置(以16byte爲單位).
PoolSubPage 中還有一個很重要的方法: toHandle(); 這個方法的做用是將節點下標 memoryMapIdx 和 bitmapIdx 放到一塊兒,用一個 long 類型來記錄.經過這個handle值, 能夠獲取到對應節點(根據 memoryMapIdx)和該節點(page)下對應的偏移位置(就是bitmapIdx * 16)
private long toHandle(int bitmapIdx) { // 後續會用 (int)handle 將這個 handle 值變回爲 memoryMapIdx , 即所屬節點下標 return 0x4000000000000000L | (long) bitmapIdx << 32 | memoryMapIdx; }
介紹完了 subPage 的字段含義後, 繼續跟蹤上面的代碼:
這一段代碼是在根據申請的大小獲取到對應下標的 head 節點後作的處理, s!=head 是判斷是否有申請過相同大小subPage, 有的話直接 initBufWithSubpage在原有的 subPage 上進行操做, 而不用調用後面的 allocateNormal(buf, reqCapacity, normCapacity); 去allocate 一個新的 subPage
synchronized (head) { final PoolSubpage<T> s = head.next; // 這裏判斷是否已經添加過 subPage // 添加過的話, 直接在該 subPage 上面進行操做, 記錄標識位等 if (s != head) { assert s.doNotDestroy && s.elemSize == normCapacity; // 在 subPage 的 bitmap 中的下標 && 節點下標 long handle = s.allocate(); assert handle >= 0; // 用已經初始化過的 bytebuf 更新 subPage 中的信息 s.chunk.initBufWithSubpage(buf, handle, reqCapacity); // 計數 incTinySmallAllocation(tiny); return; } }
initBufWithSubpage 方法跟蹤下去能夠看到:
buf.init( this, handle, runOffset(memoryMapIdx) + (bitmapIdx & 0x3FFFFFFF) * subpage.elemSize + offset, reqCapacity, subpage.elemSize, arena.parent.threadCache());
runOffset(memoryMapIdx): memoryMapIdx 爲節點下標, runOffset 表示該節點在chunk中的偏移量, 以 8192 爲單位 節點偏移(bitmapIdx & 0x3FFFFFFF) * subpage.elemSize: 這個偏移量表示 bitmapIdx 下標在 subPage 中的偏移量
offset: 表示chunk自身的偏移.
這個3個offset 總和就是 bitmapIdx表示的下標在整個緩存池中的具體偏移值
上面就是申請一個池化的ByteBuf的具體流程
本文屬做者我的理解, 有什麼寫錯的地方望各位能指出.
最後的最後,很是感謝大家能看到這裏!!大家的閱讀都是對做者的一次確定!!!
以爲文章有幫助的看官順手點個贊再走唄(終於暴露了我就是來騙讚的(◒。◒)),大家的每一個贊對做者來講都很是重要(異常真實),都是對做者寫做的一次確定(double)!!!
這一篇的內容到這就結束了,期待下一篇 還能有幸碰見你!
See you later!