DelayQueue 是一個支持延時獲取元素的阻塞隊列, 內部採用優先隊列 PriorityQueue 存儲元素,同時元素必須實現 Delayed 接口;在建立元素時能夠指定多久才能夠從隊列中獲取當前元素,只有在延遲期滿時才能從隊列中提取元素。java
因延遲阻塞隊列的特性, 咱們通常將 DelayQueue 做用於如下場景 :緩存
下面咱們以緩存系統的應用,看下 DelayQueue 的使用,代碼以下:安全
public class DelayQueueDemo {
static class Cache implements Runnable {
private boolean stop = false;
private Map<String, String> itemMap = new HashMap<>();
private DelayQueue<CacheItem> delayQueue = new DelayQueue<>();
public Cache () {
// 開啓內部線程檢測是否過時
new Thread(this).start();
}
/** * 添加緩存 * * @param key * @param value * @param exprieTime 過時時間,單位秒 */
public void put (String key, String value, long exprieTime) {
CacheItem cacheItem = new CacheItem(key, exprieTime);
// 此處忽略添加劇復 key 的處理
delayQueue.add(cacheItem);
itemMap.put(key, value);
}
public String get (String key) {
return itemMap.get(key);
}
public void shutdown () {
stop = true;
}
@Override
public void run() {
while (!stop) {
CacheItem cacheItem = delayQueue.poll();
if (cacheItem != null) {
// 元素過時, 從緩存中移除
itemMap.remove(cacheItem.getKey());
System.out.println("key : " + cacheItem.getKey() + " 過時並移除");
}
}
System.out.println("cache stop");
}
}
static class CacheItem implements Delayed {
private String key;
/** * 過時時間(單位秒) */
private long exprieTime;
private long currentTime;
public CacheItem(String key, long exprieTime) {
this.key = key;
this.exprieTime = exprieTime;
this.currentTime = System.currentTimeMillis();
}
@Override
public long getDelay(TimeUnit unit) {
// 計算剩餘的過時時間
// 大於 0 說明未過時
return exprieTime - TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() - currentTime);
}
@Override
public int compareTo(Delayed o) {
// 過時時間長的放置在隊列尾部
if (this.getDelay(TimeUnit.MICROSECONDS) > o.getDelay(TimeUnit.MICROSECONDS)) {
return 1;
}
// 過時時間短的放置在隊列頭
if (this.getDelay(TimeUnit.MICROSECONDS) < o.getDelay(TimeUnit.MICROSECONDS)) {
return -1;
}
return 0;
}
public String getKey() {
return key;
}
}
public static void main(String[] args) throws InterruptedException {
Cache cache = new Cache();
// 添加緩存元素
cache.put("a", "1", 5);
cache.put("b", "2", 4);
cache.put("c", "3", 3);
while (true) {
String a = cache.get("a");
String b = cache.get("b");
String c = cache.get("c");
System.out.println("a : " + a + ", b : " + b + ", c : " + c);
// 元素均過時後退出循環
if (StringUtils.isEmpty(a) && StringUtils.isEmpty(b) && StringUtils.isEmpty(c)) {
break;
}
TimeUnit.MILLISECONDS.sleep(1000);
}
cache.shutdown();
}
}
複製代碼
執行結果以下:網絡
a : 1, b : 2, c : 3
a : 1, b : 2, c : 3
a : 1, b : 2, c : 3
key : c 過時並移除
a : 1, b : 2, c : null
key : b 過時並移除
a : 1, b : null, c : null
key : a 過時並移除
a : null, b : null, c : null
cache stop
複製代碼
從執行結果能夠看出,因循環內部每次停頓 1 秒,當等待 3 秒後,元素 c 過時並從緩存中清除,等待 4 秒後,元素 b 過時並從緩存中清除,等待 5 秒後,元素 a 過時並從緩存中清除。多線程
private final transient ReentrantLock lock = new ReentrantLock();
複製代碼
用於保證隊列操做的線程安全性ide
private final PriorityQueue<E> q = new PriorityQueue<E>();
複製代碼
存儲介質,用於保證延遲低的優先執行this
leader 指向的是第一個從隊列獲取元素阻塞等待的線程,其做用是減小其餘線程沒必要要的等待時間。(這個地方我一直沒搞明白 怎麼就減小其餘線程的等待時間了)spa
private final Condition available = lock.newCondition();
複製代碼
條件對象,當新元素到達,或新線程可能須要成爲leader時被通知.net
下面將主要對隊列的入隊,出隊動做進行分析 :線程
public boolean offer(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 入隊
q.offer(e);
if (q.peek() == e) {
// 若入隊的元素位於隊列頭部,說明當前元素延遲最小
// 將 leader 置空
leader = null;
// 喚醒阻塞在等待隊列的線程
available.signal();
}
return true;
} finally {
lock.unlock();
}
}
複製代碼
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
E first = q.peek();
if (first == null)
// 等待 add 喚醒
available.await();
else {
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0)
// 已過時則直接返回隊列頭節點
return q.poll();
first = null; // don't retain ref while waiting
if (leader != null)
// 若 leader 不爲空
// 說明已經有其餘線程調用過 take 操做
// 當前調用線程 follower 掛起等待
available.await();
else {
// 若 leader 爲空
// 將 leader 指向當前線程
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
// 當前調用線程在指定 delay 時間內掛起等待
available.awaitNanos(delay);
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && q.peek() != null)
// leader 處理完以後,喚醒 follower
available.signal();
lock.unlock();
}
}
複製代碼
該圖引用自 CSDN 《Leader/Follower多線程網絡模型介紹》
看了 DelayQueue 的實現 咱們大概也明白 PriorityQueue 採用小頂堆的緣由了。