JUC源碼分析-集合篇(八)DelayQueue

JUC源碼分析-集合篇(八)DelayQueue

DelayQueue 是一個支持延時獲取元素的無界阻塞隊列。隊列使用 PriorityQueue 來實現。 隊列中的元素必須實現 Delayed 接口,在建立元素時能夠指定多久才能從隊列中獲取當前元素。只有在延遲期滿時才能從隊列中提取元素。java

1. DelayQueue 使用場景

1.1 DelayQueue 特色

DelayQueue 也是一種比較特殊的阻塞隊列,從類聲明也能夠看出,DelayQueue 中的全部元素必須實現 Delayed 接口。DelayQueue 隊列的元素必須實現 Delayed 接口。segmentfault

// 此接口的實現必須定義一個 compareTo 方法,該方法提供與此接口的 getDelay 方法一致的排序。
public interface Delayed extends Comparable<Delayed> {
    // 返回與此對象相關的剩餘有效時間,以給定的時間單位表示
    long getDelay(TimeUnit unit);
}

能夠看到,Delayed 接口除了自身的 getDelay 方法外,還實現了 Comparable 接口。getDelay 方法用於返回對象的剩餘有效時間,實現 Comparable 接口則是爲了可以比較兩個對象,以便排序。設計模式

也就是說,若是一個類實現了 Delayed 接口,當建立該類的對象並添加到 DelayQueue 中後,只有當該對象的 getDalay 方法返回的剩餘時間 ≤0 時纔會出隊。緩存

另外,因爲 DelayQueue 內部委託了 PriorityQueue 對象來實現全部方法,因此能以堆的結構維護元素順序,這樣剩餘時間最小的元素就在堆頂,每次出隊其實就是刪除剩餘時間 ≤0 的最小元素。多線程

DelayQueue 的特色簡要歸納以下:框架

  • DelayQueue 是無界阻塞隊列;
  • 隊列中的元素必須實現 Delayed 接口,元素過時後纔會從隊列中取走;

1.2 DelayQueue 使用場景

DelayQueue 很是有用,能夠將 DelayQueue 運用在如下應用場景。ide

  1. 緩存系統的設計:能夠用 DelayQueue 保存緩存元素的有效期,使用一個線程循環查詢 DelayQueue,一旦能從 DelayQueue 中獲取元素時,表示緩存有效期到了。
  2. 定時任務調度:使用 DelayQueue 保存當天將會執行的任務和執行時間,一旦從 DelayQueue 中獲取到任務就開始執行,好比 javax.swing.TimerQueue 就是使用 DelayQueue 實現的。ScheduledFutureTask

1.3 DelayQueue 示例

咱們能夠參考 ScheduledThreadPoolExecutor#ScheduledFutureTask 類的實現。源碼分析

// 模仿網吧上網場景
public class DelayQueueTest extends Thread {
    DelayQueue queue =  new DelayQueue();
    SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");

    public static void main(String[] args) {
        DelayQueueTest wangba = new DelayQueueTest();
        wangba.start();

        wangba.shangji("A", 5);
        wangba.shangji("B", 2);
        wangba.shangji("C", 4);
    }

    public void shangji(String name, int money) {
        WangMing wm = new WangMing(name, System.currentTimeMillis() + money * 1000l);
        queue.add(wm);
        System.out.println(name + "開始上網,時間:" + format.format(new Date()) +
                ",預計下機時間爲:" + format.format(new Date(wm.getEndTime())));
    }

    public void xiaji(WangMing wm) {
        System.out.println(wm.getName() + "下機,時間:" + format.format(new Date(wm.getEndTime())));
    }

    public void run() {
        while (true) {
            try {
                WangMing wm = (WangMing) queue.take();
                xiaji(wm);
            } catch (InterruptedException e) {
            }
        }
    }
}

// 網民,必須實現 Delayed 接口
class WangMing implements Delayed {
    private String name;
    private long endTime;
    private TimeUnit timeUnit = TimeUnit.SECONDS;

    @Override
    public long getDelay(TimeUnit unit) {
        return endTime - System.currentTimeMillis();
    }

    @Override
    public int compareTo(Delayed o) {
        WangMing wm = (WangMing) o;
        return this.getDelay(timeUnit) - wm.getDelay(timeUnit) > 0 ? 1 :
                (this.getDelay(timeUnit) - wm.getDelay(timeUnit) < 0 ? -1 : 0);
    }
}

程序執行結果:性能

A開始上網,時間:2017-12-07 09:37:52,預計下機時間爲:2017-12-07 09:37:57
B開始上網,時間:2017-12-07 09:37:52,預計下機時間爲:2017-12-07 09:37:54
C開始上網,時間:2017-12-07 09:37:52,預計下機時間爲:2017-12-07 09:37:56
B下機,時間:2017-12-07 09:37:54
C下機,時間:2017-12-07 09:37:56
A下機,時間:2017-12-07 09:37:57

2. DelayQueue 源碼分析

介紹完了 DelayQueued 的基本使用,讀者應該對該阻塞隊列的功能有了基本瞭解,接下來咱們看下 Doug Lea 是如何實現 DelayQueued 的。this

2.1 DelayQueue 屬性

private final transient ReentrantLock lock = new ReentrantLock();
private final Condition available = lock.newCondition();

// PriorityQueue 維護隊列
private final PriorityQueue<E> q = new PriorityQueue<E>();
private Thread leader = null;

上述比較特殊的是 leader 字段,咱們以前已經說過,DelayQueue 每次只會出隊一個過時的元素,若是隊首元素沒有過時,就會阻塞出隊線程,讓線程在 available 這個條件隊列上無限等待。

爲了提高性能,DelayQueue 並不會讓全部出隊線程都無限等待,而是用 leader 保存了第一個嘗試出隊的線程,該線程的等待時間是隊首元素的剩餘有效期。這樣,一旦 leader 線程被喚醒(此時隊首元素也失效了),就能夠出隊成功,而後喚醒一個其它在 available 條件隊列上等待的線程。以後,會重複上一步,新喚醒的線程可能取代成爲新的 leader 線程。這樣,就避免了無效的等待,提高了性能。這實際上是一種名爲 Leader-Follower pattern 的多線程設計模式。

2.2 入隊 offer

public boolean offer(E e) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        q.offer(e);             // 調用 PriorityQueue#offer 方法
        if (q.peek() == e) {    // 若是入隊元素在隊首, 則喚醒一個出隊線程
            leader = null;
            available.signal();
        }
        return true;
    } finally {
        lock.unlock();
    }
}

須要注意的是當首次入隊元素時,須要喚醒一個出隊線程,由於此時可能已有出隊線程在空隊列上等待了,若是不喚醒,會致使出隊線程永遠沒法執行。

2.3 出隊 poll

public E poll() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        E first = q.peek();
        // 1. 沒有元素或元素還在有效期內則直接返回 null
        if (first == null || first.getDelay(NANOSECONDS) > 0)
            return null;
        // 2. 元素已經失效直接取出來一個
        else
            return q.poll();
    } finally {
        lock.unlock();
    }
}

不阻塞直接 poll 時很簡單,再來看一下阻塞式獲取元素 take 方法。

2.4 阻塞式出隊 take

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        for (;;) {
            E first = q.peek();
            // 1. 集合爲空時全部的線程都處於無限等待的狀態。
            //    只要有元素將其中一個線程轉爲 leader 狀態
            if (first == null)
                available.await();
            else {
                long delay = first.getDelay(NANOSECONDS);
                // 2. 元素已通過期,直接取出返回
                if (delay <= 0)
                    return q.poll();
                first = null; // don't retain ref while waiting
                // 3. 已經在其它線程設置爲 leader,無限期等着
                if (leader != null)
                    available.await();
                // 4. 將 leader 設置爲當前線程,阻塞當前線程(限時等待剩餘有效時間)
                else {
                    Thread thisThread = Thread.currentThread();
                    leader = thisThread;
                    try {
                        available.awaitNanos(delay);
                    } finally {
                        // 4.1 嘗試獲取過時的元素,從新競爭
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    } finally {
        // 5. 隊列中有元素則喚醒其它無限等待的線程
        //    leader 線程是限期等待,每次 leader 線程獲取元素出隊,若是隊列中有元素
        //    就要喚醒一個無限等待的線程,將其設置爲限期等待,也就是總有一個等待線程是 leader 狀態
        if (leader == null && q.peek() != null)
            available.signal();
        lock.unlock();
    }
}

採用 take 阻塞式出隊時,這裏要思考下集合中元素時全部的等待線程永遠進行 wait 狀態不被喚醒,也就是說即便元素過時了也沒法正常出隊?

首先,在每次入隊 offer 時,若是是第一個元素就會調用 vailable.signal() 喚醒一個等待的線程。
其次,take 方法自旋結束後若是 leader == null && q.peek() != null,須要喚醒一個等待中的出隊線程。
leader == null && q.peek() != null 的含義就是——沒有 leader 線程但隊列中存在元素。咱們以前說了,leader 線程做用之一就是用來喚醒其它無限等待的線程,因此必需要有這個判斷。
固然,若是集合中沒有元素了,全部的等待線程都處理無限等待的狀態。

參考:

  1. J.U.C之collections框架:DelayQueue

天天用心記錄一點點。內容也許不重要,但習慣很重要!

相關文章
相關標籤/搜索