Netty源碼分析第五章: ByteBufhtml
第八節: subPage級別的內存分配數組
上一小節咱們剖析了page級別的內存分配邏輯, 這一小節帶你們剖析有關subPage級別的內存分配緩存
經過以前的學習咱們知道, 若是咱們分配一個緩衝區大小遠小於page, 則直接在一個page上進行分配則會形成內存浪費, 因此須要將page繼續進行切分紅多個子塊進行分配, 子塊分配的個數根據你要分配的緩衝區大小而定, 好比只須要分配1k的內存, 就會將一個page分紅8等分源碼分析
簡單起見, 咱們這裏僅僅以16字節爲例, 講解其分配邏輯學習
在分析其邏輯前, 首先看PoolArean的一個屬性:this
private final PoolSubpage<T>[] tinySubpagePools;
這個屬性是一個PoolSubpage的數組, 有點相似於一個subpage的緩存, 咱們建立一個subpage以後, 會將建立的subpage與該屬性其中每一個關聯, 下次在分配的時候能夠直接經過該屬性的元素去找關聯的subpagespa
咱們其中是在構造方法中初始化的, 看構造方法中其初始化代碼:code
tinySubpagePools = newSubpagePoolArray(numTinySubpagePools);
這裏爲numTinySubpagePools爲32orm
跟到newSubpagePoolArray(numTinySubpagePools)方法裏:htm
private PoolSubpage<T>[] newSubpagePoolArray(int size) { return new PoolSubpage[size]; }
這裏直接建立了一個PoolSubpage數組, 長度爲32
在構造方法中建立完畢以後, 會經過循環爲其賦值:
for (int i = 0; i < tinySubpagePools.length; i ++) { tinySubpagePools[i] = newSubpagePoolHead(pageSize); }
咱們跟到newSubpagePoolHead中:
private PoolSubpage<T> newSubpagePoolHead(int pageSize) { PoolSubpage<T> head = new PoolSubpage<T>(pageSize); head.prev = head; head.next = head; return head; }
這裏建立了一個PoolSubpage對象head
head.prev = head; head.next = head;
這種寫法咱們知道Subpage其實也是個雙向鏈表, 這裏的將head的上一個節點和下一個節點都設置爲自身, 有關PoolSubpage的關聯關係, 咱們稍後會看到
這樣經過循環建立PoolSubpage, 總共會建立出32個subpage, 其中每一個subpage實際表明一塊內存大小:
5-8-1
這裏就有點類以前小節的緩存數組tinySubPageDirectCaches的結構
瞭解了tinySubpagePools屬性, 咱們看PoolArean的allocate方法, 也就是緩衝區的入口方法:
private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) { //規格化
final int normCapacity = normalizeCapacity(reqCapacity); if (isTinyOrSmall(normCapacity)) { int tableIdx; PoolSubpage<T>[] table; //判斷是否是tinty
boolean tiny = isTiny(normCapacity); if (tiny) { // < 512 //緩存分配
if (cache.allocateTiny(this, buf, reqCapacity, normCapacity)) { return; } //經過tinyIdx拿到tableIdx
tableIdx = tinyIdx(normCapacity); //subpage的數組
table = tinySubpagePools; } else { if (cache.allocateSmall(this, buf, reqCapacity, normCapacity)) { return; } tableIdx = smallIdx(normCapacity); table = smallSubpagePools; } //拿到對應的節點
final PoolSubpage<T> head = table[tableIdx]; synchronized (head) { final PoolSubpage<T> s = head.next; //默認狀況下, head的next也是自身
if (s != head) { assert s.doNotDestroy && s.elemSize == normCapacity; long handle = s.allocate(); assert handle >= 0; s.chunk.initBufWithSubpage(buf, handle, reqCapacity); if (tiny) { allocationsTiny.increment(); } else { allocationsSmall.increment(); } return; } } allocateNormal(buf, reqCapacity, normCapacity); return; } if (normCapacity <= chunkSize) { //首先在緩存上進行內存分配
if (cache.allocateNormal(this, buf, reqCapacity, normCapacity)) { //分配成功, 返回
return; } //分配不成功, 作實際的內存分配
allocateNormal(buf, reqCapacity, normCapacity); } else { //大於這個值, 就不在緩存上分配
allocateHuge(buf, reqCapacity); } }
以前咱們最這個方法剖析過在page級別相關內存分配邏輯, 這一小節看subpage級別分配的相關邏輯
假設咱們分配16字節的緩衝區, isTinyOrSmall(normCapacity)就會返回true, 進入if塊
一樣if (tiny)這裏會返回true, 繼續跟到if (tiny)中:
首先會在緩存中分配緩衝區, 若是分配不到, 就開闢一塊內存進行內存分配
首先看這一步:
tableIdx = tinyIdx(normCapacity);
這裏經過normCapacity拿到tableIdx, 咱們跟進去:
static int tinyIdx(int normCapacity) { return normCapacity >>> 4; }
這裏將normCapacity除以16, 其實也就是1
咱們回到PoolArena的allocate方法繼續看:
table = tinySubpagePools
這裏將tinySubpagePools賦值到局部變量table中, 繼續往下看
final PoolSubpage<T> head = table[tableIdx] 這步時經過下標拿到一個PoolSubpage, 由於咱們以16字節爲例, 因此咱們拿到下標爲1的PoolSubpage, 對應的內存大小也就是16B
再看 final PoolSubpage<T> s = head.next 這一步, 跟咱們剛纔瞭解的的tinySubpagePools屬性, 默認狀況下head.next也是自身, 因此if (s != head)會返回false, 咱們繼續往下看:
下面, 會走到allocateNormal(buf, reqCapacity, normCapacity)這個方法:
private synchronized void allocateNormal(PooledByteBuf<T> buf, int reqCapacity, int normCapacity) { //首先在原來的chunk上進行內存分配(1)
if (q050.allocate(buf, reqCapacity, normCapacity) || q025.allocate(buf, reqCapacity, normCapacity) || q000.allocate(buf, reqCapacity, normCapacity) || qInit.allocate(buf, reqCapacity, normCapacity) || q075.allocate(buf, reqCapacity, normCapacity)) { ++allocationsNormal; return; } //建立chunk進行內存分配(2)
PoolChunk<T> c = newChunk(pageSize, maxOrder, pageShifts, chunkSize); long handle = c.allocate(normCapacity); ++allocationsNormal; assert handle > 0; //初始化byteBuf(3)
c.initBuf(buf, handle, reqCapacity); qInit.add(c); }
這裏的邏輯咱們以前的小節已經剖析過, 首先在原來的chunk中分配, 若是分配不成功, 則會建立chunk進行分配
咱們看這一步 long handle = c.allocate(normCapacity)
跟到allocate(normCapacity)方法中:
long allocate(int normCapacity) { if ((normCapacity & subpageOverflowMask) != 0) { return allocateRun(normCapacity); } else { return allocateSubpage(normCapacity); } }
上一小節咱們分析page級別分配的時候, 剖析的是allocateRun(normCapacity)方法
由於這裏咱們是以16字節舉例, 因此此次咱們剖析allocateSubpage(normCapacity)方法, 也就是在subpage級別進行內存分配
private long allocateSubpage(int normCapacity) { PoolSubpage<T> head = arena.findSubpagePoolHead(normCapacity); synchronized (head) { int d = maxOrder; //表示在第11層分配節點
int id = allocateNode(d); if (id < 0) { return id; } //獲取初始化的subpage
final PoolSubpage<T>[] subpages = this.subpages; final int pageSize = this.pageSize; freeBytes -= pageSize; //表示第幾個subpageIdx
int subpageIdx = subpageIdx(id); PoolSubpage<T> subpage = subpages[subpageIdx]; if (subpage == null) { //若是subpage爲空
subpage = new PoolSubpage<T>(head, this, id, runOffset(id), pageSize, normCapacity); //則將當前的下標賦值爲subpage
subpages[subpageIdx] = subpage; } else { subpage.init(head, normCapacity); } //取出一個子page
return subpage.allocate(); } }
首先, 經過 PoolSubpage<T> head = arena.findSubpagePoolHead(normCapacity) 這種方式找到head節點, 實際上這裏head, 就是咱們剛纔分析的tinySubpagePools屬性的第一個節點, 也就是對應16B的那個節點
int d = maxOrder 是將11賦值給d, 也就是在內存樹的第11層取節點, 這部分上一小節剖析過了, 能夠回顧圖5-8-5部分
int id = allocateNode(d) 這裏獲取的是上一小節咱們分析過的, 字節數組memoryMap的下標, 這裏指向一個page, 若是第一次分配, 指向的是0-8k的那個page, 上一小節對此進行詳細的剖析這裏再也不贅述
final PoolSubpage<T>[] subpages = this.subpages 這一步, 是拿到PoolChunk中成員變量subpages的值, 也是個PoolSubpage的數組, 在PoolChunk進行初始化的時候, 也會初始化該數組, 長度爲2048
也就是說每一個chunk都維護着一個subpage的列表, 若是每個page級別的內存都須要被切分紅子page, 則會將這個這個page放入該列表中, 專門用於分配子page, 因此這個列表中的subpage, 其實就是一個用於切分的page
5-8-2
int subpageIdx = subpageIdx(id) 這一步是經過id拿到這個PoolSubpage數組的下標, 若是id對應的page是0-8k的節點, 這裏拿到的下標就是0
在 if (subpage == null) 中, 由於默認subpages只是建立一個數組, 並無往數組中賦值, 因此第一次走到這裏會返回true, 跟到if塊中:
subpage = new PoolSubpage<T>(head, this, id, runOffset(id), pageSize, normCapacity);
這裏經過new PoolSubpage建立一個新的subpage以後, 經過 subpages[subpageIdx] = subpage 這種方式將新建立的subpage根據下標賦值到subpages中的元素中
在new PoolSubpage的構造方法中, 傳入head, 就是咱們剛纔提到過的tinySubpagePools屬性中的節點, 若是咱們分配的16字節的緩衝區, 則這裏對應的就是第一個節點
咱們跟到PoolSubpage的構造方法中:
PoolSubpage(PoolSubpage<T> head, PoolChunk<T> chunk, int memoryMapIdx, int runOffset, int pageSize, int elemSize) { this.chunk = chunk; this.memoryMapIdx = memoryMapIdx; this.runOffset = runOffset; this.pageSize = pageSize; bitmap = new long[pageSize >>> 10]; init(head, elemSize); }
這裏重點關注屬性bitmap, 這是一個long類型的數組, 初始大小爲8, 這裏只是初始化的大小, 真正的大小要根據將page切分多少塊而肯定
這裏將屬性進行了賦值, 咱們跟到init方法中:
void init(PoolSubpage<T> head, int elemSize) { doNotDestroy = true; this.elemSize = elemSize; if (elemSize != 0) { maxNumElems = numAvail = pageSize / elemSize; nextAvail = 0; bitmapLength = maxNumElems >>> 6; if ((maxNumElems & 63) != 0) { bitmapLength ++; } for (int i = 0; i < bitmapLength; i ++) { //bitmap標識哪一個子page被分配 //0標識未分配, 1表示已分配
bitmap [i] = 0; } } //加到arena裏面
addToPool(head); }
this.elemSize = elemSize 表示保存當前分配的緩衝區大小, 這裏咱們以16字節舉例, 因此這裏是16
maxNumElems = numAvail = pageSize / elemSize 這裏初始化了兩個屬性maxNumElems, numAvail, 值都爲pageSize / elemSize, 表示一個page大小除以分配的緩衝區大小, 也就是表示當前page被劃分了多少分
numAvail則表示剩餘可用的塊數, 因爲第一次分配都是可用的, 因此 numAvail=maxNumElems
bitmapLength表示bitmap的實際大小, 剛纔咱們分析過, bitmap初始化的大小爲8, 但實際上並不必定須要8個元素, 元素個數要根據page切分的子塊而定, 這裏的大小是所切分的子塊數除以64
再往下看, if ((maxNumElems & 63) != 0) 判斷maxNumElems也就是當前配置所切分的子塊是否是64的倍數, 若是不是, 則bitmapLength加1,
最後經過循環, 將其分配的大小中的元素賦值爲0
這裏詳細介紹一下有關bitmap, 這裏是個long類型的數組, long數組中的每個值, 也就是long類型的數字, 其中的每個比特位, 都標記着page中每個子塊的內存是否已分配, 若是比特位是1, 表示該子塊已分配, 若是比特位是0, 表示該子塊未分配, 標記順序是其二進制數從低位到高位進行排列
這裏, 咱們應該知道爲何bitmap大小要設置爲子塊數量除以, 64, 由於long類型的數字是64位, 每個元素能記錄64個子塊的數量, 這樣就能夠經過子page個數除以64的方式決定bitmap中元素的數量
若是子塊不能整除64, 則經過元素數量+1方式, 除以64以後剩餘的子塊經過long中比特位由低到高進行排列記錄
這裏的邏輯結構以下所示:
5-8-3
咱們跟到addToPool(head)中:
private void addToPool(PoolSubpage<T> head) { assert prev == null && next == null; prev = head; next = head.next; next.prev = this; head.next = this; }
這裏的head咱們剛纔講過, 是Arena中數組tinySubpagePools中的元素, 經過以上邏輯, 就會將新建立的Subpage經過雙向鏈表的方式關聯到tinySubpagePools中的元素, 咱們以16字節爲例, 關聯關係如圖所示:
5-8-4
這樣, 下次若是還須要分配16字節的內存, 就能夠經過tinySubpagePools找到其元素關聯的subpage進行分配了
咱們再回到PoolChunk的allocateSubpage方法中:
private long allocateSubpage(int normCapacity) { PoolSubpage<T> head = arena.findSubpagePoolHead(normCapacity); synchronized (head) { int d = maxOrder; //表示在第11層分配節點
int id = allocateNode(d); if (id < 0) { return id; } //獲取初始化的subpage
final PoolSubpage<T>[] subpages = this.subpages; final int pageSize = this.pageSize; freeBytes -= pageSize; //表示第幾個subpageIdx
int subpageIdx = subpageIdx(id); PoolSubpage<T> subpage = subpages[subpageIdx]; if (subpage == null) { //若是subpage爲空
subpage = new PoolSubpage<T>(head, this, id, runOffset(id), pageSize, normCapacity); //則將當前的下標賦值爲subpage
subpages[subpageIdx] = subpage; } else { subpage.init(head, normCapacity); } //取出一個子page
return subpage.allocate(); } }
建立完了一個subpage, 咱們就能夠經過subpage.allocate()方法進行內存分配了
咱們跟到allocate()方法中:
long allocate() { if (elemSize == 0) { return toHandle(0); } if (numAvail == 0 || !doNotDestroy) { return -1; } //取一個bitmap中可用的id(絕對id)
final int bitmapIdx = getNextAvail(); //除以64(bitmap的相對下標)
int q = bitmapIdx >>> 6; //除以64取餘, 其實就是當前絕對id的偏移量
int r = bitmapIdx & 63; assert (bitmap[q] >>> r & 1) == 0; //當前位標記爲1
bitmap[q] |= 1L << r; //若是可用的子page爲0 //可用的子page-1
if (-- numAvail == 0) { //則移除相關子page
removeFromPool(); } //bitmapIdx轉換成handler
return toHandle(bitmapIdx); }
這裏的邏輯看起來比較複雜, 這裏帶着你們一點點剖析:
首先看:
final int bitmapIdx = getNextAvail();
其中bitmapIdx表示從bitmap中找到一個可用的bit位的下標, 注意, 這裏是bit的下標, 並非數組的下標, 咱們以前分析過, 由於每一比特位表明一個子塊的內存分配狀況, 經過這個下標就能夠知道那個比特位是未分配狀態
咱們跟進這個方法:
private int getNextAvail() { //nextAvail=0
int nextAvail = this.nextAvail; if (nextAvail >= 0) { //一個子page被釋放以後, 會記錄當前子page的bitmapIdx的位置, 下次分配能夠直接經過bitmapIdx拿到一個子page
this.nextAvail = -1; return nextAvail; } return findNextAvail(); }
這裏nextAvail, 表示下一個可用的bitmapIdx, 在釋放的時候的會被標記, 標記被釋放的子塊對應bitmapIdx的下標, 若是<0則表明沒有被釋放的子塊, 則經過findNextAvail方法進行查找
咱們繼續跟進findNextAvail方法:
private int findNextAvail() { //當前long數組
final long[] bitmap = this.bitmap; //獲取其長度
final int bitmapLength = this.bitmapLength; for (int i = 0; i < bitmapLength; i ++) { //第i個
long bits = bitmap[i]; //!=-1 說明64位沒有所有佔滿
if (~bits != 0) { //找下一個節點
return findNextAvail0(i, bits); } } return -1; }
這裏會遍歷bitmap中的每個元素, 若是當前元素中全部的比特位並無所有標記被使用, 則經過findNextAvail0(i, bits)方法挨個日後找標記未使用的比特位
再繼續跟findNextAvail0:
private int findNextAvail0(int i, long bits) { //多少份
final int maxNumElems = this.maxNumElems; //乘以64, 表明當前long的第一個下標
final int baseVal = i << 6; //循環64次(指代當前的下標)
for (int j = 0; j < 64; j ++) { //第一位爲0(若是是2的倍數, 則第一位就是0)
if ((bits & 1) == 0) { //這裏至關於加, 將i*64以後加上j, 獲取絕對下標
int val = baseVal | j; //小於塊數(不能越界)
if (val < maxNumElems) { return val; } else { break; } } //當前下標不爲0 //右移一位
bits >>>= 1; } return -1; }
這裏從當前元素的第一個比特位開始找, 直到找到一個標記爲0的比特位, 並返回當前比特位的下標, 大概流程以下圖所示:
5-8-5
咱們回到allocate()方法中:
long allocate() { if (elemSize == 0) { return toHandle(0); } if (numAvail == 0 || !doNotDestroy) { return -1; } //取一個bitmap中可用的id(絕對id)
final int bitmapIdx = getNextAvail(); //除以64(bitmap的相對下標)
int q = bitmapIdx >>> 6; //除以64取餘, 其實就是當前絕對id的偏移量
int r = bitmapIdx & 63; assert (bitmap[q] >>> r & 1) == 0; //當前位標記爲1
bitmap[q] |= 1L << r; //若是可用的子page爲0 //可用的子page-1
if (-- numAvail == 0) { //則移除相關子page
removeFromPool(); } //bitmapIdx轉換成handler
return toHandle(bitmapIdx); }
找到可用的bitmapIdx以後, 經過 int q = bitmapIdx >>> 6 獲取bitmap中bitmapIdx所屬元素的數組下標
int r = bitmapIdx & 63 表示獲取bitmapIdx的位置是從當前元素最低位開始的第幾個比特位
bitmap[q] |= 1L << r 是將bitmap的位置設置爲不可用, 也就是比特位設置爲1, 表示已佔用
而後將可用子配置的數量numAvail減一
若是沒有可用子page的數量, 則會將PoolArena中的數組tinySubpagePools所關聯的subpage進行移除, 移除以後參考圖5-8-1
最後經過toHandle(bitmapIdx)獲取當前子塊的handle, 上一小節咱們知道handle指向的是當前chunk中的惟一的一塊內存, 咱們跟進toHandle(bitmapIdx)中:
private long toHandle(int bitmapIdx) { return 0x4000000000000000L | (long) bitmapIdx << 32 | memoryMapIdx; }
(long) bitmapIdx << 32 是將bitmapIdx右移32位, 而32位正好是一個int的長度, 這樣, 經過 (long) bitmapIdx << 32 | memoryMapIdx 計算, 就能夠將memoryMapIdx, 也就是page所屬的下標的二進制數保存在 (long) bitmapIdx << 32 的低32位中
0x4000000000000000L是一個最高位是1而且全部低位都是0的二進制數, 這樣經過按位或的方式能夠將 (long) bitmapIdx << 32 | memoryMapIdx 計算出來的結果保存在0x4000000000000000L的全部低位中, 這樣, 返回對的數字就能夠指向chunk中惟一的一塊內存
咱們回到PoolArena的allocateNormal方法中:
private synchronized void allocateNormal(PooledByteBuf<T> buf, int reqCapacity, int normCapacity) { //首先在原來的chunk上進行內存分配(1)
if (q050.allocate(buf, reqCapacity, normCapacity) || q025.allocate(buf, reqCapacity, normCapacity) || q000.allocate(buf, reqCapacity, normCapacity) || qInit.allocate(buf, reqCapacity, normCapacity) || q075.allocate(buf, reqCapacity, normCapacity)) { ++allocationsNormal; return; } //建立chunk進行內存分配(2)
PoolChunk<T> c = newChunk(pageSize, maxOrder, pageShifts, chunkSize); long handle = c.allocate(normCapacity); ++allocationsNormal; assert handle > 0; //初始化byteBuf(3)
c.initBuf(buf, handle, reqCapacity); qInit.add(c); }
咱們分析完了long handle = c.allocate(normCapacity)這步, 這裏返回的handle就指向chunk中的某個page中的某個子塊所對應的連續內存
最後, 經過iniBuf初始化以後, 將建立的chunk加到ChunkList裏面
咱們跟到initBuf方法中:
void initBuf(PooledByteBuf<T> buf, long handle, int reqCapacity) { int memoryMapIdx = memoryMapIdx(handle); //bitmapIdx是後面分配subpage時候使用到的
int bitmapIdx = bitmapIdx(handle); if (bitmapIdx == 0) { byte val = value(memoryMapIdx); assert val == unusable : String.valueOf(val); //runOffset(memoryMapIdx):偏移量 //runLength(memoryMapIdx):當前節點的長度
buf.init(this, handle, runOffset(memoryMapIdx), reqCapacity, runLength(memoryMapIdx), arena.parent.threadCache()); } else { initBufWithSubpage(buf, handle, bitmapIdx, reqCapacity); } }
這部分在以前的小節咱們剖析過, 相信你們不會陌生, 這裏有區別的是 if (bitmapIdx == 0) 的判斷, 這裏的bitmapIdx不會是0, 這樣, 就會走到initBufWithSubpage(buf, handle, bitmapIdx, reqCapacity)方法中
跟到initBufWithSubpage方法:
private void initBufWithSubpage(PooledByteBuf<T> buf, long handle, int bitmapIdx, int reqCapacity) { assert bitmapIdx != 0; int memoryMapIdx = memoryMapIdx(handle); PoolSubpage<T> subpage = subpages[subpageIdx(memoryMapIdx)]; assert subpage.doNotDestroy; assert reqCapacity <= subpage.elemSize; buf.init( this, handle, runOffset(memoryMapIdx) + (bitmapIdx & 0x3FFFFFFF) * subpage.elemSize, reqCapacity, subpage.elemSize, arena.parent.threadCache()); }
首先拿到memoryMapIdx, 這裏會將咱們以前計算handle傳入, 跟進去:
private static int memoryMapIdx(long handle) { return (int) handle; }
這裏將其強制轉化爲int類型, 也就是去掉高32位, 這樣就獲得memoryMapIdx
回到initBufWithSubpage方法中:
咱們注意在buf調用init方法中的一個參數: runOffset(memoryMapIdx) + (bitmapIdx & 0x3FFFFFFF) * subpage.elemSize
這裏的偏移量就是, 原來page的偏移量+子塊的偏移量
bitmapIdx & 0x3FFFFFFF 表明當前分配的子page是屬於第幾個子page
(bitmapIdx & 0x3FFFFFFF) * subpage.elemSize 表示在當前page的偏移量
這樣, 分配的ByteBuf在內存讀寫的時候, 就會根據偏移量進行讀寫
最後咱們跟到init方法中
void init(PoolChunk<T> chunk, long handle, int offset, int length, int maxLength, PoolThreadCache cache) { //初始化
assert handle >= 0; assert chunk != null; //在哪一塊內存上進行分配的
this.chunk = chunk; //這一塊內存上的哪一塊連續內存
this.handle = handle; memory = chunk.memory; //偏移量
this.offset = offset; this.length = length; this.maxLength = maxLength; tmpNioBuf = null; this.cache = cache; }
這裏又是咱們熟悉的邏輯, 初始化了屬性以後, 一個緩衝區分配完成
以上就是Subpage級別的緩衝區分配邏輯