系列傳送門:java
DelayQueue是一個支持延時獲取元素的無界阻塞隊列,使用PriorityQueue來存儲元素。編程
隊中的元素必須實現Delayed
接口【Delay接口又繼承了Comparable,須要實現compareTo方法】,每一個元素都須要指明過時時間,經過getDelay(unit)
獲取元素剩餘時間【剩餘時間 = 到期時間 - 當前時間】,每次向優先隊列中添加元素時根據compareTo方法做爲排序規則。緩存
當從隊列獲取元素時,只有過時的元素纔會出隊列。併發
使用場景: 緩存系統設計、定時任務調度等。app
public class DelayQueue<E extends Delayed> extends AbstractQueue<E> implements BlockingQueue<E> { // 獨佔鎖實現同步 private final transient ReentrantLock lock = new ReentrantLock(); // 優先隊列存放數據 private final PriorityQueue<E> q = new PriorityQueue<E>(); /** * 基於Leader-Follower模式的變體,用於儘可能減小沒必要要的線程等待 */ private Thread leader = null; /** * 與lock對應的條件變量 */ private final Condition available = lock.newCondition(); }
隊中的元素必須實現Delayed
接口【Delay接口又繼承了Comparable,須要實現compareTo方法】,每一個元素都須要指明過時時間,經過getDelay(unit)
獲取元素剩餘時間【剩餘時間 = 到期時間 - 當前時間】。dom
每次向優先隊列中添加元素時根據compareTo方法做爲排序規則,固然咱們約定一下,默認q.peek()出來的就是最早過時的元素。ide
public interface Delayed extends Comparable<Delayed> { // 返回剩餘時間 long getDelay(TimeUnit unit); } public interface Comparable<T> { // 定義比較方法 public int compareTo(T o); }
學習了Delayed接口以後,咱們看一個實際的案例,加深印象,源於:《Java併發編程之美》。工具
static class DelayedElement implements Delayed { private final long delayTime; // 延遲時間 private final long expire; // 到期時間 private final String taskName; // 任務名稱 public DelayedElement (long delayTime, String taskName) { this.delayTime = delayTime; this.taskName = taskName; expire = now() + delayTime; } final long now () { return System.currentTimeMillis(); } // 剩餘時間 = 到期時間 - 當前時間 @Override public long getDelay (TimeUnit unit) { return unit.convert(expire - now(), TimeUnit.MILLISECONDS); } @Override public int compareTo (Delayed o) { return (int) (getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS)); } @Override public String toString () { final StringBuilder res = new StringBuilder("DelayedElement [ "); res.append("delay = ").append(delayTime); res.append(", expire = ").append(expire); res.append(", taskName = '").append(taskName).append('\''); res.append(" ] "); return res.toString(); } } public static void main (String[] args) { // 建立delayQueue隊列 DelayQueue<DelayedElement> delayQueue = new DelayQueue<>(); // 建立延遲任務 Random random = new Random(); for (int i = 0; i < 10; i++) { DelayedElement element = new DelayedElement(random.nextInt(500), "task: " + i); delayQueue.offer(element); } // 依次取出任務並打印 DelayedElement ele = null; try { for (; ; ) { while ((ele = delayQueue.take()) != null) { System.out.println(ele); } } } catch (InterruptedException ex) { ex.printStackTrace(); } } // 打印結果 DelayedElement [ delay = 2, expire = 1611995426061, taskName = 'task: 4' ] DelayedElement [ delay = 52, expire = 1611995426111, taskName = 'task: 2' ] DelayedElement [ delay = 80, expire = 1611995426139, taskName = 'task: 5' ] DelayedElement [ delay = 132, expire = 1611995426191, taskName = 'task: 0' ] DelayedElement [ delay = 174, expire = 1611995426233, taskName = 'task: 9' ] DelayedElement [ delay = 175, expire = 1611995426234, taskName = 'task: 7' ] DelayedElement [ delay = 326, expire = 1611995426385, taskName = 'task: 3' ] DelayedElement [ delay = 447, expire = 1611995426506, taskName = 'task: 8' ] DelayedElement [ delay = 452, expire = 1611995426511, taskName = 'task: 1' ] DelayedElement [ delay = 486, expire = 1611995426545, taskName = 'task: 6' ]
DelayQueue構造器相比於前幾個,就顯得很是easy了。學習
public DelayQueue() {} public DelayQueue(Collection<? extends E> c) { this.addAll(c); }
由於DelayQueue是無界隊列,不會由於邊界問題產生阻塞,所以put操做和offer操做是同樣的。ui
public void put(E e) { offer(e); } public boolean offer(E e) { // 獲取獨佔鎖 final ReentrantLock lock = this.lock; lock.lock(); try { // 加入優先隊列裏 q.offer(e); // 判斷堆頂元素是否是剛剛插入的元素 // 若是判斷爲true,說明當前這個元素是將最早過時 if (q.peek() == e) { // 重置leader線程爲null leader = null; // 激活available變量條件隊列中的一個線程 available.signal(); } return true; } finally { lock.unlock(); } }
take方法將會獲取並移除隊列裏面延遲時間過時的元素 ,若是隊列裏面沒有過時元素則陷入等待。
public E take() throws InterruptedException { // 獲取獨佔鎖 final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { // 瞅一瞅誰最快過時 E first = q.peek(); // 隊列爲空,則將當前線程置入available的條件隊列中,直到裏面有元素 if (first == null) available.await(); else { // 看下還有多久過時 long delay = first.getDelay(NANOSECONDS); // 哇,已通過期了,就移除它並返回 if (delay <= 0) return q.poll(); first = null; // don't retain ref while waiting // leader不爲null表示其餘線程也在執行take // 則將當前線程置入available的條件隊列中 if (leader != null) available.await(); else { // 若是leader爲null,則選擇當前線程做爲leader線程 Thread thisThread = Thread.currentThread(); leader = thisThread; try { // 等待delay時間,時間到以後,會出條件隊列,繼續競爭鎖 available.awaitNanos(delay); } finally { if (leader == thisThread) leader = null; } } } } } finally { if (leader == null && q.peek() != null) available.signal(); lock.unlock(); } }
若是不設置first = null
,將會引發內存泄露。
- 線程A到達,隊首元素沒有到期,設置leader = 線程A,而且執行
available.awaitNanos(delay);
等待元素過時。- 這時線程B來了,由於leader != null,則會
available.await();
阻塞,線程C、D、E同理。- 線程A阻塞完畢了,再次循環,獲取列首元素成功,出列。
這個時候列首元素應該會被回收掉,可是問題是它還被線程B、線程C持有着,因此不會回收,若是線程增多,且隊首元素無限期的不能回收,就會形成內存泄漏。
DelayQueue是一個支持延時獲取元素的無界阻塞隊列,使用PriorityQueue來存儲元素。
隊中的元素必須實現Delayed
接口【Delay接口又繼承了Comparable,須要實現compareTo方法】,每一個元素都須要指明過時時間,經過getDelay(unit)
獲取元素剩餘時間【剩餘時間 = 到期時間 - 當前時間】,每次向優先隊列中添加元素時根據compareTo方法做爲排序規則。
基於Leader-Follower模式使用leader變量,減小沒必要要的線程等待。
DelayQueue是無界隊列,所以插入操做是非阻塞的。可是take操做從隊列獲取元素時,是阻塞的,阻塞規則爲:
available.await()
陷入阻塞。available.await()
陷入阻塞。available.awaitNanos(delay)
。available.signal()
喚醒一個follower線程,接着回到開始那步。《Java併發編程的藝術》
《Java併發編程之美》