無界阻塞延遲隊列DelayQueue基本原理與使用

DelayQueue 類關係圖

  • 從類關係圖譜上看,本質上具備集合、隊列、阻塞阻塞隊列、延遲等特性

應用場景:

  • 延遲隊列(相似RocketMQ中提供的機制)
  • 定時任務(定時觸發某個任務)

核心原理:

  • 初始狀態

  1. 隊列中的元素按到期時間排好序;
  2. 假設存在3個消費者線程
  3. 線程1經過爭搶成爲了leader
  4. 線程1查看隊列頭部元素
  5. 發現須要2s後到期,則進入睡眠狀態2s後喚醒
  6. 此時線程二、3處於待命狀態,不會作任何事情
  7. 線程1喚醒後,拿到對象1後,向線程二、3發送signal
  8. 線程二、3收到信號後,爭搶leader
  • 進一步狀態

  1. 此處假設線程2搶到leader
  2. 線程2查看對象2狀態,休眠3s後喚醒
  3. 後續邏輯與線程1邏輯類同
  4. 線程2被喚醒後,線程3成爲leader進入等待狀態
  5. 此時,若線程1已處理完畢,則繼續處於待命狀態
  6. 若線程1未處理完畢,則繼續處理
  • 不良狀態

  1. 一種很差的狀況,3個線程因處理時間較長,目前都在處理中狀態;
  2. 此時對象4快要到期了,沒有消費者線程空下來消費
  3. 此時對象4的處理會延期
  4. 若是元素進入隊列很快、且元素間到期時間相對集中,而且元素處理時間較長時,可能形成隊列元素堆積狀況
  • 特殊狀態

  1. 還有一種特殊狀況,若目前處於左圖現狀
  2. 隊列中的頭元素忽然發生變化
  3. 由於leader是取頭元素的,此時的leader將沒有意義
  4. 則將把當前leader = null
  5. 此時可能喚醒線程二、3中的某一個成爲新的leader
  6. 新的leader將從新查看當前隊列中最新的頭元素
  7. 再後面的邏輯與上述一致;

核心方法offer()

核心方法take()

  • 重要方法解釋
offfer() ->插入元素到隊列中
peek() -> 窺視 查看
await() -> 待命
awaitNanos - > 等待
signal() -> 發出信號
poll() -> 從隊列中彈出頭部元素
lockInterruptibly() ->加了一把可中斷鎖

延遲隊列實現代碼

/**
 * @author qinchen
 * @date 2021/6/17 14:27
 * @description 延遲隊列數據對象
 */
public class Order implements Delayed {

    /**
     * 延遲時間
     */
    private Long delayTime;

    private String name;

    public Order(Long delayTime, String name) {
        this.delayTime = System.currentTimeMillis() + delayTime;
        this.name = name;
    }

    @Override
    public long getDelay(TimeUnit unit) {
        return unit.convert(this.delayTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
    }

    @Override
    public int compareTo(Delayed o) {

        Order order = (Order) o;
        Long t = this.delayTime - order.delayTime;

        if( t > 0) {
            return 1;
        }

        if( t < 0) {
            return -1;
        }

        return 0;
    }

    public String getName() {
        return name;
    }
}
public class OrderConsumer implements Runnable{

    private DelayQueue<Order> queue;

    public OrderConsumer(DelayQueue<Order> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {

        while (true) {
            try {
                Order take = queue.take();
                System.out.println("消費的訂單名稱:" + take.getName());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

    }
}
Order order1 = new Order(5000L, "Order1");
Order order2 = new Order(12000L, "Order2");
Order order3 = new Order(3000L, "Order3");

DelayQueue<Order> queue = new DelayQueue<>();

queue.offer(order1);
queue.offer(order2);
queue.offer(order3);

ExecutorService exec = new ThreadPoolExecutor(4, 8,
        0L, TimeUnit.MILLISECONDS,
        new LinkedBlockingQueue<>(), Executors.defaultThreadFactory());
exec.execute(new OrderConsumer(queue));
exec.shutdown();
相關文章
相關標籤/搜索