PooledByteBufAllocator負責初始化PoolArena(PA)和PoolThreadCache(PTC)。它提供了一系列的接口,用來建立使用堆內存或直接內存的PooledByteBuf對象,這些接口只是一張皮,內部徹底使用了PA和PTC的能力。初始化過程分兩個步驟,首先初始化一系列的默認參數,而後初始化PTC對象和PA數組。html
DEFAULT_PAGE_SIZE: PoolChunk中的page的大小-pageSize, 使用-Dio.netty.allocator.pageSize設置, 默認值:8192。java
DEFAULT_MAX_ORDER: PoolChunk中二叉樹的高度: maxOrder, 使用-Dio.netty.allocator.maxOrder設置,默認值:11。算法
DEFAULT_NUM_HEAP_ARENA: 使用堆內存的PA數組的長度,使用-Dio.netty.allocator.numHeapArenas設置,默認值: CPU核心數 * 2。數組
DEFAULT_NUM_DIRECT_ARENA: 使用直接內存的PA數組的長度,使用-Dio.netty.allocator.numHeapArenas設置,默認值: CPU核心數 * 2。緩存
DEFAULT_TINY_CACHE_SIZE: PTC對象中每一個用來緩存Tiny內存的MemoryRegionCache對象中queue的長度,使用-Dio.netty.allocator.tinyCacheSize設置,默認值:512。ide
DEFAULT_SMALL_CACHE_SIZE: PTC對象中每一個用來緩存Small內存的MemoryRegionCache對象中queue的長度,使用-Dio.netty.allocator.smallCacheSize設置,默認值:256。性能
DEFAULT_NORMAL_CACHE_SIZE: PTC對象中每一個用來緩存Normal內存的MemoryRegionCache對象中queue的長度,使用-Dio.netty.allocator.normalCacheSize設置,默認值:64。優化
DEFAULT_MAX_CACHED_BUFFER_CAPACITY: PTC對象中緩存Normal內存的大小上限。使用-Dio.netty.allocator.maxCachedBufferCapacity設置,默認值32 * 1024。this
DEFAULT_CACHE_TRIM_INTERVAL: PTC對象中釋放緩存的內存閾值。當PTC分配內存次數大於這個值時會釋放緩存的內存。使用-Dio.netty.allocator.cacheTrimInterval設置,默認值:8192。spa
DEFAULT_USE_CACHE_FOR_ALL_THREADS: 是否對全部的線程使用緩存。使用-Dio.netty.allocator.useCacheForAllThreads設置,默認值:true。
DEFAULT_DIRECT_MEMORY_CACHE_ALIGNMENT: 直接內存的對齊參數,分配直接內存的大小必須是它的整數倍。使用-Dio.netty.allocator.directMemoryCacheAlignment設置,默認值:0, 表示不對齊。
PooledByteBufAllocator維護了兩個數組:
PoolArena<byte[]>[] heapArenas; PoolArena<ByteBuffer>[] directArenas;
heapArenas用來管理堆內存,directArenas用來管理直接內存。這兩個數組在構造方法中初始化,構造方法的定義是:
public PooledByteBufAllocator(boolean preferDirect, int nHeapArena, int nDirectArena, int pageSize, int maxOrder, int tinyCacheSize, int smallCacheSize, int normalCacheSize, boolean useCacheForAllThreads, int directMemoryCacheAlignment)
prefreDirect: 建立PooledByteBuf時,是否優先使用直接內存。
nHeapArena: 默認使用DEFAULT_NUM_HEAP_ARENA。
nDirectArena: 默認使用DEFAULT_NUM_DIRECT_ARENA。
pageSize: 默認使用的DEFAULT_PAGE_SIZE。
maxOrder: 默認使用DEFAULT_MAX_ORDER。
tinyCacheSize: 默認使用DEFAULT_TINY_CACHE_SIZE。
smallCacheSize: 默認使用DEFAULT_SMALL_CACHE_SIZE。
normalCacheSize: 默認使用DEFAULT_NORMAL_CACHE_SIZE。
useCacheForAllThreads: 默認使用DEFAULT_USE_CACHE_FOR_ALL_THREADS。
directMemoryCacheAlignment: 默認使用DEFAULT_DIRECT_MEMORY_CACHE_ALIGNMENT。
這兩數組的初始化代碼以下:
1 int pageShifts = validateAndCalculatePageShifts(pageSize); 2 3 if (nHeapArena > 0) { 4 heapArenas = newArenaArray(nHeapArena); 5 List<PoolArenaMetric> metrics = new ArrayList<PoolArenaMetric>(heapArenas.length); 6 for (int i = 0; i < heapArenas.length; i ++) { 7 PoolArena.HeapArena arena = new PoolArena.HeapArena(this, 8 pageSize, maxOrder, pageShifts, chunkSize, 9 directMemoryCacheAlignment); 10 heapArenas[i] = arena; 11 metrics.add(arena); 12 } 13 heapArenaMetrics = Collections.unmodifiableList(metrics); 14 } else { 15 heapArenas = null; 16 heapArenaMetrics = Collections.emptyList(); 17 } 18 19 if (nDirectArena > 0) { 20 directArenas = newArenaArray(nDirectArena); 21 List<PoolArenaMetric> metrics = new ArrayList<PoolArenaMetric>(directArenas.length); 22 for (int i = 0; i < directArenas.length; i ++) { 23 PoolArena.DirectArena arena = new PoolArena.DirectArena( 24 this, pageSize, maxOrder, pageShifts, chunkSize, directMemoryCacheAlignment); 25 directArenas[i] = arena; 26 metrics.add(arena); 27 } 28 directArenaMetrics = Collections.unmodifiableList(metrics); 29 } else { 30 directArenas = null; 31 directArenaMetrics = Collections.emptyList(); 32 }
1行,計算pageShifts,算法是pageShifts = Integer.SIZE - 1 - Integer.numberOfLeadingZeros(pageSize) = 31 - Integer.numberOfLeadingZeros(pageSize)。 Integer.numberOfLeadingZeros(pageSize)是pageSize(32位整數)從最高位起連續是0的位數,所以pageShifts能夠簡化爲pageShifts = log2(pageSize)。
4,20行,建立數組,new PoolArena[size]。
6-12,22-17行, 初始化數組中的PoolArena對象,分別使用PooArena的兩個內部類: HeapArena, DirectArena。
PoolThreadCache使用PoolThreadLocalCache(PTLC)間接初始化,PTLC是PooledByteBufAllocator的內部內,它的定義以下:
final class PoolThreadLocalCache extends FastThreadLocal<PoolThreadCache>
這個類派生自io.netty.util.concurrent.FastThreadLocal<T>, 和java.lang.ThreadLocal<T>功能同樣,實現了線程本地存儲(TLS)的功能,不一樣的是FastThreadLocal<T>優化了訪問性能。PTLC覆蓋了父類的initialValue方法,這個方法負責初始化線程本地的PoolThreadCache對象。當第一次調用PTLC對象的get方法時,這個方法會被調用。
1 @Override 2 protected synchronized PoolThreadCache initialValue() { 3 final PoolArena<byte[]> heapArena = leastUsedArena(heapArenas); 4 final PoolArena<ByteBuffer> directArena = leastUsedArena(directArenas); 5 6 if (useCacheForAllThreads || Thread.currentThread() instanceof FastThreadLocalThread) { 7 return new PoolThreadCache( 8 heapArena, directArena, tinyCacheSize, smallCacheSize, normalCacheSize, 9 DEFAULT_MAX_CACHED_BUFFER_CAPACITY, DEFAULT_CACHE_TRIM_INTERVAL); 10 } 11 // No caching for non FastThreadLocalThreads. 12 return new PoolThreadCache(heapArena, directArena, 0, 0, 0, 0, 0); 13 }
3,4行,分別從headArenas,directArenas中取出一個使用次數最少的PoolArena對象。PoolArena有一個numThreadCaches屬性,這個屬性是AtomicInteger類型的原子變量。它的做用是在用來記錄被PoolThreadCache對象使用的次數。PoolThreadCache對象建立時會在構造方法中會調用它的getAndIncrement方法,釋放時在free0方法中調用他的getAndDecrement方法。
6行, 若是運行每一個線程都使用緩存(userCacheForAllThreads==true),或者當成線程對象是FastThreadLocalThread時, 在第8行建立一個線程專用的PTC對象。
PoolChunkList<T> nextList
PoolChunkList<T> prevList
這兩個屬性代表PCKL對象是一個雙向鏈表的節點。
PoolChunk<T> head
這個屬性代表PCKL對象還維護的一個PCK類型的鏈表,head指向這個鏈表的頭。
int minUsage;
int maxUsage;
int maxCapacity;
minUsage是PCK鏈表中每一個PCK對象內存的最小使用率,maxUseage是PCK的最大使用率。這兩個值是百分比,例如:minUsage=10, maxUse=50,表示PCK鏈表中只能保存使用率在[10%,50%)的PCK對象。 maxCapacity表示PCK最大可分配的內存數,算法是: maxCapacity = (int)(chunkSize * (100L - minUseage) / 100L)。
PCKL鏈表有PoolArena負責維護,在PoolArena的構造方法中初始化:
1 // io.netty.buffer.PoolArena#PoolArena(PooledByteBufAllocator parent, int pageSize, 2 // int maxOrder, int pageShifts, int chunkSize, int cacheAlignment) 3 4 q100 = new PoolChunkList<T>(this, null, 100, Integer.MAX_VALUE, chunkSize); 5 q075 = new PoolChunkList<T>(this, q100, 75, 100, chunkSize); 6 q050 = new PoolChunkList<T>(this, q075, 50, 100, chunkSize); 7 q025 = new PoolChunkList<T>(this, q050, 25, 75, chunkSize); 8 q000 = new PoolChunkList<T>(this, q025, 1, 50, chunkSize); 9 qInit = new PoolChunkList<T>(this, q000, Integer.MIN_VALUE, 25, chunkSize); 10 11 q100.prevList(q075); 12 q075.prevList(q050); 13 q050.prevList(q025); 14 q025.prevList(q000); 15 q000.prevList(null); 16 qInit.prevList(qInit);
4-9行,初始化PCKL節點。每一個節點的名字q{num},其中num表示這個節點的最小使用率minUsage,如q075節點的minUsage=%75。
11-16行,把PCKL節點組裝成一個鏈表。
使用q(minUsage, maxUsage)表示一個節點,那麼:
qInit = q(Integer.MIN_VALUE, 25%)
q000 = q(1%, 50%)
q025 = q(25%, 75%)
q075 = q(75%, 100%)
q100 = q(100%, Integer.MAX_VALUE)
這個鏈表的結構以下圖所示:
一個新建立的PCK對象,它的內存使用率是usage=%0,被放進qInit節節點。每次從這個PCK對象中分配內存,都會致使它的使用率增長,當usage>=25%,即大於等於qInit的maxUsage時,會把它移動到q000中。繼續從PCK對象中分配內存,它的usage繼續增長,當usage大於等於它所屬PCKL的maxUsage時,把它移動到PKCL鏈表中的下一個節點,直到q100爲止。下面是內存分配致使PCK移動的代碼:
1 //io.netty.buffer.PoolChunkList#allocate 2 boolean allocate(PooledByteBuf<T> buf, int reqCapacity, int normCapacity) { 3 if (head == null || normCapacity > maxCapacity) { 4 // Either this PoolChunkList is empty or the requested capacity is larger then the capacity which can 5 // be handled by the PoolChunks that are contained in this PoolChunkList. 6 return false; 7 } 8 9 for (PoolChunk<T> cur = head;;) { 10 long handle = cur.allocate(normCapacity); 11 if (handle < 0) { 12 cur = cur.next; 13 if (cur == null) { 14 return false; 15 } 16 } else { 17 cur.initBuf(buf, handle, reqCapacity); 18 if (cur.usage() >= maxUsage) { 19 remove(cur); 20 nextList.add(cur); 21 } 22 return true; 23 } 24 } 25 }
9-12行,嘗試從PCK鏈表中的全部PCK節點分配所需的內存。
14行,沒有找到能分配內存的PCK節點。
17行,從cur節點分配到所需的內存,並初始化PooledByteBuf對象。
18-21行,如cur節點的使用率大於等於當前PCKL節點maxUsage,調用remove方法把cur從head鏈表中刪除,而後調用PCKL鏈表中的下一個節點的add方法,把cur移動到下一個節點中。
若是持續地釋放內存,把內存還給PCK對象,會致使usage持續減少,當usage小於它所屬的PCKL的minUsage時,把它移動到PCKL鏈表中的前一個節點,直到q000位爲止。當釋放內存致使PCK對象的usage等於%0,會銷燬這個PCK對象,釋放整個chunk的內存。下面是釋放內存致使PCK對象移動的代碼:
1 //io.netty.buffer.PoolChunkList#free 2 boolean free(PoolChunk<T> chunk, long handle) { 3 chunk.free(handle); 4 if (chunk.usage() < minUsage) { 5 remove(chunk); 6 // Move the PoolChunk down the PoolChunkList linked-list. 7 return move0(chunk); 8 } 9 return true; 10 } 11 12 //io.netty.buffer.PoolChunkList#move0 13 private boolean move0(PoolChunk<T> chunk) { 14 if (prevList == null) { 15 // There is no previous PoolChunkList so return false which result in having the PoolChunk destroyed and 16 // all memory associated with the PoolChunk will be released. 17 assert chunk.usage() == 0; 18 return false; 19 } 20 return prevList.move(chunk); 21 }
第3行,釋放內存,把內存返還給PCK對象。
4-7行,如PCK的使用率小於當前PCKL的minUsage,調用remove方法把PCK對象從當前PCKL對象中刪除,而後調用move0方法把它移動到前一個PCKL節點。
13-31行,移動PCK到前一個PCKL。
入口方法:
io.netty.buffer.AbstractByteBufAllocator#heapBuffer(int, int),建立使用堆內存的ByteBuf, 調用newHeapBuffer方法。
io.netty.buffer.AbstractByteBufAllocator#directBuffer(int, int), 建立使用直接內存的ByteBuf, 調用newDirectBuffer方法。
具體實現:
io.netty.buffer.PooledByteBufAllocator#newHeapBuffer(int initialCapacity, int maxCapacity)。
io.netty.buffer.PooledByteBufAllocator#newDirectBuffer(int initialCapacity, int maxCapacity)。
這兩個方法都是從PoolThreadCache對象中獲得線程專用的PoolArena對象,而後調用PoolArena的allocate方法建立PoolByteBuf對象。
PoolArena入口方法:
io.netty.buffer.PoolArena#allocate(io.netty.buffer.PoolThreadCache, int, int),這個方法是PoolArena分配內存,建立PoolByteBuf對象的入口方法。它先調用子類實現的newByteBuf建立一個PoolByteBuf對象,這個方法有兩個實現:
io.netty.buffer.PoolArena.HeapArena#newByteBuf(int maxCapacity),建立使用堆內存的PooledByteBuf對象。
io.netty.buffer.PoolArena.DirectArena#newByteBuf(int maxCapacity),建立使用直接內存PooledByteBuf對象。
而後調用io.netty.buffer.PoolArena#allocate(io.netty.buffer.PoolThreadCache, io.netty.buffer.PooledByteBuf<T>, int)方法爲PoolByteBuf對象分配內存,這個方法是分配內存的核心方法,下面來重點分析一下它的代碼:
1 private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) { 2 final int normCapacity = normalizeCapacity(reqCapacity); 3 if (isTinyOrSmall(normCapacity)) { // capacity < pageSize 4 int tableIdx; 5 PoolSubpage<T>[] table; 6 boolean tiny = isTiny(normCapacity); 7 if (tiny) { // < 512 8 if (cache.allocateTiny(this, buf, reqCapacity, normCapacity)) { 9 // was able to allocate out of the cache so move on 10 return; 11 } 12 tableIdx = tinyIdx(normCapacity); 13 table = tinySubpagePools; 14 } else { 15 if (cache.allocateSmall(this, buf, reqCapacity, normCapacity)) { 16 // was able to allocate out of the cache so move on 17 return; 18 } 19 tableIdx = smallIdx(normCapacity); 20 table = smallSubpagePools; 21 } 22 23 final PoolSubpage<T> head = table[tableIdx]; 24 25 /** 26 * Synchronize on the head. This is needed as {@link PoolChunk#allocateSubpage(int)} and 27 * {@link PoolChunk#free(long)} may modify the doubly linked list as well. 28 */ 29 synchronized (head) { 30 final PoolSubpage<T> s = head.next; 31 if (s != head) { 32 assert s.doNotDestroy && s.elemSize == normCapacity; 33 long handle = s.allocate(); 34 assert handle >= 0; 35 s.chunk.initBufWithSubpage(buf, handle, reqCapacity); 36 incTinySmallAllocation(tiny); 37 return; 38 } 39 } 40 synchronized (this) { 41 allocateNormal(buf, reqCapacity, normCapacity); 42 } 43 44 incTinySmallAllocation(tiny); 45 return; 46 } 47 if (normCapacity <= chunkSize) { 48 if (cache.allocateNormal(this, buf, reqCapacity, normCapacity)) { 49 // was able to allocate out of the cache so move on 50 return; 51 } 52 synchronized (this) { 53 allocateNormal(buf, reqCapacity, normCapacity); 54 ++allocationsNormal; 55 } 56 } else { 57 // Huge allocations are never served via the cache so just call allocateHuge 58 allocateHuge(buf, reqCapacity); 59 } 60 }
第2行,根據須要的內存大小reqCapacity,計算能夠分配的標準內存大小normCapacity。必須知足(1)normCapacity>=reqCapacity, (2)normCapacity是directMemoryCacheAlignment的整數倍,此外,還要根據reqCapacity的大小分3中狀況:
reqCapacity>=chunkSize:normCapacity取同時知足(1),(2)的最小值。
reqCapacity>=512且reqCapacity<chunkSize: (3)normCapacity>=512*2k, (4)normCapacity<=chunkSize,normCapacit取同時知足(1),(2),(3),(4)的最小值。
reqCapacity<412: (5)normCapacity<512, (6)normCapacity是16的整數倍,normCapacity取同時知足(1),(2),(5),(6)的最小值。
8-13行,分配Tiny類型的內存(<512)。 8-10行,若是PoolThreadCache緩存對象中分配到內存,分配內流程結束。12-13行,若是緩存中沒有,就從Tiny內存池中分配一塊內存。
15-20行,分配Small類型的內存(>=512且<pageSize)。和分配Tiny內存的邏輯相同。
29-27行, 使用從前兩個步驟中獲得的Tiny或Small內存的索引,從子頁面池中分配一塊內存。33行,從子頁面中分配內存。35行,使用分配到的內存初始化PoolByteBuf對象,若是能到這裏,分配內存流程結束。
41行,若是子頁面池中尚未內存可用,調用allocateNormal方法從PoolChunk對象中分配一個子頁面,再從子頁面中分配所需的內存。
47-55行,分配Normal類型的內存(>=pageSize且<chunkSize)。48,49行,從緩存中分配內存,若是成功,分配內存流程結束。53行,緩存中沒有可用的內存,調用allocateNormal方法從PoolChunk中分配內存。
58行,若是分配的是>chunkSize的內存。這塊內存不會進入PCKL鏈表中。
上面代碼中的allocateNormal方法封裝了建立PCK對象,從PCK對象中分配內存,再把PCK對象放入到PCKL鏈表中的邏輯,也是十分重要的代碼。
1 private void allocateNormal(PooledByteBuf<T> buf, int reqCapacity, int normCapacity) { 2 if (q050.allocate(buf, reqCapacity, normCapacity) || q025.allocate(buf, reqCapacity, normCapacity) || 3 q000.allocate(buf, reqCapacity, normCapacity) || qInit.allocate(buf, reqCapacity, normCapacity) || 4 q075.allocate(buf, reqCapacity, normCapacity)) { 5 return; 6 } 7 8 // Add a new chunk. 9 PoolChunk<T> c = newChunk(pageSize, maxOrder, pageShifts, chunkSize); 10 long handle = c.allocate(normCapacity); 11 assert handle > 0; 12 c.initBuf(buf, handle, reqCapacity); 13 qInit.add(c); 14 }
2-5行,依次嘗試從每一個PCKL節點中分配內存,若是成功,分配內存流程結束。
9-13行,先建立一個新的PCK對象,而後從中分配內存,使用內存初始化PooledByteBuf對象,最後把PCK對象添加PCKL鏈表頭節點qInit中。PKCL對象的add方法會和allocate同樣,根據PCK對象的內存使用率,把它移動到鏈表中合適的位置。
io.netty.buffer.PooledByteBuf#deallocate方法調用io.netty.buffer.PoolArena#free方法,這個free方法負責整個內存釋放過程。
1 void free(PoolChunk<T> chunk, long handle, int normCapacity, PoolThreadCache cache) { 2 if (chunk.unpooled) { 3 int size = chunk.chunkSize(); 4 destroyChunk(chunk); 5 activeBytesHuge.add(-size); 6 deallocationsHuge.increment(); 7 } else { 8 SizeClass sizeClass = sizeClass(normCapacity); 9 if (cache != null && cache.add(this, chunk, handle, normCapacity, sizeClass)) { 10 // cached so not free it. 11 return; 12 } 13 14 freeChunk(chunk, handle, sizeClass); 15 } 16 }
這段代碼重點在8-14行。第8,9行,優先把內存放到緩存中,這樣下次就能快速地從緩存中直接取用。第14行,在不能放進緩存的狀況下把內存返回給PCK對象。
1 void freeChunk(PoolChunk<T> chunk, long handle, SizeClass sizeClass) { 2 final boolean destroyChunk; 3 synchronized (this) { 4 switch (sizeClass) { 5 case Normal: 6 ++deallocationsNormal; 7 break; 8 case Small: 9 ++deallocationsSmall; 10 break; 11 case Tiny: 12 ++deallocationsTiny; 13 break; 14 default: 15 throw new Error(); 16 } 17 destroyChunk = !chunk.parent.free(chunk, handle); 18 } 19 if (destroyChunk) { 20 // destroyChunk not need to be called while holding the synchronized lock. 21 destroyChunk(chunk); 22 } 23 }
第17行,掉用PCKL對象的free方法把內存還給PCK對象,移動PCK對象在PCKL鏈表中位置。若是此時這個PCK對象的使用率變成0,destroyChunk=true。
第21行,調用destroyChunk方法銷燬掉PCK對象。
原文出處:https://www.cnblogs.com/brandonli/p/11649263.html