Java多線程進階(三六)—— J.U.C之collections框架:DelayQueue

圖片描述

本文首發於一世流雲專欄: https://segmentfault.com/blog...

1、DelayQueue簡介

DelayQueue是JDK1.5時,隨着J.U.C包一塊兒引入的一種阻塞隊列,它實現了BlockingQueue接口,底層基於已有的PriorityBlockingQueue實現:segmentfault

clipboard.png

DelayQueue也是一種比較特殊的阻塞隊列,從類聲明也能夠看出,DelayQueue中的全部元素必須實現Delayed接口:設計模式

/**
 * 一種混合風格的接口,用來標記那些應該在給定延遲時間以後執行的對象。
 * <p>
 * 此接口的實現必須定義一個 compareTo 方法,該方法提供與此接口的 getDelay 方法一致的排序。
 */
public interface Delayed extends Comparable<Delayed> {

    /**
     * 返回與此對象相關的剩餘有效時間,以給定的時間單位表示.
     */
    long getDelay(TimeUnit unit);
}

能夠看到,Delayed接口除了自身的getDelay方法外,還實現了Comparable接口。getDelay方法用於返回對象的剩餘有效時間,實現Comparable接口則是爲了可以比較兩個對象,以便排序。緩存

也就是說,若是一個類實現了Delayed接口,當建立該類的對象並添加到DelayQueue中後,只有當該對象的getDalay方法返回的剩餘時間≤0時纔會出隊網絡

另外,因爲DelayQueue內部委託了PriorityBlockingQueue對象來實現全部方法,因此能以堆的結構維護元素順序,這樣剩餘時間最小的元素就在堆頂,每次出隊其實就是刪除剩餘時間≤0的最小元素多線程

DelayQueue的特色簡要歸納以下:框架

  1. DelayQueue是無界阻塞隊列;
  2. 隊列中的元素必須實現Delayed接口,元素過時後纔會從隊列中取走;

2、DelayQueue示例

爲了便於理解DelayQueue的功能,咱們先來看一個使用DelayQueue的示例。dom

隊列元素

第一節說了,隊列元素必須實現Delayed接口,咱們先來定義一個Data類,做爲隊列元素:異步

public class Data implements Delayed {
    private static final AtomicLong atomic = new AtomicLong(0);
    private static final DateTimeFormatter formatter = DateTimeFormatter.ofPattern("HH:mm:ss-n");

    // 數據的失效時間點
    private final long time;

    // 序號
    private final long seqno;

    /**
     * @param deadline 數據失效時間點
     */
    public Data(long deadline) {
        this.time = deadline;
        this.seqno = atomic.getAndIncrement();
    }

    /**
     * 返回剩餘有效時間
     *
     * @param unit 時間單位
     */
    @Override
    public long getDelay(TimeUnit unit) {
        return unit.convert(this.time - System.nanoTime(), TimeUnit.NANOSECONDS);
    }

    /**
     * 比較兩個Delayed對象的大小, 比較順序以下:
     * 1. 若是是對象自己, 返回0;
     * 2. 比較失效時間點, 先失效的返回-1,後失效的返回1;
     * 3. 比較元素序號, 序號小的返回-1, 不然返回1.
     * 4. 非Data類型元素, 比較剩餘有效時間, 剩餘有效時間小的返回-1,大的返回1,相同返回0
     */
    @Override
    public int compareTo(Delayed other) {
        if (other == this)  // compare zero if same object
            return 0;

        if (other instanceof Data) {
            Data x = (Data) other;

            // 優先比較失效時間
            long diff = this.time - x.time;
            if (diff < 0)
                return -1;
            else if (diff > 0)
                return 1;

            else if (this.seqno < x.seqno)    // 剩餘時間相同則比較序號
                return -1;
            else
                return 1;
        }

        // 通常不會執行到此處,除非元素不是Data類型
        long diff = this.getDelay(TimeUnit.NANOSECONDS) - other.getDelay(TimeUnit.NANOSECONDS);
        return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
    }

    @Override
    public String toString() {
        return "Data{" +
            "time=" + time +
            ", seqno=" + seqno +
            "}, isValid=" + isValid();
    }

    private boolean isValid() {
        return this.getDelay(TimeUnit.NANOSECONDS) > 0;
    }
}

關於隊列元素Data類,須要注意如下幾點:ide

  1. 每一個元素的time字段保存失效時間點)的納秒形式(構造時指定,好比當前時間+60s);
  2. seqno字段表示元素序號,每一個元素惟一,僅用於失效時間點一致的元素之間的比較。
  3. getDelay方法返回元素的剩餘有效時間,能夠根據入參的TimeUnit選擇時間的表示形式(秒、微妙、納秒等),通常選擇納秒以提升精度;
  4. compareTo方法用於比較兩個元素的大小,以便在隊列中排序。因爲DelayQueue基於優先級隊列實現,因此內部是「堆」的形式,咱們定義的規則是先失效的元素將先出隊,因此先失效元素應該在堆頂,即compareTo方法返回結果<0的元素優先出隊;

生產者-消費者

仍是以「生產者-消費者」模式來做爲DelayQueued的示例:性能

生產者

public class Producer implements Runnable {
    private final DelayQueue<Data> queue;

    public Producer(DelayQueue<Data> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        while (true) {

            long currentTime = System.nanoTime();
            long validTime = ThreadLocalRandom.current().nextLong(1000000000L, 7000000000L);

            Data data = new Data(currentTime + validTime);
            queue.put(data);

            System.out.println(Thread.currentThread().getName() + ": put " + data);

            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

消費者

public class Consumer implements Runnable {
    private final DelayQueue<Data> queue;

    public Consumer(DelayQueue<Data> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        while (true) {
            try {
                Data data = queue.take();
                System.out.println(Thread.currentThread().getName() + ": take " + data);

                Thread.yield();

            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

調用

public class Main {
    public static void main(String[] args) {
        DelayQueue<Data> queue = new DelayQueue<>();

        Thread c1 = new Thread(new Consumer(queue), "consumer-1");
        Thread p1 = new Thread(new Producer(queue), "producer-1");

        c1.start();
        p1.start();
    }
}

執行結果:

producer-1: put Data{time=73262562161592, seqno=0}, isValid=true
producer-1: put Data{time=73262787192726, seqno=1}, isValid=true
producer-1: put Data{time=73265591291171, seqno=2}, isValid=true
producer-1: put Data{time=73266850330909, seqno=3}, isValid=true
consumer-1: take Data{time=73262562161592, seqno=0}, isValid=false
consumer-1: take Data{time=73262787192726, seqno=1}, isValid=false
producer-1: put Data{time=73267928737184, seqno=4}, isValid=true
producer-1: put Data{time=73265083111776, seqno=5}, isValid=true
producer-1: put Data{time=73268729942809, seqno=6}, isValid=true
consumer-1: take Data{time=73265083111776, seqno=5}, isValid=false

上面示例中,咱們建立了一個生產者,一個消費者,生產者不斷得入隊元素,每一個元素都會有個截止有效期;消費者不斷得從隊列者獲取元素。從輸出能夠看出,消費者每次獲取到的元素都是有效期最小的,且都是已經失效了的。(由於DelayQueue每次出隊只會刪除有效期最小且已通過期的元素)

3、DelayQueue原理

介紹完了DelayQueued的基本使用,讀者應該對該阻塞隊列的功能有了基本瞭解,接下來咱們看下Doug Lea是如何實現DelayQueued的。

構造

DelayQueued提供了兩種構造器,都很是簡單:

/**
 * 默認構造器.
 */
public DelayQueue() {
}
/**
 * 從已有集合構造隊列.
 */
public DelayQueue(Collection<? extends E> c) {
    this.addAll(c);
}

能夠看到,內部的PriorityQueue並不是在構造時建立,而是對象建立時生成:

public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
    implements BlockingQueue<E> {

    private final transient ReentrantLock lock = new ReentrantLock();
    private final PriorityQueue<E> q = new PriorityQueue<E>();

    /**
     * leader線程是首個嘗試出隊元素(隊列不爲空)但被阻塞的線程.
     * 該線程會限時等待(隊首元素的剩餘有效時間),用於喚醒其它等待線程
     */
    private Thread leader = null;

    /**
     * 出隊線程條件隊列, 當有多個線程, 會在此條件隊列上等待.
     */
    private final Condition available = lock.newCondition();

    //...

}

上述比較特殊的是leader字段,咱們以前已經說過,DelayQueue每次只會出隊一個過時的元素,若是隊首元素沒有過時,就會阻塞出隊線程,讓線程在available這個條件隊列上無限等待。

爲了提高性能,DelayQueue並不會讓全部出隊線程都無限等待,而是用leader保存了第一個嘗試出隊的線程,該線程的等待時間是隊首元素的剩餘有效期。這樣,一旦leader線程被喚醒(此時隊首元素也失效了),就能夠出隊成功,而後喚醒一個其它在available條件隊列上等待的線程。以後,會重複上一步,新喚醒的線程可能取代成爲新的leader線程。這樣,就避免了無效的等待,提高了性能。這實際上是一種名爲「Leader-Follower pattern」的多線程設計模式。


入隊——put

put方法沒有什麼特別,因爲是無界隊列,因此也不會阻塞線程。

/**
 * 入隊一個指定元素e.
 * 因爲是無界隊列, 因此該方法並不會阻塞線程.
 */
public void put(E e) {
    offer(e);
}

public boolean offer(E e) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        q.offer(e);             // 調用PriorityQueue的offer方法
        if (q.peek() == e) {    // 若是入隊元素在隊首, 則喚醒一個出隊線程
            leader = null;
            available.signal();
        }
        return true;
    } finally {
        lock.unlock();
    }
}
須要注意的是當首次入隊元素時,須要喚醒一個出隊線程,由於此時可能已有出隊線程在空隊列上等待了,若是不喚醒,會致使出隊線程永遠沒法執行。
if (q.peek() == e) {    // 若是入隊元素在隊首, 則喚醒一個出隊線程
    leader = null;
    available.signal();
}

出隊——take

整個take方法在一個自旋中完成,其實就分爲兩種狀況:

1.隊列爲空

這種狀況直接阻塞出隊線程。(在available條件隊列等待)

2.隊列非空

隊列非空時,還要看隊首元素的狀態(有效期),若是隊首元素過時了,那直接出隊就好了;若是隊首元素未過時,就要看當前線程是不是第一個到達的出隊線程(即判斷leader是否爲空),若是不是,就無限等待,若是是,則限時等待。

/**
 * 隊首出隊元素.
 * 若是隊首元素(堆頂)未到期或隊列爲空, 則阻塞線程.
 */
public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        for (; ; ) {
            E first = q.peek();     // 讀取隊首元素
            if (first == null)      // CASE1: 隊列爲空, 直接阻塞
                available.await();
            else {                  // CASE2: 隊列非空
                long delay = first.getDelay(NANOSECONDS);
                if (delay <= 0)                             // CASE2.0: 隊首元素已過時
                    return q.poll();

                // 執行到此處說明隊列非空, 且隊首元素未過時
                first = null;
                if (leader != null)                         // CASE2.1: 已存在leader線程
                    available.await();      // 無限期阻塞當前線程
                else {                                      // CASE2.2: 不存在leader線程
                    Thread thisThread = Thread.currentThread();
                    leader = thisThread;    // 將當前線程置爲leader線程
                    try {
                        available.awaitNanos(delay);        // 阻塞當前線程(限時等待剩餘有效時間)
                    } finally {
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    } finally {
        if (leader == null && q.peek() != null)             // 不存在leader線程, 則喚醒一個其它出隊線程
            available.signal();
        lock.unlock();
    }
}
須要注意,自旋結束後若是 leader == null && q.peek() != null,須要喚醒一個等待中的出隊線程。
leader == null && q.peek() != null的含義就是——沒有 leader線程但隊列中存在元素。咱們以前說了,leader線程做用之一就是用來喚醒其它無限等待的線程,因此必需要有這個判斷。

4、總結

DelayQueue是阻塞隊列中很是有用的一種隊列,常常被用於緩存或定時任務等的設計。

考慮一種使用場景:

異步通知的重試,在不少系統中,當用戶完成服務調用後,系統有時須要將結果異步通知到用戶的某個URI。因爲網絡等緣由,不少時候會通知失敗,這個時候就須要一種重試機制。

這時能夠用DelayQueue保存通知失敗的請求,失效時間能夠根據已通知的次數來設定(好比:2s、5s、10s、20s),這樣每次從隊列中take獲取的就是剩餘時間最短的請求,若是已重複通知次數超過必定閾值,則能夠把消息拋棄。

後面,咱們在講J.U.C之executors框架的時候,還會再次看到DelayQueue的身影。JUC線程池框架中的ScheduledThreadPoolExecutor.DelayedWorkQueue就是一種延時阻塞隊列。

相關文章
相關標籤/搜索