[轉] Java 無界阻塞隊列 DelayQueue 入門實戰

原文出處:http://cmsblogs.com/chenssyjava

DelayQueue是一個支持延時獲取元素的無界阻塞隊列。裏面的元素所有都是「可延期」的元素,列頭的元素是最早「到期」的元素,若是隊列裏面沒有元素到期,是不能從列頭獲取元素的,哪怕有元素也不行。也就是說只有在延遲期到時纔可以從隊列中取元素。緩存

DelayQueue主要用於兩個方面:併發

  • 緩存:清掉緩存中超時的緩存數據
  • 任務超時處理

DelayQueue

DelayQueue實現的關鍵主要有以下幾個:優化

  1. 可重入鎖ReentrantLock
  2. 用於阻塞和通知的Condition對象
  3. 根據Delay時間排序的優先級隊列:PriorityQueue
  4. 用於優化阻塞通知的線程元素leader

ReentrantLock、Condition這兩個對象就不須要闡述了,他是實現整個BlockingQueue的核心。PriorityQueue是一個支持優先級線程排序的隊列(參考【死磕Java併發】-----J.U.C之阻塞隊列:PriorityBlockingQueue),leader後面闡述。這裏咱們先來了解Delay,他是實現延時操做的關鍵。this

Delayed

Delayed接口是用來標記那些應該在給定延遲時間以後執行的對象,它定義了一個long getDelay(TimeUnit unit)方法,該方法返回與此對象相關的的剩餘時間。同時實現該接口的對象必須定義一個compareTo 方法,該方法提供與此接口的 getDelay 方法一致的排序。線程

public interface Delayed extends Comparable<Delayed> {
    long getDelay(TimeUnit unit);
}

如何使用該接口呢?上面說的很是清楚了,實現該接口的getDelay()方法,同時定義compareTo()方法便可。code

內部結構

先看DelayQueue的定義:對象

public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
            implements BlockingQueue<E> {
        /** 可重入鎖 */
        private final transient ReentrantLock lock = new ReentrantLock();
        /** 支持優先級的BlockingQueue */
        private final PriorityQueue<E> q = new PriorityQueue<E>();
        /** 用於優化阻塞 */
        private Thread leader = null;
        /** Condition */
        private final Condition available = lock.newCondition();

        /**
         * 省略不少代碼
         */
    }

看了DelayQueue的內部結構就對上面幾個關鍵點一目瞭然了,可是這裏有一點須要注意,DelayQueue的元素都必須繼承Delayed接口。同時也能夠從這裏初步理清楚DelayQueue內部實現的機制了:以支持優先級無界隊列的PriorityQueue做爲一個容器,容器裏面的元素都應該實現Delayed接口,在每次往優先級隊列中添加元素時以元素的過時時間做爲排序條件,最早過時的元素放在優先級最高。blog

offer()

public boolean offer(E e) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            // 向 PriorityQueue中插入元素
            q.offer(e);
            // 若是當前元素的對首元素(優先級最高),leader設置爲空,喚醒全部等待線程
            if (q.peek() == e) {
                leader = null;
                available.signal();
            }
            // 無界隊列,永遠返回true
            return true;
        } finally {
            lock.unlock();
        }
    }

offer(E e)就是往PriorityQueue中添加元素,具體能夠參考(【死磕Java併發】-----J.U.C之阻塞隊列:PriorityBlockingQueue)。整個過程仍是比較簡單,可是在判斷當前元素是否爲對首元素,若是是的話則設置leader=null,這是很是關鍵的一個步驟,後面闡述。排序

take()

public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            for (;;) {
                // 對首元素
                E first = q.peek();
                // 對首爲空,阻塞,等待off()操做喚醒
                if (first == null)
                    available.await();
                else {
                    // 獲取對首元素的超時時間
                    long delay = first.getDelay(NANOSECONDS);
                    // <=0 表示已過時,出對,return
                    if (delay <= 0)
                        return q.poll();
                    first = null; // don't retain ref while waiting
                    // leader != null 證實有其餘線程在操做,阻塞
                    if (leader != null)
                        available.await();
                    else {
                        // 不然將leader 設置爲當前線程,獨佔
                        Thread thisThread = Thread.currentThread();
                        leader = thisThread;
                        try {
                            // 超時阻塞
                            available.awaitNanos(delay);
                        } finally {
                            // 釋放leader
                            if (leader == thisThread)
                                leader = null;
                        }
                    }
                }
            }
        } finally {
            // 喚醒阻塞線程
            if (leader == null && q.peek() != null)
                available.signal();
            lock.unlock();
        }
    }

首先是獲取對首元素,若是對首元素的延時時間 delay <= 0 ,則能夠出對了,直接return便可。不然設置first = null,這裏設置爲null的主要目的是爲了不內存泄漏。若是 leader != null 則表示當前有線程佔用,則阻塞,不然設置leader爲當前線程,而後調用awaitNanos()方法超時等待。

first = null

這裏爲何若是不設置first = null,則會引發內存泄漏呢?線程A到達,列首元素沒有到期,設置leader = 線程A,這是線程B來了由於leader != null,則會阻塞,線程C同樣。假如線程阻塞完畢了,獲取列首元素成功,出列。這個時候列首元素應該會被回收掉,可是問題是它還被線程B、線程C持有着,因此不會回收,這裏只有兩個線程,若是有線程D、線程E...呢?這樣會無限期的不能回收,就會形成內存泄漏。

這個入隊、出對過程和其餘的阻塞隊列沒有很大區別,無非是在出對的時候增長了一個到期時間的判斷。同時經過leader來減小沒必要要阻塞。

本文由博客一文多發平臺 OpenWrite 發佈!

相關文章
相關標籤/搜索