DelayQueue 的類簽名和繼承結構以下:安全
public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
implements BlockingQueue<E> {}
app
DelayQueue 中的元素要實現 Delayed 接口,該接口定義以下:ide
public interface Delayed extends Comparable<Delayed> {
/**
* 以給定的時間單位,返回該對象的剩餘延遲
* 若爲零或者負數表示延時已通過去
*/
long getDelay(TimeUnit unit);
}
源碼分析
Comparable 接口也只有一個 compareTo 方法:flex
public interface Comparable<T> {
public int compareTo(T o);
}
ui
DelayQueue 有兩個構造器,以下:this
// 無參構造器
public DelayQueue() {}
// 指定集合的構造器
public DelayQueue(Collection<? extends E> c) {
// 該方法最後是經過 add 方法實現的,後文進行分析
this.addAll(c);
}
spa
// 鎖,用於保證線程安全
private final transient ReentrantLock lock = new ReentrantLock();
// 優先隊列,實際存儲元素的地方
private final PriorityQueue<E> q = new PriorityQueue<E>();
// 線程等待的標識
private Thread leader = null;
// 觸發條件,表示是否能夠從隊列中讀取元素
private final Condition available = lock.newCondition();
線程
DelayQueue 也是一個隊列,它的入隊方法有:add(E), offer(E), put(E) 等,它們的定義以下:3d
public boolean add(E e) {
return offer(e);
}
public void put(E e) {
offer(e);
}
public boolean offer(E e, long timeout, TimeUnit unit) {
return offer(e);
}
這幾個方法都是經過 offer(E) 方法實現的,它的代碼以下:
public boolean offer(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 入隊
q.offer(e);
// 若該元素爲隊列頭部元素,喚醒等待的線程
// (表示能夠從隊列中讀取數據了)
if (q.peek() == e) {
leader = null;
available.signal();
}
return true;
} finally {
lock.unlock();
}
}
有入隊天然也有出隊,主要方法有:poll(), take(), poll(timeout, unit), 以下:
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 獲取隊列頭部元素
E first = q.peek();
// 頭部元素爲空,或者延時未到,則返回空
if (first == null || first.getDelay(NANOSECONDS) > 0)
return null;
// 不然返回頭部元素
else
return q.poll();
} finally {
lock.unlock();
}
}
poll 方法是非阻塞的,即調用以後不管元素是否存在都會當即返回。下面看下阻塞的 take 方法:
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
// 以可中斷方式獲取鎖
lock.lockInterruptibly();
try {
// 無限循環
for (;;) {
// 獲取隊列頭部元素
E first = q.peek();
// 若爲空,則等待
if (first == null)
available.await();
// 若不爲空
else {
// 獲取延遲的納秒數,若小於等於零(即過時),則獲取並刪除頭部元素
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0)
return q.poll();
// 執行到這裏,表示 delay>0,也就是延時未過時
first = null; // don't retain ref while waiting
// leader 不爲空表示有其餘線程在讀取數據,當前線程等待
if (leader != null)
available.await();
else {
// 將當前線程設置爲 leader
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
// 等待延遲時間過時
available.awaitNanos(delay);
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
// 喚醒該條件下的其餘線程
if (leader == null && q.peek() != null)
available.signal();
lock.unlock();
}
}
take 方法是阻塞操做,當條件不知足時會一直等待。另外一個 poll(timeout, unit) 方法和它有些相似,只不過帶有延時,以下:
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
// 以可中斷方式獲取鎖
lock.lockInterruptibly();
try {
// 無限循環
for (;;) {
// 獲取隊列的頭部元素
E first = q.peek();
// 若頭部元素爲空(即隊列爲空),當超時時間大於零則等待相應的時間;
// 不然(即超時時間小於等於零)返回空
if (first == null) {
if (nanos <= 0)
return null;
else
nanos = available.awaitNanos(nanos);
} else {
// 執行到這裏表示隊列頭部元素不爲空
// 獲取剩餘延時
long delay = first.getDelay(NANOSECONDS);
// 延時已過時,返回隊列頭部元素
if (delay <= 0)
return q.poll();
// 延時未過時且等待超時,返回空
if (nanos <= 0)
return null;
first = null; // don't retain ref while waiting
// 延時未過時且等待未超時,且等待超時<延遲時間
// 表示有其餘線程在取數據,則當前線程進入等待
if (nanos < delay || leader != null)
nanos = available.awaitNanos(nanos);
else {
// 沒有其餘線程等待,將當前線程設置爲 leader,相似於「獨佔」操做
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
long timeLeft = available.awaitNanos(delay);
// 計算剩餘延遲時間
nanos -= delay - timeLeft;
} finally {
// 該線程操做完畢,把 leader 置空
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
// 喚醒 available 條件下的一個其餘線程
if (leader == null && q.peek() != null)
available.signal();
lock.unlock();
}
}
此外還有一個 peek 方法,該方法雖然也能獲取隊列頭部的元素,但與以上出隊方法不一樣的是,peek 方法只是讀取隊列頭部元素,並不會將其刪除:
public E peek() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 返回隊列的頭部元素(不刪除)
return q.peek();
} finally {
lock.unlock();
}
}
自定義一個實現了 Delayed 接口的 Task 類,並將它的幾個對象添加到一個延遲隊列中,代碼以下:
public class TestDelayedQueue {
public static void main(String[] args) throws Exception {
BlockingQueue<Task> delayQueue = new DelayQueue<>();
long now = System.currentTimeMillis();
delayQueue.put(new Task("c", now + 6000));
delayQueue.put(new Task("d", now + 10000));
delayQueue.put(new Task("a", now + 3000));
delayQueue.put(new Task("b", now + 4000));
while (true) {
System.out.println(delayQueue.take());
TimeUnit.SECONDS.sleep(1);
}
}
private static class Task implements Delayed {
private String taskName;
private long endTime;
public Task(String taskName, long endTime) {
this.taskName = taskName;
this.endTime = endTime;
}
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(endTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
@Override
public int compareTo(Delayed o) {
return Long.compare(getDelay(TimeUnit.MILLISECONDS), o.getDelay(TimeUnit.MILLISECONDS));
}
@Override
public String toString() {
return "taskName-->" + taskName;
}
}
}