做爲阻塞隊列的一員,DelayQueue(延遲隊列)因爲其特殊含義而使用在特定的場景之中,主要在於Delay這個詞上,那麼其內部是如何實現的呢?今天一塊兒經過DelayQueue的源碼來看一看其是如何完成Delay操做的java
JDK版本號:1.8.0_171
DelayQueue內部經過優先級隊列PriorityQueue來實現隊列元素的排序操做,以前已經介紹過PriorityBlockingQueue的源碼實現,二者比較相似,可自行回顧下,既然用到了優先級隊列,則須要保證其隊列元素的可比較性,以及延遲隊列的特性(可計算延遲時間,經過延遲時間進行比較排序),故這裏其中的隊列元素須要實現Delayed接口,DelayQueue主要就在於理解這兩部份內容數組
下面示例代碼部分已經顯示了DelayQueue的用法,從名字命名上也能理解出其含義,延遲隊列,主要在於延遲消費,如何實現呢?這裏就須要用到Delayed接口,後面會進行說明,在使用時須要實現Delayed接口和compareTo接口安全
本身能夠先試試運行結果,理解看看,能夠看下調用poll和take的結果。若是用過rocketmq,能夠類比其中的延遲消息隊列,等到規定的時間再進行消費,只不過mq中的實現要比這複雜多線程
public class TestDelayQueue { public static void main(String[] args) throws InterruptedException { DelayQueue<DelayItem> delayQueue = new DelayQueue(); // 20s後 delayQueue.add(new DelayItem(20, "aaaaaa")); // 10秒後 delayQueue.add(new DelayItem(10, "bbbbbb")); // 30秒後 delayQueue.add(new DelayItem(30, "cccccc")); while (0 < delayQueue.size()) { Thread.sleep(1000); DelayItem d = delayQueue.poll(); // DelayItem d = delayQueue.take(); System.out.println(null != d ? d.getItem() : "null"); } } static class DelayItem implements Delayed { private long delayTime; private String item; public DelayItem(long delayTime, String item) { super(); // 當前時間 LocalDateTime localDateTime = LocalDateTime.now(); this.delayTime = localDateTime.getSecond() + delayTime; this.item = item; } @Override public long getDelay(TimeUnit unit) { LocalDateTime localDateTime = LocalDateTime.now(); return unit.convert(delayTime - localDateTime.getSecond(), TimeUnit.SECONDS); } @Override public int compareTo(Delayed o) { return this.getDelay(TimeUnit.SECONDS) - o.getDelay(TimeUnit.SECONDS) < 0 ? -1 : 1; } public String getItem() { return item; } } }
public class DelayQueue<E extends Delayed> extends AbstractQueue<E> implements BlockingQueue<E>
首先要說明的是Delayed接口,類定義部分也已經明確指出其使用(E extends Delayed),咱們在操做時放入DelayQueue隊列元素必須實現這個接口,實現其中的getDelay方法和compareTo方法,在使用示例代碼部分我也說明了這兩個方法的做用ide
public interface Delayed extends Comparable<Delayed> { /** * Returns the remaining delay associated with this object, in the * given time unit. * * @param unit the time unit * @return the remaining delay; zero or negative values indicate * that the delay has already elapsed */ long getDelay(TimeUnit unit); }
其中使用了PriorityQueue來完成有序出隊操做,與以前講解過的PriorityBlockingQueue相似,有些許不一樣,可自行參考源碼部分,也能夠去看我以前的一篇專門講解PriorityBlockingQueue源碼的文章,主要異同在於PriorityQueue是非線程安全的,而PriorityBlockingQueue是線程安全的,內部排序機制使用的都是堆排序this
若是你瞭解過PriorityQueue或PriorityBlockingQueue則在這裏使用這個類是很容易理解源碼實現人員的目的的,建議先去了解其實現,要不直接看這個源碼比較有難度spa
因爲須要實現延遲隊列,使用PriorityQueue根據時間排序(自行實現具體細節,例如上邊示例根據時間來排序),經過Delayed接口限制使用DelayQueue的場景線程
/** * 可重入鎖ReentrantLock */ private final transient ReentrantLock lock = new ReentrantLock(); /** * 內部使用PriorityQueue來完成DelayQueue的操做 */ private final PriorityQueue<E> q = new PriorityQueue<E>(); /** * leader線程 * 指定了用於等待隊列元素出隊的線程 * 若是非空,則這個線程能夠阻塞等待一段時間(時間經過計算得到),其餘線程則無限等待 * 避免其餘線程沒必要要的等待 * 這個線程等待一段時間而後出隊操做,其餘線程則無限等待, * 若是等待過程當中入隊了過時時間更短的元素(優先級隊列堆頂元素變化),則會重置leader爲null,並會喚醒等待的線程去爭搶leader來獲取執行出隊的權利 */ private Thread leader = null; /** * Condition對象完成線程等待和喚醒任務 */ private final Condition available = lock.newCondition();
構造方法比較簡單,無參構造沒有進行任何操做,有參構造方法直接傳入對應類型的集合,循環add放入隊列code
public DelayQueue() {} public DelayQueue(Collection<? extends E> c) { this.addAll(c); }
入隊操做,先得到lock,以後經過優先級隊列的offer方法完成入隊,同時判斷是否要重置leader對象
/** * Inserts the specified element into this delay queue. * * @param e the element to add * @return {@code true} * @throws NullPointerException if the specified element is null */ public boolean offer(E e) { final ReentrantLock lock = this.lock; lock.lock(); try { q.offer(e); // 此節點爲當前優先級隊列堆頂節點,即0的索引位置 // 即這次添加的節點即爲下次要獲取的堆頂節點(出隊節點) // 若是非堆頂節點則表示堆頂節點未變化則不要重置leader if (q.peek() == e) { // leader線程置空,讓出隊線程爭搶leader優先執行權 leader = null; // 喚醒阻塞的線程 available.signal(); } return true; } finally { lock.unlock(); } }
出隊操做,先得到lock,以後經過優先級隊列的poll方法完成出隊,固然須要判斷堆頂元素是否已到期。等待超時方法較爲複雜,需耐心理解
/** * Retrieves and removes the head of this queue, or returns {@code null} * if this queue has no elements with an expired delay. * * @return the head of this queue, or {@code null} if this * queue has no elements with an expired delay */ public E poll() { final ReentrantLock lock = this.lock; lock.lock(); try { // 堆頂元素 E first = q.peek(); // 堆爲空或者堆頂元素延遲時間還未到期則返回null,不然經過poll出隊 if (first == null || first.getDelay(NANOSECONDS) > 0) return null; else return q.poll(); } finally { lock.unlock(); } } /** * Retrieves and removes the head of this queue, waiting if necessary * until an element with an expired delay is available on this queue, * or the specified wait time expires. * * @return the head of this queue, or {@code null} if the * specified waiting time elapses before an element with * an expired delay becomes available * @throws InterruptedException {@inheritDoc} */ public E poll(long timeout, TimeUnit unit) throws InterruptedException { // 時間轉成納秒 long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { // 堆頂元素 E first = q.peek(); // 空表示隊列爲空 if (first == null) { // 等待時間小於等於0則直接返回null,不然就阻塞等待nanos時間 if (nanos <= 0) return null; else // 若是中途被喚醒則更新nanos,剩餘等待時間 nanos = available.awaitNanos(nanos); } else { // 堆頂元素的延遲時間 long delay = first.getDelay(NANOSECONDS); // 延遲時間到期直接出隊操做 if (delay <= 0) return q.poll(); // 延遲時間未到期直接返回null if (nanos <= 0) return null; // 延遲時間未到期同時設置了超時時間進入下面進行處理 // 處於等待狀態不要引用first first = null; // don't retain ref while waiting // 超時時間小於延遲時間或者leader非空阻塞等待nanos // 超時時間小於延遲時間則當前線程最多等待nanos超時時間便可 // leader非空則代表其餘線程已經得到優先執行權,最多等待nanos超時時間便可 // 在等待中有可能被喚醒再此循環執行 if (nanos < delay || leader != null) nanos = available.awaitNanos(nanos); else { // 超時時間大於延遲時間同時leader線程爲空進入下面處理 Thread thisThread = Thread.currentThread(); // 先設置leader線程獲取執行權 leader = thisThread; try { // 阻塞等待delay便可出隊操做 // 萬一等待過程當中被喚醒則經過剩餘等待時間循環判斷處理 // 有可能在等待中入隊了延遲時間更短的元素,此時需釋放leader從新爭搶優先執行權 long timeLeft = available.awaitNanos(delay); nanos -= delay - timeLeft; } finally { // 釋放leader執行權,從新爭搶leader if (leader == thisThread) leader = null; } } } } } finally { // leader空且隊列非空則喚醒其餘阻塞的線程 if (leader == null && q.peek() != null) available.signal(); lock.unlock(); } }
出隊操做,先得到lock,再經過判斷最終執行poll完成出隊操做,和poll的超時等待方法相似
/** * Retrieves and removes the head of this queue, waiting if necessary * until an element with an expired delay is available on this queue. * * @return the head of this queue * @throws InterruptedException {@inheritDoc} */ public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { E first = q.peek(); if (first == null) // 隊列無數據則阻塞等待 available.await(); else { // 獲取其堆頂元素延遲時間 long delay = first.getDelay(NANOSECONDS); // 延遲時間已到期,能夠進行出隊操做了 if (delay <= 0) return q.poll(); // 延遲時間還未到期 // 置空first,等待時間內去掉引用 first = null; // don't retain ref while waiting // leader線程非空,表示其餘線程已經獲取優先執行權,阻塞等待 if (leader != null) available.await(); else { // leader爲空則指向當前線程,表示當前線程得到執行權 Thread thisThread = Thread.currentThread(); leader = thisThread; try { // 阻塞等待delay秒以後繼續 // 也有可能新入隊元素(堆頂元素變化時)被喚醒需從新獲取leader執行權 available.awaitNanos(delay); } finally { // leader置空,釋放優先執行權 if (leader == thisThread) leader = null; } } } } } finally { // leader空且隊列非空則喚醒其餘阻塞的線程 if (leader == null && q.peek() != null) available.signal(); lock.unlock(); } }
轉移隊列操做,內部是先經過peek方法先獲取隊列堆頂元素,判斷其是否已到期,如到期則添加元素到新隊列中,同時對原隊列出隊操做,固然,只轉移已經到期的全部元素
/** * Returns first element only if it is expired. * Used only by drainTo. Call only when holding lock. * * 命名上徹底能瞭解其含義 * 獲取隊列中的堆頂元素,延遲時間還未到期則返回null * 被drainTo所使用,參照下面方法 * */ private E peekExpired() { // assert lock.isHeldByCurrentThread(); E first = q.peek(); return (first == null || first.getDelay(NANOSECONDS) > 0) ? null : first; } public int drainTo(Collection<? super E> c) { if (c == null) throw new NullPointerException(); if (c == this) throw new IllegalArgumentException(); final ReentrantLock lock = this.lock; lock.lock(); try { int n = 0; // 轉移已過時的元素到新隊列中 for (E e; (e = peekExpired()) != null;) { c.add(e); // In this order, in case add() throws. q.poll(); ++n; } return n; } finally { lock.unlock(); } } public int drainTo(Collection<? super E> c, int maxElements) { // 判空 if (c == null) throw new NullPointerException(); // 非本對象 if (c == this) throw new IllegalArgumentException(); // 長度判斷 if (maxElements <= 0) return 0; final ReentrantLock lock = this.lock; lock.lock(); try { int n = 0; // 轉移已過時的元素到新隊列中,最多轉移maxElements個元素 for (E e; n < maxElements && (e = peekExpired()) != null;) { c.add(e); // In this order, in case add() throws. q.poll(); ++n; } return n; } finally { lock.unlock(); } }
其餘方法如peek,size,clear,toArray,remove等都是經過優先級隊列PriorityQueue來實現的,只是每次操做時須要先得到可重入鎖保證線程安全
迭代器的實現不是很複雜,迭代器複製了隊列中的全部元素,須要注意的是,迭代器中的remove方法會經過removeEQ方法直接刪除原PriorityQueue隊列中的元素,不是刪除拷貝的數據元素
/** * * 本質上調用PriorityQueue.toArray * 將PriorityQueue的底層數組拷貝做爲迭代器的array * 故這裏保存了全部的元素,不只僅是已過時的元素 */ public Iterator<E> iterator() { return new Itr(toArray()); } /** * Snapshot iterator that works off copy of underlying q array. */ private class Itr implements Iterator<E> { // 保存PriorityQueue的數組 final Object[] array; // Array of all elements // 下次next返回的元素索引 int cursor; // index of next element to return // 上次返回的return元素索引 int lastRet; // index of last element, or -1 if no such Itr(Object[] array) { lastRet = -1; this.array = array; } public boolean hasNext() { return cursor < array.length; } @SuppressWarnings("unchecked") public E next() { if (cursor >= array.length) throw new NoSuchElementException(); lastRet = cursor; return (E)array[cursor++]; } // 刪除元素,須要注意,會直接把原隊列中的元素刪除 public void remove() { if (lastRet < 0) throw new IllegalStateException(); removeEQ(array[lastRet]); lastRet = -1; } }
DelayQueue做爲一個特殊的阻塞隊列,主要在於Delay特性上,內部經過優先級阻塞隊列和Delayed接口實現延遲的操做,若是以前已經瞭解了優先級隊列,則很是容易理解其源碼實現邏輯,複雜點的部分也就在於在多線程環境下入隊一個新的更短的元素時內部作的處理,經過爭搶leader來肯定優先出隊的那個線程,作不一樣的處理,比較有意思,能夠參考文章多理解理解,不算過於複雜
以上內容若有問題歡迎指出,筆者驗證後將及時修正,謝謝