public class TestDelayQueue { public static void main(String[] args) throws InterruptedException { DelayQueue<DelayItem> delayQueue = new DelayQueue(); // 20s後 delayQueue.add(new DelayItem(20, "aaaaaa")); // 10秒後 delayQueue.add(new DelayItem(10, "bbbbbb")); // 30秒後 delayQueue.add(new DelayItem(30, "cccccc")); while (0 < delayQueue.size()) { Thread.sleep(1000); DelayItem d = delayQueue.poll(); // DelayItem d = delayQueue.take(); System.out.println(null != d ? d.getItem() : "null"); } } static class DelayItem implements Delayed { private long delayTime; private String item; public DelayItem(long delayTime, String item) { super(); // 當前時間 LocalDateTime localDateTime = LocalDateTime.now(); this.delayTime = localDateTime.getSecond() + delayTime; this.item = item; } @Override public long getDelay(TimeUnit unit) { LocalDateTime localDateTime = LocalDateTime.now(); return unit.convert(delayTime - localDateTime.getSecond(), TimeUnit.SECONDS); } @Override public int compareTo(Delayed o) { return this.getDelay(TimeUnit.SECONDS) - o.getDelay(TimeUnit.SECONDS) < 0 ? -1 : 1; } public String getItem() { return item; } } }
public class DelayQueue<E extends Delayed> extends AbstractQueue<E> implements BlockingQueue<E>
首先要說明的是Delayed接口,類定義部分也已經明確指出其使用(E extends Delayed),咱們在操做時放入DelayQueue隊列元素必須實現這個接口,實現其中的getDelay方法和compareTo方法,在使用示例代碼部分我也說明了這兩個方法的做用ide
public interface Delayed extends Comparable<Delayed> { /** * Returns the remaining delay associated with this object, in the * given time unit. * * @param unit the time unit * @return the remaining delay; zero or negative values indicate * that the delay has already elapsed */ long getDelay(TimeUnit unit); }
/** * 可重入鎖ReentrantLock */ private final transient ReentrantLock lock = new ReentrantLock(); /** * 內部使用PriorityQueue來完成DelayQueue的操做 */ private final PriorityQueue<E> q = new PriorityQueue<E>(); /** * leader線程 * 指定了用於等待隊列元素出隊的線程 * 若是非空,則這個線程能夠阻塞等待一段時間(時間經過計算得到),其餘線程則無限等待 * 避免其餘線程沒必要要的等待 * 這個線程等待一段時間而後出隊操做,其餘線程則無限等待, * 若是等待過程當中入隊了過時時間更短的元素(優先級隊列堆頂元素變化),則會重置leader爲null,並會喚醒等待的線程去爭搶leader來獲取執行出隊的權利 */ private Thread leader = null; /** * Condition對象完成線程等待和喚醒任務 */ private final Condition available = lock.newCondition();
public DelayQueue() {} public DelayQueue(Collection<? extends E> c) { this.addAll(c); }
/** * Inserts the specified element into this delay queue. * * @param e the element to add * @return {@code true} * @throws NullPointerException if the specified element is null */ public boolean offer(E e) { final ReentrantLock lock = this.lock; lock.lock(); try { q.offer(e); // 此節點爲當前優先級隊列堆頂節點,即0的索引位置 // 即這次添加的節點即爲下次要獲取的堆頂節點(出隊節點) // 若是非堆頂節點則表示堆頂節點未變化則不要重置leader if (q.peek() == e) { // leader線程置空,讓出隊線程爭搶leader優先執行權 leader = null; // 喚醒阻塞的線程 available.signal(); } return true; } finally { lock.unlock(); } }
/** * Retrieves and removes the head of this queue, or returns {@code null} * if this queue has no elements with an expired delay. * * @return the head of this queue, or {@code null} if this * queue has no elements with an expired delay */ public E poll() { final ReentrantLock lock = this.lock; lock.lock(); try { // 堆頂元素 E first = q.peek(); // 堆爲空或者堆頂元素延遲時間還未到期則返回null,不然經過poll出隊 if (first == null || first.getDelay(NANOSECONDS) > 0) return null; else return q.poll(); } finally { lock.unlock(); } } /** * Retrieves and removes the head of this queue, waiting if necessary * until an element with an expired delay is available on this queue, * or the specified wait time expires. * * @return the head of this queue, or {@code null} if the * specified waiting time elapses before an element with * an expired delay becomes available * @throws InterruptedException {@inheritDoc} */ public E poll(long timeout, TimeUnit unit) throws InterruptedException { // 時間轉成納秒 long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { // 堆頂元素 E first = q.peek(); // 空表示隊列爲空 if (first == null) { // 等待時間小於等於0則直接返回null,不然就阻塞等待nanos時間 if (nanos <= 0) return null; else // 若是中途被喚醒則更新nanos,剩餘等待時間 nanos = available.awaitNanos(nanos); } else { // 堆頂元素的延遲時間 long delay = first.getDelay(NANOSECONDS); // 延遲時間到期直接出隊操做 if (delay <= 0) return q.poll(); // 延遲時間未到期直接返回null if (nanos <= 0) return null; // 延遲時間未到期同時設置了超時時間進入下面進行處理 // 處於等待狀態不要引用first first = null; // don't retain ref while waiting // 超時時間小於延遲時間或者leader非空阻塞等待nanos // 超時時間小於延遲時間則當前線程最多等待nanos超時時間便可 // leader非空則代表其餘線程已經得到優先執行權,最多等待nanos超時時間便可 // 在等待中有可能被喚醒再此循環執行 if (nanos < delay || leader != null) nanos = available.awaitNanos(nanos); else { // 超時時間大於延遲時間同時leader線程爲空進入下面處理 Thread thisThread = Thread.currentThread(); // 先設置leader線程獲取執行權 leader = thisThread; try { // 阻塞等待delay便可出隊操做 // 萬一等待過程當中被喚醒則經過剩餘等待時間循環判斷處理 // 有可能在等待中入隊了延遲時間更短的元素,此時需釋放leader從新爭搶優先執行權 long timeLeft = available.awaitNanos(delay); nanos -= delay - timeLeft; } finally { // 釋放leader執行權,從新爭搶leader if (leader == thisThread) leader = null; } } } } } finally { // leader空且隊列非空則喚醒其餘阻塞的線程 if (leader == null && q.peek() != null) available.signal(); lock.unlock(); } }
/** * Retrieves and removes the head of this queue, waiting if necessary * until an element with an expired delay is available on this queue. * * @return the head of this queue * @throws InterruptedException {@inheritDoc} */ public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { E first = q.peek(); if (first == null) // 隊列無數據則阻塞等待 available.await(); else { // 獲取其堆頂元素延遲時間 long delay = first.getDelay(NANOSECONDS); // 延遲時間已到期,能夠進行出隊操做了 if (delay <= 0) return q.poll(); // 延遲時間還未到期 // 置空first,等待時間內去掉引用 first = null; // don't retain ref while waiting // leader線程非空,表示其餘線程已經獲取優先執行權,阻塞等待 if (leader != null) available.await(); else { // leader爲空則指向當前線程,表示當前線程得到執行權 Thread thisThread = Thread.currentThread(); leader = thisThread; try { // 阻塞等待delay秒以後繼續 // 也有可能新入隊元素(堆頂元素變化時)被喚醒需從新獲取leader執行權 available.awaitNanos(delay); } finally { // leader置空,釋放優先執行權 if (leader == thisThread) leader = null; } } } } } finally { // leader空且隊列非空則喚醒其餘阻塞的線程 if (leader == null && q.peek() != null) available.signal(); lock.unlock(); } }
/** * Returns first element only if it is expired. * Used only by drainTo. Call only when holding lock. * * 命名上徹底能瞭解其含義 * 獲取隊列中的堆頂元素,延遲時間還未到期則返回null * 被drainTo所使用,參照下面方法 * */ private E peekExpired() { // assert lock.isHeldByCurrentThread(); E first = q.peek(); return (first == null || first.getDelay(NANOSECONDS) > 0) ? null : first; } public int drainTo(Collection<? super E> c) { if (c == null) throw new NullPointerException(); if (c == this) throw new IllegalArgumentException(); final ReentrantLock lock = this.lock; lock.lock(); try { int n = 0; // 轉移已過時的元素到新隊列中 for (E e; (e = peekExpired()) != null;) { c.add(e); // In this order, in case add() throws. q.poll(); ++n; } return n; } finally { lock.unlock(); } } public int drainTo(Collection<? super E> c, int maxElements) { // 判空 if (c == null) throw new NullPointerException(); // 非本對象 if (c == this) throw new IllegalArgumentException(); // 長度判斷 if (maxElements <= 0) return 0; final ReentrantLock lock = this.lock; lock.lock(); try { int n = 0; // 轉移已過時的元素到新隊列中,最多轉移maxElements個元素 for (E e; n < maxElements && (e = peekExpired()) != null;) { c.add(e); // In this order, in case add() throws. q.poll(); ++n; } return n; } finally { lock.unlock(); } }
/** * * 本質上調用PriorityQueue.toArray * 將PriorityQueue的底層數組拷貝做爲迭代器的array * 故這裏保存了全部的元素,不只僅是已過時的元素 */ public Iterator<E> iterator() { return new Itr(toArray()); } /** * Snapshot iterator that works off copy of underlying q array. */ private class Itr implements Iterator<E> { // 保存PriorityQueue的數組 final Object[] array; // Array of all elements // 下次next返回的元素索引 int cursor; // index of next element to return // 上次返回的return元素索引 int lastRet; // index of last element, or -1 if no such Itr(Object[] array) { lastRet = -1; this.array = array; } public boolean hasNext() { return cursor < array.length; } @SuppressWarnings("unchecked") public E next() { if (cursor >= array.length) throw new NoSuchElementException(); lastRet = cursor; return (E)array[cursor++]; } // 刪除元素,須要注意,會直接把原隊列中的元素刪除 public void remove() { if (lastRet < 0) throw new IllegalStateException(); removeEQ(array[lastRet]); lastRet = -1; } }