歡迎關注公衆號:【 愛編碼】
若是有須要後臺回覆 2019贈送 1T的學習資料哦!!
全部的網路通訊都涉及字節序列的移動,因此高效易用的數據結構明顯是必不可少的。Netty的ByteBuf實現知足並超越了這些需求。html
ByteBuf維護了兩個不一樣的索引:一個是用於讀取,一個用於寫入。當你從ByteBuf讀取是,它的readerIndex
將會被遞增已經被讀取的字節數。一樣地,當你寫入ByteBuf時,它的witerIndex
也會被遞增。api
做爲一個容器,源碼中的以下。有三塊區域
discardable bytes:無效空間(已經讀取過的空間),可丟棄字節的區域,由readerIndex指針控制
readable bytes:內容空間,可讀字節的區域,由readerIndex和writerIndex指針控制控制
writable bytes:空閒空間,可寫入字節的區域,由writerIndex指針和capacity容量控制數組
* <pre> * +-------------------+------------------+------------------+ * | discardable bytes | readable bytes | writable bytes | * | | (CONTENT) | | * +-------------------+------------------+------------------+ * | | | | * 0 <= readerIndex <= writerIndex <= capacity * </pre>
整體分類劃分是可根據JVM堆內存來區分的。緩存
1.堆內內存(JVM堆空間內)
2.堆外內存(本機直接內存)
3.複合緩衝區(以上2種緩衝區多個混合)
最經常使用的ByteBuf模式是將數據存儲在JVM的堆空間中。它能在沒有使用池化的狀況下提供快速的分配和釋放。安全
JDK容許JVM實現經過本地調用來分配內存。主要是爲了不每次調用本地I/O操做以前(或者以後)將緩衝區的內容複製到一箇中間緩衝區(或者從中間緩衝區把內容複製到緩衝區)。
**最大的特色:它的內容將駐留在常規的會被垃圾回收的堆以外。
最大的缺點:相對於堆緩衝區,它的分配和釋放都是較爲昂貴的。**數據結構
經常使用類:CompositeByteBuf,它爲多個ByteBuf提供一個聚合視圖,將多個緩衝區表示爲單個合併緩衝區的虛擬表示。
好比:HTTP協議:頭部和主體這兩部分由應用程序的不一樣模塊產生。這個時候把這兩部分合並的話,選擇CompositeByteBuf是比較好的。jvm
主要分爲三大類ide
Pooled和Unpooled (池化)
unsafe和非unsafe ()
Heap和Direct (堆內和堆外)
Pooled和Unpooled
Pooled:每次都從預先分配好的內存中去取出一段連續內存封裝成一個ByteBuf給應用程序使用
Unpooled:每次分配內存的時候,直接調用系統api,向操做系統申請一
塊內存學習
Heap和Direct:
Head:是調用jvm的堆內存進行分配的,須要被gc進行管理
Direct:是調用jdk的api進行內存分配,不受jvm控制,不會參與到gc的過程大數據
Unsafe和非Unsafe
jdk中有Unsafe對象能夠直接拿到對象的內存地址,而且基於這個內存地址進行讀寫操做。那麼對應的分類的區別就是是否能夠拿到jdk底層的Unsafe進行讀寫操做了。
這個接口實現負責分配緩衝區而且是線程安全的。從下面的接口方法以及註釋能夠總結出主要是圍繞上面的三種ByteBuf內存模式:堆內,堆外以及複合型的內存分配。
/** * Implementations are responsible to allocate buffers. Implementations of this interface are expected to be * thread-safe. */ public interface ByteBufAllocator { ByteBufAllocator DEFAULT = ByteBufUtil.DEFAULT_ALLOCATOR; /** * Allocate a {@link ByteBuf}. If it is a direct or heap buffer * depends on the actual implementation. */ ByteBuf buffer(); /** * Allocate a {@link ByteBuf} with the given initial capacity. * If it is a direct or heap buffer depends on the actual implementation. */ ByteBuf buffer(int initialCapacity); /** * Allocate a {@link ByteBuf} with the given initial capacity and the given * maximal capacity. If it is a direct or heap buffer depends on the actual * implementation. */ ByteBuf buffer(int initialCapacity, int maxCapacity); /** * Allocate a {@link ByteBuf}, preferably a direct buffer which is suitable for I/O. */ ByteBuf ioBuffer(); /** * Allocate a {@link ByteBuf}, preferably a direct buffer which is suitable for I/O. */ ByteBuf ioBuffer(int initialCapacity); /** * Allocate a {@link ByteBuf}, preferably a direct buffer which is suitable for I/O. */ ByteBuf ioBuffer(int initialCapacity, int maxCapacity); /** * Allocate a heap {@link ByteBuf}. */ ByteBuf heapBuffer(); /** * Allocate a heap {@link ByteBuf} with the given initial capacity. */ ByteBuf heapBuffer(int initialCapacity); /** * Allocate a heap {@link ByteBuf} with the given initial capacity and the given * maximal capacity. */ ByteBuf heapBuffer(int initialCapacity, int maxCapacity); /** * Allocate a direct {@link ByteBuf}. */ ByteBuf directBuffer(); /** * Allocate a direct {@link ByteBuf} with the given initial capacity. */ ByteBuf directBuffer(int initialCapacity); /** * Allocate a direct {@link ByteBuf} with the given initial capacity and the given * maximal capacity. */ ByteBuf directBuffer(int initialCapacity, int maxCapacity); /** * Allocate a {@link CompositeByteBuf}. * If it is a direct or heap buffer depends on the actual implementation. */ CompositeByteBuf compositeBuffer(); /** * Allocate a {@link CompositeByteBuf} with the given maximum number of components that can be stored in it. * If it is a direct or heap buffer depends on the actual implementation. */ CompositeByteBuf compositeBuffer(int maxNumComponents); /** * Allocate a heap {@link CompositeByteBuf}. */ CompositeByteBuf compositeHeapBuffer(); /** * Allocate a heap {@link CompositeByteBuf} with the given maximum number of components that can be stored in it. */ CompositeByteBuf compositeHeapBuffer(int maxNumComponents); /** * Allocate a direct {@link CompositeByteBuf}. */ CompositeByteBuf compositeDirectBuffer(); /** * Allocate a direct {@link CompositeByteBuf} with the given maximum number of components that can be stored in it. */ CompositeByteBuf compositeDirectBuffer(int maxNumComponents); /** * Returns {@code true} if direct {@link ByteBuf}'s are pooled */ boolean isDirectBufferPooled(); /** * Calculate the new capacity of a {@link ByteBuf} that is used when a {@link ByteBuf} needs to expand by the * {@code minNewCapacity} with {@code maxCapacity} as upper-bound. */ int calculateNewCapacity(int minNewCapacity, int maxCapacity); }
其中ByteBufAllocator 的具體實現能夠查看其子類,以下圖
下面來看看各自子類的功能以及區別
heap內存分配
入口new InstrumentedUnpooledUnsafeHeapByteBuf(this, initialCapacity, maxCapacity)
:
發現分配Unpooled、Unsafe、Heap
內存,實際上是分配了一個byte數組,並保存在UnpooledHeapByteBuf#array
成員變量中。該內存的初始值容量和最大可擴展容量能夠指定。
public UnpooledHeapByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) { super(maxCapacity); checkNotNull(alloc, "alloc"); if (initialCapacity > maxCapacity) { throw new IllegalArgumentException(String.format( "initialCapacity(%d) > maxCapacity(%d)", initialCapacity, maxCapacity)); } this.alloc = alloc; setArray(allocateArray(initialCapacity)); setIndex(0, 0); } protected byte[] allocateArray(int initialCapacity) { return new byte[initialCapacity]; }
查看UnpooledHeapByteBuf#getByte()
方法,堆內存類型的ByteBuf獲取的時候。直接經過下標獲取byte數組中的byte
@Override public byte getByte(int index) { ensureAccessible(); return _getByte(index); } @Override protected byte _getByte(int index) { //該array爲初始化的時候,實例化的byte[] return HeapByteBufUtil.getByte(array, index); } static byte getByte(byte[] memory, int index) { //直接拿到一個數組 return memory[index]; }
direct內存分配
入口UnpooledByteBufAllocator#newDirectBuffer() --> UnpooledUnsafeDirectByteBuf
public UnpooledUnsafeDirectByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) { super(maxCapacity); if (alloc == null) { throw new NullPointerException("alloc"); } checkPositiveOrZero(initialCapacity, "initialCapacity"); checkPositiveOrZero(maxCapacity, "maxCapacity"); if (initialCapacity > maxCapacity) { throw new IllegalArgumentException(String.format( "initialCapacity(%d) > maxCapacity(%d)", initialCapacity, maxCapacity)); } this.alloc = alloc; setByteBuffer(allocateDirect(initialCapacity), false); } protected ByteBuffer allocateDirect(int initialCapacity) { return ByteBuffer.allocateDirect(initialCapacity); } final void setByteBuffer(ByteBuffer buffer, boolean tryFree) { if (tryFree) { ByteBuffer oldBuffer = this.buffer; if (oldBuffer != null) { if (doNotFree) { doNotFree = false; } else { freeDirect(oldBuffer); } } } this.buffer = buffer; memoryAddress = PlatformDependent.directBufferAddress(buffer); tmpNioBuf = null; capacity = buffer.remaining(); }
能夠發現,Unpooled、Direct類型得內存分配其實是維護了一個底層jdk的一個DirectByteBuffer。分配內存的時候就建立它,並將他保存到buffer成員變量。
跟蹤iUnpooledHeapByteBuf#_getByte()
,就比較簡單了,直接使用jdk的api獲取
@Override protected byte _getByte(int index) { //使用buffer return buffer.get(index); }
更加詳細的分析能夠查看下面這篇文章
https://www.jianshu.com/p/158...
入口PooledByteBufAllocator#newHeapBuffer()
和PooledByteBufAllocator#newDirectBuffer()
,堆內內存和堆外內存分配的模式都比較固定。
1.拿到線程局部緩存PoolThreadCache
2.拿到不一樣類型的rena
3.使用不一樣類型的arena進行內存分配
@Override protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) { //拿到線程局部緩存 PoolThreadCache cache = threadCache.get(); //拿到heapArena PoolArena<byte[]> heapArena = cache.heapArena; final ByteBuf buf; if (heapArena != null) { //使用heapArena分配內存 buf = heapArena.allocate(cache, initialCapacity, maxCapacity); } else { buf = PlatformDependent.hasUnsafe() ? new UnpooledUnsafeHeapByteBuf(this, initialCapacity, maxCapacity) : new UnpooledHeapByteBuf(this, initialCapacity, maxCapacity); } return toLeakAwareBuffer(buf); } @Override protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) { //拿到線程局部緩存 PoolThreadCache cache = threadCache.get(); //拿到directArena PoolArena<ByteBuffer> directArena = cache.directArena; final ByteBuf buf; if (directArena != null) { //使用directArena分配內存 buf = directArena.allocate(cache, initialCapacity, maxCapacity); } else { buf = PlatformDependent.hasUnsafe() ? UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity) : new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity); } return toLeakAwareBuffer(buf); }
跟蹤threadCache.get()
調用的是FastThreadLocal#get()
方法。那麼其實threadCache也是一個FastThreadLocal,能夠當作是jdk的ThreadLocal,get方法調用了初始化方法initializel
public final V get() { InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get(); Object v = threadLocalMap.indexedVariable(index); if (v != InternalThreadLocalMap.UNSET) { return (V) v; } //調用初始化方法 V value = initialize(threadLocalMap); registerCleaner(threadLocalMap); return value; }
initialValue()
方法的邏輯以下
1.從預先準備好的heapArenas
和directArenas
中獲取最少使用的arena
2.使用獲取到的arean
爲參數,實例化一個PoolThreadCache
並返回
final class PoolThreadLocalCache extends FastThreadLocal<PoolThreadCache> { private final boolean useCacheForAllThreads; PoolThreadLocalCache(boolean useCacheForAllThreads) { this.useCacheForAllThreads = useCacheForAllThreads; } @Override protected synchronized PoolThreadCache initialValue() { /** * arena翻譯成競技場,關於內存非配的邏輯都在這個競技場中進行分配 */ //獲取heapArena:從heapArenas堆內競技場中拿出使用最少的一個arena final PoolArena<byte[]> heapArena = leastUsedArena(heapArenas); //獲取directArena:從directArena堆內競技場中拿出使用最少的一個arena final PoolArena<ByteBuffer> directArena = leastUsedArena(directArenas); Thread current = Thread.currentThread(); if (useCacheForAllThreads || current instanceof FastThreadLocalThread) { //建立PoolThreadCache:該Cache最終被一個線程使用 //經過heapArena和directArena維護兩大塊內存:堆和堆外內存 //經過tinyCacheSize,smallCacheSize,normalCacheSize維護ByteBuf緩存列表維護反覆使用的內存塊 return new PoolThreadCache( heapArena, directArena, tinyCacheSize, smallCacheSize, normalCacheSize, DEFAULT_MAX_CACHED_BUFFER_CAPACITY, DEFAULT_CACHE_TRIM_INTERVAL); } // No caching so just use 0 as sizes. return new PoolThreadCache(heapArena, directArena, 0, 0, 0, 0, 0); } //省略代碼...... }
查看PoolThreadCache
其維護了兩種類型的內存分配策略,一種是上述經過持有heapArena
和directArena
,另外一種是經過維護tiny,small,normal
對應的緩存列表來維護反覆使用的內存。
final class PoolThreadCache { private static final InternalLogger logger = InternalLoggerFactory.getInstance(PoolThreadCache.class); //經過arena的方式維護內存 final PoolArena<byte[]> heapArena; final PoolArena<ByteBuffer> directArena; //維護了tiny, small, normal三種類型的緩存列表 // Hold the caches for the different size classes, which are tiny, small and normal. private final MemoryRegionCache<byte[]>[] tinySubPageHeapCaches; private final MemoryRegionCache<byte[]>[] smallSubPageHeapCaches; private final MemoryRegionCache<ByteBuffer>[] tinySubPageDirectCaches; private final MemoryRegionCache<ByteBuffer>[] smallSubPageDirectCaches; private final MemoryRegionCache<byte[]>[] normalHeapCaches; private final MemoryRegionCache<ByteBuffer>[] normalDirectCaches; // Used for bitshifting when calculate the index of normal caches later private final int numShiftsNormalDirect; private final int numShiftsNormalHeap; private final int freeSweepAllocationThreshold; private final AtomicBoolean freed = new AtomicBoolean(); private int allocations; // TODO: Test if adding padding helps under contention //private long pad0, pad1, pad2, pad3, pad4, pad5, pad6, pad7; PoolThreadCache(PoolArena<byte[]> heapArena, PoolArena<ByteBuffer> directArena, int tinyCacheSize, int smallCacheSize, int normalCacheSize, int maxCachedBufferCapacity, int freeSweepAllocationThreshold) { checkPositiveOrZero(maxCachedBufferCapacity, "maxCachedBufferCapacity"); this.freeSweepAllocationThreshold = freeSweepAllocationThreshold; //經過持有heapArena和directArena,arena的方式管理內存分配 this.heapArena = heapArena; this.directArena = directArena; //經過tinyCacheSize,smallCacheSize,normalCacheSize建立不一樣類型的緩存列表並保存到成員變量 if (directArena != null) { tinySubPageDirectCaches = createSubPageCaches( tinyCacheSize, PoolArena.numTinySubpagePools, SizeClass.Tiny); smallSubPageDirectCaches = createSubPageCaches( smallCacheSize, directArena.numSmallSubpagePools, SizeClass.Small); numShiftsNormalDirect = log2(directArena.pageSize); normalDirectCaches = createNormalCaches( normalCacheSize, maxCachedBufferCapacity, directArena); directArena.numThreadCaches.getAndIncrement(); } else { // No directArea is configured so just null out all caches tinySubPageDirectCaches = null; smallSubPageDirectCaches = null; normalDirectCaches = null; numShiftsNormalDirect = -1; } if (heapArena != null) { // Create the caches for the heap allocations //建立規格化緩存隊列 tinySubPageHeapCaches = createSubPageCaches( tinyCacheSize, PoolArena.numTinySubpagePools, SizeClass.Tiny); //建立規格化緩存隊列 smallSubPageHeapCaches = createSubPageCaches( smallCacheSize, heapArena.numSmallSubpagePools, SizeClass.Small); numShiftsNormalHeap = log2(heapArena.pageSize); //建立規格化緩存隊列 normalHeapCaches = createNormalCaches( normalCacheSize, maxCachedBufferCapacity, heapArena); heapArena.numThreadCaches.getAndIncrement(); } else { // No heapArea is configured so just null out all caches tinySubPageHeapCaches = null; smallSubPageHeapCaches = null; normalHeapCaches = null; numShiftsNormalHeap = -1; } // Only check if there are caches in use. if ((tinySubPageDirectCaches != null || smallSubPageDirectCaches != null || normalDirectCaches != null || tinySubPageHeapCaches != null || smallSubPageHeapCaches != null || normalHeapCaches != null) && freeSweepAllocationThreshold < 1) { throw new IllegalArgumentException("freeSweepAllocationThreshold: " + freeSweepAllocationThreshold + " (expected: > 0)"); } } private static <T> MemoryRegionCache<T>[] createSubPageCaches( int cacheSize, int numCaches, SizeClass sizeClass) { if (cacheSize > 0 && numCaches > 0) { //MemoryRegionCache 維護緩存的一個對象 @SuppressWarnings("unchecked") MemoryRegionCache<T>[] cache = new MemoryRegionCache[numCaches]; for (int i = 0; i < cache.length; i++) { // TODO: maybe use cacheSize / cache.length //每一種MemoryRegionCache(tiny,small,normal)都表示不一樣內存大小(不一樣規格)的一個隊列 cache[i] = new SubPageMemoryRegionCache<T>(cacheSize, sizeClass); } return cache; } else { return null; } } private static <T> MemoryRegionCache<T>[] createNormalCaches( int cacheSize, int maxCachedBufferCapacity, PoolArena<T> area) { if (cacheSize > 0 && maxCachedBufferCapacity > 0) { int max = Math.min(area.chunkSize, maxCachedBufferCapacity); int arraySize = Math.max(1, log2(max / area.pageSize) + 1); //MemoryRegionCache 維護緩存的一個對象 @SuppressWarnings("unchecked") MemoryRegionCache<T>[] cache = new MemoryRegionCache[arraySize]; for (int i = 0; i < cache.length; i++) { //每一種MemoryRegionCache(tiny,small,normal)都表示不一樣內存(不一樣規格)大小的一個隊列 cache[i] = new NormalMemoryRegionCache<T>(cacheSize); } return cache; } else { return null; } } ...... }
更加詳細分析可參考如下文章
https://www.jianshu.com/p/1cd...
上一步拿到PoolThreadCache以後,獲取對應的Arena
。那麼以後就是Arena具體分配內存的步驟。
入口PooledByteBufAllocator#newDirectBuffer()
方法種有以下代碼
PooledByteBuf<T> allocate(PoolThreadCache cache, int reqCapacity, int maxCapacity) { //拿到PooledByteBuf對象,僅僅是一個對象 PooledByteBuf<T> buf = newByteBuf(maxCapacity); //從cache種分配內存,並初始化buf種內存地址相關的屬性 allocate(cache, buf, reqCapacity); return buf; }
能夠看到分配的過程以下:拿到PooledByteBuf對象從cache中分配內存,並重置相關屬性.
1.newByteBuf(maxCapacity);
拿到PooledByteBuf對象
@Override protected PooledByteBuf<ByteBuffer> newByteBuf(int maxCapacity) { if (HAS_UNSAFE) { //獲取一個PooledByteBuf return PooledUnsafeDirectByteBuf.newInstance(maxCapacity); } else { return PooledDirectByteBuf.newInstance(maxCapacity); } } static PooledUnsafeDirectByteBuf newInstance(int maxCapacity) { //從帶有回收特性的對象池RECYCLER獲取一個PooledUnsafeDirectByteBuf PooledUnsafeDirectByteBuf buf = RECYCLER.get(); //buf多是從回收站拿出來的,要進行復用 buf.reuse(maxCapacity); return buf; }
2.Recycler是一個基於線程本地堆棧的對象池。Recycler維護了一個ThreadLocal成員變量,用於返回一個stack給回收處理DefaultHandle
,該處理器經過維護這個堆棧來維護PooledUnsafeDirectByteBuf
緩存。
private static final Recycler<PooledUnsafeDirectByteBuf> RECYCLER = new Recycler<PooledUnsafeDirectByteBuf>() { @Override protected PooledUnsafeDirectByteBuf newObject(Handle<PooledUnsafeDirectByteBuf> handle) { //Recycler負責用回收處理器handler維護PooledUnsafeDirectByteBuf //handler底層持有一個stack做爲對象池,維護對象池,handle同時負責對象回收 //存儲handler爲成員變量,使用完該ByteBuf能夠調用回收方法回收 return new PooledUnsafeDirectByteBuf(handle, 0); } };
//維護了一個`ThreadLocal`,`initialValue`方法返回一個堆棧。 private final FastThreadLocal<Stack<T>> threadLocal = new FastThreadLocal<Stack<T>>() { @Override protected Stack<T> initialValue() { return new Stack<T>(Recycler.this, Thread.currentThread(), maxCapacityPerThread, maxSharedCapacityFactor, ratioMask, maxDelayedQueuesPerThread); } @Override protected void onRemoval(Stack<T> value) { // Let us remove the WeakOrderQueue from the WeakHashMap directly if its safe to remove some overhead if (value.threadRef.get() == Thread.currentThread()) { if (DELAYED_RECYCLED.isSet()) { DELAYED_RECYCLED.get().remove(value); } } } };
3.再看Recycler#get()方法
public final T get() { if (maxCapacityPerThread == 0) { return newObject((Handle<T>) NOOP_HANDLE); } //獲取對應的堆棧,至關一個回收站 Stack<T> stack = threadLocal.get(); //從棧頂拿出一個來DefaultHandle(回收處理器) //DefaultHandle持有一個value,實際上是PooledUnsafeDirectByteBuf DefaultHandle<T> handle = stack.pop(); //沒有回收處理器,說明沒有閒置的ByteBuf if (handle == null) { //新增一個處理器 handle = stack.newHandle(); //回調,還記得麼?該回調返回一個PooledUnsafeDirectByteBuf //讓處理器持有一個新的PooledUnsafeDirectByteBuf handle.value = newObject(handle); } //若是有,則可直接重複使用 return (T) handle.value; } public final V get() { InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get(); Object v = threadLocalMap.indexedVariable(index); if (v != InternalThreadLocalMap.UNSET) { return (V) v; } //回調initialize V value = initialize(threadLocalMap); registerCleaner(threadLocalMap); return value; } private V initialize(InternalThreadLocalMap threadLocalMap) { V v = null; try { //回調 v = initialValue(); } catch (Exception e) { PlatformDependent.throwException(e); } threadLocalMap.setIndexedVariable(index, v); addToVariablesToRemove(threadLocalMap, this); return v; } DefaultHandle<T> newHandle() { //實例化一個處理器並而且初四話成員變量,該成員變量stack從threalocal中初始化 return new DefaultHandle<T>(this); }
DefaultHandle用stack
做爲緩存池維護PooledUnsafeDirectByteBuf
,同理PooledDirectByteBuf
也是同樣的。只不過實例化的對象的實現不同而已。同時,處理器定義了回收的方法是將兌現存回棧內,使用的時候則是從棧頂取出。
static final class DefaultHandle<T> implements Handle<T> { private int lastRecycledId; private int recycleId; boolean hasBeenRecycled; //對象緩存池 private Stack<?> stack; private Object value; DefaultHandle(Stack<?> stack) { this.stack = stack; } /** * 定義回收方法,回收對象到stack * @param object */ @Override public void recycle(Object object) { if (object != value) { throw new IllegalArgumentException("object does not belong to handle"); } Stack<?> stack = this.stack; if (lastRecycledId != recycleId || stack == null) { throw new IllegalStateException("recycled already"); } //回收:將本身存進棧中緩存起來 stack.push(this); } }
到這咱們剛剛看完第一步,到第二步重置緩存內指針的時候了 ,獲取到PooledUnsafeDirectByteBuf的時候,有多是從緩存中取出來的。所以須要複用.
static PooledUnsafeDirectByteBuf newInstance(int maxCapacity) { //從帶有回收特性的對象池RECYCLER獲取一個PooledUnsafeDirectByteBuf PooledUnsafeDirectByteBuf buf = RECYCLER.get(); //buf多是從回收站拿出來的,要進行復用 buf.reuse(maxCapacity); return buf; } final void reuse(int maxCapacity) { //重置最大容量 maxCapacity(maxCapacity); //設置引用 setRefCnt(1); //重置指針 setIndex0(0, 0); //重置標記值 discardMarks(); }
到這纔剛剛完成分配內存的第一步(拿到PooledByteBuf對象),以上都是僅僅是獲取而且用回收站和回收處理器管理這些對象,這些對象仍然只是一個對象,尚未分配實際的內存。
跟蹤PoolArena#allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity)
private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) { final int normCapacity = normalizeCapacity(reqCapacity); //不一樣的規格大小進行內存分配 /** * 分配總體邏輯(先判斷tiny和small規格的,再判斷normal規格的) * 1. 嘗試從緩存上進行內存分配,成功則返回 * 2. 失敗則再從內存堆中進行分配內存 */ if (isTinyOrSmall(normCapacity)) { // capacity < pageSize int tableIdx; PoolSubpage<T>[] table; boolean tiny = isTiny(normCapacity); //嘗試tiny和small規格的緩存內存分配 if (tiny) { // < 512 if (cache.allocateTiny(this, buf, reqCapacity, normCapacity)) { // was able to allocate out of the cache so move on return; } tableIdx = tinyIdx(normCapacity); table = tinySubpagePools; } else { if (cache.allocateSmall(this, buf, reqCapacity, normCapacity)) { // was able to allocate out of the cache so move on return; } tableIdx = smallIdx(normCapacity); table = smallSubpagePools; } final PoolSubpage<T> head = table[tableIdx]; /** * Synchronize on the head. This is needed as {@link PoolChunk#allocateSubpage(int)} and * {@link PoolChunk#free(long)} may modify the doubly linked list as well. */ synchronized (head) { final PoolSubpage<T> s = head.next; if (s != head) { assert s.doNotDestroy && s.elemSize == normCapacity; long handle = s.allocate(); assert handle >= 0; s.chunk.initBufWithSubpage(buf, null, handle, reqCapacity); incTinySmallAllocation(tiny); return; } } //tiny和small規格的緩存內存分配嘗試失敗 //從內存堆中分配內存 synchronized (this) { allocateNormal(buf, reqCapacity, normCapacity); } incTinySmallAllocation(tiny); return; } //normal規格 //若是分配處出來的內存大於一個值(chunkSize),則執行allocateHuge if (normCapacity <= chunkSize) { //從緩存上進行內存分配 if (cache.allocateNormal(this, buf, reqCapacity, normCapacity)) { // was able to allocate out of the cache so move on return; } //緩存沒有再從內存堆中分配內存 synchronized (this) { allocateNormal(buf, reqCapacity, normCapacity); ++allocationsNormal; } } else { // Huge allocations are never served via the cache so just call allocateHuge allocateHuge(buf, reqCapacity); } }
其總體分配內存的邏輯是根據不一樣規格大小的內存須要來的,顯示tiny
和small
規格的,再是normal
規格的。分配也是先嚐試從緩存中進行內存分配,若是分配失敗再從內存堆中進行內存分配。 固然,分配出來的內存回和第一步拿到的PooledByteBuf進行綁定起來。
主要學習了ByteBuf 的基本結構、使用模式、分類、基本的內存分配。
下次再學習ByteBuf 的命中邏輯以及內存回收。
https://www.cnblogs.com/xiang...
https://www.jianshu.com/u/fc9...
若是對 Java、大數據感興趣請長按二維碼關注一波,我會努力帶給大家價值。以爲對你哪怕有一丁點幫助的請幫忙點個贊或者轉發哦。
關注公衆號【愛編碼】,回覆2019有相關資料哦。