Java併發包源碼學習系列:阻塞隊列實現之DelayQueue源碼解析

系列傳送門:java

DelayQueue概述

DelayQueue是一個支持延時獲取元素的無界阻塞隊列,使用PriorityQueue來存儲元素。編程

隊中的元素必須實現Delayed接口【Delay接口又繼承了Comparable,須要實現compareTo方法】,每一個元素都須要指明過時時間,經過getDelay(unit)獲取元素剩餘時間【剩餘時間 = 到期時間 - 當前時間】,每次向優先隊列中添加元素時根據compareTo方法做爲排序規則。緩存

當從隊列獲取元素時,只有過時的元素纔會出隊列。併發

使用場景: 緩存系統設計、定時任務調度等。app

類圖及重要字段

public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
    implements BlockingQueue<E> {
    // 獨佔鎖實現同步
    private final transient ReentrantLock lock = new ReentrantLock();
    // 優先隊列存放數據
    private final PriorityQueue<E> q = new PriorityQueue<E>();

    /**
     * 基於Leader-Follower模式的變體,用於儘可能減小沒必要要的線程等待
     */
    private Thread leader = null;

    /**
     * 與lock對應的條件變量
     */
    private final Condition available = lock.newCondition();    
}
  1. 使用ReentrantLock獨佔鎖實現線程同步,使用Condition實現等待通知機制。
  2. 基於Leader-Follower模式的變體,減小沒必要要的線程等待。
  3. 內部使用PriorityQueue優先級隊列存儲元素,且隊列中元素必須實現Delayed接口。

Delayed接口

隊中的元素必須實現Delayed接口【Delay接口又繼承了Comparable,須要實現compareTo方法】,每一個元素都須要指明過時時間,經過getDelay(unit)獲取元素剩餘時間【剩餘時間 = 到期時間 - 當前時間】。dom

每次向優先隊列中添加元素時根據compareTo方法做爲排序規則,固然咱們約定一下,默認q.peek()出來的就是最早過時的元素。ide

public interface Delayed extends Comparable<Delayed> {
    // 返回剩餘時間
    long getDelay(TimeUnit unit);
}

public interface Comparable<T> {
	// 定義比較方法
    public int compareTo(T o);
}

Delayed元素案例

學習了Delayed接口以後,咱們看一個實際的案例,加深印象,源於:《Java併發編程之美》。工具

static class DelayedElement implements Delayed {

        private final long delayTime; // 延遲時間
        private final long expire; // 到期時間
        private final String taskName; // 任務名稱

        public DelayedElement (long delayTime, String taskName) {
            this.delayTime = delayTime;
            this.taskName = taskName;
            expire = now() + delayTime;
        }

        final long now () {
            return System.currentTimeMillis();
        }

        // 剩餘時間 = 到期時間 - 當前時間
        @Override
        public long getDelay (TimeUnit unit) {
            return unit.convert(expire - now(), TimeUnit.MILLISECONDS);
        }

        @Override
        public int compareTo (Delayed o) {
            return (int) (getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));
        }

        @Override
        public String toString () {
            final StringBuilder res = new StringBuilder("DelayedElement [ ");
            res.append("delay = ").append(delayTime);
            res.append(", expire = ").append(expire);
            res.append(", taskName = '").append(taskName).append('\'');
            res.append(" ] ");
            return res.toString();
        }
    }


    public static void main (String[] args) {
        // 建立delayQueue隊列
        DelayQueue<DelayedElement> delayQueue = new DelayQueue<>();

        // 建立延遲任務
        Random random = new Random();
        for (int i = 0; i < 10; i++) {
            DelayedElement element = new DelayedElement(random.nextInt(500), "task: " + i);
            delayQueue.offer(element);
        }

        // 依次取出任務並打印
        DelayedElement ele = null;
        try {
            for (; ; ) {
                while ((ele = delayQueue.take()) != null) {
                    System.out.println(ele);
                }
            }
        } catch (InterruptedException ex) {
            ex.printStackTrace();
        }
    }
// 打印結果
DelayedElement [ delay = 2, expire = 1611995426061, taskName = 'task: 4' ] 
DelayedElement [ delay = 52, expire = 1611995426111, taskName = 'task: 2' ] 
DelayedElement [ delay = 80, expire = 1611995426139, taskName = 'task: 5' ] 
DelayedElement [ delay = 132, expire = 1611995426191, taskName = 'task: 0' ] 
DelayedElement [ delay = 174, expire = 1611995426233, taskName = 'task: 9' ] 
DelayedElement [ delay = 175, expire = 1611995426234, taskName = 'task: 7' ] 
DelayedElement [ delay = 326, expire = 1611995426385, taskName = 'task: 3' ] 
DelayedElement [ delay = 447, expire = 1611995426506, taskName = 'task: 8' ] 
DelayedElement [ delay = 452, expire = 1611995426511, taskName = 'task: 1' ] 
DelayedElement [ delay = 486, expire = 1611995426545, taskName = 'task: 6' ]
  • 實現了compareTo方法,定義比較規則爲越早過時的排在隊頭。
  • 實現了getDelay方法,計算公式爲:剩餘時間 = 到期時間 - 當前時間。

構造器

DelayQueue構造器相比於前幾個,就顯得很是easy了。學習

public DelayQueue() {}

    public DelayQueue(Collection<? extends E> c) {
        this.addAll(c);
    }

void put(E e)

由於DelayQueue是無界隊列,不會由於邊界問題產生阻塞,所以put操做和offer操做是同樣的。ui

public void put(E e) {
        offer(e);
    }

    public boolean offer(E e) {
        // 獲取獨佔鎖
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            // 加入優先隊列裏
            q.offer(e);
            // 判斷堆頂元素是否是剛剛插入的元素
            // 若是判斷爲true,說明當前這個元素是將最早過時
            if (q.peek() == e) {
                // 重置leader線程爲null
                leader = null; 
                // 激活available變量條件隊列中的一個線程
                available.signal();
            }
            return true;
        } finally {
            lock.unlock();
        }
    }

E take()

take方法將會獲取並移除隊列裏面延遲時間過時的元素 ,若是隊列裏面沒有過時元素則陷入等待。

public E take() throws InterruptedException {
        // 獲取獨佔鎖
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            for (;;) {
                // 瞅一瞅誰最快過時
                E first = q.peek();
                // 隊列爲空,則將當前線程置入available的條件隊列中,直到裏面有元素
                if (first == null)
                    available.await();
                else {
                    // 看下還有多久過時
                    long delay = first.getDelay(NANOSECONDS);
                    // 哇,已通過期了,就移除它並返回
                    if (delay <= 0)
                        return q.poll();
                    first = null; // don't retain ref while waiting
                    // leader不爲null表示其餘線程也在執行take
                    // 則將當前線程置入available的條件隊列中
                    if (leader != null)
                        available.await();
                    else {
                        // 若是leader爲null,則選擇當前線程做爲leader線程
                        Thread thisThread = Thread.currentThread();
                        leader = thisThread;
                        try {
                            // 等待delay時間,時間到以後,會出條件隊列,繼續競爭鎖
                            available.awaitNanos(delay);
                        } finally {
                            if (leader == thisThread)
                                leader = null;
                        }
                    }
                }
            }
        } finally {
            if (leader == null && q.peek() != null)
                available.signal();
            lock.unlock();
        }
    }

first = null 有什麼用

若是不設置first = null,將會引發內存泄露。

  • 線程A到達,隊首元素沒有到期,設置leader = 線程A,而且執行available.awaitNanos(delay);等待元素過時。
  • 這時線程B來了,由於leader != null,則會available.await();阻塞,線程C、D、E同理。
  • 線程A阻塞完畢了,再次循環,獲取列首元素成功,出列。

這個時候列首元素應該會被回收掉,可是問題是它還被線程B、線程C持有着,因此不會回收,若是線程增多,且隊首元素無限期的不能回收,就會形成內存泄漏。

總結

DelayQueue是一個支持延時獲取元素無界阻塞隊列,使用PriorityQueue來存儲元素。

隊中的元素必須實現Delayed接口【Delay接口又繼承了Comparable,須要實現compareTo方法】,每一個元素都須要指明過時時間,經過getDelay(unit)獲取元素剩餘時間【剩餘時間 = 到期時間 - 當前時間】,每次向優先隊列中添加元素時根據compareTo方法做爲排序規則。

基於Leader-Follower模式使用leader變量,減小沒必要要的線程等待。

DelayQueue是無界隊列,所以插入操做是非阻塞的。可是take操做從隊列獲取元素時,是阻塞的,阻塞規則爲:

  • 當一個線程調用隊列的take方法,若是隊列爲空,則將會調用 available.await()陷入阻塞。
  • 若是隊列不爲空,則查看隊列的隊首元素是否過時,根據getDelay的返回值是否小於0判斷,若是過時則返回該元素。
  • 若是隊首元素未過時,則判斷當前線程是否爲leader線程,若是不是,代表有其餘線程在執行take操做,就調用available.await()陷入阻塞。
  • 若是沒有其餘線程在執行take,就將當前線程設置爲leader,並等待隊首元素過時,available.awaitNanos(delay)
  • leader線程退出take以後,將會調用available.signal()喚醒一個follower線程,接着回到開始那步。

參考閱讀

相關文章
相關標籤/搜索