DelayQueue 類關係圖
- 從類關係圖譜上看,本質上具備集合、隊列、阻塞阻塞隊列、延遲等特性
應用場景:
- 延遲隊列(相似RocketMQ中提供的機制)
- 定時任務(定時觸發某個任務)
核心原理:
- 隊列中的元素按到期時間排好序;
- 假設存在3個消費者線程
- 線程1經過爭搶成爲了leader
- 線程1查看隊列頭部元素
- 發現須要2s後到期,則進入睡眠狀態2s後喚醒
- 此時線程二、3處於待命狀態,不會作任何事情
- 線程1喚醒後,拿到對象1後,向線程二、3發送signal
- 線程二、3收到信號後,爭搶leader
- 此處假設線程2搶到leader
- 線程2查看對象2狀態,休眠3s後喚醒
- 後續邏輯與線程1邏輯類同
- 線程2被喚醒後,線程3成爲leader進入等待狀態
- 此時,若線程1已處理完畢,則繼續處於待命狀態
- 若線程1未處理完畢,則繼續處理
- 一種很差的狀況,3個線程因處理時間較長,目前都在處理中狀態;
- 此時對象4快要到期了,沒有消費者線程空下來消費
- 此時對象4的處理會延期
- 若是元素進入隊列很快、且元素間到期時間相對集中,而且元素處理時間較長時,可能形成隊列元素堆積狀況
- 還有一種特殊狀況,若目前處於左圖現狀
- 隊列中的頭元素忽然發生變化
- 由於leader是取頭元素的,此時的leader將沒有意義
- 則將把當前leader = null
- 此時可能喚醒線程二、3中的某一個成爲新的leader
- 新的leader將從新查看當前隊列中最新的頭元素
- 再後面的邏輯與上述一致;
核心方法offer()
核心方法take()
offfer() ->插入元素到隊列中
peek() -> 窺視 查看
await() -> 待命
awaitNanos - > 等待
signal() -> 發出信號
poll() -> 從隊列中彈出頭部元素
lockInterruptibly() ->加了一把可中斷鎖
延遲隊列實現代碼
/**
* @author qinchen
* @date 2021/6/17 14:27
* @description 延遲隊列數據對象
*/
public class Order implements Delayed {
/**
* 延遲時間
*/
private Long delayTime;
private String name;
public Order(Long delayTime, String name) {
this.delayTime = System.currentTimeMillis() + delayTime;
this.name = name;
}
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(this.delayTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
@Override
public int compareTo(Delayed o) {
Order order = (Order) o;
Long t = this.delayTime - order.delayTime;
if( t > 0) {
return 1;
}
if( t < 0) {
return -1;
}
return 0;
}
public String getName() {
return name;
}
}
public class OrderConsumer implements Runnable{
private DelayQueue<Order> queue;
public OrderConsumer(DelayQueue<Order> queue) {
this.queue = queue;
}
@Override
public void run() {
while (true) {
try {
Order take = queue.take();
System.out.println("消費的訂單名稱:" + take.getName());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
Order order1 = new Order(5000L, "Order1");
Order order2 = new Order(12000L, "Order2");
Order order3 = new Order(3000L, "Order3");
DelayQueue<Order> queue = new DelayQueue<>();
queue.offer(order1);
queue.offer(order2);
queue.offer(order3);
ExecutorService exec = new ThreadPoolExecutor(4, 8,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(), Executors.defaultThreadFactory());
exec.execute(new OrderConsumer(queue));
exec.shutdown();