Netty源碼分析第8章(高性能工具類FastThreadLocal和Recycler)---->第7節: 獲取異線程釋放的對象

 

Netty源碼分析第八章: 高性能工具類FastThreadLocal和Recyclerhtml

 

第七節: 獲取異線程釋放的對象數組

 

上一小節分析了異線程回收對象, 原理是經過與stack關聯的WeakOrderQueue進行回收性能優化

若是對象通過異線程回收以後, 當前線程須要取出對象進行二次利用, 若是當前stack中爲空, 則會經過當前stack關聯的WeakOrderQueue進行取出, 這也是這一小寫要分析的, 獲取異線程釋放的對象數據結構

在介紹以前咱們首先看Stack類中的兩個屬性:工具

private WeakOrderQueue cursor, prev; private volatile WeakOrderQueue head;

這裏都是指向WeakOrderQueue的指針, 其中head咱們上一小節分析過, 指向最近建立的和stack關聯WeakOrderQueue, 也就是頭結點源碼分析

cursor表明的是尋找的當前WeakOrderQueue, pre則是cursor上一個節點, 如圖所示:性能

8-7-1優化

咱們從獲取對象的入口方法, handle的get開始分析:this

public final T get() { if (maxCapacityPerThread == 0) { return newObject((Handle<T>) NOOP_HANDLE); } Stack<T> stack = threadLocal.get(); DefaultHandle<T> handle = stack.pop(); if (handle == null) { handle = stack.newHandle(); handle.value = newObject(handle); } return (T) handle.value; }

這塊邏輯咱們並不陌上, stack對象經過pop彈出一個handlespa

咱們跟到pop方法中:

DefaultHandle<T> pop() { int size = this.size; if (size == 0) { if (!scavenge()) { return null; } size = this.size; } size --; DefaultHandle ret = elements[size]; elements[size] = null; if (ret.lastRecycledId != ret.recycleId) { throw new IllegalStateException("recycled multiple times"); } ret.recycleId = 0; ret.lastRecycledId = 0; this.size = size; return ret; }

這裏咱們重點關注, 若是size爲空, 也就是當前tack爲空的狀況下, 會走到scavenge方法, 這個方法, 就是從WeakOrderQueue獲取對象的方法

跟進scavenge方法:

boolean scavenge() { if (scavengeSome()) { return true; } prev = null; cursor = head; return false; }

scavengeSome方法表示已經回收到了對象, 則直接返回, 若是沒有回收到對象, 則將prev和cursor兩個指針進行重置

繼續跟到scavengeSome方法中:

boolean scavengeSome() { WeakOrderQueue cursor = this.cursor; if (cursor == null) { cursor = head; if (cursor == null) { return false; } } boolean success = false; WeakOrderQueue prev = this.prev; do { if (cursor.transfer(this)) { success = true; break; } WeakOrderQueue next = cursor.next; if (cursor.owner.get() == null) { if (cursor.hasFinalData()) { for (;;) { if (cursor.transfer(this)) { success = true; } else { break; } } } if (prev != null) { prev.next = next; } } else { prev = cursor; } cursor = next; } while (cursor != null && !success); this.prev = prev; this.cursor = cursor; return success; }

首先拿到cursor指針, cursor指針表明要回收的WeakOrderQueue

若是cursor爲空, 則讓其指向頭節點, 若是頭節點也空, 說明當前stack沒有與其關聯的WeakOrderQueue, 則返回false

經過一個布爾值success標記回收狀態

而後拿到pre指針, 也就是cursor的上一個節點, 以後進入一個do-while循環

do-while循環的終止條件是, 若是沒有遍歷到最後一個節點而且回收的狀態爲false, 這裏咱們能夠分析到再循環體裏, 是無論遍歷與stack關聯的WeakOrderQueue, 直到彈出對象爲止

跟到do-while循環中:

首先cursor指針會調用transfer方法, 該方法表示從當前指針指向的WeakOrderQueue中將元素放入到當前stack中, 若是取出成功則將success設置爲true並跳出循環, transfer咱們稍後分析, 咱們繼續往下看

若是沒有得到元素, 則會經過next屬性拿到下一個WeakOrderQueue, 而後會進入一個判斷 if (cursor.owner.get() == null) 

owner屬性咱們上一小節提到過, 就是與當前WeakOrderQueue關聯的一個線程, get方法就是得到關聯的線程對象, 若是這個對象爲null說明該線程不存在, 則進入if塊, 也就是一些清理的工做

if塊中又進入一個判斷 if (cursor.hasFinalData()) , 這裏表示當前的WeakOrderQueue中是否還有數據, 若是有數據則經過for循環將數據經過transfer方法傳輸到當前stack中, 傳輸成功的, 將success標記爲true

transfer方法是將WeakOrderQueue中一個link中的handle往stack進行傳輸, 有關link的相關內容, 咱們上一小節也進行過度析

因此這裏經過for循環將每一個link的中的數據傳輸到stack中

繼續往下看, 若是pre節點不爲空, 則經過 prev.next = next 將cursor節點進行釋放, 也就是pre的下一個節點指向cursor的下一個節點

繼續往下看else塊中的 prev = cursor 

這裏表示若是當前線程還在, 則將prev賦值爲cursor, 表明prev後移一個節點

最後經過cursor = next將cursor後移一位, 而後再繼續進行循環

循環結束以後, 將stack的prev和cursor屬性進行保存

 

咱們跟到transfer方法中, 分析如何將WeakOrderQueue中的handle傳輸到stack中:

boolean transfer(Stack<?> dst) { Link head = this.head; if (head == null) { return false; } if (head.readIndex == LINK_CAPACITY) { if (head.next == null) { return false; } this.head = head = head.next; } final int srcStart = head.readIndex; int srcEnd = head.get(); final int srcSize = srcEnd - srcStart; if (srcSize == 0) { return false; } final int dstSize = dst.size; final int expectedCapacity = dstSize + srcSize; if (expectedCapacity > dst.elements.length) { final int actualCapacity = dst.increaseCapacity(expectedCapacity); srcEnd = min(srcStart + actualCapacity - dstSize, srcEnd); } if (srcStart != srcEnd) { final DefaultHandle[] srcElems = head.elements; final DefaultHandle[] dstElems = dst.elements; int newDstSize = dstSize; for (int i = srcStart; i < srcEnd; i++) { DefaultHandle element = srcElems[i]; if (element.recycleId == 0) { element.recycleId = element.lastRecycledId; } else if (element.recycleId != element.lastRecycledId) { throw new IllegalStateException("recycled already"); } srcElems[i] = null; if (dst.dropHandle(element)) { continue; } element.stack = dst; dstElems[newDstSize ++] = element; } if (srcEnd == LINK_CAPACITY && head.next != null) { reclaimSpace(LINK_CAPACITY); this.head = head.next; } head.readIndex = srcEnd; if (dst.size == newDstSize) { return false; } dst.size = newDstSize; return true; } else { return false; } }

剖析以前這裏咱們回顧WeakOrderQueue的數據結構, 如圖所示:

8-7-2

咱們上一小節分析過, WeakOrderQueue是由多個link組成, 每一個link經過鏈表的方式進行關聯, 其中head屬性指向第一個link, tail屬性指向最後一個link

在每一個link中有多個handle

在link中維護了一個讀指針readIndex, 標記着讀取link中handle的位置

 

咱們繼續分析transfer方法:

首先獲取頭結點, 並判斷頭結點是否爲空, 若是頭結點爲空, 說明當前WeakOrderQueue並無link, 返回false

 if (head.readIndex == LINK_CAPACITY) 這裏判斷讀指針是否爲16, 由於link中元素最大數量就是16, 若是讀指針爲16, 說明當前link中的數據都被取走了

接着判斷 head.next == null , 表示是否還有下一個link, 若是沒有下一個link, 則說明當前WeakOrderQueue沒有元素了, 則返回false

若是當前head的next節點不爲null, 則將當前head節點指向下一個節點, 將原來的head節點進行釋放, 移動關係如圖所示:

8-7-3

繼續往下看, 拿到head節點的讀指針和head中元素的數量, 接着計算能夠傳輸元素的大小, 若是大小爲0, 則返回false

8-7-4

接着, 拿到當前stack的大小, 當前stack大小加上能夠傳輸的大小表示stack中所須要的容量

 if (expectedCapacity > dst.elements.length) 表示若是須要的容量大於當前stack中所維護的數組的大小, 則將stack中維護的數組進行擴容, 進入if塊中

擴容以後會返回actualCapacity, 表示擴容以後的大小

再看 srcEnd = min(srcStart + actualCapacity - dstSize, srcEnd) 這步

srcEnd表示能夠從Link中取的最後一個元素的下標

 srcStart + actualCapacity - dstSize 這裏咱們進行一個拆分, actualCapacity - dstSize表示擴容後大大小-原stack的大小, 也就是最多能往stack中傳輸多少元素

讀指針+能夠往stack傳輸的數量, 能夠表示往stack中傳輸的最後一個下標, 這裏的下標和srcEnd中取一個較小的值, 也就是既不能超過stack的容量, 也不能形成當前link中下標越界

 

繼續往下看

 int newDstSize = dstSize 表示初始化stack的下標, 表示stack中從這個下標開始添加數據

而後判斷 srcStart != srcEnd , 表示能不能同link中獲取內容, 若是不能, 則返回false, 若是能夠, 則進入if塊中

接着拿到當前link的數組elements和stack中的數組elements

而後經過for循環, 經過數組下標的方式不斷的將當前link中的數據放入到stack中

for循環中首先拿到link的第i個元素

接着咱們咱們關注一個細節:

if (element.recycleId == 0) { element.recycleId = element.lastRecycledId; } else if (element.recycleId != element.lastRecycledId) { throw new IllegalStateException("recycled already"); }

這裏 element.recycleId == 0 表示對象沒有被回收過, 若是沒有被回收過, 則賦值爲lastRecycledId, 咱們前面分析過lastRecycledId是WeakOrderQueue中的惟一下標, 經過賦值標記element被回收過

而後繼續判斷 element.recycleId != element.lastRecycledId , 這表示該對象被回收過, 可是回收的recycleId卻不是最後一次回收lastRecycledId, 這是一種異常狀況, 表示一個對象在不一樣的地方被回收過兩次, 這種狀況則拋出異常

接着將link的第i個元素設置爲null

繼續往下看:

if (dst.dropHandle(element)) { continue; }

這裏表示控制回收站回收的頻率, 以前的小節咱們分析過, 這裏再也不贅述

 element.stack = dst 表示將handle的stack屬性設置到當前stack

 dstElems[newDstSize ++] = element 這裏經過數組的下標的方式將link中的handle賦值到stack的數組中

 

繼續往下看:

if (srcEnd == LINK_CAPACITY && head.next != null) { reclaimSpace(LINK_CAPACITY); this.head = head.next; }

這裏的if表循環結束後, 若是link中的數據已經回收完畢, 而且還有下一個節點則會進到reclaimSpace方法

咱們跟到reclaimSpace方法:

private void reclaimSpace(int space) { assert space >= 0; availableSharedCapacity.addAndGet(space); }

這裏將availableSharedCapacity加上16, 表示WeakOrderQueue還能夠繼續插入link

繼續看transfer方法:

 this.head = head.next 表示將head節點後移一個元素

 head.readIndex = srcEnd 表示將讀指針指向srcEnd, 下一次讀取能夠從srcEnd開始

 if (dst.size == newDstSize) 表示沒有向stack傳輸任何對象, 則返回false

不然就經過 dst.size = newDstSize 更新stack的大小爲newDstSize, 並返回true

以上就是從link中往stack中傳輸數據的過程

 

第八章總結

        這一章主要講解了兩個性能優化工具了FastThreadLocal和Recycler

        FastThreadLocal和jdk的ThreadLocal功能相似, 只是性能更快, 經過FastTreadLocalThread中的threadLocalMap對象, 經過數組下標的方式進行保存和獲取對象

        Recycler是一個輕量級的對象回收站, 用於對象重用, 避免了對象的頻繁建立和減輕gc的壓力

        Recycler同線程回收對象是經過一個線程共享的stack實現的, 將對象包裝成handle並存入stack中

        Reclyer異線程回收對象是將handle存入一個與stack關聯的WeakOrderQueue中, 同一個stack中關聯的不一樣WeakOrderQueue由不一樣的線程建立

        從Recycler獲取對象時stack中有值, 則能夠直接從stack中獲取

        若是stack中沒有值則經過stack關聯的WeakOrderQueue中獲取

 

上一節: 異線程回收對象

相關文章
相關標籤/搜索