Netty源碼分析第5章(ByteBuf)---->第4節: PooledByteBufAllocator簡述

 

Netty源碼分析第五章: ByteBufhtml

 

第四節: PooledByteBufAllocator簡述數組

 

 

上一小節簡單介紹了ByteBufAllocator以及其子類UnPooledByteBufAllocator的緩衝區分類的邏輯, 這一小節開始帶你們剖析更爲複雜的PooledByteBufAllocator, 咱們知道PooledByteBufAllocator是經過本身取一塊連續的內存進行ByteBuf的封裝, 因此這裏更爲複雜, 在這一小節簡單講解有關PooledByteBufAllocator分配邏輯緩存

友情提示:  從這一節開始難度開始加大, 請各位戰友作好心理準備ide

PooledByteBufAllocator一樣也重寫了AbstractByteBuf的newDirectBuffer和newHeapBuffer兩個抽象方法, 咱們這一小節以newDirectBuffer爲例, 先簡述一下其邏輯源碼分析

首先看UnPooledByteBufAllocator中newDirectBuffer這個方法:性能

protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) { PoolThreadCache cache = threadCache.get(); PoolArena<ByteBuffer> directArena = cache.directArena; ByteBuf buf; if (directArena != null) { buf = directArena.allocate(cache, initialCapacity, maxCapacity); } else { if (PlatformDependent.hasUnsafe()) { buf = UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity); } else { buf = new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity); } } return toLeakAwareBuffer(buf); }

首先 PoolThreadCache cache = threadCache.get() 這一步是拿到一個線程局部緩存對象, 線程局部緩存, 顧明思議, 就是同一個線程共享的一個緩存this

threadCache是PooledByteBufAllocator類的一個成員變量, 類型是PoolThreadLocalCache(這兩個很是容易混淆, 切記):spa

private final PoolThreadLocalCache threadCache;

再看其類型PoolThreadLocalCache的定義:.net

final class PoolThreadLocalCache extends FastThreadLocal<PoolThreadCache> { @Override protected synchronized PoolThreadCache initialValue() { final PoolArena<byte[]> heapArena = leastUsedArena(heapArenas); final PoolArena<ByteBuffer> directArena = leastUsedArena(directArenas); return new PoolThreadCache( heapArena, directArena, tinyCacheSize, smallCacheSize, normalCacheSize, DEFAULT_MAX_CACHED_BUFFER_CAPACITY, DEFAULT_CACHE_TRIM_INTERVAL); } //代碼省略
} 

這裏繼承了一個FastThreadLocal類, 這個類至關於jdk的ThreadLocal, 只是性能更快, 有關FastThreadLocal, 咱們在後面的章節會詳細剖析, 這裏咱們只要知道, 繼承FastThreadLocal類而且重寫了initialValue方法, 則經過其get方法就能得到initialValue返回的對象, 而且這個對象是線程共享的線程

 

在這裏咱們看到, 在重寫的initialValue方法中, 初始化了heapArena和directArena兩個屬性以後, 經過new PoolThreadCache()這種方式建立了PoolThreadCache對象

這裏注意, PoolThreadLocalCache是一個FastThreadLocal, 而PoolThreadCache纔是線程局部緩存, 這兩個類名很是很是像, 千萬別搞混了(我當初讀這段代碼時由於搞混因此懵逼了)

其中heapArena和directArena是分別是用來分配堆和堆外內存用的兩個對象, 以directArena爲例, 咱們看到是經過leastUsedArena(directArenas)這種方式得到的, directArenas是一個directArena類型的數組, leastUsedArena(directArenas)這個方法是用來獲取數組中一個使用最少的directArena對象

 

directArenas是PooledByteBufAllocator的成員變量, 是在其構造方法中初始化的:

public PooledByteBufAllocator(boolean preferDirect, int nHeapArena, int nDirectArena, int pageSize, int maxOrder, int tinyCacheSize, int smallCacheSize, int normalCacheSize) { //代碼省略
    if (nDirectArena > 0) { directArenas = newArenaArray(nDirectArena); List<PoolArenaMetric> metrics = new ArrayList<PoolArenaMetric>(directArenas.length); for (int i = 0; i < directArenas.length; i ++) { PoolArena.DirectArena arena = new PoolArena.DirectArena( this, pageSize, maxOrder, pageShifts, chunkSize); directArenas[i] = arena; metrics.add(arena); } directArenaMetrics = Collections.unmodifiableList(metrics); } else { directArenas = null; directArenaMetrics = Collections.emptyList(); } }

咱們看到這裏經過directArenas = newArenaArray(nDirectArena)初始化了directArenas, 其中nDirectArena, 默認是cpu核心數的2倍, 這點咱們能夠跟蹤構造方法的調用鏈能夠分析到

這樣保證了每個線程會有一個獨享的arena

咱們看newArenaArray(nDirectArena)這個方法:

private static <T> PoolArena<T>[] newArenaArray(int size) { return new PoolArena[size]; }

這裏只是建立了一個數組, 默認長度爲nDirectArena

繼續跟PooledByteBufAllocator的構造方法, 建立完了數組, 後面在for循環中爲數組賦值:

首先經過new PoolArena.DirectArena建立一個DirectArena實例, 而後再爲新建立的directArenas數組賦值

再回到PoolThreadLocalCache的構造方法中:

final class PoolThreadLocalCache extends FastThreadLocal<PoolThreadCache> { @Override protected synchronized PoolThreadCache initialValue() { final PoolArena<byte[]> heapArena = leastUsedArena(heapArenas); final PoolArena<ByteBuffer> directArena = leastUsedArena(directArenas); return new PoolThreadCache( heapArena, directArena, tinyCacheSize, smallCacheSize, normalCacheSize, DEFAULT_MAX_CACHED_BUFFER_CAPACITY, DEFAULT_CACHE_TRIM_INTERVAL); } //代碼省略
} 

方法最後, 建立PoolThreadCache的一個對象, 咱們跟進構造方法中:

PoolThreadCache(PoolArena<byte[]> heapArena, PoolArena<ByteBuffer> directArena, int tinyCacheSize, int smallCacheSize, int normalCacheSize, int maxCachedBufferCapacity, int freeSweepAllocationThreshold) { //代碼省略 //保存成兩個成員變量
    this.heapArena = heapArena; this.directArena = directArena; //代碼省略
}

這裏省略了大段代碼, 只須要關注這裏將兩個值保存在PoolThreadCache的成員變量中

咱們回到newDirectBuffer中:

protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) { PoolThreadCache cache = threadCache.get(); PoolArena<ByteBuffer> directArena = cache.directArena; ByteBuf buf; if (directArena != null) { buf = directArena.allocate(cache, initialCapacity, maxCapacity); } else { if (PlatformDependent.hasUnsafe()) { buf = UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity); } else { buf = new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity); } } return toLeakAwareBuffer(buf); }

簡單分析的線程局部緩存初始化相關邏輯, 咱們再往下跟:

PoolArena<ByteBuffer> directArena = cache.directArena;

經過上面的分析, 這步咱們應該不陌生, 在PoolThreadCache構造方法中將directArena和heapArena中保存在成員變量中, 這樣就能夠直接經過cache.directArena這種方式拿到其成員變量的內容

 

從以上邏輯, 咱們能夠大概的分析一下流程, 一般會建立和線程數量相等的arena, 並以數組的形式存儲在PooledByteBufAllocator的成員變量中, 每個PoolThreadCache建立的時候, 都會在當前線程拿到一個arena, 並保存在自身的成員變量中

5-4-1

PoolThreadCache除了維護了一個arena以外, 還維護了一個緩存列表, 咱們在重複分配ByteBuf的時候, 並不須要每次都經過arena進行分配, 能夠直接從緩存列表中拿一個ByteBuf

 

有關緩存列表, 咱們按部就班的往下看:

在PooledByteBufAllocator中維護了三個值:

1.  tinyCacheSize

2.  smallCacheSize

3.  normalCacheSize

tinyCacheSize表明tiny類型的ByteBuf能緩存多少個

smallCacheSize表明small類型的ByteBuf能緩存多少個

normalCacheSize表明normal類型的ByteBuf能緩存多少個

具體tiny類型, small類型, normal是什麼意思, 咱們會在後面講解

 

咱們回到PoolThreadLocalCache類中看其構造方法:

final class PoolThreadLocalCache extends FastThreadLocal<PoolThreadCache> { @Override protected synchronized PoolThreadCache initialValue() { final PoolArena<byte[]> heapArena = leastUsedArena(heapArenas); final PoolArena<ByteBuffer> directArena = leastUsedArena(directArenas); return new PoolThreadCache( heapArena, directArena, tinyCacheSize, smallCacheSize, normalCacheSize, DEFAULT_MAX_CACHED_BUFFER_CAPACITY, DEFAULT_CACHE_TRIM_INTERVAL); } //代碼省略
} 

咱們看到這三個屬性是在PoolThreadCache的構造方法中傳入的

這三個屬性是經過PooledByteBufAllocator的構造方法中初始化的, 跟隨構造方法的調用鏈會走到這個構造方法:

public PooledByteBufAllocator(boolean preferDirect, int nHeapArena, int nDirectArena, int pageSize, int maxOrder) { this(preferDirect, nHeapArena, nDirectArena, pageSize, maxOrder, DEFAULT_TINY_CACHE_SIZE, DEFAULT_SMALL_CACHE_SIZE, DEFAULT_NORMAL_CACHE_SIZE); }

這裏仍然調用了一個重載的構造方法, 這裏咱們關注這幾個參數:

DEFAULT_TINY_CACHE_SIZE, DEFAULT_SMALL_CACHE_SIZE, DEFAULT_NORMAL_CACHE_SIZE

這裏對應着幾個靜態的成員變量:

private static final int DEFAULT_TINY_CACHE_SIZE; private static final int DEFAULT_SMALL_CACHE_SIZE; private static final int DEFAULT_NORMAL_CACHE_SIZE;

咱們在static塊中看其初始化過程:

static{ //代碼省略
    DEFAULT_TINY_CACHE_SIZE = SystemPropertyUtil.getInt("io.netty.allocator.tinyCacheSize", 512); DEFAULT_SMALL_CACHE_SIZE = SystemPropertyUtil.getInt("io.netty.allocator.smallCacheSize", 256); DEFAULT_NORMAL_CACHE_SIZE = SystemPropertyUtil.getInt("io.netty.allocator.normalCacheSize", 64); //代碼省略
}

在這裏咱們看到, 這三個屬性分別初始化的大小是512, 256, 64, 這三個屬性就對應了PooledByteBufAllocator另外的幾個成員變量, tinyCacheSize, smallCacheSize, normalCacheSize

也就是說, tiny類型的ByteBuf在每一個緩存中默認緩存的數量是512個, small類型的ByteBuf在每一個緩存中默認緩存的數量是256個, normal類型的ByteBuf在每一個緩存中默認緩存的數量是64個

咱們再到PooledByteBufAllocator中重載的構造方法中:

public PooledByteBufAllocator(boolean preferDirect, int nHeapArena, int nDirectArena, int pageSize, int maxOrder, int tinyCacheSize, int smallCacheSize, int normalCacheSize) { super(preferDirect); threadCache = new PoolThreadLocalCache(); this.tinyCacheSize = tinyCacheSize; this.smallCacheSize = smallCacheSize; this.normalCacheSize = normalCacheSize; //代碼省略
}

篇幅緣由, 這裏也省略了大段代碼, 你們能夠經過構造方法參數找到源碼中相對的位置進行閱讀

咱們關注這段代碼:

this.tinyCacheSize = tinyCacheSize; this.smallCacheSize = smallCacheSize; this.normalCacheSize = normalCacheSize;

在這裏將將參數的DEFAULT_TINY_CACHE_SIZE, DEFAULT_SMALL_CACHE_SIZE, DEFAULT_NORMAL_CACHE_SIZE的三個值保存到了成員變量tinyCacheSize, smallCacheSize, normalCacheSize

PooledByteBufAllocator中將這三個成員變量初始化以後, 在PoolThreadLocalCache的initialValue方法中就可使用這三個成員變量的值了

咱們再次跟到initialValue方法中:

final class PoolThreadLocalCache extends FastThreadLocal<PoolThreadCache> { @Override protected synchronized PoolThreadCache initialValue() { final PoolArena<byte[]> heapArena = leastUsedArena(heapArenas); final PoolArena<ByteBuffer> directArena = leastUsedArena(directArenas); return new PoolThreadCache( heapArena, directArena, tinyCacheSize, smallCacheSize, normalCacheSize, DEFAULT_MAX_CACHED_BUFFER_CAPACITY, DEFAULT_CACHE_TRIM_INTERVAL); } //代碼省略
} 

這裏就能夠在建立PoolThreadCache對象的的構造方法中傳入tinyCacheSize, smallCacheSize, normalCacheSize這三個成員變量了

咱們再跟到PoolThreadCache的構造方法中:

PoolThreadCache(PoolArena<byte[]> heapArena, PoolArena<ByteBuffer> directArena, int tinyCacheSize, int smallCacheSize, int normalCacheSize, int maxCachedBufferCapacity, int freeSweepAllocationThreshold) { //代碼省略
    this.freeSweepAllocationThreshold = freeSweepAllocationThreshold; this.heapArena = heapArena; this.directArena = directArena; if (directArena != null) { tinySubPageDirectCaches = createSubPageCaches( tinyCacheSize, PoolArena.numTinySubpagePools, SizeClass.Tiny); smallSubPageDirectCaches = createSubPageCaches( smallCacheSize, directArena.numSmallSubpagePools, SizeClass.Small); numShiftsNormalDirect = log2(directArena.pageSize);  normalDirectCaches = createNormalCaches( normalCacheSize, maxCachedBufferCapacity, directArena); directArena.numThreadCaches.getAndIncrement(); } else { //代碼省略
 } //代碼省略
 ThreadDeathWatcher.watch(thread, freeTask); }

其中tinySubPageDirectCaches, smallSubPageDirectCaches, 和normalDirectCaches就表明了三種類型的緩存數組, 數組元素是MemoryRegionCache類型的對象, MemoryRegionCache就表明一個ByeBuf的緩存

以tinySubPageDirectCaches爲例, 咱們看到tiny類型的緩存是經過createSubPageCaches這個方法建立的

這裏傳入了三個參數tinyCacheSize咱們以前分析過是512, PoolArena.numTinySubpagePools這裏是32(這裏不一樣類型的緩存大小不同, small類型是4, normal類型是3) , SizeClass.Tiny表明其類型是tiny類型

咱們跟到createSubPageCaches這個方法中:

private static <T> MemoryRegionCache<T>[] createSubPageCaches( int cacheSize, int numCaches, SizeClass sizeClass) { if (cacheSize > 0) { //建立數組, 長度爲32
        @SuppressWarnings("unchecked") MemoryRegionCache<T>[] cache = new MemoryRegionCache[numCaches]; for (int i = 0; i < cache.length; i++) { //每個節點是ubPageMemoryRegionCache對象
            cache[i] = new SubPageMemoryRegionCache<T>(cacheSize, sizeClass); } return cache; } else { return null; } }

這裏首先建立了MemoryRegionCache, 長度是咱們剛纔分析過的32

而後經過for循環, 爲數組賦值, 賦值的對象是SubPageMemoryRegionCache類型的, SubPageMemoryRegionCache就是MemoryRegionCache類型的子類, 一樣也是一個緩存對象, 構造方法中, cacheSize, 就是其中緩存對象的數量, 若是是tiny類型就是512, sizeClass, 表明其類型, 好比tiny, small或者normal

再簡單跟到其構造方法:

SubPageMemoryRegionCache(int size, SizeClass sizeClass) { super(size, sizeClass); }

這裏調用了父類的構造方法, 咱們繼續跟進去:

MemoryRegionCache(int size, SizeClass sizeClass) { //size會進行規格化
     this.size = MathUtil.safeFindNextPositivePowerOfTwo(size); //隊列大小
     queue = PlatformDependent.newFixedMpscQueue(this.size); this.sizeClass = sizeClass; }

首先會對其進行規格化, 其實就是查找大於等於當前size的2的冪次方的數, 這裏若是是512那麼規格化以後仍是512, 而後初始化一個隊列, 隊列大小就是傳入的大小, 若是是tiny, 這裏大小就是512

最後並保存其類型

這裏咱們不難看出, 其實每一個緩存的對象, 裏面是經過一個隊列保存的, 有關緩存隊列和ByteBuf之間的邏輯, 後面的小節會進行剖析

 

從上面剖析咱們不難看出, PoolThreadCache中維護了三種類型的緩存數組, 每一個緩存數組中的每一個值中, 又經過一個隊列進行對象的存儲

5-4-2

固然這裏只舉了Direct類型的對象關係, heap類型其實都是同樣的, 這裏再也不贅述

這一小節邏輯較爲複雜, 同窗們能夠本身在源碼中跟蹤一遍加深印象

 

上一節: 緩衝區分配器

下一節: directArena分配緩衝區概述

相關文章
相關標籤/搜索