本文首發於一世流雲專欄: https://segmentfault.com/blog...
DelayQueue
是JDK1.5時,隨着J.U.C包一塊兒引入的一種阻塞隊列,它實現了BlockingQueue接口,底層基於已有的PriorityBlockingQueue實現:segmentfault
DelayQueue也是一種比較特殊的阻塞隊列,從類聲明也能夠看出,DelayQueue中的全部元素必須實現Delayed
接口:設計模式
/** * 一種混合風格的接口,用來標記那些應該在給定延遲時間以後執行的對象。 * <p> * 此接口的實現必須定義一個 compareTo 方法,該方法提供與此接口的 getDelay 方法一致的排序。 */ public interface Delayed extends Comparable<Delayed> { /** * 返回與此對象相關的剩餘有效時間,以給定的時間單位表示. */ long getDelay(TimeUnit unit); }
能夠看到,Delayed接口除了自身的getDelay
方法外,還實現了Comparable接口。getDelay方法用於返回對象的剩餘有效時間,實現Comparable接口則是爲了可以比較兩個對象,以便排序。緩存
也就是說,若是一個類實現了Delayed接口,當建立該類的對象並添加到DelayQueue中後,只有當該對象的getDalay方法返回的剩餘時間≤0時纔會出隊。網絡
另外,因爲DelayQueue內部委託了PriorityBlockingQueue對象來實現全部方法,因此能以堆的結構維護元素順序,這樣剩餘時間最小的元素就在堆頂,每次出隊其實就是刪除剩餘時間≤0的最小元素。多線程
DelayQueue的特色簡要歸納以下:框架
爲了便於理解DelayQueue的功能,咱們先來看一個使用DelayQueue的示例。dom
第一節說了,隊列元素必須實現Delayed接口,咱們先來定義一個Data類,做爲隊列元素:異步
public class Data implements Delayed { private static final AtomicLong atomic = new AtomicLong(0); private static final DateTimeFormatter formatter = DateTimeFormatter.ofPattern("HH:mm:ss-n"); // 數據的失效時間點 private final long time; // 序號 private final long seqno; /** * @param deadline 數據失效時間點 */ public Data(long deadline) { this.time = deadline; this.seqno = atomic.getAndIncrement(); } /** * 返回剩餘有效時間 * * @param unit 時間單位 */ @Override public long getDelay(TimeUnit unit) { return unit.convert(this.time - System.nanoTime(), TimeUnit.NANOSECONDS); } /** * 比較兩個Delayed對象的大小, 比較順序以下: * 1. 若是是對象自己, 返回0; * 2. 比較失效時間點, 先失效的返回-1,後失效的返回1; * 3. 比較元素序號, 序號小的返回-1, 不然返回1. * 4. 非Data類型元素, 比較剩餘有效時間, 剩餘有效時間小的返回-1,大的返回1,相同返回0 */ @Override public int compareTo(Delayed other) { if (other == this) // compare zero if same object return 0; if (other instanceof Data) { Data x = (Data) other; // 優先比較失效時間 long diff = this.time - x.time; if (diff < 0) return -1; else if (diff > 0) return 1; else if (this.seqno < x.seqno) // 剩餘時間相同則比較序號 return -1; else return 1; } // 通常不會執行到此處,除非元素不是Data類型 long diff = this.getDelay(TimeUnit.NANOSECONDS) - other.getDelay(TimeUnit.NANOSECONDS); return (diff < 0) ? -1 : (diff > 0) ? 1 : 0; } @Override public String toString() { return "Data{" + "time=" + time + ", seqno=" + seqno + "}, isValid=" + isValid(); } private boolean isValid() { return this.getDelay(TimeUnit.NANOSECONDS) > 0; } }
關於隊列元素Data類,須要注意如下幾點:ide
time
字段保存失效時間點)的納秒形式(構造時指定,好比當前時間+60s);seqno
字段表示元素序號,每一個元素惟一,僅用於失效時間點一致的元素之間的比較。getDelay
方法返回元素的剩餘有效時間,能夠根據入參的TimeUnit選擇時間的表示形式(秒、微妙、納秒等),通常選擇納秒以提升精度;compareTo
方法用於比較兩個元素的大小,以便在隊列中排序。因爲DelayQueue基於優先級隊列實現,因此內部是「堆」的形式,咱們定義的規則是先失效的元素將先出隊,因此先失效元素應該在堆頂,即compareTo方法返回結果<0的元素優先出隊;仍是以「生產者-消費者」模式來做爲DelayQueued的示例:性能
生產者
public class Producer implements Runnable { private final DelayQueue<Data> queue; public Producer(DelayQueue<Data> queue) { this.queue = queue; } @Override public void run() { while (true) { long currentTime = System.nanoTime(); long validTime = ThreadLocalRandom.current().nextLong(1000000000L, 7000000000L); Data data = new Data(currentTime + validTime); queue.put(data); System.out.println(Thread.currentThread().getName() + ": put " + data); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } }
消費者
public class Consumer implements Runnable { private final DelayQueue<Data> queue; public Consumer(DelayQueue<Data> queue) { this.queue = queue; } @Override public void run() { while (true) { try { Data data = queue.take(); System.out.println(Thread.currentThread().getName() + ": take " + data); Thread.yield(); } catch (InterruptedException e) { e.printStackTrace(); } } } }
調用
public class Main { public static void main(String[] args) { DelayQueue<Data> queue = new DelayQueue<>(); Thread c1 = new Thread(new Consumer(queue), "consumer-1"); Thread p1 = new Thread(new Producer(queue), "producer-1"); c1.start(); p1.start(); } }
執行結果:
producer-1: put Data{time=73262562161592, seqno=0}, isValid=true
producer-1: put Data{time=73262787192726, seqno=1}, isValid=true
producer-1: put Data{time=73265591291171, seqno=2}, isValid=true
producer-1: put Data{time=73266850330909, seqno=3}, isValid=trueconsumer-1: take Data{time=73262562161592, seqno=0}, isValid=false
consumer-1: take Data{time=73262787192726, seqno=1}, isValid=false
producer-1: put Data{time=73267928737184, seqno=4}, isValid=true
producer-1: put Data{time=73265083111776, seqno=5}, isValid=true
producer-1: put Data{time=73268729942809, seqno=6}, isValid=trueconsumer-1: take Data{time=73265083111776, seqno=5}, isValid=false
上面示例中,咱們建立了一個生產者,一個消費者,生產者不斷得入隊元素,每一個元素都會有個截止有效期;消費者不斷得從隊列者獲取元素。從輸出能夠看出,消費者每次獲取到的元素都是有效期最小的,且都是已經失效了的。(由於DelayQueue每次出隊只會刪除有效期最小且已通過期的元素)
介紹完了DelayQueued的基本使用,讀者應該對該阻塞隊列的功能有了基本瞭解,接下來咱們看下Doug Lea是如何實現DelayQueued的。
DelayQueued提供了兩種構造器,都很是簡單:
/** * 默認構造器. */ public DelayQueue() { }
/** * 從已有集合構造隊列. */ public DelayQueue(Collection<? extends E> c) { this.addAll(c); }
能夠看到,內部的PriorityQueue並不是在構造時建立,而是對象建立時生成:
public class DelayQueue<E extends Delayed> extends AbstractQueue<E> implements BlockingQueue<E> { private final transient ReentrantLock lock = new ReentrantLock(); private final PriorityQueue<E> q = new PriorityQueue<E>(); /** * leader線程是首個嘗試出隊元素(隊列不爲空)但被阻塞的線程. * 該線程會限時等待(隊首元素的剩餘有效時間),用於喚醒其它等待線程 */ private Thread leader = null; /** * 出隊線程條件隊列, 當有多個線程, 會在此條件隊列上等待. */ private final Condition available = lock.newCondition(); //... }
上述比較特殊的是leader
字段,咱們以前已經說過,DelayQueue每次只會出隊一個過時的元素,若是隊首元素沒有過時,就會阻塞出隊線程,讓線程在available
這個條件隊列上無限等待。
爲了提高性能,DelayQueue並不會讓全部出隊線程都無限等待,而是用leader
保存了第一個嘗試出隊的線程,該線程的等待時間是隊首元素的剩餘有效期。這樣,一旦leader線程被喚醒(此時隊首元素也失效了),就能夠出隊成功,而後喚醒一個其它在available
條件隊列上等待的線程。以後,會重複上一步,新喚醒的線程可能取代成爲新的leader線程。這樣,就避免了無效的等待,提高了性能。這實際上是一種名爲「Leader-Follower pattern」的多線程設計模式。
put方法沒有什麼特別,因爲是無界隊列,因此也不會阻塞線程。
/** * 入隊一個指定元素e. * 因爲是無界隊列, 因此該方法並不會阻塞線程. */ public void put(E e) { offer(e); } public boolean offer(E e) { final ReentrantLock lock = this.lock; lock.lock(); try { q.offer(e); // 調用PriorityQueue的offer方法 if (q.peek() == e) { // 若是入隊元素在隊首, 則喚醒一個出隊線程 leader = null; available.signal(); } return true; } finally { lock.unlock(); } }
須要注意的是當首次入隊元素時,須要喚醒一個出隊線程,由於此時可能已有出隊線程在空隊列上等待了,若是不喚醒,會致使出隊線程永遠沒法執行。
if (q.peek() == e) { // 若是入隊元素在隊首, 則喚醒一個出隊線程 leader = null; available.signal(); }
整個take方法在一個自旋中完成,其實就分爲兩種狀況:
1.隊列爲空
這種狀況直接阻塞出隊線程。(在available條件隊列等待)
2.隊列非空
隊列非空時,還要看隊首元素的狀態(有效期),若是隊首元素過時了,那直接出隊就好了;若是隊首元素未過時,就要看當前線程是不是第一個到達的出隊線程(即判斷leader
是否爲空),若是不是,就無限等待,若是是,則限時等待。
/** * 隊首出隊元素. * 若是隊首元素(堆頂)未到期或隊列爲空, 則阻塞線程. */ public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (; ; ) { E first = q.peek(); // 讀取隊首元素 if (first == null) // CASE1: 隊列爲空, 直接阻塞 available.await(); else { // CASE2: 隊列非空 long delay = first.getDelay(NANOSECONDS); if (delay <= 0) // CASE2.0: 隊首元素已過時 return q.poll(); // 執行到此處說明隊列非空, 且隊首元素未過時 first = null; if (leader != null) // CASE2.1: 已存在leader線程 available.await(); // 無限期阻塞當前線程 else { // CASE2.2: 不存在leader線程 Thread thisThread = Thread.currentThread(); leader = thisThread; // 將當前線程置爲leader線程 try { available.awaitNanos(delay); // 阻塞當前線程(限時等待剩餘有效時間) } finally { if (leader == thisThread) leader = null; } } } } } finally { if (leader == null && q.peek() != null) // 不存在leader線程, 則喚醒一個其它出隊線程 available.signal(); lock.unlock(); } }
須要注意,自旋結束後若是leader == null && q.peek() != null
,須要喚醒一個等待中的出隊線程。
leader == null && q.peek() != null
的含義就是——沒有leader
線程但隊列中存在元素。咱們以前說了,leader線程做用之一就是用來喚醒其它無限等待的線程,因此必需要有這個判斷。
DelayQueue是阻塞隊列中很是有用的一種隊列,常常被用於緩存或定時任務等的設計。
考慮一種使用場景:
異步通知的重試,在不少系統中,當用戶完成服務調用後,系統有時須要將結果異步通知到用戶的某個URI。因爲網絡等緣由,不少時候會通知失敗,這個時候就須要一種重試機制。
這時能夠用DelayQueue保存通知失敗的請求,失效時間能夠根據已通知的次數來設定(好比:2s、5s、10s、20s),這樣每次從隊列中take獲取的就是剩餘時間最短的請求,若是已重複通知次數超過必定閾值,則能夠把消息拋棄。
後面,咱們在講J.U.C之executors框架的時候,還會再次看到DelayQueue的身影。JUC線程池框架中的ScheduledThreadPoolExecutor.DelayedWorkQueue
就是一種延時阻塞隊列。