JDK容器學習之Queue:DelayQueue

延遲阻塞隊列 DelayQueue

阻塞隊列與普通隊列的區別在於,當隊列是空的時,從隊列中獲取元素的操做將會被阻塞,或者當隊列是滿時,往隊列裏添加元素的操做會被阻塞java

延遲阻塞隊列DelayQueue的底層是基於優先級隊列PriorityQueue來實現的,所以研究延遲阻塞隊列,更多的注意力應集中在如下兩點緩存

  • 阻塞是如何實現的
  • 應用場景是什麼

I. 阻塞隊列的實現邏輯

1. 限定

類的聲明以下,要求隊列中的元素必須繼承 Delayed安全

public class DelayQueue<E extends Delayed> 
    extends AbstractQueue<E>
    implements BlockingQueue<E>
    
    
public interface Delayed extends Comparable<Delayed> {
    long getDelay(TimeUnit unit);
}

這個限定,主要服務於優先級隊列的排序要求,根據延遲時間對元素隊列中的元素進行排序併發

2. 入隊

入隊的實現邏輯比較簡單,爲了保證併發安全,實現中實現加鎖機制異步

public boolean offer(E e) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        q.offer(e);
        if (q.peek() == e) {
            leader = null;
            available.signal();
        }
        return true;
    } finally {
        lock.unlock();
    }
}

入隊的實際是交由優先級隊列進行實現,須要注意的是,入隊以後,額外的一個操做,若是入隊的元素剛好在隊列頭,執行兩個操做ide

  1. leader賦值爲空 (這個是幹嗎的,爲何這麼作?)
  2. available.signal() 喚醒被阻塞的線程(什麼線程被阻塞?)

3. 出隊

public E poll() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        E first = q.peek();
        if (first == null || first.getDelay(NANOSECONDS) > 0)
            return null;
        else
            return q.poll();
    } finally {
        lock.unlock();
    }
}

出隊的操做一樣加鎖,獲取隊列頭的元素,判斷延期時間是否結束,是才返回結果,不然返回nullpost

注意,這裏有兩個疑問性能

  1. 隊列中元素getDelay()方法返回值會變麼,由誰來改變呢?
  2. 上面的出隊沒有阻塞,直接返回了null

雖然上面的出隊和入隊的邏輯比較簡單,可是留下的疑問一點都很多,上面的四個問題應該如何解答?學習

繼續看源碼,發現還有一個出隊的方法, 傳入了兩個參數表示阻塞的超時時間(即超過這個時間沒有返回,則拋一箇中端異常)測試

public E poll(long timeout, TimeUnit unit) throws InterruptedException {
    // 時間轉換爲納秒
    long nanos = unit.toNanos(timeout);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        for (;;) {
            // 獲取隊列頭
            E first = q.peek();
            if (first == null) { // 隊列爲空
                if (nanos <= 0)
                    // 延時時間已過,直接返回null
                    return null;
                else
                    // 當前線程阻塞 nanos (ns),而後再次循環
                    nanos = available.awaitNanos(nanos);
            } else { // 隊列非空
                // 獲取隊列頭元素的延遲時間
                long delay = first.getDelay(NANOSECONDS);
                if (delay <= 0) // 延遲時間小於0,直接返回隊列頭
                    return q.poll();
                if (nanos <= 0) 
                // 阻塞時間已過,隊列頭的延遲時間還沒到,則返回null
                    return null;
                first = null; // don't retain ref while waiting
                if (nanos < delay || leader != null)
                // 沒法獲取當前的隊列頭
                //(由於隊列頭延遲時間大於阻塞時間,即隊列頭不生效)
                // 繼續阻塞,以指望此時可能新增一個到隊列頭
                    nanos = available.awaitNanos(nanos);
                else {
                // 能夠獲取當前隊列頭
                    Thread thisThread = Thread.currentThread();
                    leader = thisThread;
                    try {
                    // 阻塞到隊列頭生效
                        long timeLeft = available.awaitNanos(delay);
                        nanos -= delay - timeLeft;
                    } finally {
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    } finally {
        if (leader == null && q.peek() != null)
            available.signal();
        lock.unlock();
    }
}

分析: 以當前隊列爲空做爲條件

上面代碼的流程以下:

  1. 阻塞當前方法
  2. 此時若新入隊一個元素,根據前面入隊方法,此時表示新入隊的就是在隊列頭,會發出一個喚醒的操做
  3. 此時阻塞的線程被喚醒,繼續循環,再次獲取隊列頭(此時非空了)
  4. 判斷隊列頭的元素延遲時間是否已過
  • 已過,則彈出隊列頭,並返回
  • 未過,繼續判斷阻塞時間是否小於0
    • 是則表示已通過了預期的阻塞時間,直接返回null
    • 若阻塞時間小於隊列頭的延遲時間(表示能夠當前的隊列頭,不是本方法預期的),則繼續阻塞當前線程,以指望此時有新入隊的元素可能被再次獲取
    • 不然表示當前線程獲能夠獲取如今的隊列頭,記錄下當前線程,並阻塞,等到隊列頭元素生效

繼續化重點

  • 添加元素到隊列頭會喚起出隊的阻塞線程
  • 被喚起以後,出隊線程會再次獲取隊列頭元素,判斷是否能夠返回(即getDelay返回小於0)

上面的方法由於加上了一個超時時間(即在指定的時間內依然沒法返回時,斷掉阻塞),分析起來可能不太順暢,再看源碼,還有一個take方法,邏輯與上面類似,只是砍掉了超時斷開阻塞的邏輯

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        for (;;) {
            E first = q.peek();
            if (first == null)
            // 隊列頭爲空,則阻塞,直到新增一個入隊爲止
                available.await();
            else {
                long delay = first.getDelay(NANOSECONDS);
                if (delay <= 0)
                // 若隊列頭元素已生效,則直接返回
                    return q.poll();
                first = null; // don't retain ref while waiting
                if (leader != null)
                // leader 非空時,表示有其餘的一個線程在出隊阻塞中
                // 此時掛住當前線程,等待另外一個線程出隊完成
                    available.await();
                else {
                    Thread thisThread = Thread.currentThread();
                    leader = thisThread;
                    try {
                    // 等待隊列頭元素生效
                        available.awaitNanos(delay);
                    } finally {
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    } finally {
        if (leader == null && q.peek() != null)
        // 當前線程出隊完成,通知其餘出隊阻塞的線程繼續執行
            available.signal();
        lock.unlock();
    }
}

經過了以前的燒腦邏輯以後,再看這個就簡單不少了

1. 隊列爲空,則阻塞,直到有個線程完成入隊操做

2. 獲取隊列頭,若隊列頭已生效,則直接返回

3. 若隊列頭未生效

  1. 如有另外一個線程已經處於等待隊列頭生效的阻塞過程當中,則阻塞當前線程,直到另外一個線程完成出隊操做

  2. 若沒有其餘線程阻塞在出隊過程當中,即當前線程爲第一個獲取隊列頭的線程

    • 標識當前線程處於等待隊列頭生效的阻塞中(leader = thisThread
    • 阻塞當前線程,等待隊列頭生效
    • 隊列頭生效以後,清空標識(leader=null)
    • 再次進入循環,獲取隊列頭並返回
  3. 最後步驟1中被阻塞的線程


所以能夠愉快的解答上面的四個問題

添加一個元素到隊列頭

  1. leader賦值爲空 (這個是幹嗎的,爲何這麼作?)

    leader記錄了被阻塞在等待隊列頭生效的線程
    新增一個元素到隊列頭,表示等待原來隊列頭生效的阻塞的線程已經失去了阻塞的意義,此時須要獲取新的隊列頭進行返回了
  2. available.signal() 喚醒被阻塞的線程(什麼線程被阻塞?)

    獲取隊列頭的線程被喚起,主要有兩種場景:
    1. 以前隊列爲空,致使被阻塞的線程
    2. 以前隊列非空,可是隊列頭沒有生效致使被阻塞的線程

普通的出隊方法

  1. 隊列中元素getDelay()方法返回值會變麼,由誰來改變呢?

    必須得變,不然這個元素一直不生效,將直接致使線程一直阻塞
    由隊列中的元素實現類來保證,返回值是逐漸變小的
  2. 上面的出隊沒有阻塞,直接返回了null

    須要阻塞獲取隊列頭,用 `take`, `poll(long,TimeUnit)`來代替

II. 應用場景及使用case

上面分析的是阻塞隊列的實現原理,接下來舉一個實例來解析下這個延遲阻塞隊列的使用姿式,加深下理解

1. 一個實例場景

(簡化了在簡化以後的,與實際會有一些區別,請勿徹底認定合理)

好比和電商的詳情頁展現,爲了提升應用的性能,咱們將整個頁面進行了緩存,當詳情頁發生修改後,咱們會更新緩存的內容

所以爲了保證緩存的內容和實際的內容是一致的,咱們須要一個對帳的任務,當詳情頁修改後,而且更新緩存完成以後,咱們須要再次對比緩存和實際內容的一致性;

此時一個異步的任務能夠這麼設計:監聽詳情頁修改的事件,延遲一段時間,而後再對比緩存和實際內容的一致性 (這裏延遲一段時間主要是爲了保證緩存已經更新完成)

2. 實現

詳情信息 DetailInfo

@Data
@NoArgsConstructor
@AllArgsConstructor
public class DetailInfo {

    private int itemId;

    private String title;

    private String desc;

    private int price;

}

延遲隊列的元素 UpdateTask

注意其中 getDelay() 的實現邏輯,根據當前時間與預訂的延遲生效時間進行比較

@Data
@AllArgsConstructor
public class UpdateTask implements Delayed {

    private int itemId;

    private long delayTime;


    @Override
    public long getDelay(TimeUnit unit) {
        return delayTime - System.currentTimeMillis();
    }

    @Override
    public int compareTo(Delayed o) {
        return (int) (getDelay(TimeUnit.MICROSECONDS) - o.getDelay(TimeUnit.MICROSECONDS));
    }
}

主業務邏輯

更新事件的監聽訂閱使用了 Guava的EventBus來處理,若有疑問能夠搜索EventBus的使用姿式

public class DetailManager {
    // 模擬真實數據存儲空間
    private Map<Integer, DetailInfo> realMap = new ConcurrentHashMap<>();
    // 模擬緩存空間
    private Map<String, String> cache = new ConcurrentHashMap<>();

    private Gson gson = new Gson();

    private String getCacheKey(int itemId) {
        return "detailInfo_" + itemId;
    }

    // eventBus 用於發送更新事件;異步接受更新事件
    private AsyncEventBus eventBus;
    private void init() {
        DetailInfo detailInfo = new DetailInfo(1, "onw", "第一個測試", 100);
        DetailInfo detailInfo2 = new DetailInfo(2, "two", "第二個測試", 200);

        realMap.put(detailInfo.getItemId(), detailInfo);
        realMap.put(detailInfo2.getItemId(), detailInfo2);

        cache.put(getCacheKey(detailInfo.getItemId()), gson.toJson(detailInfo));
        cache.put(getCacheKey(detailInfo2.getItemId()), gson.toJson(detailInfo2));

        eventBus = new AsyncEventBus("Validate-Thread", 
            Executors.newFixedThreadPool(2));
        eventBus.register(this);
    }

    // 模擬更新商品
    public void updateDetail(int itemId) {
        DetailInfo detailInfo = realMap.get(itemId);
        long now = System.currentTimeMillis();
        detailInfo.setTitle("title_" + itemId + "_" + now);
        cache.put(getCacheKey(itemId), gson.toJson(detailInfo));

        // 發送一個修改的事件
        eventBus.post(new UpdateTask(itemId, now + 5000));
        System.out.println("[UpdateInfo]>>>ItemId: " + itemId + " updateTime: " + now + " validateTime: " + (now + 5000));
    }

    // 延遲隊列
    private DelayQueue<UpdateTask> delayQueue = new DelayQueue<>();
    /**
     * 監聽修改事件
     * @param updateTask
     */
    @Subscribe
    public void verify(UpdateTask updateTask) {
        long getTaskTime = System.currentTimeMillis();
        delayQueue.put(updateTask);

        try {
            UpdateTask task = delayQueue.take();
            long processTime = System.currentTimeMillis();


            DetailInfo real = realMap.get(task.getItemId());
            String cacheObj = cache.get(getCacheKey(task.getItemId()));
            boolean ans = gson.toJson(real).equals(cacheObj);
            System.out.println("validate itemId: " + updateTask.getItemId() +
                    " getEventTime: " + getTaskTime +
                    " processTime:" + processTime + " ans: " + ans);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        DetailManager detailManager = new DetailManager();
        detailManager.init();

        // 開始修改
        detailManager.updateDetail(1);
        Thread.sleep(20);
        detailManager.updateDetail(2);

        Thread.sleep(35000);
    }
}

簡單說明主流程

  1. 首先初始化DetailManager
  2. 更新兩個商品,在更新的邏輯中實現如下步驟
  • 更新實際的商品內容
  • 更新緩存的內容
  • 發送一條商品更新的消息
  1. 異步監聽更新消息任務邏輯
  • 將消息塞入延遲隊列
  • 從延遲隊列中後去已經生效的消息,而後對帳

輸出結果以下

[UpdateInfo]>>>ItemId: 1 updateTime: 1508677959067 validateTime: 1508677964067
[UpdateInfo]>>>ItemId: 2 updateTime: 1508677959103 validateTime: 1508677964103
Thread[pool-1-thread-1,5,main]>>> validate itemId: 1 getEventTime: 1508677959078 processTime:1508677964067 ans: true
Thread[pool-1-thread-2,5,main]>>> validate itemId: 2 getEventTime: 1508677964067 processTime:1508677964103 ans: true

從上面的輸出能夠得知,實際驗證的時間戳和預期的時間錯是相同的

III. 小結

延遲阻塞隊列DelayQueue,學習下來以後感受很是有意思,首先是加深了使用姿式的瞭解,其次對其中的阻塞,喚醒機制有了必定了解,漲了鎖使用知識的見識(這裏面還有一個很是有意思的東西就是 ConditionReentrantLock的使用,後續線程安全篇的研究能夠以此做爲應用場景)

簡單小結上面的學習內容

  1. 隊列中更不能有null
  2. 底層使用的是優先級隊列 PriorityQueue
  3. 經過鎖來實現線程安全
  4. 須要使用阻塞的獲取元素時,請使用 take(), poll(long, TimeUnit)兩方法之一
  5. 要求延遲阻塞隊列的元素實現 Delayed接口,內部實現的getDelay方法,要求返回值愈來愈小(若是一直大於0,這個延遲任務就一直沒法執行了)

掃描關注,java分享

QrCodeGG

相關文章
相關標籤/搜索