咱們知道,Netty使用直接內存實現Netty零拷貝以提高性能,
但直接內存的建立和釋放可能須要涉及系統調用,是比較昂貴的操做,若是每一個請求都建立和釋放一個直接內存,那性能確定是不能知足要求的。
這時就須要使用內存池。
即從系統中申請一大塊內存,再在上面分配每一個請求所需的內存。算法
Netty中的內存池主要涉及PoolArena,PoolChunk與PoolSubpage。
本文主要分析PoolArena的做用與實現。
源碼分析基於Netty 4.1.52數組
接口關係
ByteBufAllocator,內存分配器,負責爲ByteBuf分配內存, 線程安全。
PooledByteBufAllocator,池化內存分配器,默認的ByteBufAllocator,預先從操做系統中申請一大塊內存,在該內存上分配內存給ByteBuf,能夠提升性能和減少內存碎片。
UnPooledByteBufAllocator,非池化內存分配器,每次都從操做系統中申請內存。緩存
RecvByteBufAllocator,接收內存分配器,爲Channel讀入的IO數據分配一塊大小合理的buffer空間。具體功能交由內部接口Handle定義。
它主要是針對Channel讀入場景添加一些操做,如guess,incMessagesRead,lastBytesRead等等。
ByteBuf,分配好的內存塊,能夠直接使用。安全
下面只關注PooledByteBufAllocator,它是Netty中默認的內存分配器,也是理解Netty內存機制的難點。微信
前面文章《ChannelPipeline機制與讀寫過程》中分析了數據讀取過程,
NioByteUnsafe#read多線程
public final void read() { ... final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle(); allocHandle.reset(config); ByteBuf byteBuf = null; ... byteBuf = allocHandle.allocate(allocator); allocHandle.lastBytesRead(doReadBytes(byteBuf)); ... }
recvBufAllocHandle方法返回AdaptiveRecvByteBufAllocator.HandleImpl。(AdaptiveRecvByteBufAllocator,PooledByteBufAllocator都在DefaultChannelConfig中初始化)併發
AdaptiveRecvByteBufAllocator.HandleImpl#allocate -> AbstractByteBufAllocator#ioBuffer -> PooledByteBufAllocator#directBuffer -> PooledByteBufAllocator#newDirectBufferjvm
protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) { // #1 PoolThreadCache cache = threadCache.get(); PoolArena<ByteBuffer> directArena = cache.directArena; final ByteBuf buf; if (directArena != null) { // #2 buf = directArena.allocate(cache, initialCapacity, maxCapacity); } else { // #3 buf = PlatformDependent.hasUnsafe() ? UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity) : new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity); } return toLeakAwareBuffer(buf); }
AbstractByteBufAllocator#ioBuffer方法會判斷當前系統是否支持unsafe。支持時使用直接內存,不支持則使用堆內存。這裏只關注直接內存的實現。
#1
從當前線程緩存中獲取對應內存池PoolArena
#2
在當前線程內存池上分配內存
#3
內存池不存在,只能使用非池化內存分配內存了高併發
PooledByteBufAllocator#threadCache是一個PoolThreadLocalCache實例,PoolThreadLocalCache繼承於FastThreadLocal,FastThreadLocal這裏簡單理解爲對ThreadLocal的優化,它爲每一個線程維護了一個PoolThreadCache,PoolThreadCache上關聯了內存池。
當PoolThreadLocalCache上某個線程的PoolThreadCache不存在時,經過initialValue方法構造。源碼分析
PoolThreadLocalCache#initialValue
protected synchronized PoolThreadCache initialValue() { // #1 final PoolArena<byte[]> heapArena = leastUsedArena(heapArenas); final PoolArena<ByteBuffer> directArena = leastUsedArena(directArenas); // #2 final Thread current = Thread.currentThread(); if (useCacheForAllThreads || current instanceof FastThreadLocalThread) { final PoolThreadCache cache = new PoolThreadCache( heapArena, directArena, smallCacheSize, normalCacheSize, DEFAULT_MAX_CACHED_BUFFER_CAPACITY, DEFAULT_CACHE_TRIM_INTERVAL); ... } // No caching so just use 0 as sizes. return new PoolThreadCache(heapArena, directArena, 0, 0, 0, 0); }
#1
從PooledByteBufAllocator的heapArenas,directArenas中獲取使用率最小的PoolArena。
PooledByteBufAllocator構造時默認會爲PooledByteBufAllocator#directArenas初始化8個PoolArena。
#2
構造PoolThreadCache。
PoolArena,能夠理解爲一個內存池,負責管理從操做系統中申請到的內存塊。
PoolThreadCache爲每個線程關聯一個PoolArena(PoolThreadCache#directArena),該線程的內存都在該PoolArena上分配。
Netty支持高併發系統,可能有不少線程進行同時內存分配。爲了緩解線程競爭,經過建立多個PoolArena細化鎖的粒度,從而提升併發執行的效率。
注意,一個PoolArena能夠會分給多個的線程,能夠看到PoolArena上會有一些同步操做。
前面分析SizeClasses的文章說過,Netty將內存池中的內存塊按大小劃分爲3個級別。
不一樣級別的內存塊管理算法不一樣。默認劃分規則以下:
small <= 28672(28K)
normal <= 16777216(16M)
huge > 16777216(16M)
smallSubpagePools是一個PoolSubpage數組,負責維護small級別的內存塊信息。
PoolChunk負責維護normal級別的內存,PoolChunkList管理一組PoolChunk。
PoolArena按內存使用率將PoolChunk分別維護到6個PoolChunkList中,
PoolArena按內存使用率將PoolChunk分別維護到6個PoolChunkList中,
qInit->內存使用率爲0~25,
q000->內存使用率爲1~50,
q025->內存使用率爲25~75,
q050->內存使用率爲50~75,
q075->內存使用率爲75~100,
q100->內存使用率爲100。
注意:PoolChunk是Netty每次向操做系統申請的內存塊。
PoolSubpage須要從PoolChunk中分配,而Tiny,Small級別的內存則是從PoolSubpage中分配。
下面來看一下分配過程
private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) { // #1 final int sizeIdx = size2SizeIdx(reqCapacity); // #2 if (sizeIdx <= smallMaxSizeIdx) { tcacheAllocateSmall(cache, buf, reqCapacity, sizeIdx); } else if (sizeIdx < nSizes) { // #3 tcacheAllocateNormal(cache, buf, reqCapacity, sizeIdx); } else { // #4 int normCapacity = directMemoryCacheAlignment > 0 ? normalizeSize(reqCapacity) : reqCapacity; // Huge allocations are never served via the cache so just call allocateHuge allocateHuge(buf, normCapacity); } }
#1
size2SizeIdx是父類SizeClasses提供的方法,它使用特定算法,將申請的內存大小調整爲規範大小,劃分到對應位置,返回對應索引,可參考《內存對齊類SizeClasses》
#2
分配small級別的內存塊
#3
分配normal級別的內存塊
#4
分配huge級別的內存塊
private void tcacheAllocateSmall(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity, final int sizeIdx) { // #1 if (cache.allocateSmall(this, buf, reqCapacity, sizeIdx)) { return; } // #2 final PoolSubpage<T> head = smallSubpagePools[sizeIdx]; final boolean needsNormalAllocation; synchronized (head) { // #3 final PoolSubpage<T> s = head.next; needsNormalAllocation = s == head; if (!needsNormalAllocation) { assert s.doNotDestroy && s.elemSize == sizeIdx2size(sizeIdx); long handle = s.allocate(); assert handle >= 0; s.chunk.initBufWithSubpage(buf, null, handle, reqCapacity, cache); } } // #4 if (needsNormalAllocation) { synchronized (this) { allocateNormal(buf, reqCapacity, sizeIdx, cache); } } incSmallAllocation(); }
#1
首先嚐試在線程緩存上分配。
除了PoolArena,PoolThreadCache#smallSubPageHeapCaches還爲每一個線程維護了Small級別的內存緩存
#2
使用前面SizeClasses#size2SizeIdx方法計算的索引,獲取對應PoolSubpage
#3
注意,head是一個佔位節點,並不存儲數據,s==head表示當前存在能夠用的PoolSubpage,由於已經耗盡的PoolSubpage是會從鏈表中移除。
接着從PoolSubpage中分配內存,後面有文章解析詳細過程
注意,這裏必要運行在同步機制中。
#4
沒有可用的PoolSubpage,須要申請一個Normal級別的內存塊,再在上面分配所需內存
normal級別的內存也是先嚐試在線程緩存中分配,分配失敗後再調用allocateNormal方法申請
PoolArena#allocate -> allocateNormal
private void allocateNormal(PooledByteBuf<T> buf, int reqCapacity, int sizeIdx, PoolThreadCache threadCache) { if (q050.allocate(buf, reqCapacity, sizeIdx, threadCache) || q025.allocate(buf, reqCapacity, sizeIdx, threadCache) || q000.allocate(buf, reqCapacity, sizeIdx, threadCache) || qInit.allocate(buf, reqCapacity, sizeIdx, threadCache) || q075.allocate(buf, reqCapacity, sizeIdx, threadCache)) { return; } // Add a new chunk. PoolChunk<T> c = newChunk(pageSize, nPSizes, pageShifts, chunkSize); boolean success = c.allocate(buf, reqCapacity, sizeIdx, threadCache); assert success; qInit.add(c); }
#1
依次從q050,q025,q000,qInit,q075上申請內存
爲何要是這個順序呢?
PoolArena中的PoolChunkList之間也組成一個「雙向」鏈表
qInit ---> q000 <---> q025 <---> q050 <---> q075 <---> q100
PoolChunkList中還維護了minUsage,maxUsage,即當一個PoolChunk使用率大於maxUsage,它將被移動到下一個PoolChunkList,使用率小於minUsage,則被移動到前一個PoolChunkList。
注意:q000沒有前置節點,它的minUsage爲1,即上面的PoolChunk內存徹底釋放後,將被銷燬。
qInit的前置節點是它本身,但它的minUsage爲Integer.MIN_VALUE,即便上面的PoolChunk內存徹底釋放後,也不會被銷燬,而是繼續保留在內存。
不優先從q000分配,正是由於q000上的PoolChunk內存徹底釋放後要被銷燬,若是在上面分配,則會延遲內存的回收進度。
而q075上因爲內存利用率過高,致使內存分配的成功率大大下降,所以放到最後。
因此從q050是一個不錯的選擇,這樣大部分狀況下,Chunk的利用率都會保持在一個較高水平,提升整個應用的內存利用率;
在PoolChunkList上申請內存,PoolChunkList會遍歷鏈表上PoolChunk節點,直到分配成功或到達鏈表末尾。
PoolChunk分配後,若是內存使用率高於maxUsage,它將被移動到下一個PoolChunkList。
newChunk方法負責構造一個PoolChunk,這裏是內存池向操做系統申請內存。
DirectArena#newChunk
protected PoolChunk<ByteBuffer> newChunk(int pageSize, int maxPageIdx, int pageShifts, int chunkSize) { if (directMemoryCacheAlignment == 0) { return new PoolChunk<ByteBuffer>(this, allocateDirect(chunkSize), pageSize, pageShifts, chunkSize, maxPageIdx, 0); } final ByteBuffer memory = allocateDirect(chunkSize + directMemoryCacheAlignment); return new PoolChunk<ByteBuffer>(this, memory, pageSize, pageShifts, chunkSize, maxPageIdx, offsetCacheLine(memory)); }
allocateDirect方法向操做系統申請內存,得到一個(jvm)ByteBuffer,
PoolChunk#memory維護了該ByteBuffer,PoolChunk的內存實際上都是在該ByteBuffer上分配。
最後是huge級別的內存申請
private void allocateHuge(PooledByteBuf<T> buf, int reqCapacity) { PoolChunk<T> chunk = newUnpooledChunk(reqCapacity); activeBytesHuge.add(chunk.chunkSize()); buf.initUnpooled(chunk, reqCapacity); allocationsHuge.increment(); }
比較簡單,沒有使用內存池,直接向操做系統申請內存。
void free(PoolChunk<T> chunk, ByteBuffer nioBuffer, long handle, int normCapacity, PoolThreadCache cache) { if (chunk.unpooled) { // #1 int size = chunk.chunkSize(); destroyChunk(chunk); activeBytesHuge.add(-size); deallocationsHuge.increment(); } else { // #2 SizeClass sizeClass = sizeClass(handle); if (cache != null && cache.add(this, chunk, nioBuffer, handle, normCapacity, sizeClass)) { // cached so not free it. return; } freeChunk(chunk, handle, normCapacity, sizeClass, nioBuffer, false); } }
#1
非池化內存,直接銷燬內存
#2
池化內存,首先嚐試加到線程緩存中,成功則不須要其餘操做。失敗則調用freeChunk
void freeChunk(PoolChunk<T> chunk, long handle, int normCapacity, SizeClass sizeClass, ByteBuffer nioBuffer, boolean finalizer) { final boolean destroyChunk; synchronized (this) { ... destroyChunk = !chunk.parent.free(chunk, handle, normCapacity, nioBuffer); } if (destroyChunk) { // destroyChunk not need to be called while holding the synchronized lock. destroyChunk(chunk); } }
chunk.parent即PoolChunkList,PoolChunkList#free會調用PoolChunk釋放內存,釋放內存後,若是內存使用率低於minUsage,則移動前一個PoolChunkList,若是前一個PoolChunkList不存在(q000),則返回false,由後面的步驟銷燬該PoolChunk。
可回顧前面解析ByteBuf文章中關於內存銷燬的內容。
若是您以爲本文不錯,歡迎關注個人微信公衆號,系列文章持續更新中。您的關注是我堅持的動力!