DelayQueue 是一個支持延時獲取元素的阻塞隊列, 內部採用優先隊列 PriorityQueue 存儲元素,同時元素必須實現 Delayed 接口;在建立元素時能夠指定多久才能夠從隊列中獲取當前元素,只有在延遲期滿時才能從隊列中提取元素。後端
因延遲阻塞隊列的特性, 咱們通常將 DelayQueue 做用於如下場景 :緩存
下面咱們以緩存系統的應用,看下 DelayQueue 的使用,代碼以下:安全
public class DelayQueueDemo { static class Cache implements Runnable { private boolean stop = false; private Map<String, String> itemMap = new HashMap<>(); private DelayQueue<CacheItem> delayQueue = new DelayQueue<>(); public Cache () { // 開啓內部線程檢測是否過時 new Thread(this).start(); } /** * 添加緩存 * * @param key * @param value * @param exprieTime 過時時間,單位秒 */ public void put (String key, String value, long exprieTime) { CacheItem cacheItem = new CacheItem(key, exprieTime); // 此處忽略添加劇復 key 的處理 delayQueue.add(cacheItem); itemMap.put(key, value); } public String get (String key) { return itemMap.get(key); } public void shutdown () { stop = true; } @Override public void run() { while (!stop) { CacheItem cacheItem = delayQueue.poll(); if (cacheItem != null) { // 元素過時, 從緩存中移除 itemMap.remove(cacheItem.getKey()); System.out.println("key : " + cacheItem.getKey() + " 過時並移除"); } } System.out.println("cache stop"); } } static class CacheItem implements Delayed { private String key; /** * 過時時間(單位秒) */ private long exprieTime; private long currentTime; public CacheItem(String key, long exprieTime) { this.key = key; this.exprieTime = exprieTime; this.currentTime = System.currentTimeMillis(); } @Override public long getDelay(TimeUnit unit) { // 計算剩餘的過時時間 // 大於 0 說明未過時 return exprieTime - TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() - currentTime); } @Override public int compareTo(Delayed o) { // 過時時間長的放置在隊列尾部 if (this.getDelay(TimeUnit.MICROSECONDS) > o.getDelay(TimeUnit.MICROSECONDS)) { return 1; } // 過時時間短的放置在隊列頭 if (this.getDelay(TimeUnit.MICROSECONDS) < o.getDelay(TimeUnit.MICROSECONDS)) { return -1; } return 0; } public String getKey() { return key; } } public static void main(String[] args) throws InterruptedException { Cache cache = new Cache(); // 添加緩存元素 cache.put("a", "1", 5); cache.put("b", "2", 4); cache.put("c", "3", 3); while (true) { String a = cache.get("a"); String b = cache.get("b"); String c = cache.get("c"); System.out.println("a : " + a + ", b : " + b + ", c : " + c); // 元素均過時後退出循環 if (StringUtils.isEmpty(a) && StringUtils.isEmpty(b) && StringUtils.isEmpty(c)) { break; } TimeUnit.MILLISECONDS.sleep(1000); } cache.shutdown(); } } 複製代碼
執行結果以下:架構
a : 1, b : 2, c : 3 a : 1, b : 2, c : 3 a : 1, b : 2, c : 3 key : c 過時並移除 a : 1, b : 2, c : null key : b 過時並移除 a : 1, b : null, c : null key : a 過時並移除 a : null, b : null, c : null cache stop 複製代碼
從執行結果能夠看出,因循環內部每次停頓 1 秒,當等待 3 秒後,元素 c 過時並從緩存中清除,等待 4 秒後,元素 b 過時並從緩存中清除,等待 5 秒後,元素 a 過時並從緩存中清除。給你們推薦一個Java後端架構羣:698581634 進羣免費領取架構資料。ide
重入鎖this
private final transient ReentrantLock lock = new ReentrantLock(); 複製代碼
用於保證隊列操做的線程安全性spa
優先隊列線程
private final PriorityQueue<E> q = new PriorityQueue<E>(); 複製代碼
存儲介質,用於保證延遲低的優先執行3d
leadercode
leader 指向的是第一個從隊列獲取元素阻塞等待的線程,其做用是減小其餘線程沒必要要的等待時間。(這個地方我一直沒搞明白 怎麼就減小其餘線程的等待時間了)
condition
private final Condition available = lock.newCondition(); 複製代碼
條件對象,當新元素到達,或新線程可能須要成爲leader時被通知
下面將主要對隊列的入隊,出隊動做進行分析 :
入隊 - offer
public boolean offer(E e) { final ReentrantLock lock = this.lock; lock.lock(); try { // 入隊 q.offer(e); if (q.peek() == e) { // 若入隊的元素位於隊列頭部,說明當前元素延遲最小 // 將 leader 置空 leader = null; // 喚醒阻塞在等待隊列的線程 available.signal(); } return true; } finally { lock.unlock(); } } 複製代碼
出隊 - poll
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { E first = q.peek(); if (first == null) // 等待 add 喚醒 available.await(); else { long delay = first.getDelay(NANOSECONDS); if (delay <= 0) // 已過時則直接返回隊列頭節點 return q.poll(); first = null; // don't retain ref while waiting if (leader != null) // 若 leader 不爲空 // 說明已經有其餘線程調用過 take 操做 // 當前調用線程 follower 掛起等待 available.await(); else { // 若 leader 爲空 // 將 leader 指向當前線程 Thread thisThread = Thread.currentThread(); leader = thisThread; try { // 當前調用線程在指定 delay 時間內掛起等待 available.awaitNanos(delay); } finally { if (leader == thisThread) leader = null; } } } } } finally { if (leader == null && q.peek() != null) // leader 處理完以後,喚醒 follower available.signal(); lock.unlock(); } } 複製代碼
Leader-follower 模式