我在看SOFAJRaft的源碼的時候看到了使用了對象池的技術,看了一下感受要吃透的話仍是要新開一篇文章來說,內容也比較充實,你們也能夠學到以後運用到實際的項目中去。java
這裏我使用RecyclableByteBufferList來做爲講解的例子:數組
RecyclableByteBufferList數據結構
public final class RecyclableByteBufferList extends ArrayList<ByteBuffer> implements Recyclable { private transient final Recyclers.Handle handle; private static final Recyclers<RecyclableByteBufferList> recyclers = new Recyclers<RecyclableByteBufferList>(512) { @Override protected RecyclableByteBufferList newObject(final Handle handle) { return new RecyclableByteBufferList( handle); } }; //獲取一個RecyclableByteBufferList實例 public static RecyclableByteBufferList newInstance(final int minCapacity) { final RecyclableByteBufferList ret = recyclers.get(); //容量不夠的話,進行擴容 ret.ensureCapacity(minCapacity); return ret; } //回收RecyclableByteBufferList對象 @Override public boolean recycle() { clear(); this.capacity = 0; return recyclers.recycle(this, handle); } }
我在上面將RecyclableByteBufferList獲取對象的方法和回收對象的方法給列舉出來了,獲取實例的時候會經過recyclers的get方法去獲取,回收對象的時候會去調用list的clear方法清空list裏面的內容以後再去調用recyclers的recycle方法進行回收。
若是recyclers裏面沒有對象能夠獲取,那麼會調用newObject方法建立一個對象,而後將handle對象傳入構造器中進行實例化。多線程
ThreadLocal<Stack<T>> threadLocal
實例;ThreadLocal<Map<Stack<?>, WeakOrderQueue>> delayedRecycled
實例;假設線程A建立的對象app
Recyclers靜態代碼塊ide
private static final int DEFAULT_INITIAL_MAX_CAPACITY_PER_THREAD = 4 * 1024; // Use 4k instances as default. private static final int DEFAULT_MAX_CAPACITY_PER_THREAD; private static final int INITIAL_CAPACITY; static { // 每一個線程的最大對象池容量 int maxCapacityPerThread = SystemPropertyUtil.getInt("jraft.recyclers.maxCapacityPerThread", DEFAULT_INITIAL_MAX_CAPACITY_PER_THREAD); if (maxCapacityPerThread < 0) { maxCapacityPerThread = DEFAULT_INITIAL_MAX_CAPACITY_PER_THREAD; } DEFAULT_MAX_CAPACITY_PER_THREAD = maxCapacityPerThread; if (LOG.isDebugEnabled()) { if (DEFAULT_MAX_CAPACITY_PER_THREAD == 0) { LOG.debug("-Djraft.recyclers.maxCapacityPerThread: disabled"); } else { LOG.debug("-Djraft.recyclers.maxCapacityPerThread: {}", DEFAULT_MAX_CAPACITY_PER_THREAD); } } // 設置初始化容量信息 INITIAL_CAPACITY = Math.min(DEFAULT_MAX_CAPACITY_PER_THREAD, 256); } public static final Handle NOOP_HANDLE = new Handle() {};
Recyclers會在靜態代碼塊中作一些對象池容量初始化的工做,初始化了最大對象池容量和初始化容量信息。this
Recyclers#get線程
// 線程變量,保存每一個線程的對象池信息,經過 ThreadLocal 的使用,避免了不一樣線程之間的競爭狀況 private final ThreadLocal<Stack<T>> threadLocal = new ThreadLocal<Stack<T>>() { @Override protected Stack<T> initialValue() { return new Stack<>(Recyclers.this, Thread.currentThread(), maxCapacityPerThread); } }; public final T get() { if (maxCapacityPerThread == 0) { return newObject(NOOP_HANDLE); } //從threadLocal中獲取一個棧對象 Stack<T> stack = threadLocal.get(); //拿出棧頂元素 DefaultHandle handle = stack.pop(); //若是棧裏面沒有元素,那麼就實例化一個 if (handle == null) { handle = stack.newHandle(); handle.value = newObject(handle); } return (T) handle.value; }
Get方法會從threadLocal中去獲取數據,若是獲取不到,那麼會初始化一個Stack,並傳入當前Recyclers實例,當前線程,與最大容量。而後從stack中pop拿出棧頂元素,若是獲取的元素爲空,那麼直接調用newHandle新建一個DefaultHandle實例,並調用Recyclers實現類的newObject獲取實現類的實例。也就是說DefaultHandle是用來封裝真正的對象的實例。debug
從stack中申請一個對象指針
Stack(Recyclers<T> parent, Thread thread, int maxCapacity) { this.parent = parent; this.thread = thread; this.maxCapacity = maxCapacity; elements = new DefaultHandle[Math.min(INITIAL_CAPACITY, maxCapacity)]; } DefaultHandle pop() { int size = this.size; if (size == 0) { if (!scavenge()) { return null; } size = this.size; } //size表示整個stack中的大小 size--; //獲取最後一個元素 DefaultHandle ret = elements[size]; if (ret.lastRecycledId != ret.recycleId) { throw new IllegalStateException("recycled multiple times"); } // 清空回收信息,以便判斷是否重複回收 ret.recycleId = 0; ret.lastRecycledId = 0; this.size = size; return ret; }
獲取對象的邏輯也比較簡單,當 Stack 中的 DefaultHandle[] 的 size 爲 0 時,須要從其餘線程的 WeakOrderQueue 中轉移數據到 Stack 中的 DefaultHandle[],即 scavenge方法,該方法下面再聊。當 Stack 中的 DefaultHandle[] 中最終有了數據時,直接獲取最後一個元素
咱們再來看看RecyclableByteBufferList是怎麼回收對象的。
RecyclableByteBufferList#recycle
public boolean recycle() { clear(); this.capacity = 0; return recyclers.recycle(this, handle); }
RecyclableByteBufferList回收對象的時候首先會調用clear方法清空屬性,而後調用recyclers的recycle方法進行對象回收。
Recyclers#recycle
public final boolean recycle(T o, Handle handle) { if (handle == NOOP_HANDLE) { return false; } DefaultHandle h = (DefaultHandle) handle; //stack在實例化的時候會在構造器中傳入一個Recyclers做爲parent //因此這裏是校驗一下,若是不是當前線程的, 直接不回收了 if (h.stack.parent != this) { return false; } if (o != h.value) { throw new IllegalArgumentException("o does not belong to handle"); } h.recycle(); return true; }
這裏會接着調用DefaultHandle的recycle方法進行回收
DefaultHandle
static final class DefaultHandle implements Handle { //在WeakOrderQueue的add方法中會設置成ID //在push方法中設置成爲OWN_THREAD_ID //在pop方法中設置爲0 private int lastRecycledId; //只有在push方法中才會設置OWN_THREAD_ID //在pop方法中設置爲0 private int recycleId; //當前的DefaultHandle對象所屬的Stack private Stack<?> stack; private Object value; DefaultHandle(Stack<?> stack) { this.stack = stack; } public void recycle() { Thread thread = Thread.currentThread(); //若是當前線程正好等於stack所對應的線程,那麼直接push進去 if (thread == stack.thread) { stack.push(this); return; } // we don't want to have a ref to the queue as the value in our weak map // so we null it out; to ensure there are no races with restoring it later // we impose a memory ordering here (no-op on x86) // 若是不是當前線程,則須要延遲迴收,獲取當前線程存儲的延遲迴收WeakHashMap Map<Stack<?>, WeakOrderQueue> delayedRecycled = Recyclers.delayedRecycled.get(); // 當前 handler 所在的 stack 是否已經在延遲迴收的任務隊列中 // 而且 WeakOrderQueue是一個多線程間能夠共享的Queue WeakOrderQueue queue = delayedRecycled.get(stack); if (queue == null) { delayedRecycled.put(stack, queue = new WeakOrderQueue(stack, thread)); } queue.add(this); } }
DefaultHandle在實例化的時候會傳入一個stack實例,表明當前實例是屬於這個stack的。
因此在調用recycle方法的時候,會判斷一下,當前的線程是否是stack所屬的線程,若是是那麼直接push到stack裏面就行了,不是則調用延遲隊列delayedRecycled;
從delayedRecycled隊列中獲取Map<Stack<?>, WeakOrderQueue> delayedRecycled ,根據stack做爲key來獲取WeakOrderQueue,而後將當前的DefaultHandle實例放入到WeakOrderQueue中。
Stack#push
void push(DefaultHandle item) { // (item.recycleId | item.lastRecycleId) != 0 等價於 item.recycleId!=0 && item.lastRecycleId!=0 // 當item開始建立時item.recycleId==0 && item.lastRecycleId==0 // 當item被recycle時,item.recycleId==x,item.lastRecycleId==y 進行賦值 // 當item被pop以後, item.recycleId = item.lastRecycleId = 0 // 因此當item.recycleId 和 item.lastRecycleId 任何一個不爲0,則表示回收過 if ((item.recycleId | item.lastRecycledId) != 0) { throw new IllegalStateException("recycled already"); } // 設置對象的回收id爲線程id信息,標記本身的被回收的線程信息 item.recycleId = item.lastRecycledId = OWN_THREAD_ID; int size = this.size; if (size >= maxCapacity) { // Hit the maximum capacity - drop the possibly youngest object. return; } // stack中的elements擴容兩倍,複製元素,將新數組賦值給stack.elements if (size == elements.length) { elements = Arrays.copyOf(elements, Math.min(size << 1, maxCapacity)); } elements[size] = item; this.size = size + 1; }
同線程回收對象 DefaultHandle#recycle 步驟:
WeakOrderQueue
static final class Stack<T> { //使用volatile能夠當即讀取到該queue private volatile WeakOrderQueue head; } WeakOrderQueue(Stack<?> stack, Thread thread) { head = tail = new Link(); //使用的是WeakReference ,做用是在poll的時候,若是owner不存在了 // 則須要將該線程所包含的WeakOrderQueue的元素釋放,而後從鏈表中刪除該Queue。 owner = new WeakReference<>(thread); //假設線程B和線程C同時回收線程A的對象時,有可能會同時建立一個WeakOrderQueue,就坑同時設置head,因此這裏須要加鎖 synchronized (stackLock(stack)) { next = stack.head; stack.head = this; } }
建立WeakOrderQueue對象的時候會初始化一個WeakReference的owner,做用是在poll的時候,若是owner不存在了, 則須要將該線程所包含的WeakOrderQueue的元素釋放,而後從鏈表中刪除該Queue。
而後給stack加鎖,假設線程B和線程C同時回收線程A的對象時,有可能會同時建立一個WeakOrderQueue,就坑同時設置head,因此這裏須要加鎖。
以head==null的時候爲例
加鎖:
線程B先執行,則head = 線程B的queue;以後線程C執行,此時將當前的head也就是線程B的queue做爲線程C的queue的next,組成鏈表,以後設置head爲線程C的queue
不加鎖:
線程B先執行 next = stack.head此時線程B的queue.next=null->線程C執行next = stack.head;線程C的queue.next=null-> 線程B執行stack.head = this;設置head爲線程B的queue -> 線程C執行stack.head = this;設置head爲線程C的queue,此時線程B和線程C的queue沒有連起來。
WeakOrderQueue#add
void add(DefaultHandle handle) { // 設置handler的最近一次回收的id信息,標記此時暫存的handler是被誰回收的 handle.lastRecycledId = id; Link tail = this.tail; int writeIndex; // 判斷一個Link對象是否已經滿了: // 若是沒滿,直接添加; // 若是已經滿了,建立一個新的Link對象,以後重組Link鏈表,而後添加元素的末尾的Link(除了這個Link,前邊的Link所有已經滿了) if ((writeIndex = tail.get()) == LINK_CAPACITY) { this.tail = tail = tail.next = new Link(); writeIndex = tail.get(); } tail.elements[writeIndex] = handle; // 若是使用者在將DefaultHandle對象壓入隊列後,將Stack設置爲null // 可是此處的DefaultHandle是持有stack的強引用的,則Stack對象沒法回收; //並且因爲此處DefaultHandle是持有stack的強引用,WeakHashMap中對應stack的WeakOrderQueue也沒法被回收掉了,致使內存泄漏 handle.stack = null; // we lazy set to ensure that setting stack to null appears before we unnull it in the owning thread; // this also means we guarantee visibility of an element in the queue if we see the index updated // tail自己繼承於AtomicInteger,因此此處直接對tail進行+1操做 tail.lazySet(writeIndex + 1); }
Stack異線程push對象流程:
WeakOrderQueue 的建立流程:
WeakOrderQueue#add添加對象流程
我再把pop方法搬下來一次:
DefaultHandle pop() { int size = this.size; // size=0 則說明本線程的Stack沒有可用的對象,先從其它線程中獲取。 if (size == 0) { // 當 Stack<T> 此時的容量爲 0 時,去 WeakOrder 中轉移部分對象到 Stack 中 if (!scavenge()) { return null; } //因爲在transfer(Stack<?> dst)的過程當中,可能會將其餘線程的WeakOrderQueue中的DefaultHandle對象傳遞到當前的Stack, //因此size發生了變化,須要從新賦值 size = this.size; } //size表示整個stack中的大小 size--; //獲取最後一個元素 DefaultHandle ret = elements[size]; if (ret.lastRecycledId != ret.recycleId) { throw new IllegalStateException("recycled multiple times"); } // 清空回收信息,以便判斷是否重複回收 ret.recycleId = 0; ret.lastRecycledId = 0; this.size = size; return ret; }
Stack#scavenge
boolean scavenge() { // continue an existing scavenge, if any // 掃描判斷是否存在可轉移的 Handler if (scavengeSome()) { return true; } // reset our scavenge cursor prev = null; cursor = head; return false; }
調用scavengeSome掃描判斷是否存在可轉移的 Handler,若是沒有,那麼就返回false,表示沒有可用對象
Stack#scavengeSome
boolean scavengeSome() { WeakOrderQueue cursor = this.cursor; if (cursor == null) { cursor = head; // 若是head==null,表示當前的Stack對象沒有WeakOrderQueue,直接返回 if (cursor == null) { return false; } } boolean success = false; WeakOrderQueue prev = this.prev; do { // 從當前的WeakOrderQueue節點進行 handler 的轉移 if (cursor.transfer(this)) { success = true; break; } // 遍歷下一個WeakOrderQueue WeakOrderQueue next = cursor.next; // 若是 WeakOrderQueue 的實際持有線程因GC回收了 if (cursor.owner.get() == null) { // If the thread associated with the queue is gone, unlink it, after // performing a volatile read to confirm there is no data left to collect. // We never unlink the first queue, as we don't want to synchronize on updating the head. // 若是當前的WeakOrderQueue的線程已經不可達了 //若是該WeakOrderQueue中有數據,則將其中的數據所有轉移到當前Stack中 if (cursor.hasFinalData()) { for (;;) { if (cursor.transfer(this)) { success = true; } else { break; } } } //將當前的WeakOrderQueue的前一個節點prev指向當前的WeakOrderQueue的下一個節點, // 即將當前的WeakOrderQueue從Queue鏈表中移除。方便後續GC if (prev != null) { prev.next = next; } } else { prev = cursor; } cursor = next; } while (cursor != null && !success); this.prev = prev; this.cursor = cursor; return success; }
boolean transfer(Stack<?> dst) { //尋找第一個Link Link head = this.head; // head == null,沒有存儲數據的節點,直接返回 if (head == null) { return false; } // 讀指針的位置已經到達了每一個 Node 的存儲容量,若是還有下一個節點,進行節點轉移 if (head.readIndex == LINK_CAPACITY) { //判斷當前的Link節點的下一個節點是否爲null,若是爲null,說明已經達到了Link鏈表尾部,直接返回, if (head.next == null) { return false; } // 不然,將當前的Link節點的下一個Link節點賦值給head和this.head.link,進而對下一個Link節點進行操做 this.head = head = head.next; } // 獲取Link節點的readIndex,即當前的Link節點的第一個有效元素的位置 final int srcStart = head.readIndex; // 獲取Link節點的writeIndex,即當前的Link節點的最後一個有效元素的位置 int srcEnd = head.get(); // 本次可轉移的對象數量(寫指針減去讀指針) final int srcSize = srcEnd - srcStart; if (srcSize == 0) { return false; } // 獲取轉移元素的目的地Stack中當前的元素個數 final int dstSize = dst.size; // 計算期盼的容量 final int expectedCapacity = dstSize + srcSize; // 指望的容量大小與實際 Stack 所能承載的容量大小進行比對,取最小值 if (expectedCapacity > dst.elements.length) { final int actualCapacity = dst.increaseCapacity(expectedCapacity); srcEnd = Math.min(srcStart + actualCapacity - dstSize, srcEnd); } if (srcStart != srcEnd) { // 獲取Link節點的DefaultHandle[] final DefaultHandle[] srcElems = head.elements; // 獲取目的地Stack的DefaultHandle[] final DefaultHandle[] dstElems = dst.elements; // dst數組的大小,會隨着元素的遷入而增長,若是最後發現沒有增長,那麼表示沒有遷移成功任何一個元素 int newDstSize = dstSize; //// 進行對象轉移 for (int i = srcStart; i < srcEnd; i++) { DefaultHandle element = srcElems[i]; // 代表本身尚未被任何一個 Stack 所回收 if (element.recycleId == 0) { element.recycleId = element.lastRecycledId; // 避免對象重複回收 } else if (element.recycleId != element.lastRecycledId) { throw new IllegalStateException("recycled already"); } // 將可轉移成功的DefaultHandle元素的stack屬性設置爲目的地Stack element.stack = dst; // 將DefaultHandle元素轉移到目的地Stack的DefaultHandle[newDstSize ++]中 dstElems[newDstSize++] = element; // 設置爲null,清楚暫存的handler信息,同時幫助 GC srcElems[i] = null; } // 將新的newDstSize賦值給目的地Stack的size dst.size = newDstSize; if (srcEnd == LINK_CAPACITY && head.next != null) { // 將Head指向下一個Link,也就是將當前的Link給回收掉了 // 假設以前爲Head -> Link1 -> Link2,回收以後爲Head -> Link2 this.head = head.next; } // 設置讀指針位置 head.readIndex = srcEnd; return true; } else { // The destination stack is full already. return false; } } }