【併發隊列】無界阻塞隊列DelayQueue 源碼解析

內部結構

  • 可重入鎖
  • 用於根據delay時間排序的優先級隊列
  • 用於優化阻塞通知的線程元素leader
  • 用於實現阻塞和通知的Condition對象

delayed和PriorityQueue

  • delayed是一個具備過時時間的元素
  • PriorityQueue是一個根據隊列裏元素某些屬性排列前後的順序隊列

delayQueue其實就是在每次往優先級隊列中添加元素,而後以元素的delay過時值做爲排序的因素,以此來達到先過時的元素會拍在隊首,每次從隊列裏取出來都是最早要過時的元素。less

所謂Delayed類型,由於須要比較,因此繼承了Comparable接口:優化

public interface Delayed extends Comparable<Delayed> {
    long getDelay(TimeUnit unit);
}

DelayQueue須要排序存儲Delayed類型的對象同時具有阻塞功能,可是阻塞的過程伴有延遲等待類型的阻塞,所以不能直接使用BlockingPriorityQueue來實現,而是用非阻塞的版本的PriorityQueue來實現排序存儲。ui

private final PriorityQueue<E> q = new PriorityQueue<E>();

所以DelayQueue須要本身實現阻塞的功能(須要一個Condition):this

private final Condition available = lock.newCondition();

offer方法

  1. 執行加鎖操做
  2. 元素添加到優先級隊列中
  3. 查看元素是否爲隊首
  4. 若是是隊首的話,設置leader爲空,喚醒全部等待的隊列
  5. 釋放鎖

代碼以下:spa

public boolean offer(E e) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            q.offer(e);
            if (q.peek() == e) {
                leader = null;
                available.signal();
            }
            return true;
        } finally {
            lock.unlock();
        }
    }

take方法

  1. 執行加鎖操做
  2. 取出優先級隊列元素q的隊首
  3. 若是元素q的隊首/隊列爲空,阻塞請求
  4. 若是元素q的隊首(first)不爲空,得到這個元素的delay時間值
  5. 若是first的延遲delay時間值小於等於0的話,說明該元素已經到了可使用的時間,調用poll方法彈出該元素,跳出方法
  6. 若是first的延遲delay時間值不爲0的話,釋放元素first的引用,避免內存泄露
  7. 判斷leader元素是否爲空,不爲空的話阻塞當前線程
  8. 若是leader元素爲空的話,把當前線程賦值給leader元素,而後阻塞delay的時間,即等待隊首到達能夠出隊的時間,在finally塊中釋放leader元素的引用
  9. 循環執行從1~8的步驟
  10. 若是leader爲空而且優先級隊列不爲空的狀況下(判斷還有沒有其餘後續節點),調用signal通知其餘的線程
  11. 執行解鎖操做
public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            for (;;) {
                E first = q.peek();
                if (first == null)
                    available.await();
                else {
                    long delay = first.getDelay(NANOSECONDS);
                    if (delay <= 0)
                        return q.poll();
                    first = null; // don't retain ref while waiting
                    if (leader != null)
                        available.await();
                    else {
                        Thread thisThread = Thread.currentThread();
                        leader = thisThread;
                        try {
                            available.awaitNanos(delay);
                        } finally {
                            if (leader == thisThread)
                                leader = null;
                        }
                    }
                }
            }
        } finally {
            if (leader == null && q.peek() != null)
                available.signal();
            lock.unlock();
        }
    }

get點

整個代碼的過程當中並無使用上太難理解的地方,可是有幾個比較難以理解他爲何這麼作的地方線程

leader元素的使用

你們可能看到在咱們的DelayQueue中有一個Thread類型的元素leader,那麼他是作什麼的呢,有什麼用呢?code

讓咱們先看一下元素註解上的doc描述:對象

Thread designated to wait for the element at the head of the queue.
This variant of the Leader-Follower pattern serves to minimize unnecessary timed waiting.
when a thread becomes the leader, it waits only for the next delay to elapse, but other threads await indefinitely.
The leader thread must signal some other thread before returning from take() or poll(...), unless some other thread becomes leader in the interim.
Whenever the head of the queue is replaced with an element with an earlier expiration time, the leader field is invalidated by being reset to null, and some waiting thread, but not necessarily the current leader, is signalled.
So waiting threads must be prepared to acquire and lose leadership while waiting.排序

上面主要的意思就是說用leader來減小沒必要要的等待時間,那麼這裏咱們的DelayQueue是怎麼利用leader來作到這一點的呢:繼承

這裏咱們想象着咱們有個多個消費者線程用take方法去取,內部先加鎖,而後每一個線程都去peek第一個節點.
若是leader不爲空說明已經有線程在取了,設置當前線程等待

if (leader != null)
   available.await();

若是爲空說明沒有其餘線程去取這個節點,設置leader並等待delay延時到期,直到poll後結束循環

else {
         Thread thisThread = Thread.currentThread();
         leader = thisThread;
         try {
              available.awaitNanos(delay);
         } finally {
              if (leader == thisThread)
                  leader = null;
         }
     }

take方法中爲何釋放first元素

first = null; // don't retain ref while waiting

咱們能夠看到doug lea後面寫的註釋,那麼這段代碼有什麼用呢?

想一想假設如今延遲隊列裏面有三個對象。

  • 線程A進來獲取first,而後進入 else 的else ,設置了leader爲當前線程A
  • 線程B進來獲取first,進入else的阻塞操做,而後無限期等待
  • 這時在JDK 1.7下面他是持有first引用的
  • 若是線程A阻塞完畢,獲取對象成功,出隊,這個對象理應被GC回收,可是他還被線程B持有着,GC鏈可達,因此不能回收這個first.
  • 假設還有線程C 、D、E.. 持有對象1引用,那麼無限期的不能回收該對象1引用了,那麼就會形成內存泄露.
相關文章
相關標籤/搜索