netty對象池使用與回收

1. Recycler對象池
  Recycler抽象類的實現很是簡單,只有三個方法:
  獲取對象:Recycler:get()
  回收對象:Recycler:recycle()
  建立對象:Recycler:newObject()
  newObject爲抽象方法,須要由實現類本身實現此方法來建立對象。
  Recycler對象池目的是儘量的重複利用同一線程建立的對象,同時爲了不佔用過多的內存,回收對象時採用必定的比例回收對象(默認1/7規則 註釋中有解釋),未回收的對象由jvm垃圾回收機制來處理。
  Recycler對象池的數據存儲結構以下:
  

  和建立stack相同的線程回收對象時存儲在elements數組(pushNow方法回收),若是不是同一個線程則放入WeakOrderQueue隊列中,等到須要get對象時且stack爲空,會將WeakOrderQueue集合的全部元素根據必定的規則轉移到stack的elements數組中。數組

2. 對象的獲取併發

  首先判斷池中是否存在對象,若是由則優先從本地線程stack中獲取Object,若是stack爲空時,再將其餘線程queue集合的全部對象根據必定的規則轉移到本地線程stack中(1/7規則),而後再從stack獲取Object並返回.
  若是池中不存在對象,建立對象並返回。 
  相關代碼以下:   
    public final T get() {
        if (maxCapacityPerThread == 0) {
            return newObject((Handle<T>) NOOP_HANDLE);
        }
        Stack<T> stack = threadLocal.get();
        DefaultHandle<T> handle = stack.pop();//首先判斷池中是否存在對象,若是由則優先從本地線程stack中獲取Object,若是stack爲空時,再將其餘線程queue集合的全部對象根據必定的規則轉移到本地線程stack中(1/7規則),而後再從stack獲取Object並返回.
        if (handle == null) {//若是池中不存在對象,建立對象並返回。
            handle = stack.newHandle();
            handle.value = newObject(handle);
        }
        //        System.out.println(threadLocal.getClass() + "=" + stack + "=" + handle);
        return (T) handle.value;
    }

    DefaultHandle<T> pop() {
        int size = this.size;
        if (size == 0) {首先判斷池中是否存在對象//
            if (!scavenge()) {//若是stack爲空時,再將其餘線程queue集合的全部對象根據必定的規則轉移到本地線程stack中(1/7規則)
                return null;
            }
            size = this.size;
        }
        size--;
        DefaultHandle ret = elements[size];
        elements[size] = null;
        if (ret.lastRecycledId != ret.recycleId) {
            throw new IllegalStateException("recycled multiple times");
        }
        ret.recycleId = 0;
        ret.lastRecycledId = 0;
        this.size = size;
        return ret;
    }
    
    boolean scavenge() {
        // continue an existing scavenge, if any
        if (scavengeSome()) {
            return true;
        }

        // reset our scavenge cursor
        prev = null;
        cursor = head;
        return false;
    }

    boolean scavengeSome() {
        WeakOrderQueue prev;
        WeakOrderQueue cursor = this.cursor;
        if (cursor == null) {
            prev = null;
            cursor = head;
            if (cursor == null) {
                return false;
            }
        } else {
            prev = this.prev;
        }

        boolean success = false;
        do {
            if (cursor.transfer(this)) {//將其餘線程queue集合的全部對象根據必定的規則轉移到本地線程stack中(1/7規則)
                success = true;
                break;
            }
            WeakOrderQueue next = cursor.next;
            if (cursor.owner.get() == null) {
                // If the thread associated with the queue is gone, unlink it, after
                // performing a volatile read to confirm there is no data left to collect.
                // We never unlink the first queue, as we don't want to synchronize on updating the head.
                if (cursor.hasFinalData()) {
                    for (;;) {
                        if (cursor.transfer(this)) {
                            success = true;
                        } else {
                            break;
                        }
                    }
                }

                if (prev != null) {
                    prev.setNext(next);
                }
            } else {
                prev = cursor;
            }

            cursor = next;

        } while (cursor != null && !success);//遍歷全部的WeakOrderQueue

        this.prev = prev;
        this.cursor = cursor;
        return success;
    }

    boolean transfer(Stack<?> dst) {
        Link head = this.head.link;
        if (head == null) {
            return false;
        }

        if (head.readIndex == LINK_CAPACITY) {
            if (head.next == null) {
                return false;
            }
            this.head.link = head = head.next;
        }

        final int srcStart = head.readIndex;
        int srcEnd = head.get();
        final int srcSize = srcEnd - srcStart;
        if (srcSize == 0) {
            return false;
        }

        final int dstSize = dst.size;
        final int expectedCapacity = dstSize + srcSize;

        if (expectedCapacity > dst.elements.length) {
            final int actualCapacity = dst.increaseCapacity(expectedCapacity);
            srcEnd = min(srcStart + actualCapacity - dstSize, srcEnd);
        }

        if (srcStart != srcEnd) {
            final DefaultHandle[] srcElems = head.elements;
            final DefaultHandle[] dstElems = dst.elements;
            int newDstSize = dstSize;
            for (int i = srcStart; i < srcEnd; i++) {//將queue中全部元素按照(1/7規則)保存到stack中
                DefaultHandle element = srcElems[i];
                if (element.recycleId == 0) {
                    element.recycleId = element.lastRecycledId;
                } else if (element.recycleId != element.lastRecycledId) {
                    throw new IllegalStateException("recycled already");
                }
                srcElems[i] = null;

                if (dst.dropHandle(element)) {
                    // Drop the object.
                    continue;
                }
                element.stack = dst;
                dstElems[newDstSize++] = element;
            }

            if (srcEnd == LINK_CAPACITY && head.next != null) {
                // Add capacity back as the Link is GCed.
                this.head.reclaimSpace(LINK_CAPACITY);
                this.head.link = head.next;
            }

            head.readIndex = srcEnd;
            if (dst.size == newDstSize) {
                return false;
            }
            dst.size = newDstSize;
            return true;
        } else {
            // The destination stack is full already.
            return false;
        }
    }
    
    
View Code

3.  回收對象app

  若是當前回收的線程是原始線程(建立對象的線程),那麼調用pushNow,若是超過Stack的容量(默認4*1024)或者1/7規則 就drop(由jvm回收釋放)
  若是當前回收的線程不是原始線程,那麼調用pushLater,若是當前線程是第一次回收原始線程的對象,須要由當前線程建立的WeakOrderQueue(原始線程的stack對象中能夠關聯其餘線程的WeakOrderQueue)。若是建立隊列成功,則進入該隊列;失敗(queue個數大於maxDelayedQueues)則放棄該對象(由jvm回收釋放)
註釋:1)由本地線程的WeakOrderQueue回收對象,這麼作的緣由時避免併發(races競爭),並且不是每一個對象都有對象回收池來回收,若是超過最大容量限制會放棄,並且本地線程stack採用1/8規則措施,並且將異地線程queue的對象轉移到本地線程stack時也採起1/8規則措施
   2)1/7規則是指每7個對象只留取1個回收,剩餘部分放棄,如1-7個回收1個,8-15回收2個......
回收對象的相關代碼以下:
     (1)DefaultHandle類
        @Override
        public void recycle(Object object) {
            if (object != value) {
                throw new IllegalArgumentException("object does not belong to handle");
            }
            stack.push(this);
        }
    
        (2)Stack類
        void push(DefaultHandle<?> item) {
        Thread currentThread = Thread.currentThread();
        if (threadRef.get() == currentThread) {
            // The current Thread is the thread that belongs to the Stack, we can try to push the object now.
            pushNow(item);
        } else {
            // The current Thread is not the one that belongs to the Stack
            // (or the Thread that belonged to the Stack was collected already), we need to signal that the push
            // happens later.
            pushLater(item, currentThread);
        }
    
       (3)private void pushNow(DefaultHandle<?> item) {
            if ((item.recycleId | item.lastRecycledId) != 0) {
                throw new IllegalStateException("recycled already");
            }
            item.recycleId = item.lastRecycledId = OWN_THREAD_ID;

            int size = this.size;
            if (size >= maxCapacity || dropHandle(item)) {//爲了不回收對象數量太多,佔用太多內存,採起了1/8規則措施,參數可調
                // Hit the maximum capacity or should drop - drop the possibly youngest object.
                return;
            }
            if (size == elements.length) {
                elements = Arrays.copyOf(elements, min(size << 1, maxCapacity));
            }

            elements[size] = item;
            this.size = size + 1;
        }
        
        (4)private void pushLater(DefaultHandle<?> item, Thread thread) {
            // we don't want to have a ref to the queue as the value in our weak map
            // so we null it out; to ensure there are no races with restoring it later
            // we impose a memory ordering here (no-op on x86)
            Map<Stack<?>, WeakOrderQueue> delayedRecycled = DELAYED_RECYCLED.get();
            WeakOrderQueue queue = delayedRecycled.get(this);
            if (queue == null) {
                if (delayedRecycled.size() >= maxDelayedQueues) {回收池的異地隊列個數有限制,每一個隊列的容量也有限制
                    // Add a dummy queue so we know we should drop the object
                    delayedRecycled.put(this, WeakOrderQueue.DUMMY);
                    return;
                }
                // Check if we already reached the maximum number of delayed queues and if we can allocate at all.
                if ((queue = WeakOrderQueue.allocate(this, thread)) == null) {
                    // drop object
                    return;
                }
                delayedRecycled.put(this, queue);
            } else if (queue == WeakOrderQueue.DUMMY) {
                // drop object
                return;
            }

            queue.add(item);
        }

        (5)WeakOrderQueue類
        void add(DefaultHandle<?> handle) {
            handle.lastRecycledId = id;

            Link tail = this.tail;
            int writeIndex;
            if ((writeIndex = tail.get()) == LINK_CAPACITY) {
                if (!head.reserveSpace(LINK_CAPACITY)) {
                    // Drop it.
                    return;
                }
                // We allocate a Link so reserve the space
                this.tail = tail = tail.next = new Link();

                writeIndex = tail.get();
            }
            tail.elements[writeIndex] = handle;
            handle.stack = null;
            // we lazy set to ensure that setting stack to null appears before we unnull it in the owning thread;
            // this also means we guarantee visibility of an element in the queue if we see the index updated
            tail.lazySet(writeIndex + 1);
        }
 
View Code
相關文章
相關標籤/搜索