Netty源碼分析第5章(ByteBuf)---->第7節: page級別的內存分配

 

Netty源碼分析第五章: ByteBufhtml

 

第六節: page級別的內存分配api

 

前面小節咱們剖析過命中緩存的內存分配邏輯, 前提是若是緩存中有數據, 那麼緩存中沒有數據, netty是如何開闢一塊內存進行內存分配的呢?這一小節帶你們進行剖析:數組

剖析以前首先簡單介紹netty內存分配的大概數據結構:緩存

以前咱們介紹過, netty內存分配的單位是chunk, 一個chunk的大小是16MB, 實際上每一個chunk, 都以雙向鏈表的形式保存在一個chunkList中, 而多個chunkList, 一樣也是雙向鏈表進行關聯的, 大概結構以下所示:數據結構

5-7-1函數

在chunkList中, 是根據chunk的內存使用率歸到一個chunkList中, 這樣, 在內存分配時, 會根據百分比找到相應的chunkList, 在chunkList中選擇一個chunk進行內存分配 源碼分析

咱們看PoolArena中有關chunkList的成員變量:this

private final PoolChunkList<T> q050; private final PoolChunkList<T> q025; private final PoolChunkList<T> q000; private final PoolChunkList<T> qInit; private final PoolChunkList<T> q075; private final PoolChunkList<T> q100;

這裏總共定義了6個chunkList, 並在構造方法將其進行初始化spa

跟到其構造方法中:debug

protected PoolArena(PooledByteBufAllocator parent, int pageSize, int maxOrder, int pageShifts, int chunkSize) { //代碼省略
    q100 = new PoolChunkList<T>(null, 100, Integer.MAX_VALUE, chunkSize); q075 = new PoolChunkList<T>(q100, 75, 100, chunkSize); q050 = new PoolChunkList<T>(q075, 50, 100, chunkSize); q025 = new PoolChunkList<T>(q050, 25, 75, chunkSize); q000 = new PoolChunkList<T>(q025, 1, 50, chunkSize); qInit = new PoolChunkList<T>(q000, Integer.MIN_VALUE, 25, chunkSize); //用雙向鏈表的方式進行鏈接
 q100.prevList(q075); q075.prevList(q050); q050.prevList(q025); q025.prevList(q000); q000.prevList(null); qInit.prevList(qInit); //代碼省略
}

首先經過new PoolChunkList()這種方式將每一個chunkList進行建立, 咱們以 q050 = new PoolChunkList<T>(q075, 50, 100, chunkSize) 爲例進行簡單的介紹

q075表示當前q50的下一個節點是q075, 剛纔咱們講過ChunkList是經過雙向鏈表進行關聯的, 因此這裏不難理解

參數50和100表示當前chunkList中存儲的chunk的內存使用率都在50%到100%之間, 最後chunkSize爲其設置大小

建立完ChunkList以後, 再設置其上一個節點, q050.prevList(q025)爲例, 這裏表明當前chunkList的上一個節點是q025

以這種方式建立完成以後, chunkList的節點關係變成了以下圖所示:

5-7-2

netty中, chunk又包含了多個page, 每一個page的大小爲8k, 若是要分配16k的內存, 則在在chunk中找到連續的兩個page就能夠分配, 對應關係以下:

5-7-3

不少場景下, 爲緩衝區分配8k的內存也是一種浪費, 好比只須要分配2k的緩衝區, 若是使用8k會形成6k的浪費, 這種狀況, netty又會將page切分紅多個subpage, 每一個subpage大小要根據分配的緩衝區大小而指定, 好比要分配2k的內存, 就會將一個page切分紅4個subpage, 每一個subpage的大小爲2k, 如圖:

5-7-4

咱們看PoolSubpage的屬性:

final PoolChunk<T> chunk; private final int memoryMapIdx; private final int runOffset; private final int pageSize; private final long[] bitmap; PoolSubpage<T> prev; PoolSubpage<T> next; boolean doNotDestroy; int elemSize;

chunk表明其子頁屬於哪一個chunk

bitmap用於記錄子頁的內存分配狀況

prev和next, 表明子頁是按照雙向鏈表進行關聯的, 這裏分別指向上一個和下一個節點

elemSize屬性, 表明的就是這個子頁是按照多大內存進行劃分的, 若是按照1k劃分, 則能夠劃分出8個子頁

簡單介紹了內存分配的數據結構, 咱們開始剖析netty在page級別上分配內存的流程:

咱們回到PoolArena的allocate方法:

private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) { //規格化
    final int normCapacity = normalizeCapacity(reqCapacity); if (isTinyOrSmall(normCapacity)) { int tableIdx; PoolSubpage<T>[] table; //判斷是否是tinty
        boolean tiny = isTiny(normCapacity); if (tiny) { // < 512 //緩存分配
            if (cache.allocateTiny(this, buf, reqCapacity, normCapacity)) { return; } //經過tinyIdx拿到tableIdx
            tableIdx = tinyIdx(normCapacity); //subpage的數組
            table = tinySubpagePools; } else { if (cache.allocateSmall(this, buf, reqCapacity, normCapacity)) { return; } tableIdx = smallIdx(normCapacity); table = smallSubpagePools; } //拿到對應的節點
        final PoolSubpage<T> head = table[tableIdx]; synchronized (head) { final PoolSubpage<T> s = head.next; //默認狀況下, head的next也是自身
            if (s != head) { assert s.doNotDestroy && s.elemSize == normCapacity; long handle = s.allocate(); assert handle >= 0; s.chunk.initBufWithSubpage(buf, handle, reqCapacity); if (tiny) { allocationsTiny.increment(); } else { allocationsSmall.increment(); } return; } } allocateNormal(buf, reqCapacity, normCapacity); return; } if (normCapacity <= chunkSize) { //首先在緩存上進行內存分配
        if (cache.allocateNormal(this, buf, reqCapacity, normCapacity)) { //分配成功, 返回
            return; } //分配不成功, 作實際的內存分配
 allocateNormal(buf, reqCapacity, normCapacity); } else { //大於這個值, 就不在緩存上分配
 allocateHuge(buf, reqCapacity); } }

咱們以前講過, 若是在緩存中分配不成功, 則會開闢一塊連續的內存進行緩衝區分配, 這裏咱們先跳過isTinyOrSmall(normCapacity)日後的代碼, 下一小節進行分析

首先 if (normCapacity <= chunkSize) 說明其小於16MB, 而後首先在緩存中分配, 由於最初緩存中沒有值, 因此會走到allocateNormal(buf, reqCapacity, normCapacity), 這裏實際上就是在page級別上進行分配, 分配一個或者多個page的空間

咱們跟進allocateNormal:

private synchronized void allocateNormal(PooledByteBuf<T> buf, int reqCapacity, int normCapacity) { //首先在原來的chunk上進行內存分配(1)
    if (q050.allocate(buf, reqCapacity, normCapacity) || q025.allocate(buf, reqCapacity, normCapacity) || q000.allocate(buf, reqCapacity, normCapacity) || qInit.allocate(buf, reqCapacity, normCapacity) || q075.allocate(buf, reqCapacity, normCapacity)) { ++allocationsNormal; return; } //建立chunk進行內存分配(2)
    PoolChunk<T> c = newChunk(pageSize, maxOrder, pageShifts, chunkSize); long handle = c.allocate(normCapacity); ++allocationsNormal; assert handle > 0; //初始化byteBuf(3)
 c.initBuf(buf, handle, reqCapacity); qInit.add(c); }

這裏主要拆解了以下步驟

1. 在原有的chunk中進行分配

2. 建立chunk進行分配

3. 初始化ByteBuf

首先咱們看第一步, 在原有的chunk中進行分配:

if (q050.allocate(buf, reqCapacity, normCapacity) || q025.allocate(buf, reqCapacity, normCapacity) || q000.allocate(buf, reqCapacity, normCapacity) || qInit.allocate(buf, reqCapacity, normCapacity) || q075.allocate(buf, reqCapacity, normCapacity)) { ++allocationsNormal; return; }

咱們以前講過, chunkList是存儲不一樣內存使用量的chunk集合, 每一個chunkList經過雙向鏈表的形式進行關聯, 這裏的q050.allocate(buf, reqCapacity, normCapacity)就表明首先在q050這個chunkList上進行內存分配

咱們以q050爲例進行分析, 跟到q050.allocate(buf, reqCapacity, normCapacity)方法中:

boolean allocate(PooledByteBuf<T> buf, int reqCapacity, int normCapacity) { if (head == null || normCapacity > maxCapacity) { return false; } //從head節點往下遍歷
    for (PoolChunk<T> cur = head;;) { long handle = cur.allocate(normCapacity); if (handle < 0) { cur = cur.next; if (cur == null) { return false; } } else { cur.initBuf(buf, handle, reqCapacity); if (cur.usage() >= maxUsage) { remove(cur); nextList.add(cur); } return true; } } }

首先會從head節點往下遍歷

 long handle = cur.allocate(normCapacity) 表示對於每一個chunk, 都嘗試去分配

 if (handle < 0) 說明沒有分配到, 則經過cur = cur.next找到下一個節點繼續進行分配, 咱們講過chunk也是經過雙向鏈表進行關聯的, 因此對這塊邏輯應該不會陌生

若是handle大於0說明已經分配到了內存, 則經過cur.initBuf(buf, handle, reqCapacity)對byteBuf進行初始化

 if (cur.usage() >= maxUsage) 表明當前chunk的內存使用率大於其最大使用率, 則經過remove(cur)從當前的chunkList中移除, 再經過nextList.add(cur)添加到下一個chunkList中

咱們再回到PoolArena的allocateNormal方法中:

咱們看第二步PoolChunk<T> c = newChunk(pageSize, maxOrder, pageShifts, chunkSize)

這裏的參數pageSize是8192, 也就是8k

maxOrder爲11

pageShifts爲13,   2的13次方正好是8192, 也就是8k

chunkSize爲16777216, 也就是16MB

這裏的參數值能夠經過debug的方式跟蹤到

由於咱們的示例是堆外內存, newChunk(pageSize, maxOrder, pageShifts, chunkSize)因此會走到DirectArena的newChunk方法中:

protected PoolChunk<ByteBuffer> newChunk(int pageSize, int maxOrder, int pageShifts, int chunkSize) { return new PoolChunk<ByteBuffer>( this, allocateDirect(chunkSize), pageSize, maxOrder, pageShifts, chunkSize); }

這裏直接經過構造函數建立了一個chunk

allocateDirect(chunkSize)這裏是經過jdk的api的申請了一塊直接內存, 咱們跟到PoolChunk的構造函數中:

PoolChunk(PoolArena<T> arena, T memory, int pageSize, int maxOrder, int pageShifts, int chunkSize) { unpooled = false; this.arena = arena; //memeory爲一個ByteBuf
    this.memory = memory; //8k
    this.pageSize = pageSize; //13
    this.pageShifts = pageShifts; //11
    this.maxOrder = maxOrder; this.chunkSize = chunkSize; unusable = (byte) (maxOrder + 1); log2ChunkSize = log2(chunkSize); subpageOverflowMask = ~(pageSize - 1); freeBytes = chunkSize; assert maxOrder < 30 : "maxOrder should be < 30, but is: " + maxOrder; maxSubpageAllocs = 1 << maxOrder; //節點數量爲4096
    memoryMap = new byte[maxSubpageAllocs << 1]; //也是4096個節點
    depthMap = new byte[memoryMap.length]; int memoryMapIndex = 1; //d至關於一個深度, 賦值的內容表明當前節點的深度
    for (int d = 0; d <= maxOrder; ++ d) { int depth = 1 << d; for (int p = 0; p < depth; ++ p) { memoryMap[memoryMapIndex] = (byte) d; depthMap[memoryMapIndex] = (byte) d; memoryMapIndex ++; } } subpages = newSubpageArray(maxSubpageAllocs); }

首先將參數傳入的值進行賦值

 this.memory = memory 就是將參數中建立的堆外內存進行保存, 就是chunk所指向的那塊連續的內存, 在這個chunk中所分配的ByteBuf, 都會在這塊內存中進行讀寫

咱們重點關注 memoryMap = new byte[maxSubpageAllocs << 1] 和 depthMap = new byte[memoryMap.length] 這兩步

首先看 memoryMap = new byte[maxSubpageAllocs << 1] 

這裏初始化了一個字節數組memoryMap, 大小爲maxSubpageAllocs << 1, 也就是4096

 depthMap = new byte[memoryMap.length] 一樣也是初始化了一個字節數組, 大小爲memoryMap的大小, 也就是4096

繼續往下分析以前, 咱們看chunk的一個層級關係

5-7-5

這是一個二叉樹的結構, 左側的數字表明層級, 右側表明一塊連續的內存, 每一個父節點下又拆分紅多個子節點, 最頂層表示的內存範圍爲0-16MB, 其又下分爲兩層, 範圍爲0-8MB, 8-16MB, 以此類推, 最後到11層, 以8k的大小劃分, 也就是一個page的大小

若是咱們分配一個8mb的緩衝區, 則會將第二層的第一個節點, 也就是0-8這個連續的內存進行分配, 分配完成以後, 會將這個節點設置爲不可用, 具體邏輯後面會講解

結合上面的圖, 咱們再看構造方法中的for循環:

for (int d = 0; d <= maxOrder; ++ d) { int depth = 1 << d; for (int p = 0; p < depth; ++ p) { memoryMap[memoryMapIndex] = (byte) d; depthMap[memoryMapIndex] = (byte) d; memoryMapIndex ++; } }

實際上這個for循環就是將上面的結構包裝成一個字節數組memoryMap, 外層循環用於控制層數, 內層循環用於控制裏面每層的節點, 這裏通過循環以後, memoryMap和depthMap內容爲如下表現形式:

[0, 0, 1, 1, 2, 2, 2, 2, 3, 3, 3, 3, 3, 3, 3, 3, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4...........]

這裏注意一下, 由於程序中數組的下標是從1開始設置的, 因此第零個節點元素爲默認值0

這裏數字表明層級, 同時也表明了當前層級的節點, 相同的數字個數就是這一層級的節點數

其中0爲2個(由於這裏分配時下標是從1開始的, 因此第0個位置是默認值0, 實際上第零層元素只有一個, 就是頭結點), 1爲2個, 2爲4個, 3爲8個, 4爲16個, n爲2的n次方個, 直到11, 也就是11有2的11次方個

咱們再回到PoolArena的allocateNormal方法中:

private synchronized void allocateNormal(PooledByteBuf<T> buf, int reqCapacity, int normCapacity) { //首先在原來的chunk上進行內存分配(1)
    if (q050.allocate(buf, reqCapacity, normCapacity) || q025.allocate(buf, reqCapacity, normCapacity) || q000.allocate(buf, reqCapacity, normCapacity) || qInit.allocate(buf, reqCapacity, normCapacity) || q075.allocate(buf, reqCapacity, normCapacity)) { ++allocationsNormal; return; } //建立chunk進行內存分配(2)
    PoolChunk<T> c = newChunk(pageSize, maxOrder, pageShifts, chunkSize); long handle = c.allocate(normCapacity); ++allocationsNormal; assert handle > 0; //初始化byteBuf(3)
 c.initBuf(buf, handle, reqCapacity); qInit.add(c); }

咱們繼續剖析 long handle = c.allocate(normCapacity) 這步

跟到allocate(normCapacity)中:

long allocate(int normCapacity) { if ((normCapacity & subpageOverflowMask) != 0) { return allocateRun(normCapacity); } else { return allocateSubpage(normCapacity); } }

若是分配是以page爲單位, 則走到allocateRun(normCapacity)方法中, 跟進去:

private long allocateRun(int normCapacity) { int d = maxOrder - (log2(normCapacity) - pageShifts); int id = allocateNode(d); if (id < 0) { return id; } freeBytes -= runLength(id); return id; }

 int d = maxOrder - (log2(normCapacity) - pageShifts) 表示根據normCapacity計算出圖5-8-5中的第幾層

 int id = allocateNode(d) 表示根據層級關係, 去分配一個節點, 其中id表明memoryMap中的下標

咱們跟到allocateNode方法中:

private int allocateNode(int d) { //下標初始值爲1
    int id = 1; //表明當前層級第一個節點的初始下標
    int initial = - (1 << d); //獲取第一個節點的值
    byte val = value(id); //若是值大於層級, 說明chunk不可用
    if (val > d) { return -1; } //當前下標對應的節點值若是小於層級, 或者當前下標小於層級的初始下標
    while (val < d || (id & initial) == 0) { //當前下標乘以2, 表明下當前節點的子節點的起始位置
        id <<= 1; //得到id位置的值
        val = value(id); //若是當前節點值大於層數(節點不可用)
        if (val > d) { //id爲偶數則+1, id爲奇數則-1(拿的是其兄弟節點)
            id ^= 1; //獲取id的值
            val = value(id); } } byte value = value(id); assert value == d && (id & initial) == 1 << d : String.format("val = %d, id & initial = %d, d = %d", value, id & initial, d); //將找到的節點設置爲不可用
 setValue(id, unusable); //逐層往上標記被使用
 updateParentsAlloc(id); return id; }

這裏是其實是從第一個節點往下找, 找到層級爲d未被使用的節點, 咱們能夠經過註釋體會其邏輯

找到相關節點後經過setValue將當前節點設置爲不可用, 其中id是當前節點的下標, unusable表明一個不可用的值, 這裏是12, 由於咱們的層級只有12層, 因此設置爲12以後就至關於標記不可用

設置成不可用以後, 經過updateParentsAlloc(id)逐層設置爲被使用

咱們跟進updateParentsAlloc方法:

private void updateParentsAlloc(int id) { while (id > 1) { //取到當前節點的父節點的id
        int parentId = id >>> 1; //獲取當前節點的值
        byte val1 = value(id); //找到當前節點的兄弟節點
        byte val2 = value(id ^ 1); //若是當前節點值小於兄弟節點, 則保存當前節點值到val, 不然, 保存兄弟節點值到val //若是當前節點是不可用, 則當前節點值是12, 大於兄弟節點的值, 因此這裏將兄弟節點的值進行保存
        byte val = val1 < val2 ? val1 : val2; //將val的值設置爲父節點下標所對應的值
 setValue(parentId, val); //id設置爲父節點id, 繼續循環
        id = parentId; } }

這裏實際上是將循環將兄弟節點的值替換成父節點的值, 咱們能夠經過註釋仔細的進行邏輯分析

若是實在理解有困難, 我經過畫圖幫助你們理解:

簡單起見, 咱們這裏只設置三層:

5-7-6

這裏咱們模擬其分配場景, 假設只有三層, 其中index表明數組memoryMap的下標, value表明其值, memoryMap中的值就爲[0, 0, 1, 1, 2, 2, 2, 2]

咱們要分配一個4MB的byteBuf, 在咱們調用allocateNode(int d)中傳入的d是2, 也就是第二層

根據咱們上面分分析的邏輯這裏會找到第二層的第一個節點, 也就是0-4mb這個節點, 找到以後將其設置爲不可用, 這樣memoryMap中的值就爲[0, 0, 1, 1, 12, 2, 2, 2]

二叉樹的結構就會變爲:

5-7-7

注意標紅部分, 將index爲4的節點設置爲了避免可用

 

將這個節點設置爲不可用以後, 則會將進行向上設置不可用, 循環將兄弟節點數值較小的節點替換到父節點, 也就是將index爲2的節點的值替換成了index的爲5的節點的值, 這樣數組的值就會變爲[0, 1, 2, 1, 12, 2, 2, 2]

二叉樹的結構變爲:

5-7-8

注意, 這裏節點標紅僅僅表明節點變化, 並非當前節點爲不可用狀態, 真正不可用狀態的判斷依據是value值爲12

 

這樣, 若是再次分配一個4MB內存的ByteBuf, 根據其邏輯, 則會找到第二層的第二個節點, 也就是4-8MB

再根據咱們的邏輯, 經過向上設置不可用, index爲2就會設置成不可用狀態, 將value的值設置爲12, 數組數值變爲[0, 1, 12, 1, 12, 12, 2, 2]二叉樹以下圖所示:

5-7-9

這樣咱們看到, 經過分配兩個4mb的byteBuf以後, 當前節點和其父節點都會設置成不可用狀態, 當index=2的節點設置爲不可用以後, 將不會再找這個節點下的子節點

以此類推, 直到全部的內存分配完畢的時候, index爲1的節點, 也會變成不可用狀態, 這樣全部的page就分配完畢, chunk中再無可用節點

咱們再回到PoolArena的allocateNormal方法中:

private synchronized void allocateNormal(PooledByteBuf<T> buf, int reqCapacity, int normCapacity) { //首先在原來的chunk上進行內存分配(1)
    if (q050.allocate(buf, reqCapacity, normCapacity) || q025.allocate(buf, reqCapacity, normCapacity) || q000.allocate(buf, reqCapacity, normCapacity) || qInit.allocate(buf, reqCapacity, normCapacity) || q075.allocate(buf, reqCapacity, normCapacity)) { ++allocationsNormal; return; } //建立chunk進行內存分配(2)
    PoolChunk<T> c = newChunk(pageSize, maxOrder, pageShifts, chunkSize); long handle = c.allocate(normCapacity); ++allocationsNormal; assert handle > 0; //初始化byteBuf(3)
 c.initBuf(buf, handle, reqCapacity); qInit.add(c); }

經過以上邏輯咱們知道, long handle = c.allocate(normCapacity)這一步, 其實返回的就是memoryMap的一個下標, 經過這個下標, 咱們能惟一的定位一塊內存

繼續往下跟, 經過c.initBuf(buf, handle, reqCapacity)初始化ByteBuf以後, 經過qInit.add(c)將新建立的chunk添加到chunkList中

 

咱們跟到initBuf方法中去:

void initBuf(PooledByteBuf<T> buf, long handle, int reqCapacity) { int memoryMapIdx = memoryMapIdx(handle); int bitmapIdx = bitmapIdx(handle); if (bitmapIdx == 0) { byte val = value(memoryMapIdx); assert val == unusable : String.valueOf(val); buf.init(this, handle, runOffset(memoryMapIdx), reqCapacity, runLength(memoryMapIdx), arena.parent.threadCache()); } else { initBufWithSubpage(buf, handle, bitmapIdx, reqCapacity); } }

這裏經過memoryMapIdx(handle)找到memoryMap的下標, 其實就是handle的值

bitmapIdx(handle)是有關subPage中使用到的邏輯, 若是是page級別的分配, 這裏只返回0, 因此進入到if塊中

if中首先斷言當前節點是否是不可用狀態, 而後經過init方法進行初始化

其中runOffset(memoryMapIdx)表示偏移量, 偏移量至關於分配給緩衝區的這塊內存相對於chunk中申請的內存的首地址偏移了多少

參數runLength(memoryMapIdx), 表示根據下標獲取可分配的最大長度

咱們跟到init中, 這裏會走到PooledByteBuf的init方法中:

void init(PoolChunk<T> chunk, long handle, int offset, int length, int maxLength, PoolThreadCache cache) { //初始化
    assert handle >= 0; assert chunk != null; //在哪一塊內存上進行分配的
    this.chunk = chunk; //這一塊內存上的哪一塊連續內存
    this.handle = handle; memory = chunk.memory; this.offset = offset; this.length = length; this.maxLength = maxLength; tmpNioBuf = null; this.cache = cache; }

這裏又是咱們熟悉的部分, 將屬性進行了初始化

以上就是完整的DirectUnsafePooledByteBuf在page級別的完整分配的流程, 邏輯也是很是的複雜, 想真正的掌握熟練, 也須要多下功夫進行調試和剖析

 

上一節: 命中緩存的分配

下一節: Subpage級別的內存分配

相關文章
相關標籤/搜索