DelayQueue 是一個支持延時獲取元素的無界阻塞隊列。隊列使用 PriorityQueue 來實現。 隊列中的元素必須實現 Delayed 接口,在建立元素時能夠指定多久才能從隊列中獲取當前元素。只有在延遲期滿時才能從隊列中提取元素。java
DelayQueue 也是一種比較特殊的阻塞隊列,從類聲明也能夠看出,DelayQueue 中的全部元素必須實現 Delayed 接口。DelayQueue 隊列的元素必須實現 Delayed 接口。segmentfault
// 此接口的實現必須定義一個 compareTo 方法,該方法提供與此接口的 getDelay 方法一致的排序。 public interface Delayed extends Comparable<Delayed> { // 返回與此對象相關的剩餘有效時間,以給定的時間單位表示 long getDelay(TimeUnit unit); }
能夠看到,Delayed 接口除了自身的 getDelay 方法外,還實現了 Comparable 接口。getDelay 方法用於返回對象的剩餘有效時間,實現 Comparable 接口則是爲了可以比較兩個對象,以便排序。設計模式
也就是說,若是一個類實現了 Delayed 接口,當建立該類的對象並添加到 DelayQueue 中後,只有當該對象的 getDalay 方法返回的剩餘時間 ≤0 時纔會出隊。緩存
另外,因爲 DelayQueue 內部委託了 PriorityQueue 對象來實現全部方法,因此能以堆的結構維護元素順序,這樣剩餘時間最小的元素就在堆頂,每次出隊其實就是刪除剩餘時間 ≤0 的最小元素。多線程
DelayQueue 的特色簡要歸納以下:框架
DelayQueue 很是有用,能夠將 DelayQueue 運用在如下應用場景。ide
咱們能夠參考 ScheduledThreadPoolExecutor#ScheduledFutureTask 類的實現。源碼分析
// 模仿網吧上網場景 public class DelayQueueTest extends Thread { DelayQueue queue = new DelayQueue(); SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss"); public static void main(String[] args) { DelayQueueTest wangba = new DelayQueueTest(); wangba.start(); wangba.shangji("A", 5); wangba.shangji("B", 2); wangba.shangji("C", 4); } public void shangji(String name, int money) { WangMing wm = new WangMing(name, System.currentTimeMillis() + money * 1000l); queue.add(wm); System.out.println(name + "開始上網,時間:" + format.format(new Date()) + ",預計下機時間爲:" + format.format(new Date(wm.getEndTime()))); } public void xiaji(WangMing wm) { System.out.println(wm.getName() + "下機,時間:" + format.format(new Date(wm.getEndTime()))); } public void run() { while (true) { try { WangMing wm = (WangMing) queue.take(); xiaji(wm); } catch (InterruptedException e) { } } } } // 網民,必須實現 Delayed 接口 class WangMing implements Delayed { private String name; private long endTime; private TimeUnit timeUnit = TimeUnit.SECONDS; @Override public long getDelay(TimeUnit unit) { return endTime - System.currentTimeMillis(); } @Override public int compareTo(Delayed o) { WangMing wm = (WangMing) o; return this.getDelay(timeUnit) - wm.getDelay(timeUnit) > 0 ? 1 : (this.getDelay(timeUnit) - wm.getDelay(timeUnit) < 0 ? -1 : 0); } }
程序執行結果:性能
A開始上網,時間:2017-12-07 09:37:52,預計下機時間爲:2017-12-07 09:37:57 B開始上網,時間:2017-12-07 09:37:52,預計下機時間爲:2017-12-07 09:37:54 C開始上網,時間:2017-12-07 09:37:52,預計下機時間爲:2017-12-07 09:37:56 B下機,時間:2017-12-07 09:37:54 C下機,時間:2017-12-07 09:37:56 A下機,時間:2017-12-07 09:37:57
介紹完了 DelayQueued 的基本使用,讀者應該對該阻塞隊列的功能有了基本瞭解,接下來咱們看下 Doug Lea 是如何實現 DelayQueued 的。this
private final transient ReentrantLock lock = new ReentrantLock(); private final Condition available = lock.newCondition(); // PriorityQueue 維護隊列 private final PriorityQueue<E> q = new PriorityQueue<E>(); private Thread leader = null;
上述比較特殊的是 leader 字段,咱們以前已經說過,DelayQueue 每次只會出隊一個過時的元素,若是隊首元素沒有過時,就會阻塞出隊線程,讓線程在 available 這個條件隊列上無限等待。
爲了提高性能,DelayQueue 並不會讓全部出隊線程都無限等待,而是用 leader 保存了第一個嘗試出隊的線程,該線程的等待時間是隊首元素的剩餘有效期。這樣,一旦 leader 線程被喚醒(此時隊首元素也失效了),就能夠出隊成功,而後喚醒一個其它在 available 條件隊列上等待的線程。以後,會重複上一步,新喚醒的線程可能取代成爲新的 leader 線程。這樣,就避免了無效的等待,提高了性能。這實際上是一種名爲 Leader-Follower pattern
的多線程設計模式。
public boolean offer(E e) { final ReentrantLock lock = this.lock; lock.lock(); try { q.offer(e); // 調用 PriorityQueue#offer 方法 if (q.peek() == e) { // 若是入隊元素在隊首, 則喚醒一個出隊線程 leader = null; available.signal(); } return true; } finally { lock.unlock(); } }
須要注意的是當首次入隊元素時,須要喚醒一個出隊線程,由於此時可能已有出隊線程在空隊列上等待了,若是不喚醒,會致使出隊線程永遠沒法執行。
public E poll() { final ReentrantLock lock = this.lock; lock.lock(); try { E first = q.peek(); // 1. 沒有元素或元素還在有效期內則直接返回 null if (first == null || first.getDelay(NANOSECONDS) > 0) return null; // 2. 元素已經失效直接取出來一個 else return q.poll(); } finally { lock.unlock(); } }
不阻塞直接 poll 時很簡單,再來看一下阻塞式獲取元素 take 方法。
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { E first = q.peek(); // 1. 集合爲空時全部的線程都處於無限等待的狀態。 // 只要有元素將其中一個線程轉爲 leader 狀態 if (first == null) available.await(); else { long delay = first.getDelay(NANOSECONDS); // 2. 元素已通過期,直接取出返回 if (delay <= 0) return q.poll(); first = null; // don't retain ref while waiting // 3. 已經在其它線程設置爲 leader,無限期等着 if (leader != null) available.await(); // 4. 將 leader 設置爲當前線程,阻塞當前線程(限時等待剩餘有效時間) else { Thread thisThread = Thread.currentThread(); leader = thisThread; try { available.awaitNanos(delay); } finally { // 4.1 嘗試獲取過時的元素,從新競爭 if (leader == thisThread) leader = null; } } } } } finally { // 5. 隊列中有元素則喚醒其它無限等待的線程 // leader 線程是限期等待,每次 leader 線程獲取元素出隊,若是隊列中有元素 // 就要喚醒一個無限等待的線程,將其設置爲限期等待,也就是總有一個等待線程是 leader 狀態 if (leader == null && q.peek() != null) available.signal(); lock.unlock(); } }
採用 take 阻塞式出隊時,這裏要思考下集合中元素時全部的等待線程永遠進行 wait 狀態不被喚醒,也就是說即便元素過時了也沒法正常出隊?
首先,在每次入隊 offer 時,若是是第一個元素就會調用 vailable.signal() 喚醒一個等待的線程。
其次,take 方法自旋結束後若是 leader == null && q.peek() != null,須要喚醒一個等待中的出隊線程。
leader == null && q.peek() != null 的含義就是——沒有 leader 線程但隊列中存在元素。咱們以前說了,leader 線程做用之一就是用來喚醒其它無限等待的線程,因此必需要有這個判斷。
固然,若是集合中沒有元素了,全部的等待線程都處理無限等待的狀態。
參考:
天天用心記錄一點點。內容也許不重要,但習慣很重要!