DelayQueue系列(一):源碼分析

本文原發表於簡書DelayQueue之源碼分析安全

本文將會對DelayQueue作一個簡單的介紹,並提供部分源碼的分析。bash

DelayQueue的特性基本上由BlockingQueue、PriorityQueue和Delayed的特性來決定的。併發

簡而言之,DelayQueue是經過Delayed,使得不一樣元素之間能按照剩餘的延遲時間進行排序,而後經過PriorityQueue,使得超時的元素能最早被處理,而後利用BlockingQueue,將元素處理的操做阻塞住。源碼分析

基本定義以下:ui

public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
    implements BlockingQueue<E> {
    private final transient ReentrantLock lock = new ReentrantLock();
    private final PriorityQueue<E> q = new PriorityQueue<E>();
    private Thread leader = null;
    private final Condition available = lock.newCondition();
}
複製代碼

ReentrantLock lock = new ReentrantLock(); ReentrantLock是一個可重入的互斥鎖,將由最近成功得到鎖,而且尚未釋放該鎖的線程所擁有,當鎖被其餘線程得到時,調用lock的線程將沒法得到鎖。 在DelayQueue中,只有一個互斥鎖lock。this

PriorityQueue q = new PriorityQueue(); PriorityQueue是一個優先級隊列,每次從隊列中取出的是具備最高優先權的元素。 在DelayQueue中,由於E繼承於Delayed,因此q表示一個按照delayTime排序的優先級隊列,用於存放須要延遲執行的元素。spa

Thread leader = null; 這裏的leader設計出來是爲了minimize unnecessary timed waiting(減小沒必要要的等待時間),如何實現的方案會在詳細解讀中解釋。 在DelayQueue中leader表示一個等待從隊列中獲取消息的線程。線程

Condition available = lock.newCondition(); Condition是lock對象的條件變量,只能和鎖lock配合使用,用於控制併發程序訪問競爭資源的安全。 一個鎖lock能夠有多個條件變量condition,每一個條件上能夠有多個線程等待,經過調用await()方法,可讓線程在該條件下等待。當調用signalAll()方法,又能夠喚醒該條件下的等待的線程。 在DelayQueue中lock對象只有一個條件變量available。設計

如下是DelayQueue的主要方法:code

public boolean offer(E e) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        q.offer(e);
        if (q.peek() == e) {
            leader = null;
            available.signal();
        }
        return true;
    } finally {
        lock.unlock();
    }
}
複製代碼

一、執行lock.lock(),獲取鎖。

二、把元素e添加到優先隊列q(下稱隊列q)中。

三、判斷隊列q的隊首元素是否爲e。

四、若是e是隊首元素的話,即元素e是最近可被執行的元素,意味着延遲隊列的執行順序將被變動。 執行leader = null,不然在執行take時,全部線程就會在if(leader!=null)的判斷下進入等待。 執行available.signal(),喚醒其餘等待中的線程,從新去循環執行take中的操做1-8。 若是不執行signal,那麼在take方法中,只有執行awaitNanos(delay)的線程在等待delay指定的時間後自動喚醒,其餘執行await的線程將一直被掛起。 若是沒有新的線程去執行take方法,那麼等待執行awaitNanos(delay)的線程自動喚醒時,此時等待時間將超過元素e的delayTime,這不符合預期。 即使有新的線程去執行take方法,那以前掛起的線程也將一直在等待,效率很低。

五、在finally塊中執行lock.unlock()。 須要注意的是,鎖必須在 finally 塊中釋放。不然,若是代碼拋出異常,那麼鎖就有可能永遠得不到釋放。若是沒有釋放鎖,那麼就會產生死鎖的問題。

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        for (;;) {
            E first = q.peek();
            if (first == null)
                available.await();
            else {
                long delay = first.getDelay(NANOSECONDS);
                if (delay <= 0)
                    return q.poll();
                first = null; // don't retain ref while waiting if (leader != null) available.await(); else { Thread thisThread = Thread.currentThread(); leader = thisThread; try { available.awaitNanos(delay); } finally { if (leader == thisThread) leader = null; } } } } } finally { if (leader == null && q.peek() != null) available.signal(); lock.unlock(); } } 複製代碼

一、執行lock.lockInterruptibly(),獲取鎖。 lockInterruptibly和lock的區別在於 lock 在鎖被其餘線程佔有,當前線程等待鎖期間(下稱等待鎖期間),只考慮獲取鎖。只有在獲取鎖成功後,纔會去響應中斷。 而lockInterruptibly 在等待鎖期間,會優先考慮響應中斷,而不是響應鎖的獲取。若是當前線程被打斷(interrupt)則該方法拋出InterruptedException。該方法提供了一種解除死鎖的途徑。

二、E first = q.peek(),獲取隊列q的隊首元素first(下稱first)。

三、若是first爲空,則執行avaliable.await()讓線程進入等待。實際上就是釋放鎖,而後掛起線程,等待被喚醒,此時其餘線程能夠得到鎖了。 await()和awaitNanos(nanosTimeout)區別在於 執行awaitNanos(nanosTimeout)的線程比執行await()的線程多一個喚醒條件,超過等待nanosTimeout指定的時間,線程將自動喚醒。線程喚醒時,保證該線程是持有鎖的。

四、若是first不爲空,則執行first.getDelay(NANOSECONDS)獲取first的剩餘延遲時間delayTime(下稱delayTime)

五、若是first的delayTime<=0,代表該元素已經達到以前設定的延遲時間了,則調用return q.poll(),將first從隊列q中的移除而且返回該元素first.

六、若是first的delayTime>0,則將first指向null,釋放first的引用,避免內存泄露.

七、若是線程leader(下稱leader)不爲空的話,則執行avaliable.await()讓線程進入等待。leader不爲空的話,代表已經有其餘線程在獲取優先隊列q的隊首元素了(下稱獲取隊首元素),此時只須要執行avaliable.await()讓當前線程進入等待便可。

八、若是leader爲空,則執行Thread thisThread = Thread.currentThread();leader = thisThread;將leader指向當前線程,而後執行available.awaitNanos(delay);讓線程最長等待delayTime的時間。最後在finally塊中,若是leader依然指向前文獲取的當前線程thisThread,那麼將leader指向null,釋放leader引用。 這裏leader爲空,代表還沒有有其餘線程在獲取隊首元素,此時設置leader對象,指向當前線程(下稱currentThread)。由於currentThread執行了available.awaitNanos(delay)釋放了鎖,因此其餘線程(下稱otherThread)在調用take方法時能獲取鎖,可是由於leader非空,因此otherThread都會進入7的那步,直接進入等待,而不須要像currentThread那樣執行8的一系列操做,達到設計leader線程的初衷。

九、循環執行以上1-8步,直到first非空且first的delayTime<=0,跳出循環。

十、跳出循環後,進入finally塊。

十一、若是leader爲空且隊列q的隊首元素非null(q隊列中移除了上文的first元素後還有其餘元素),此時執行available.signal(),調用signal喚醒其餘等待中的線程。

十二、執行lock.unlock(),執行解鎖操做。

ok,源碼分析就先講到這裏了,下一期我準備講一下如何將DelayQueue封裝成可用的組件,讓使用者調用起來更加方便。

相關文章
相關標籤/搜索