DelayQueue是java併發包下的延時阻塞隊列,經常使用於實現定時任務。java
從繼承體系能夠看到,DelayQueue實現了BlockingQueue,因此它是一個阻塞隊列。數組
另外,DelayQueue還組合了一個叫作Delayed的接口,DelayQueue中存儲的全部元素必須實現Delayed接口。安全
那麼,Delayed是什麼呢?併發
public interface Delayed extends Comparable<Delayed> { long getDelay(TimeUnit unit); }
Delayed是一個繼承自Comparable的接口,而且定義了一個getDelay()方法,用於表示還有多少時間到期,到期了應返回小於等於0的數值。ide
// 用於控制併發的鎖 private final transient ReentrantLock lock = new ReentrantLock(); // 優先級隊列 private final PriorityQueue<E> q = new PriorityQueue<E>(); // 用於標記當前是否有線程在排隊(僅用於取元素時) private Thread leader = null; // 條件,用於表示如今是否有可取的元素 private final Condition available = lock.newCondition();
從屬性咱們能夠知道,延時隊列主要使用優先級隊列來實現,並輔以重入鎖和條件來控制併發安全。源碼分析
由於優先級隊列是無界的,因此這裏只須要一個條件就能夠了。this
public DelayQueue() {} public DelayQueue(Collection<? extends E> c) { this.addAll(c); }
構造方法比較簡單,一個默認構造方法,一個初始化添加集合c中全部元素的構造方法。spa
由於DelayQueue是阻塞隊列,且優先級隊列是無界的,因此入隊不會阻塞不會超時,所以它的四個入隊方法是同樣的。線程
public boolean add(E e) { return offer(e); } public void put(E e) { offer(e); } public boolean offer(E e, long timeout, TimeUnit unit) { return offer(e); } public boolean offer(E e) { final ReentrantLock lock = this.lock; lock.lock(); try { q.offer(e); if (q.peek() == e) { leader = null; available.signal(); } return true; } finally { lock.unlock(); } }
入隊方法比較簡單:3d
由於DelayQueue是阻塞隊列,因此它的出隊有四個不一樣的方法,有拋出異常的,有阻塞的,有不阻塞的,有超時的。
咱們這裏主要分析兩個,poll()和take()方法。
public E poll() { final ReentrantLock lock = this.lock; lock.lock(); try { E first = q.peek(); if (first == null || first.getDelay(NANOSECONDS) > 0) return null; else return q.poll(); } finally { lock.unlock(); } }
poll()方法比較簡單:
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { // 堆頂元素 E first = q.peek(); // 若是堆頂元素爲空,說明隊列中尚未元素,直接阻塞等待 if (first == null) available.await(); else { // 堆頂元素的到期時間 long delay = first.getDelay(NANOSECONDS); // 若是小於0說明已到期,直接調用poll()方法彈出堆頂元素 if (delay <= 0) return q.poll(); // 若是delay大於0 ,則下面要阻塞了 // 將first置爲空方便gc,由於有可能其它元素彈出了這個元素 // 這裏還持有着引用不會被清理 first = null; // don't retain ref while waiting // 若是前面有其它線程在等待,直接進入等待 if (leader != null) available.await(); else { // 若是leader爲null,把當前線程賦值給它 Thread thisThread = Thread.currentThread(); leader = thisThread; try { // 等待delay時間後自動醒過來 // 醒過來後把leader置空並從新進入循環判斷堆頂元素是否到期 // 這裏即便醒過來後也不必定能獲取到元素 // 由於有可能其它線程先一步獲取了鎖並彈出了堆頂元素 // 條件鎖的喚醒分紅兩步,先從Condition的隊列裏出隊 // 再入隊到AQS的隊列中,當其它線程調用LockSupport.unpark(t)的時候纔會真正喚醒 // 關於AQS咱們後面會講的^^ available.awaitNanos(delay); } finally { // 若是leader仍是當前線程就把它置爲空,讓其它線程有機會獲取元素 if (leader == thisThread) leader = null; } } } } } finally { // 成功出隊後,若是leader爲空且堆頂還有元素,就喚醒下一個等待的線程 if (leader == null && q.peek() != null) // signal()只是把等待的線程放到AQS的隊列裏面,並非真正的喚醒 available.signal(); // 解鎖,這纔是真正的喚醒 lock.unlock(); } }
take()方法稍微要複雜一些:
請看下面的案例, 越早到期的元素越先出隊。
public class DelayQueueTest { public static void main(String[] args) { DelayQueue<Message> queue = new DelayQueue<>(); long now = System.currentTimeMillis(); // 啓動一個線程從隊列中取元素 new Thread(()->{ while (true) { try { // 將依次打印1000,2000,5000,7000,8000 System.out.println(queue.take().deadline - now); } catch (InterruptedException e) { e.printStackTrace(); } } }).start(); // 添加5個元素到隊列中 queue.add(new Message(now + 5000)); queue.add(new Message(now + 8000)); queue.add(new Message(now + 2000)); queue.add(new Message(now + 1000)); queue.add(new Message(now + 7000)); } } class Message implements Delayed { long deadline; public Message(long deadline) { this.deadline = deadline; } @Override public long getDelay(TimeUnit unit) { return deadline - System.currentTimeMillis(); } @Override public int compareTo(Delayed o) { return (int) (getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS)); } @Override public String toString() { return String.valueOf(deadline); } }
ScheduledThreadPoolExecutor中使用的是它本身定義的內部類DelayedWorkQueue,其實裏面的實現邏輯基本都是同樣的,只不過DelayedWorkQueue裏面沒有使用如今的PriorityQueue,而是使用數組又實現了一遍優先級隊列,本質上沒有什麼區別。