【Netty】ByteBuf (一)

歡迎關注公衆號:【 愛編碼
若是有須要後臺回覆 2019贈送 1T的學習資料哦!!

簡介

全部的網路通訊都涉及字節序列的移動,因此高效易用的數據結構明顯是必不可少的。Netty的ByteBuf實現知足並超越了這些需求。html

ByteBuf結構

ByteBuf維護了兩個不一樣的索引:一個是用於讀取,一個用於寫入。當你從ByteBuf讀取是,它的readerIndex將會被遞增已經被讀取的字節數。一樣地,當你寫入ByteBuf時,它的witerIndex也會被遞增。api

做爲一個容器,源碼中的以下。有三塊區域
discardable bytes:無效空間(已經讀取過的空間),可丟棄字節的區域,由readerIndex指針控制
readable bytes:內容空間,可讀字節的區域,由readerIndex和writerIndex指針控制控制
writable bytes:空閒空間,可寫入字節的區域,由writerIndex指針和capacity容量控制數組

* <pre>
 *      +-------------------+------------------+------------------+
 *      | discardable bytes |  readable bytes  |  writable bytes  |
 *      |                   |     (CONTENT)    |                  |
 *      +-------------------+------------------+------------------+
 *      |                   |                  |                  |
 *      0      <=      readerIndex   <=   writerIndex    <=    capacity
 * </pre>

ByteBuf使用模式

整體分類劃分是可根據JVM堆內存來區分的。緩存

1.堆內內存(JVM堆空間內)
2.堆外內存(本機直接內存)
3.複合緩衝區(以上2種緩衝區多個混合)

1.堆內內存

最經常使用的ByteBuf模式是將數據存儲在JVM的堆空間中。它能在沒有使用池化的狀況下提供快速的分配和釋放。安全

2.堆外內存

JDK容許JVM實現經過本地調用來分配內存。主要是爲了不每次調用本地I/O操做以前(或者以後)將緩衝區的內容複製到一箇中間緩衝區(或者從中間緩衝區把內容複製到緩衝區)。
**最大的特色:它的內容將駐留在常規的會被垃圾回收的堆以外。
最大的缺點:相對於堆緩衝區,它的分配和釋放都是較爲昂貴的。**數據結構

3.複合緩衝區

經常使用類:CompositeByteBuf,它爲多個ByteBuf提供一個聚合視圖,將多個緩衝區表示爲單個合併緩衝區的虛擬表示。
好比:HTTP協議:頭部和主體這兩部分由應用程序的不一樣模塊產生。這個時候把這兩部分合並的話,選擇CompositeByteBuf是比較好的。jvm

ByteBuf分類

主要分爲三大類ide

Pooled和Unpooled (池化)
unsafe和非unsafe ()
Heap和Direct (堆內和堆外)

Pooled和Unpooled
Pooled:每次都從預先分配好的內存中去取出一段連續內存封裝成一個ByteBuf給應用程序使用
Unpooled:每次分配內存的時候,直接調用系統api,向操做系統申請一
塊內存學習

Heap和Direct:
Head:是調用jvm的堆內存進行分配的,須要被gc進行管理
Direct:是調用jdk的api進行內存分配,不受jvm控制,不會參與到gc的過程大數據

Unsafe和非Unsafe
jdk中有Unsafe對象能夠直接拿到對象的內存地址,而且基於這個內存地址進行讀寫操做。那麼對應的分類的區別就是是否能夠拿到jdk底層的Unsafe進行讀寫操做了。

Java爲何會引入及如何使用Unsafe

內存分配ByteBufAllocator

這個接口實現負責分配緩衝區而且是線程安全的。從下面的接口方法以及註釋能夠總結出主要是圍繞上面的三種ByteBuf內存模式:堆內,堆外以及複合型的內存分配。

/**
 * Implementations are responsible to allocate buffers. Implementations of this interface are expected to be
 * thread-safe.
 */
public interface ByteBufAllocator {

    ByteBufAllocator DEFAULT = ByteBufUtil.DEFAULT_ALLOCATOR;

    /**
     * Allocate a {@link ByteBuf}. If it is a direct or heap buffer
     * depends on the actual implementation.
     */
    ByteBuf buffer();

    /**
     * Allocate a {@link ByteBuf} with the given initial capacity.
     * If it is a direct or heap buffer depends on the actual implementation.
     */
    ByteBuf buffer(int initialCapacity);

    /**
     * Allocate a {@link ByteBuf} with the given initial capacity and the given
     * maximal capacity. If it is a direct or heap buffer depends on the actual
     * implementation.
     */
    ByteBuf buffer(int initialCapacity, int maxCapacity);

    /**
     * Allocate a {@link ByteBuf}, preferably a direct buffer which is suitable for I/O.
     */
    ByteBuf ioBuffer();

    /**
     * Allocate a {@link ByteBuf}, preferably a direct buffer which is suitable for I/O.
     */
    ByteBuf ioBuffer(int initialCapacity);

    /**
     * Allocate a {@link ByteBuf}, preferably a direct buffer which is suitable for I/O.
     */
    ByteBuf ioBuffer(int initialCapacity, int maxCapacity);

    /**
     * Allocate a heap {@link ByteBuf}.
     */
    ByteBuf heapBuffer();

    /**
     * Allocate a heap {@link ByteBuf} with the given initial capacity.
     */
    ByteBuf heapBuffer(int initialCapacity);

    /**
     * Allocate a heap {@link ByteBuf} with the given initial capacity and the given
     * maximal capacity.
     */
    ByteBuf heapBuffer(int initialCapacity, int maxCapacity);

    /**
     * Allocate a direct {@link ByteBuf}.
     */
    ByteBuf directBuffer();

    /**
     * Allocate a direct {@link ByteBuf} with the given initial capacity.
     */
    ByteBuf directBuffer(int initialCapacity);

    /**
     * Allocate a direct {@link ByteBuf} with the given initial capacity and the given
     * maximal capacity.
     */
    ByteBuf directBuffer(int initialCapacity, int maxCapacity);

    /**
     * Allocate a {@link CompositeByteBuf}.
     * If it is a direct or heap buffer depends on the actual implementation.
     */
    CompositeByteBuf compositeBuffer();

    /**
     * Allocate a {@link CompositeByteBuf} with the given maximum number of components that can be stored in it.
     * If it is a direct or heap buffer depends on the actual implementation.
     */
    CompositeByteBuf compositeBuffer(int maxNumComponents);

    /**
     * Allocate a heap {@link CompositeByteBuf}.
     */
    CompositeByteBuf compositeHeapBuffer();

    /**
     * Allocate a heap {@link CompositeByteBuf} with the given maximum number of components that can be stored in it.
     */
    CompositeByteBuf compositeHeapBuffer(int maxNumComponents);

    /**
     * Allocate a direct {@link CompositeByteBuf}.
     */
    CompositeByteBuf compositeDirectBuffer();

    /**
     * Allocate a direct {@link CompositeByteBuf} with the given maximum number of components that can be stored in it.
     */
    CompositeByteBuf compositeDirectBuffer(int maxNumComponents);

    /**
     * Returns {@code true} if direct {@link ByteBuf}'s are pooled
     */
    boolean isDirectBufferPooled();

    /**
     * Calculate the new capacity of a {@link ByteBuf} that is used when a {@link ByteBuf} needs to expand by the
     * {@code minNewCapacity} with {@code maxCapacity} as upper-bound.
     */
    int calculateNewCapacity(int minNewCapacity, int maxCapacity);
 }

其中ByteBufAllocator 的具體實現能夠查看其子類,以下圖

下面來看看各自子類的功能以及區別

UnpooledByteBufAllocator

heap內存分配
入口new InstrumentedUnpooledUnsafeHeapByteBuf(this, initialCapacity, maxCapacity) :
發現分配Unpooled、Unsafe、Heap內存,實際上是分配了一個byte數組,並保存在UnpooledHeapByteBuf#array成員變量中。該內存的初始值容量和最大可擴展容量能夠指定。

public UnpooledHeapByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
        super(maxCapacity);

        checkNotNull(alloc, "alloc");

        if (initialCapacity > maxCapacity) {
            throw new IllegalArgumentException(String.format(
                    "initialCapacity(%d) > maxCapacity(%d)", initialCapacity, maxCapacity));
        }

        this.alloc = alloc;
        setArray(allocateArray(initialCapacity));
        setIndex(0, 0);
    }


    protected byte[] allocateArray(int initialCapacity) {
        return new byte[initialCapacity];
    }

查看UnpooledHeapByteBuf#getByte()方法,堆內存類型的ByteBuf獲取的時候。直接經過下標獲取byte數組中的byte

@Override
    public byte getByte(int index) {
        ensureAccessible();
        return _getByte(index);
    }

   @Override
    protected byte _getByte(int index) {
        //該array爲初始化的時候,實例化的byte[]
        return HeapByteBufUtil.getByte(array, index);
    }

    static byte getByte(byte[] memory, int index) {
        //直接拿到一個數組
        return memory[index];
    }

direct內存分配

入口UnpooledByteBufAllocator#newDirectBuffer() --> UnpooledUnsafeDirectByteBuf

public UnpooledUnsafeDirectByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
        super(maxCapacity);
        if (alloc == null) {
            throw new NullPointerException("alloc");
        }
        checkPositiveOrZero(initialCapacity, "initialCapacity");
        checkPositiveOrZero(maxCapacity, "maxCapacity");
        if (initialCapacity > maxCapacity) {
            throw new IllegalArgumentException(String.format(
                    "initialCapacity(%d) > maxCapacity(%d)", initialCapacity, maxCapacity));
        }

        this.alloc = alloc;
        setByteBuffer(allocateDirect(initialCapacity), false);
    }


  protected ByteBuffer allocateDirect(int initialCapacity) {
        return ByteBuffer.allocateDirect(initialCapacity);
    }

  final void setByteBuffer(ByteBuffer buffer, boolean tryFree) {
        if (tryFree) {
            ByteBuffer oldBuffer = this.buffer;
            if (oldBuffer != null) {
                if (doNotFree) {
                    doNotFree = false;
                } else {
                    freeDirect(oldBuffer);
                }
            }
        }
        this.buffer = buffer;
        memoryAddress = PlatformDependent.directBufferAddress(buffer);
        tmpNioBuf = null;
        capacity = buffer.remaining();
    }

能夠發現,Unpooled、Direct類型得內存分配其實是維護了一個底層jdk的一個DirectByteBuffer。分配內存的時候就建立它,並將他保存到buffer成員變量。

跟蹤iUnpooledHeapByteBuf#_getByte(),就比較簡單了,直接使用jdk的api獲取

@Override
    protected byte _getByte(int index) {
        //使用buffer
        return buffer.get(index);
    }

更加詳細的分析能夠查看下面這篇文章
https://www.jianshu.com/p/158...

PooledByteBufAllocator

入口PooledByteBufAllocator#newHeapBuffer()PooledByteBufAllocator#newDirectBuffer(),堆內內存和堆外內存分配的模式都比較固定。

1.拿到線程局部緩存PoolThreadCache
2.拿到不一樣類型的rena
3.使用不一樣類型的arena進行內存分配
@Override
    protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) {
        //拿到線程局部緩存
        PoolThreadCache cache = threadCache.get();
        //拿到heapArena
        PoolArena<byte[]> heapArena = cache.heapArena;

        final ByteBuf buf;
        if (heapArena != null) {
            //使用heapArena分配內存
            buf = heapArena.allocate(cache, initialCapacity, maxCapacity);
        } else {
            buf = PlatformDependent.hasUnsafe() ?
                    new UnpooledUnsafeHeapByteBuf(this, initialCapacity, maxCapacity) :
                    new UnpooledHeapByteBuf(this, initialCapacity, maxCapacity);
        }

        return toLeakAwareBuffer(buf);
    }

    @Override
    protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
        //拿到線程局部緩存
        PoolThreadCache cache = threadCache.get();
        //拿到directArena
        PoolArena<ByteBuffer> directArena = cache.directArena;

        final ByteBuf buf;
        if (directArena != null) {
            //使用directArena分配內存
            buf = directArena.allocate(cache, initialCapacity, maxCapacity);
        } else {
            buf = PlatformDependent.hasUnsafe() ?
                    UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity) :
                    new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
        }

        return toLeakAwareBuffer(buf);
    }

跟蹤threadCache.get()調用的是FastThreadLocal#get()方法。那麼其實threadCache也是一個FastThreadLocal,能夠當作是jdk的ThreadLocal,get方法調用了初始化方法initializel

public final V get() {
        InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
        Object v = threadLocalMap.indexedVariable(index);
        if (v != InternalThreadLocalMap.UNSET) {
            return (V) v;
        }
        //調用初始化方法
        V value = initialize(threadLocalMap);
        registerCleaner(threadLocalMap);
        return value;
    }

initialValue()方法的邏輯以下

1.從預先準備好的 heapArenasdirectArenas中獲取最少使用的 arena
2.使用獲取到的 arean爲參數,實例化一個 PoolThreadCache並返回
final class PoolThreadLocalCache extends FastThreadLocal<PoolThreadCache> {
        private final boolean useCacheForAllThreads;

        PoolThreadLocalCache(boolean useCacheForAllThreads) {
            this.useCacheForAllThreads = useCacheForAllThreads;
        }

        @Override
        protected synchronized PoolThreadCache initialValue() {
            /**
             * arena翻譯成競技場,關於內存非配的邏輯都在這個競技場中進行分配
             */
            //獲取heapArena:從heapArenas堆內競技場中拿出使用最少的一個arena
            final PoolArena<byte[]> heapArena = leastUsedArena(heapArenas);
            //獲取directArena:從directArena堆內競技場中拿出使用最少的一個arena
            final PoolArena<ByteBuffer> directArena = leastUsedArena(directArenas);

            Thread current = Thread.currentThread();
            if (useCacheForAllThreads || current instanceof FastThreadLocalThread) {
                //建立PoolThreadCache:該Cache最終被一個線程使用
                //經過heapArena和directArena維護兩大塊內存:堆和堆外內存
                //經過tinyCacheSize,smallCacheSize,normalCacheSize維護ByteBuf緩存列表維護反覆使用的內存塊
                return new PoolThreadCache(
                        heapArena, directArena, tinyCacheSize, smallCacheSize, normalCacheSize,
                        DEFAULT_MAX_CACHED_BUFFER_CAPACITY, DEFAULT_CACHE_TRIM_INTERVAL);
            }
            // No caching so just use 0 as sizes.
            return new PoolThreadCache(heapArena, directArena, 0, 0, 0, 0, 0);
        }

      //省略代碼......

      }

查看PoolThreadCache其維護了兩種類型的內存分配策略,一種是上述經過持有heapArenadirectArena,另外一種是經過維護tiny,small,normal對應的緩存列表來維護反覆使用的內存。

final class PoolThreadCache {

    private static final InternalLogger logger = InternalLoggerFactory.getInstance(PoolThreadCache.class);

    //經過arena的方式維護內存
    final PoolArena<byte[]> heapArena;
    final PoolArena<ByteBuffer> directArena;

    //維護了tiny, small, normal三種類型的緩存列表
    // Hold the caches for the different size classes, which are tiny, small and normal.
    private final MemoryRegionCache<byte[]>[] tinySubPageHeapCaches;
    private final MemoryRegionCache<byte[]>[] smallSubPageHeapCaches;
    private final MemoryRegionCache<ByteBuffer>[] tinySubPageDirectCaches;
    private final MemoryRegionCache<ByteBuffer>[] smallSubPageDirectCaches;
    private final MemoryRegionCache<byte[]>[] normalHeapCaches;
    private final MemoryRegionCache<ByteBuffer>[] normalDirectCaches;

    // Used for bitshifting when calculate the index of normal caches later
    private final int numShiftsNormalDirect;
    private final int numShiftsNormalHeap;
    private final int freeSweepAllocationThreshold;
    private final AtomicBoolean freed = new AtomicBoolean();

    private int allocations;

    // TODO: Test if adding padding helps under contention
    //private long pad0, pad1, pad2, pad3, pad4, pad5, pad6, pad7;

    PoolThreadCache(PoolArena<byte[]> heapArena, PoolArena<ByteBuffer> directArena,
                    int tinyCacheSize, int smallCacheSize, int normalCacheSize,
                    int maxCachedBufferCapacity, int freeSweepAllocationThreshold) {
        checkPositiveOrZero(maxCachedBufferCapacity, "maxCachedBufferCapacity");
        this.freeSweepAllocationThreshold = freeSweepAllocationThreshold;

        //經過持有heapArena和directArena,arena的方式管理內存分配
        this.heapArena = heapArena;
        this.directArena = directArena;

        //經過tinyCacheSize,smallCacheSize,normalCacheSize建立不一樣類型的緩存列表並保存到成員變量
        if (directArena != null) {
            tinySubPageDirectCaches = createSubPageCaches(
                    tinyCacheSize, PoolArena.numTinySubpagePools, SizeClass.Tiny);
            smallSubPageDirectCaches = createSubPageCaches(
                    smallCacheSize, directArena.numSmallSubpagePools, SizeClass.Small);

            numShiftsNormalDirect = log2(directArena.pageSize);
            normalDirectCaches = createNormalCaches(
                    normalCacheSize, maxCachedBufferCapacity, directArena);

            directArena.numThreadCaches.getAndIncrement();
        } else {
            // No directArea is configured so just null out all caches
            tinySubPageDirectCaches = null;
            smallSubPageDirectCaches = null;
            normalDirectCaches = null;
            numShiftsNormalDirect = -1;
        }
        if (heapArena != null) {
            // Create the caches for the heap allocations
            //建立規格化緩存隊列
            tinySubPageHeapCaches = createSubPageCaches(
                    tinyCacheSize, PoolArena.numTinySubpagePools, SizeClass.Tiny);
            //建立規格化緩存隊列
            smallSubPageHeapCaches = createSubPageCaches(
                    smallCacheSize, heapArena.numSmallSubpagePools, SizeClass.Small);

            numShiftsNormalHeap = log2(heapArena.pageSize);
            //建立規格化緩存隊列
            normalHeapCaches = createNormalCaches(
                    normalCacheSize, maxCachedBufferCapacity, heapArena);

            heapArena.numThreadCaches.getAndIncrement();
        } else {
            // No heapArea is configured so just null out all caches
            tinySubPageHeapCaches = null;
            smallSubPageHeapCaches = null;
            normalHeapCaches = null;
            numShiftsNormalHeap = -1;
        }

        // Only check if there are caches in use.
        if ((tinySubPageDirectCaches != null || smallSubPageDirectCaches != null || normalDirectCaches != null
                || tinySubPageHeapCaches != null || smallSubPageHeapCaches != null || normalHeapCaches != null)
                && freeSweepAllocationThreshold < 1) {
            throw new IllegalArgumentException("freeSweepAllocationThreshold: "
                    + freeSweepAllocationThreshold + " (expected: > 0)");
        }
    }

    private static <T> MemoryRegionCache<T>[] createSubPageCaches(
            int cacheSize, int numCaches, SizeClass sizeClass) {
        if (cacheSize > 0 && numCaches > 0) {
            //MemoryRegionCache 維護緩存的一個對象
            @SuppressWarnings("unchecked")
            MemoryRegionCache<T>[] cache = new MemoryRegionCache[numCaches];
            for (int i = 0; i < cache.length; i++) {
                // TODO: maybe use cacheSize / cache.length
                //每一種MemoryRegionCache(tiny,small,normal)都表示不一樣內存大小(不一樣規格)的一個隊列
                cache[i] = new SubPageMemoryRegionCache<T>(cacheSize, sizeClass);
            }
            return cache;
        } else {
            return null;
        }
    }

    private static <T> MemoryRegionCache<T>[] createNormalCaches(
            int cacheSize, int maxCachedBufferCapacity, PoolArena<T> area) {
        if (cacheSize > 0 && maxCachedBufferCapacity > 0) {
            int max = Math.min(area.chunkSize, maxCachedBufferCapacity);
            int arraySize = Math.max(1, log2(max / area.pageSize) + 1);
            //MemoryRegionCache 維護緩存的一個對象
            @SuppressWarnings("unchecked")
            MemoryRegionCache<T>[] cache = new MemoryRegionCache[arraySize];
            for (int i = 0; i < cache.length; i++) {
                //每一種MemoryRegionCache(tiny,small,normal)都表示不一樣內存(不一樣規格)大小的一個隊列
                cache[i] = new NormalMemoryRegionCache<T>(cacheSize);
            }
            return cache;
        } else {
            return null;
        }
    }

......
}

更加詳細分析可參考如下文章
https://www.jianshu.com/p/1cd...

directArena分配direct內存的流程

上一步拿到PoolThreadCache以後,獲取對應的Arena。那麼以後就是Arena具體分配內存的步驟。

入口PooledByteBufAllocator#newDirectBuffer()方法種有以下代碼

PooledByteBuf<T> allocate(PoolThreadCache cache, int reqCapacity, int maxCapacity) {
        //拿到PooledByteBuf對象,僅僅是一個對象
        PooledByteBuf<T> buf = newByteBuf(maxCapacity);
        //從cache種分配內存,並初始化buf種內存地址相關的屬性
        allocate(cache, buf, reqCapacity);
        return buf;
    }

能夠看到分配的過程以下:拿到PooledByteBuf對象從cache中分配內存,並重置相關屬性.

1.newByteBuf(maxCapacity);拿到PooledByteBuf對象

@Override
        protected PooledByteBuf<ByteBuffer> newByteBuf(int maxCapacity) {
            if (HAS_UNSAFE) {
                //獲取一個PooledByteBuf
                return PooledUnsafeDirectByteBuf.newInstance(maxCapacity);
            } else {
                return PooledDirectByteBuf.newInstance(maxCapacity);
            }
        }

    static PooledUnsafeDirectByteBuf newInstance(int maxCapacity) {
        //從帶有回收特性的對象池RECYCLER獲取一個PooledUnsafeDirectByteBuf
        PooledUnsafeDirectByteBuf buf = RECYCLER.get();
        //buf多是從回收站拿出來的,要進行復用
        buf.reuse(maxCapacity);
        return buf;
    }

2.Recycler是一個基於線程本地堆棧的對象池。Recycler維護了一個ThreadLocal成員變量,用於返回一個stack給回收處理DefaultHandle,該處理器經過維護這個堆棧來維護PooledUnsafeDirectByteBuf緩存。

private static final Recycler<PooledUnsafeDirectByteBuf> RECYCLER = new Recycler<PooledUnsafeDirectByteBuf>() {
        @Override
        protected PooledUnsafeDirectByteBuf newObject(Handle<PooledUnsafeDirectByteBuf> handle) {
            //Recycler負責用回收處理器handler維護PooledUnsafeDirectByteBuf
            //handler底層持有一個stack做爲對象池,維護對象池,handle同時負責對象回收
            //存儲handler爲成員變量,使用完該ByteBuf能夠調用回收方法回收
            return new PooledUnsafeDirectByteBuf(handle, 0);
        }
    };
//維護了一個`ThreadLocal`,`initialValue`方法返回一個堆棧。
    private final FastThreadLocal<Stack<T>> threadLocal = new FastThreadLocal<Stack<T>>() {
        @Override
        protected Stack<T> initialValue() {
            return new Stack<T>(Recycler.this, Thread.currentThread(), maxCapacityPerThread, maxSharedCapacityFactor,
                    ratioMask, maxDelayedQueuesPerThread);
        }

        @Override
        protected void onRemoval(Stack<T> value) {
            // Let us remove the WeakOrderQueue from the WeakHashMap directly if its safe to remove some overhead
            if (value.threadRef.get() == Thread.currentThread()) {
               if (DELAYED_RECYCLED.isSet()) {
                   DELAYED_RECYCLED.get().remove(value);
               }
            }
        }
    };

3.再看Recycler#get()方法

public final T get() {
        if (maxCapacityPerThread == 0) {
            return newObject((Handle<T>) NOOP_HANDLE);
        }
        //獲取對應的堆棧,至關一個回收站
        Stack<T> stack = threadLocal.get();

        //從棧頂拿出一個來DefaultHandle(回收處理器)
        //DefaultHandle持有一個value,實際上是PooledUnsafeDirectByteBuf
        DefaultHandle<T> handle = stack.pop();
        //沒有回收處理器,說明沒有閒置的ByteBuf
        if (handle == null) {
            //新增一個處理器
            handle = stack.newHandle();
            
            //回調,還記得麼?該回調返回一個PooledUnsafeDirectByteBuf
            //讓處理器持有一個新的PooledUnsafeDirectByteBuf
            handle.value = newObject(handle);
        }
        //若是有,則可直接重複使用
        return (T) handle.value;
    }

    public final V get() {
        InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
        Object v = threadLocalMap.indexedVariable(index);
        if (v != InternalThreadLocalMap.UNSET) {
            return (V) v;
        }
        //回調initialize
        V value = initialize(threadLocalMap);
        registerCleaner(threadLocalMap);
        return value;
    }

        private V initialize(InternalThreadLocalMap threadLocalMap) {
        V v = null;
        try {
            //回調
            v = initialValue();
        } catch (Exception e) {
            PlatformDependent.throwException(e);
        }

        threadLocalMap.setIndexedVariable(index, v);
        addToVariablesToRemove(threadLocalMap, this);
        return v;
    }


        DefaultHandle<T> newHandle() {
            //實例化一個處理器並而且初四話成員變量,該成員變量stack從threalocal中初始化
            return new DefaultHandle<T>(this);
        }

DefaultHandle用stack做爲緩存池維護PooledUnsafeDirectByteBuf,同理PooledDirectByteBuf也是同樣的。只不過實例化的對象的實現不同而已。同時,處理器定義了回收的方法是將兌現存回棧內,使用的時候則是從棧頂取出。

static final class DefaultHandle<T> implements Handle<T> {
        private int lastRecycledId;
        private int recycleId;

        boolean hasBeenRecycled;
        //對象緩存池
        private Stack<?> stack;
        private Object value;

        DefaultHandle(Stack<?> stack) {
            this.stack = stack;
        }

        /**
         * 定義回收方法,回收對象到stack
         * @param object
         */
        @Override
        public void recycle(Object object) {
            if (object != value) {
                throw new IllegalArgumentException("object does not belong to handle");
            }

            Stack<?> stack = this.stack;
            if (lastRecycledId != recycleId || stack == null) {
                throw new IllegalStateException("recycled already");
            }
            //回收:將本身存進棧中緩存起來
            stack.push(this);
        }
    }

到這咱們剛剛看完第一步,到第二步重置緩存內指針的時候了 ,獲取到PooledUnsafeDirectByteBuf的時候,有多是從緩存中取出來的。所以須要複用.

static PooledUnsafeDirectByteBuf newInstance(int maxCapacity) {
        //從帶有回收特性的對象池RECYCLER獲取一個PooledUnsafeDirectByteBuf
        PooledUnsafeDirectByteBuf buf = RECYCLER.get();
        //buf多是從回收站拿出來的,要進行復用
        buf.reuse(maxCapacity);
        return buf;
    }

    final void reuse(int maxCapacity) {
        //重置最大容量
        maxCapacity(maxCapacity);
        //設置引用
        setRefCnt(1);
        //重置指針
        setIndex0(0, 0);
        //重置標記值
        discardMarks();
    }

到這纔剛剛完成分配內存的第一步(拿到PooledByteBuf對象),以上都是僅僅是獲取而且用回收站和回收處理器管理這些對象,這些對象仍然只是一個對象,尚未分配實際的內存。

跟蹤PoolArena#allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity)

private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) {
        final int normCapacity = normalizeCapacity(reqCapacity);

        //不一樣的規格大小進行內存分配
        /**
         * 分配總體邏輯(先判斷tiny和small規格的,再判斷normal規格的)
         * 1. 嘗試從緩存上進行內存分配,成功則返回
         * 2. 失敗則再從內存堆中進行分配內存
         */
        if (isTinyOrSmall(normCapacity)) { // capacity < pageSize
            int tableIdx;
            PoolSubpage<T>[] table;
            boolean tiny = isTiny(normCapacity);

            //嘗試tiny和small規格的緩存內存分配
            if (tiny) { // < 512
                if (cache.allocateTiny(this, buf, reqCapacity, normCapacity)) {
                    // was able to allocate out of the cache so move on
                    return;
                }
                tableIdx = tinyIdx(normCapacity);
                table = tinySubpagePools;
            } else {
                if (cache.allocateSmall(this, buf, reqCapacity, normCapacity)) {
                    // was able to allocate out of the cache so move on
                    return;
                }
                tableIdx = smallIdx(normCapacity);
                table = smallSubpagePools;
            }

            final PoolSubpage<T> head = table[tableIdx];

            /**
             * Synchronize on the head. This is needed as {@link PoolChunk#allocateSubpage(int)} and
             * {@link PoolChunk#free(long)} may modify the doubly linked list as well.
             */
            synchronized (head) {
                final PoolSubpage<T> s = head.next;
                if (s != head) {
                    assert s.doNotDestroy && s.elemSize == normCapacity;
                    long handle = s.allocate();
                    assert handle >= 0;
                    s.chunk.initBufWithSubpage(buf, null, handle, reqCapacity);
                    incTinySmallAllocation(tiny);
                    return;
                }
            }
            //tiny和small規格的緩存內存分配嘗試失敗
            //從內存堆中分配內存
            synchronized (this) {
                allocateNormal(buf, reqCapacity, normCapacity);
            }

            incTinySmallAllocation(tiny);
            return;
        }
        //normal規格
        //若是分配處出來的內存大於一個值(chunkSize),則執行allocateHuge
        if (normCapacity <= chunkSize) {
            //從緩存上進行內存分配
            if (cache.allocateNormal(this, buf, reqCapacity, normCapacity)) {
                // was able to allocate out of the cache so move on
                return;
            }
            //緩存沒有再從內存堆中分配內存
            synchronized (this) {
                allocateNormal(buf, reqCapacity, normCapacity);
                ++allocationsNormal;
            }
        } else {
            // Huge allocations are never served via the cache so just call allocateHuge
            allocateHuge(buf, reqCapacity);
        }
    }

其總體分配內存的邏輯是根據不一樣規格大小的內存須要來的,顯示tinysmall規格的,再是normal規格的。分配也是先嚐試從緩存中進行內存分配,若是分配失敗再從內存堆中進行內存分配。 固然,分配出來的內存回和第一步拿到的PooledByteBuf進行綁定起來。

總結

主要學習了ByteBuf 的基本結構、使用模式、分類、基本的內存分配

下次再學習ByteBuf 的命中邏輯以及內存回收

參考文章

https://www.cnblogs.com/xiang...
https://www.jianshu.com/u/fc9...

最後

若是對 Java、大數據感興趣請長按二維碼關注一波,我會努力帶給大家價值。以爲對你哪怕有一丁點幫助的請幫忙點個贊或者轉發哦。
關注公衆號【愛編碼】,回覆2019有相關資料哦。

相關文章
相關標籤/搜索