Java同步數據結構之DelayQueue/DelayedWorkQueue

前言

前面介紹了優先級隊列PriorityBlockingQueue,順帶也說了一下PriorityQueue,二者的實現方式是如出一轍的,都是採用基於數組的平衡二叉堆實現,不論入隊的順序怎麼樣,take、poll出隊的節點都是按優先級排序的。可是PriorityBlockingQueue/PriorityQueue隊列中的全部元素並非在入隊以後就已經所有按優先級排好序了,而是隻保證head節點即隊列的首個元素是當前最小或者說最高優先級的,其它節點的順序並不保證是按優先級排序的,PriorityBlockingQueue/PriorityQueue隊列只會在經過take、poll取走head以後纔會再次決出新的最小或者說最高優先級的節點做爲新的head,其它節點的順序依然不保證。因此經過peek拿到的head節點就是當前隊列中最高優先級的節點。java

明白了優先級隊列的原理要理解DelayQueue就很是簡單,由於DelayQueue就是基於PriorityQueue實現的,DelayQueue隊列實際上就是將隊列元素保存到內部的一個PriorityQueue實例中的(因此也不支持插入null值),DelayQueue只專一於實現隊列元素的延時出隊。算法

延遲隊列DelayQueue是一個無界阻塞隊列,它的隊列元素只能在該元素的延遲已經結束或者說過時才能被出隊。它怎麼判斷一個元素的延遲是否結束呢,原來DelayQueue隊列元素必須是實現了Delayed接口的實例,該接口有一個getDelay方法須要實現,延遲隊列就是經過實時的調用元素的該方法來判斷當前元素是否延遲已經結束。數組

既然DelayQueue是基於優先級隊列來實現的,那確定元素也要實現Comparable接口,沒錯由於Delayed接口繼承了Comparable接口,因此實現Delayed的隊列元素也必需要實現Comparable的compareTo方法。延遲隊列就是以時間做爲比較基準的優先級隊列,這個時間即延遲時間,這個時間大都在構造元素的時候就已經設置好,隨着程序的運行時間的推移,隊列元素的延遲時間逐步到期,DelayQueue就可以基於延遲時間運用優先級隊列並配合getDelay方法達到延遲隊列中的元素在延遲結束時精準出隊。併發

Delayed接口

1 public interface Delayed extends Comparable<Delayed> { 2 
3      //以指定的時間單位unit返回此對象的剩餘延遲 4     // 返回值 小於等於0 表示延遲已經結束
5      long getDelay(TimeUnit unit); 6 }

 放入DelayQueue隊列的元素必須實現Delayed接口,getDelay方法用於查看當前對象的延遲剩餘時間,返回值是以參數指定的unit爲單位的數字(unit能夠是秒,分鐘等),返回值小於等於0就表示該元素延遲結束。注意:該接口的實現類必需要實現compareTo方法,且compareTo方法提供與getDelay方法一致的排序,也就是說compareTo要基於getDelay方法的返回值來實現比較。app

DelayQueue

現來看看它的成員變量:less

 1 public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
 2     implements BlockingQueue<E> {  3 
 4     //非公平鎖
 5     private final transient ReentrantLock lock = new ReentrantLock();  6     private final PriorityQueue<E> q = new PriorityQueue<E>(); //優先級隊列
 7 
 8 /* Thread designated to wait for the element at the head of the queue. This variant of the Leader-Follower pattern (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves  9  * to minimize unnecessary timed waiting. When a thread becomes the leader, it waits only for the next delay to elapse, but other threads await indefinitely. 10  * The leader thread must signal some other thread before returning from take() or poll(...), unless some other thread becomes leader in the interim. 11  * Whenever the head of the queue is replaced with an element with an earlier expiration time, the leader field is invalidated by being reset to null, 12  * and some waiting thread, but not necessarily the current leader, is signalled. So waiting threads must be prepared to acquire and lose leadership while waiting. 13 
14  * 指定用於等待隊列頭部的元素的線程。這種Leader-Follower模式的變體(http://www.cs.wustl.edu/~schmidt/POSA/POSA2/)能夠減小沒必要要的定時等待。 15  * 當一個線程成爲leader時,它只等待下一個延遲過時,而其餘線程則無限期地等待。leader線程必須在從take()或poll(…)返回以前向其餘線程發出信號,除非其餘線程在此期間成爲leader。 16  * 每當隊列的頭部被具備更早過時時間的元素替換時,leader字段就會經過重置爲null而無效,而且會通知等待的線程(不必定是當前的leader)的信號。 17  * 所以,等待線程必須準備好在等待時得到和失去領導權。 18  */
19 private Thread leader = null; 20 
21 /* Condition signalled when a newer element becomes available at the head of the queue or a new thread may need to become leader. 22  * 當隊列頭部的新元素可用或新線程可能須要成爲leader時發出的條件。 23  */
24 private final Condition available = lock.newCondition();
View Code

  DelayQueue的成員很簡單,一個非公平鎖以及Condition、一個優先級隊列,以及一個leader線程。這個leader線程很關鍵,它上面的Java Doc描述很長,其實就已經把DelayQueue的內部實現說的很清楚了。簡單來講,DelayQueue使用了所謂的Leader-Follower模式的變體來減小消費線程沒必要要的定時等待,因爲優先級隊列中的head就是整個隊列中最早要延遲結束的元素,其它元素的延遲結束時間都比它長,因此就讓獲取head元素的線程只等待它延時剩餘的時間,而讓其它消費線程無限期等待,當獲取head元素的線程結束等待取走head以後再喚醒其它消費線程,那些消費線程又會由於拿到新的head而產生一個新的leader,這就是Leader-Follower模式,只等待指定時間的獲取head元素的線程就叫leader,而其它全部調用take的消費線程都是Follower。固然若是在leader等待的過程當中有延遲時間更短的元素入隊,根據優先級隊列的平衡二叉堆實現,它必然會被排到原來的head以前,成爲新的head,這時候原來的leader線程可能就會失去leader的領導權,誰被非公平鎖喚醒拿到新的head誰就是新的leader。ide

DelayQueue只有兩個構造方法,一個無參,一個用指定的集合元素初始化延遲隊列,源碼很簡單就不貼了。直接看DelayQueue的入隊邏輯。ui

入隊offer

DelayQueue的入隊方法都是調用的offer實現,直接看offer就能夠了:this

 1 public boolean offer(E e) {  2     final ReentrantLock lock = this.lock;  3  lock.lock();  4     try {  5         q.offer(e); //直接PriorityQueue入隊
 6         if (q.peek() == e) { //若是當前是第一個入隊的元素或者之前的head元素被當前元素替代了
 7             leader = null; //剝奪leader的領導權,因爲是非公平鎖,喚醒的不必定是leader線程,因此須要重置爲null
 8             available.signal();//喚醒leader,更換head
 9  } 10         return true; 11     } finally { 12  lock.unlock(); 13  } 14 }

 入隊操做很簡單,直接調用的優先級隊列的offer入隊,若是是①第一個入隊的元素或者是②當前時刻當前元素比原來的head具備更短的剩餘延遲時間,那麼是①的話須要喚醒由於隊列空而阻塞的消費線程,是②的話須要剝奪當前leader的領導權,並隨機喚醒(非公平鎖)一個消費線程(若是有的話)成爲新的leader。這裏有一個問題,怎麼就能判斷當前入隊的元素比head具備更短的延遲時間呢,由於head的延遲時間已通過去了一部分,其實這就須要在實現元素的compareTo方法時要根據getDelay方法返回的實時延遲剩餘時間來做比較,這樣優先級隊列在經過siftUp冒泡肯定新入隊元素的位置的時候就能精確的把握實時的延遲剩餘時間從而找到本身正確的位置。spa

出隊take

DelayQueue的精髓就在出隊方法的實現了,由於要保證只讓延遲結束的元素才能出隊:

 1 /**
 2  * Retrieves and removes the head of this queue, waiting if necessary  3  * until an element with an expired delay is available on this queue.  4  * 檢索並刪除此隊列的頭部,若是須要,將一直等待,直到該隊列上具備延遲結束的元素爲止。  5  * @return the head of this queue  6  * @throws InterruptedException {@inheritDoc}  7  */
 8 public E take() throws InterruptedException {  9     final ReentrantLock lock = this.lock; 10  lock.lockInterruptibly(); 11     try { 12         for (;;) { //注意是循環
13             E first = q.peek(); //獲取但不移除head
14             if (first == null) //隊列爲空,等待生產者offer完喚醒
15  available.await(); 16             else { 17                 long delay = first.getDelay(NANOSECONDS); 18                 if (delay <= 0) //head元素已通過期
19                     return q.poll(); 20                 first = null; // don't retain ref while waiting 等待時不要持有引用
21                 if (leader != null) //已經有leader在等待了,無限期等待leader醒來通知
22  available.await(); 23                 else { 24                     // 當前線程成爲leader
25                     Thread thisThread = Thread.currentThread(); 26                     leader = thisThread; 27                     try { 28                         available.awaitNanos(delay); //等待heade元素剩餘的延遲時間結束
29                     } finally { 30                         if (leader == thisThread) 31                             leader = null;    //交出leader權限
32  } 33  } 34  } 35  } 36     } finally { 37         if (leader == null && q.peek() != null) //隊列不爲空,而且沒有leader
38             available.signal(); //喚醒其它可能等待的消費線程
39  lock.unlock(); 40  } 41 }

出隊的邏輯就是Leader-Follower模式的實現,Leader只等待head剩餘的延遲時間(28行),而Follower無限期等待(22行)Leader成功取走head以後來喚醒(finally的部分),若是因爲offer入隊更短剩餘延遲時間的元素致使leader失去領導權,非公平鎖喚醒的將多是無限期等待的Follower,也多是原來的Leader,因爲offer重置了leader爲null,因此被喚醒的線程可以當即拿走head返回(若是head已經延遲結束)或者從新成爲leader(若是head尚未延遲結束)。

DelayQueue的其他方法例如當即返回的poll和指定超時時間的poll方法邏輯簡單或者與take的實現原理一致就不做分析了,有個比較特殊的方法就是drainTo,它只會轉移當前全部延遲已經結束的元素。peek方法返回的head不判斷是否延遲結束只表示當前剩餘延遲時間最少的元素,size方法返回隊列中全部元素的個數包括延遲沒有結束的。

DelayQueue的迭代器與PriorityBlockingQueue的迭代器實現如出一轍,都是經過toArray將原隊列數組元素拷貝到新的數組進行遍歷,不會與原隊列同步更新,可是remove能夠刪除原隊列中的指定元素,並且迭代時不區分元素是否延遲結束。

DelayQueue的可拆分迭代器繼承自Collection接口的默認迭代器實現IteratorSpliterator,和ArrayBlockingQueue同樣,IteratorSpliterator的特性就是頂層迭代器實際上調用的是隊列自己的Iterator迭代器實現,拆分後的迭代器按是數組下標方式的迭代,拆分按一半的原則進行,所以DelayQueue的可拆分迭代器也是脫離隊列源數據的,不會隨着隊列變化同步更新。更多關於IteratorSpliterator的其它特性請到ArrayBlockingQueue章節中查看。

應用實例

 1 package com.Queue;  2 
 3 import java.util.concurrent.DelayQueue;  4 import java.util.concurrent.Delayed;  5 import java.util.concurrent.TimeUnit;  6 
 7 public class DelayQueueTest {  8 
 9 
10     public static void main(String[] args) throws Exception { 11         DelayQueue<DelayTask> dq = new DelayQueue(); 12 
13         //入隊四個元素,注意它們的延遲時間單位不同。
14         dq.offer(new DelayTask(5, TimeUnit.SECONDS)); 15         dq.offer(new DelayTask(2, TimeUnit.MINUTES)); 16         dq.offer(new DelayTask(700, TimeUnit.MILLISECONDS)); 17         dq.offer(new DelayTask(1000, TimeUnit.NANOSECONDS)); 18 
19         while(dq.size() > 0){ 20  System.out.println(dq.take()); 21  } 22 
23         /*
24  打印順序: 25  DelayTask{delay=1000, unit=NANOSECONDS} 26  DelayTask{delay=700000000, unit=MILLISECONDS} 27  DelayTask{delay=5000000000, unit=SECONDS} 28  DelayTask{delay=120000000000, unit=MINUTES} 29         */
30  } 31 } 32 
33 class DelayTask implements Delayed { 34 
35     private long delay; //延遲多少納秒開始執行
36     private TimeUnit unit; 37 
38     public DelayTask(long delay, TimeUnit unit){ 39         this.unit = unit; 40         this.delay = TimeUnit.NANOSECONDS.convert(delay, unit);//統一轉換成納秒計數
41  } 42 
43  @Override 44     public long getDelay(TimeUnit unit) {//延遲剩餘時間,單位unit指定
45         return unit.convert(delay - System.currentTimeMillis(), TimeUnit.NANOSECONDS); 46  } 47 
48  @Override 49     public int compareTo(Delayed o) {//基於getDelay實時延遲剩餘時間進行比較
50         if(this.getDelay(TimeUnit.NANOSECONDS) < o.getDelay(TimeUnit.NANOSECONDS)) //都換算成納秒計算
51             return -1; 52         else if(this.getDelay(TimeUnit.NANOSECONDS) > o.getDelay(TimeUnit.NANOSECONDS)) //都換算成納秒計算
53             return 1; 54         else
55             return 0; 56  } 57 
58  @Override 59     public String toString() { 60         return "DelayTask{" +
61                 "delay=" + delay +
62                 ", unit=" + unit +
63                 '}'; 64  } 65 }
View Code

 假設有四個延遲任務,分別須要延遲不一樣的時間開始執行,上例中最終打印出的結果是按延遲剩餘時間從小到大排列的。

DelayedWorkQueue

順便把ScheduledThreadPoolExecutor的內部類DelayedWorkQueue也說一下把,它也是一種無界延遲阻塞隊列,它主要用於線程池定時或週期任務的使用,關於線程池和定時任務在後面的章節介紹,本文僅限於分析DelayedWorkQueue。從DelayQueue的特性很容易想到它適合定時任務的實現,因此Java併發包中調度定時任務的線程池隊列是基於這種實現的,它就是DelayedWorkQueue,爲何不直接使用DelayQueue而要從新實現一個DelayedWorkQueue呢,多是了方便在實現過程當中加入一些擴展。放入該延遲隊列中的元素是特殊的,例如DelayedWorkQueue中放的元素是線程運行時代碼RunnableScheduledFuture。

 1 //A ScheduledFuture that is Runnable. Successful execution of the run method causes completion of the Future and allows access to its results.  2 //run方法的成功執行將致使Future的完成並容許訪問其結果。
 3 public interface RunnableScheduledFuture<V> extends RunnableFuture<V>, ScheduledFuture<V> {  4 
 5     //若是此任務是週期性的,則返回true。按期任務能夠根據某些計劃從新運行。非週期任務只能運行一次。
 6     boolean isPeriodic();  7 }  8 
 9 //A delayed result-bearing action that can be cancelled. 一種能夠取消的延遲產生結果的動做。 10 //Usually a scheduled future is the result of scheduling a task with a ScheduledExecutorService. 11 //一般,ScheduledFuture是ScheduledExecutorService調度任務的結果。
12 public interface ScheduledFuture<V> extends Delayed, Future<V> { 13         //繼承了Delayed接口
14 }
View Code

RunnableScheduledFuture接口繼承了ScheduledFuture接口,ScheduledFuture接口繼承了Delayed接口。

DelayedWorkQueue的實現沒有像DelayQueue那樣直接藉助優先級隊列來實現,而是重寫了相關的邏輯可是實現的算法仍是基於數組的平衡二叉堆實現,而且糅合了DelayQueue中實現延遲時間結束元素才能出隊的Leader-Follower模式。能夠說,DelayedWorkQueue = 優先級隊列實現 + 延遲隊列實現。理解DelayedWorkQueue以前請先理解PriorityBlockingQueue和DelayQueue,而後理解DelayedWorkQueue不費吹灰之力。

 1 /**
 2  * Specialized delay queue. To mesh with TPE declarations, this  3  * class must be declared as a BlockingQueue<Runnable> even though  4  * it can only hold RunnableScheduledFutures.  5      */
 6     static class DelayedWorkQueue extends AbstractQueue<Runnable>
 7         implements BlockingQueue<Runnable> {  8 
 9         /*
10  * A DelayedWorkQueue is based on a heap-based data structure 11  * like those in DelayQueue and PriorityQueue, except that 12  * every ScheduledFutureTask also records its index into the 13  * heap array. This eliminates the need to find a task upon 14  * cancellation, greatly speeding up removal (down from O(n) 15  * to O(log n)), and reducing garbage retention that would 16  * otherwise occur by waiting for the element to rise to top 17  * before clearing. But because the queue may also hold 18  * RunnableScheduledFutures that are not ScheduledFutureTasks, 19  * we are not guaranteed to have such indices available, in 20  * which case we fall back to linear search. (We expect that 21  * most tasks will not be decorated, and that the faster cases 22  * will be much more common.) 23  * 24  * All heap operations must record index changes -- mainly 25  * within siftUp and siftDown. Upon removal, a task's 26  * heapIndex is set to -1. Note that ScheduledFutureTasks can 27  * appear at most once in the queue (this need not be true for 28  * other kinds of tasks or work queues), so are uniquely 29  * identified by heapIndex. 30          */
31 
32         private static final int INITIAL_CAPACITY = 16; 33         private RunnableScheduledFuture<?>[] queue =
34             new RunnableScheduledFuture<?>[INITIAL_CAPACITY]; 35         private final ReentrantLock lock = new ReentrantLock(); 36         private int size = 0; 37 
38         /**
39  * Thread designated to wait for the task at the head of the 40  * queue. This variant of the Leader-Follower pattern 41  * (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to 42  * minimize unnecessary timed waiting. When a thread becomes 43  * the leader, it waits only for the next delay to elapse, but 44  * other threads await indefinitely. The leader thread must 45  * signal some other thread before returning from take() or 46  * poll(...), unless some other thread becomes leader in the 47  * interim. Whenever the head of the queue is replaced with a 48  * task with an earlier expiration time, the leader field is 49  * invalidated by being reset to null, and some waiting 50  * thread, but not necessarily the current leader, is 51  * signalled. So waiting threads must be prepared to acquire 52  * and lose leadership while waiting. 53          */
54         private Thread leader = null; 55 
56         /**
57  * Condition signalled when a newer task becomes available at the 58  * head of the queue or a new thread may need to become leader. 59          */
60         private final Condition available = lock.newCondition();
View Code

看到沒有,相似PriorityBlockingQueue的初始化容量爲16的類型爲RunnableScheduledFuture的數組queue,和DelayQueue同樣的非公平鎖ReentrantLock和Condition,以及特有的leader線程。不一樣的是DelayedWorkQueue的數組元素是繼承了Delayed接口的RunnableScheduledFuture接口的實現類, 

入隊offer

 1 public boolean offer(Runnable x) {  2     if (x == null)  3         throw new NullPointerException(); //不容許插入null值
 4     RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;  5     final ReentrantLock lock = this.lock;  6  lock.lock();  7     try {  8         int i = size;  9         if (i >= queue.length) 10             grow();  //相似優先級隊列PriorityBlockingQueue的擴容
11         size = i + 1; 12         if (i == 0) { //隊列爲空直接放在第一個位置
13             queue[0] = e; 14             setIndex(e, 0);//這是線程池定時任務特有的邏輯
15         } else { 16             siftUp(i, e); //相似優先級隊列PriorityBlockingQueue的冒泡方式插入元素
17  } 18         if (queue[0] == e) { //相似延遲隊列DelayQueue的消費線程喚醒與leader剝奪
19             leader = null; 20  available.signal(); 21  } 22     } finally { 23  lock.unlock(); 24  } 25     return true; 26 }
View Code

入隊的邏輯綜合了PriorityBlockingQueue的平衡二叉堆冒泡插入以及DelayQueue的消費線程喚醒與leader領導權剝奪。只有setIndex方法是特有的,該方法記錄了元素在數組中的索引下標(在其餘出隊方法中,會將出隊的元素的索引下標置爲-1,表示已經不在隊列中了),爲了方便實現快速查找。它的擴容方法grow比優先級隊列的實現簡單粗暴多了,在持有鎖的狀況下每次擴容50%。siftUp與PriorityBlockingQueue的siftUpXXX方法如出一轍,也只是多了一個特有的setIndex方法的調用。

出隊take

 1 public RunnableScheduledFuture<?> take() throws InterruptedException {  2     final ReentrantLock lock = this.lock;  3  lock.lockInterruptibly();  4     try {  5         for (;;) {  6             RunnableScheduledFuture<?> first = queue[0];//獲取不移除head
 7             if (first == null) //同DelayQueue同樣,隊列爲空,等待生產者offer完喚醒
 8  available.await();  9             else { 10                 long delay = first.getDelay(NANOSECONDS); 11                 if (delay <= 0)  //head元素已通過期
12                     return finishPoll(first); 13                 first = null; // don't retain ref while waiting
14                 if (leader != null) //已經有leader在等待了,無限期等待leader醒來通知
15  available.await(); 16                 else { // 當前線程成爲leader
17                     Thread thisThread = Thread.currentThread(); 18                     leader = thisThread; 19                     try { 20                         available.awaitNanos(delay); //等待heade元素剩餘的延遲時間結束
21                     } finally { 22                         if (leader == thisThread) 23                             leader = null;  //交出leader權限
24  } 25  } 26  } 27  } 28     } finally { 29         if (leader == null && queue[0] != null) //隊列不爲空,而且沒有leader
30             available.signal(); //喚醒其它可能等待的消費線程
31  lock.unlock(); 32  } 33 } 34 
35 //相似優先級隊列PriorityBlockingQueue的出隊邏輯
36 private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) { 37     int s = --size; 38     RunnableScheduledFuture<?> x = queue[s]; 39     queue[s] = null; 40     if (s != 0) 41         siftDown(0, x); //相似PriorityBlockingQueue的拿最後一個元素從head向降低級來肯定位置
42     setIndex(f, -1); 43     return f; 44 }
View Code

出隊的邏輯同樣綜合了PriorityBlockingQueue的平衡二叉堆向降低級以及DelayQueue的Leader-Follower線程等待喚醒模式,就不細說了。只有在finishPoll方法中,會將已經出隊的RunnableScheduledFuture元素的索引下標經過setIndex設置成-1.

其它種種方法:remove、toArray、size、contains、put、drainTo(只轉移延遲結束的)、peek、add、poll、clear都和DelayQueue的實現大同小異。

DelayedWorkQueue的迭代器也是同DelayQueue同樣,迭代的是脫離源隊列的拷貝數組,可是能夠經過remove刪除源隊列中的對象。並且迭代器不區分元素是否延遲結束。

總結

DelayQueue延遲隊列只容許放入實現了Delayed接口的實例,它是優先級隊列針對計時的一種運用,因此它是基於優先級隊列 + Leader-Follower的線程等待模式,只容許取出延遲結束的隊列元素。獲取head的線程(每每是第一個消費線程)因爲head是整個隊列中最早延遲結束的元素,因此線程只等待特定的剩餘的延遲時間它便是leader,而其餘後來的消費線程則無限期等待即follower,直到leader取走head時隨機喚醒一個follower使其拿到新的head變成新的leader或者新入隊了一個剩餘延遲時間更短的元素致使leader失去領導權也會隨機喚醒一個線程成爲新的leader,如此往復等待喚醒。

 

至於DelayedWorkQueue也是一種設計爲定時任務的延遲隊列,它的實現和DelayQueue同樣,不過是將優先級隊列和DelayQueue的實現過程遷移到自己方法體中,從而能夠在該過程中靈活的加入定時任務特有的方法調用。

相關文章
相關標籤/搜索