JDK源碼分析-DelayQueue

概述

DelayQueue 也是一種隊列,它內部的元素有「延遲」,也就是當從隊列中獲取元素時,若是它的延遲時間未到,則沒法取出。

DelayQueue 的類簽名和承結構以下:安全

public class DelayQueue<E extends Delayed> extends AbstractQueue<E>    implements BlockingQueue<E> {}app


下面分析其代碼實現。

代碼分析

相關接口

DelayQueue 中的元素要實現 Delayed 接口,該接口定義以下:ide

public interface Delayed extends Comparable<Delayed> {    /**     * 以給定的時間單位,返回該對象的剩餘延遲     * 若爲零或者負數表示延時已通過去     */    long getDelay(TimeUnit unit);}源碼分析

Delayed 接口繼承自 Comparable 接口,而它自己只定義了一個 getDelay 方法,該方法的做用是獲取對象的剩餘延遲時間。

Comparable 接口也只有一個 compareTo 方法:flex

public interface Comparable<T> {    public int compareTo(T o);}ui

這裏再也不詳述。

構造器

DelayQueue 有兩個構造器,以下:this

// 無參構造器public DelayQueue() {}
// 指定集合的構造器public DelayQueue(Collection<? extends E> c) {    // 該方法最後是經過 add 方法實現的,後文進行分析    this.addAll(c);}
spa


成員變量


// 鎖,用於保證線程安全private final transient ReentrantLock lock = new ReentrantLock();
// 優先隊列,實際存儲元素的地方private final PriorityQueue<E> q = new PriorityQueue<E>();
// 線程等待的標識private Thread leader = null;
// 觸發條件,表示是否能夠從隊列中讀取元素private final Condition available = lock.newCondition();線程

關於優先隊列可參考前文「JDK源碼分析-PriorityQueue」的分析。

入隊方法

DelayQueue 也是一個隊列,它的入隊方法有:add(E), offer(E), put(E) 等,它們的定義以下:3d

public boolean add(E e) {    return offer(e);}
public void put(E e) {    offer(e);}
public boolean offer(E e, long timeout, TimeUnit unit) {    return offer(e);}

這幾個方法都是經過 offer(E) 方法實現的,它的代碼以下:

public boolean offer(E e) {    final ReentrantLock lock = this.lock;    lock.lock();    try {        // 入隊        q.offer(e);        // 若該元素爲隊列頭部元素,喚醒等待的線程        // (表示能夠從隊列中讀取數據了)        if (q.peek() == e) {            leader = null;            available.signal();        }        return true;    } finally {        lock.unlock();    }}


出隊方法

有入隊天然也有出隊,主要方法有:poll(), take(), poll(timeout, unit), 以下:

public E poll() {    final ReentrantLock lock = this.lock;    lock.lock();    try {        // 獲取隊列頭部元素        E first = q.peek();        // 頭部元素爲空,或者延時未到,則返回空        if (first == null || first.getDelay(NANOSECONDS) > 0)            return null;        // 不然返回頭部元素        else            return q.poll();    } finally {        lock.unlock();    }}

poll 方法是非阻塞的,即調用以後不管元素是否存在都會當即返回。下面看下阻塞的 take 方法:

public E take() throws InterruptedException {    final ReentrantLock lock = this.lock;    // 以可中斷方式獲取鎖    lock.lockInterruptibly();    try {        // 無限循環        for (;;) {            // 獲取隊列頭部元素            E first = q.peek();            // 若爲空,則等待            if (first == null)                available.await();            // 若不爲空            else {                // 獲取延遲的納秒數,若小於等於零(即過時),則獲取並刪除頭部元素                long delay = first.getDelay(NANOSECONDS);                if (delay <= 0)                    return q.poll();                // 執行到這裏,表示 delay>0,也就是延時未過時                first = null; // don't retain ref while waiting                // leader 不爲空表示有其餘線程在讀取數據,當前線程等待                if (leader != null)                    available.await();                else {                    // 將當前線程設置爲 leader                    Thread thisThread = Thread.currentThread();                    leader = thisThread;                    try {                        // 等待延遲時間過時                        available.awaitNanos(delay);                    } finally {                        if (leader == thisThread)                            leader = null;                    }                }            }        }    } finally {        // 喚醒該條件下的其餘線程        if (leader == null && q.peek() != null)            available.signal();        lock.unlock();    }}

該方法看起來稍複雜,主要邏輯以下:
1. 獲取隊列頭部元素;
    1.1 若該元素爲空(隊列爲空),則當前線程等待;
    1.2 若該元素不爲空,且已通過期,則取出該元素(並移除);
        1.2.1 若未過時,且有其餘線程在操做(leader 不爲空),當前線程等待;
        1.2.2 若未過時,且沒有其餘線程操做,則佔有「操做權」(將 leader 設置爲當前線程),並等待延遲過時。
以上操做循環執行。

take 方法是阻塞操做,當條件不知足時會一直等待。另外一個 poll(timeout, unit) 方法和它有些相似,只不過帶有延時,以下:

public E poll(long timeout, TimeUnit unit) throws InterruptedException {        long nanos = unit.toNanos(timeout);        final ReentrantLock lock = this.lock;        // 以可中斷方式獲取鎖        lock.lockInterruptibly();        try {            // 無限循環            for (;;) {                // 獲取隊列的頭部元素                E first = q.peek();                // 若頭部元素爲空(即隊列爲空),當超時時間大於零則等待相應的時間;                //   不然(即超時時間小於等於零)返回空                if (first == null) {                    if (nanos <= 0)                        return null;                    else                        nanos = available.awaitNanos(nanos);                } else {                    // 執行到這裏表示隊列頭部元素不爲空                    // 獲取剩餘延時                    long delay = first.getDelay(NANOSECONDS);                    // 延時已過時,返回隊列頭部元素                    if (delay <= 0)                        return q.poll();                    // 延時未過時且等待超時,返回空                    if (nanos <= 0)                        return null;                    first = null; // don't retain ref while waiting                    // 延時未過時且等待未超時,且等待超時<延遲時間                    // 表示有其餘線程在取數據,則當前線程進入等待                    if (nanos < delay || leader != null)                        nanos = available.awaitNanos(nanos);                    else {                        // 沒有其餘線程等待,將當前線程設置爲 leader,相似於「獨佔」操做                        Thread thisThread = Thread.currentThread();                        leader = thisThread;                        try {                            long timeLeft = available.awaitNanos(delay);                            // 計算剩餘延遲時間                            nanos -= delay - timeLeft;                        } finally {                            // 該線程操做完畢,把 leader 置空                            if (leader == thisThread)                                leader = null;                        }                    }                }            }        } finally {            // 喚醒 available 條件下的一個其餘線程            if (leader == null && q.peek() != null)                available.signal();            lock.unlock();        }    }

take 和 poll 方法還有一個區別: 當延遲未過時時,take 方法會一直等待,而 poll 方法則會返回空。

此外還有一個 peek 方法,該方法雖然也能獲取隊列頭部的元素,但與以上出隊方法不一樣的是,peek 方法只是讀取隊列頭部元素,並不會將其刪除:

public E peek() {    final ReentrantLock lock = this.lock;    lock.lock();    try {        // 返回隊列的頭部元素(不刪除)        return q.peek();    } finally {        lock.unlock();    }}

以上就是 DelayQueue 的主要方法的代碼分析,爲便於理解,下面簡要舉例分析。

用法舉例

示例代碼:

自定義一個實現了 Delayed 接口的 Task 類,並將它的幾個對象添加到一個延遲隊列中,代碼以下:

public class TestDelayedQueue {    public static void main(String[] args) throws Exception {        BlockingQueue<Task> delayQueue = new DelayQueue<>();        long now = System.currentTimeMillis();        delayQueue.put(new Task("c", now + 6000));                delayQueue.put(new Task("d", now + 10000));        delayQueue.put(new Task("a", now + 3000));        delayQueue.put(new Task("b", now + 4000));                while (true) {            System.out.println(delayQueue.take());            TimeUnit.SECONDS.sleep(1);        }    }
   private static class Task implements Delayed {        private String taskName;        private long endTime;
       public Task(String taskName, long endTime) {            this.taskName = taskName;            this.endTime = endTime;        }
        @Override        public long getDelay(TimeUnit unit) {            return unit.convert(endTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);        }
        @Override        public int compareTo(Delayed o) {            return Long.compare(getDelay(TimeUnit.MILLISECONDS), o.getDelay(TimeUnit.MILLISECONDS));        }
        @Override        public String toString() {            return "taskName-->" + taskName;        }    }}

結果會以延遲時間的順序取出各個元素。

小結

1. DelayQueue 是一種隊列,同時實現了 BlockingQueue 接口;
2. 它內部的元素有延遲時間的概念,出隊時,若延時未到,則沒法讀取到隊列頭部的元素;
3. 它是線程安全的。

相關閱讀:
JDK源碼分析-PriorityQueue
JDK源碼分析-BlockingQueue


相關文章
相關標籤/搜索