Netty源碼解析 -- 對象池Recycler實現原理

因爲在Java中建立一個實例的消耗不小,不少框架爲了提升性能都使用對象池,Netty也不例外。
本文主要分析Netty對象池Recycler的實現原理。數組

源碼分析基於Netty 4.1.52緩存

緩存對象管理

Recycler的內部類Stack負責管理緩存對象。
Stack關鍵字段安全

// Stack所屬主線程,注意這裏使用了WeakReference
WeakReference<Thread> threadRef;    
// 主線程回收的對象
DefaultHandle<?>[] elements;
// elements最大長度
int maxCapacity;
// elements索引
int size;
// 非主線程回收的對象
volatile WeakOrderQueue head;

Recycler將一個Stack劃分給某個主線程,主線程直接從Stack#elements中存取對象,而非主線程回收對象則存入WeakOrderQueue中。
threadRef字段使用了WeakReference,當主線程消亡後,該字段指向對象就能夠被垃圾回收。微信

DefaultHandle,對象的包裝類,在Recycler中緩存的對象都會包裝成DefaultHandle類。框架

head指向的WeakOrderQueue,用於存放其餘線程的對象源碼分析

WeakOrderQueue主要屬性性能

// Head#link指向Link鏈表首對象
Head head;  
// 指向Link鏈表尾對象
Link tail;
// 指向WeakOrderQueue鏈表下一對象
WeakOrderQueue next;
// 所屬線程
WeakReference<Thread> owner;

Link中也有一個DefaultHandle<?>[] elements字段,負責存儲數據。
注意,Link繼承了AtomicInteger,AtomicInteger的值存儲elements的最新索引。優化

WeakOrderQueue也是屬於某個線程,而且WeakOrderQueue繼承了WeakReference<Thread>,當所屬線程消亡時,對應WeakOrderQueue也能夠被垃圾回收。
注意:每一個WeakOrderQueue都只屬於一個Stack,而且只屬於一個非主線程。

thread2要存放對象到Stack1中,只能存放在WeakOrderQueue1
thread1要存放對象到Stack2中,只能存放在WeakOrderQueue3this

回收對象

DefaultHandle#recycle -> Stack#push線程

void push(DefaultHandle<?> item) {
    Thread currentThread = Thread.currentThread();
    if (threadRef.get() == currentThread) {
        // #1
        pushNow(item);
    } else {
        // #2
        pushLater(item, currentThread);
    }
}

#1 當前線程是主線程,直接將對象加入到Stack#elements中。
#2 當前線程非主線程,須要將對象放到對應的WeakOrderQueue中

private void pushLater(DefaultHandle<?> item, Thread thread) {
    ...
    // #1
    Map<Stack<?>, WeakOrderQueue> delayedRecycled = DELAYED_RECYCLED.get();
    WeakOrderQueue queue = delayedRecycled.get(this);
    if (queue == null) {
        // #2
        if (delayedRecycled.size() >= maxDelayedQueues) {
            delayedRecycled.put(this, WeakOrderQueue.DUMMY);
            return;
        }
        // #3
        if ((queue = newWeakOrderQueue(thread)) == null) {
            return;
        }
        delayedRecycled.put(this, queue);
    } else if (queue == WeakOrderQueue.DUMMY) {
        // #4
        return;
    }
    // #5
    queue.add(item);
}

#1 DELAYED_RECYCLED是一個FastThreadLocal,能夠理解爲Netty中的ThreadLocal優化類。它爲每一個線程維護了一個Map,存儲每一個Stack和對應WeakOrderQueue。
全部這裏獲取的delayedRecycled變量是僅用於當前線程的。
而delayedRecycled.get獲取的WeakOrderQueue,是以Thread + Stack做爲維度區分的,只能是一個線程操做。
#2 當前WeakOrderQueue數量超出限制,添加WeakOrderQueue.DUMMY做爲標記
#3 構造一個WeakOrderQueue,加入到Stack#head指向的WeakOrderQueue鏈表中,並放入DELAYED_RECYCLED。這時是須要一下同步操做的。
#4 遇到WeakOrderQueue.DUMMY標記對象,直接拋棄對象
#5 將緩存對象添加到WeakOrderQueue中。

WeakOrderQueue#add

void add(DefaultHandle<?> handle) {
    handle.lastRecycledId = id;

    // #1
    if (handleRecycleCount < interval) {
        handleRecycleCount++;
        return;
    }
    handleRecycleCount = 0;

    
    Link tail = this.tail;
    int writeIndex;
    // #2
    if ((writeIndex = tail.get()) == LINK_CAPACITY) {
        Link link = head.newLink();
        if (link == null) {
            return;
        }
        this.tail = tail = tail.next = link;
        writeIndex = tail.get();
    }
    // #3
    tail.elements[writeIndex] = handle;
    handle.stack = null;
    // #4
    tail.lazySet(writeIndex + 1);
}

#1 控制回收頻率,避免WeakOrderQueue增加過快。
每8個對象都會拋棄7個,回收一個
#2 當前Link#elements已所有使用,建立一個新的Link
#3 存入緩存對象
#4 延遲設置Link#elements的最新索引(Link繼承了AtomicInteger),這樣在該stack主線程經過該索引獲取elements緩存對象時,保證elements中元素已經可見。

獲取對象

Recycler#threadLocal中存放了每一個線程對應的Stack。
Recycler#get中首先獲取屬於當前線程的Stack,再從該Stack中獲取對象,也就是,每一個線程只能從本身的Stack中獲取對象。
Recycler#get -> Stack#pop

DefaultHandle<T> pop() {
    int size = this.size;
    if (size == 0) {
        // #1
        if (!scavenge()) {
            return null;
        }
        size = this.size;
        if (size <= 0) {
            return null;
        }
    }
    // #2
    size --;
    DefaultHandle ret = elements[size];
    elements[size] = null;
    this.size = size;

    ...
    return ret;
}

#1 elements沒有可用對象時,將WeakOrderQueue中的對象遷移到elements
#2 從elements中取出一個緩存對象

scavenge -> scavengeSome -> WeakOrderQueue#transfer

boolean transfer(Stack<?> dst) {
    Link head = this.head.link;
    if (head == null) {
        return false;
    }
    // #1
    if (head.readIndex == LINK_CAPACITY) {
        if (head.next == null) {
            return false;
        }
        head = head.next;
        this.head.relink(head);
    }
    // #2
    final int srcStart = head.readIndex;
    int srcEnd = head.get();
    final int srcSize = srcEnd - srcStart;
    if (srcSize == 0) {
        return false;
    }
    // #3
    final int dstSize = dst.size;
    final int expectedCapacity = dstSize + srcSize;

    if (expectedCapacity > dst.elements.length) {
        final int actualCapacity = dst.increaseCapacity(expectedCapacity);
        srcEnd = min(srcStart + actualCapacity - dstSize, srcEnd);
    }

    if (srcStart != srcEnd) {
        final DefaultHandle[] srcElems = head.elements;
        final DefaultHandle[] dstElems = dst.elements;
        int newDstSize = dstSize;
        // #4
        for (int i = srcStart; i < srcEnd; i++) {
            DefaultHandle<?> element = srcElems[i];
            ...
            srcElems[i] = null;
            // #5
            if (dst.dropHandle(element)) {
                continue;
            }
            element.stack = dst;
            dstElems[newDstSize ++] = element;
        }
        // #6
        if (srcEnd == LINK_CAPACITY && head.next != null) {
            this.head.relink(head.next);
        }

        head.readIndex = srcEnd;
        // #7
        if (dst.size == newDstSize) {
            return false;
        }
        dst.size = newDstSize;
        return true;
    } else {
        // The destination stack is full already.
        return false;
    }
}

就是把WeakOrderQueue中的對象遷移到Stack中。
#1 head.readIndex 標誌如今已遷移對象下標
head.readIndex == LINK_CAPACITY,表示當前Link已所有移動,查找下一個Link
#2 計算待遷移對象數量
注意,Link繼承了AtomicInteger
#3 計算Stack#elements數組長度,不夠則擴容
#4 遍歷待遷移的對象
#5 控制回收頻率
#6 當前Link對象已所有移動,修改WeakOrderQueue#head的link屬性,指向下一Link,這樣前面的Link就能夠被垃圾回收了。
#7 dst.size == newDstSize 表示並無對象移動,返回false
不然更新dst.size

其實對象池的實現難點在於線程安全。
Recycler中將主線程和非主線程回收對象劃分到不一樣的存儲空間中(stack#elements和WeakOrderQueue.Link#elements),而且對於WeakOrderQueue.Link#elements,存取操做劃分到兩端進行(非主線程從尾端存入,主線程從首部開始讀取),
從而減小同步操做,並保證線程安全。

另外,Netty還提供了更高級別的對象池類ObjectPool,使用方法能夠參考PooledDirectByteBuf#RECYCLER,這裏再也不贅述。

若是您以爲本文不錯,歡迎關注個人微信公衆號,系列文章持續更新中。您的關注是我堅持的動力!

相關文章
相關標籤/搜索