延時隊列 DelayQueue
搞定單機版的超時訂單
那麼何時須要用延時隊列呢?常見的延時任務場景 舉栗子:java
🌱 🌱 🌱 🌱 🌱 🌱 🌱 🌱 🌱 🌱git
DelayQueue
,萬祖歸宗,萬法同源,學會了最基礎的Queue
,就不愁其餘的了延時隊列,首先,它是一種隊列,隊列意味着內部的元素是有序的,元素出隊和入隊是有方向性的,元素從一端進入,從另外一端取出。github
其次,延時隊列,最重要的特性就體如今它的延時屬性上,跟普通的隊列不同的是,普通隊列中的元素老是等着但願被早點取出處理,而延時隊列中的元素則是但願被在指定時間獲得取出和處理,因此延時隊列中的元素是都是帶時間屬性的,一般來講是須要被處理的消息或者任務。數據庫
一言以蔽之曰 : 延時隊列就是用來存放須要在指定時間被處理的元素的隊列。編程
1) DelayQueue 是誰,上族譜 數組
DelayQueue
這一代已經第五代傳人了,
要知道 DelayQueue
自幼生在八戒家,長大就往外面拉,熊熊烈火它不怕,水是水來渣是渣。緩存
不過它真的是文韜武略,有一把ReentrantLock
就是它的九齒釘耙,抗的死死の捍衛着本身的PriorityQueue
.安全
有典故曰:服務器
public class DelayQueue<E extends Delayed> extends AbstractQueue<E> implements BlockingQueue<E> {
// 用於控制併發的 可重入 全局 鎖
private final transient ReentrantLock lock = new ReentrantLock();
// 根據Delay時間排序的 無界的 優先級隊列
private final PriorityQueue<E> q = new PriorityQueue<E>();
// 用於優化阻塞通知的線程元素leader,標記當前是否有線程在排隊(僅用於取元素時)
private Thread leader = null;
// 條件,用於阻塞和通知的Condition對象,表示如今是否有可取的元素
private final Condition available = lock.newCondition();
/** * 省洛方法代碼..... 大家懂個人省洛嗎? */
複製代碼
鎖,隊列,狀態(條件)
Condition
進行條件的判斷-->進行線程之間的通訊和喚起PriorityQueue
做爲一個容器,容器裏面的元素都應該實現Delayed
接口,在每次往優先級隊列中添加元素時以元素的過時時間做爲排序條件,最早過時的元素放在優先級最高。DelayQueue
是一個沒有大小限制的隊列,所以往隊列中插入數據的操做(生產者)永遠不會被阻塞,而只有獲取數據的操做(消費者)纔會被阻塞。2) 優先級隊列 PriorityQueue多線程
由於咱們的DelayQueue
裏面維護了一個優先級的隊列PriorityQueue
簡單的看下:
//默認容量11
private static final int DEFAULT_INITIAL_CAPACITY = 11;
//存儲元素的地方 數組
transient Object[] queue; // non-private to simplify nested class access
//元素個數
private int size = 0;
//比較器
private final Comparator<? super E> comparator;
複製代碼
3) DelayQueue的方法簡介
leader
置爲空,並喚醒等待在條件available
上的線程;public boolean add(E e) { return offer(e);}
public void put(E e) { offer(e);}
public boolean offer(E e, long timeout, TimeUnit unit) { return offer(e);}
public boolean offer(E e) {
final ReentrantLock lock = this.lock;
lock.lock(); //加鎖 由於優先隊列線程不安全
try {
q.offer(e); //判斷優先級 進行入隊
if (q.peek() == e) { //-----[1]
//leader記錄了被阻塞在等待隊列頭生效的線程 新增一個元素到隊列頭,
//表示等待原來隊列頭生效的阻塞的線程已經失去了阻塞的意義
//,此時須要獲取新的隊列頭進行返回了
leader = null;
//獲取隊列頭的線程被喚起,主要有兩種場景:
//1. 以前隊列爲空,致使被阻塞的線程
//2. 以前隊列非空,可是隊列頭沒有生效(到期)致使被阻塞的線程
available.signal();
}
return true; //由於是無界隊列 因此添加元素確定成功 直到OOM
} finally {
lock.unlock(); //釋放鎖
}
}
複製代碼
offer()
方法,首先獲取獨佔鎖,而後添加元素到優先級隊列,因爲q是優先級隊列,因此添加元素後,peek並不必定是當前添加的元素,若是[1]爲true,說明當前元素e的優先級最小也就即將過時的,這時候激活avaliable變量條件隊列裏面的線程,通知他們隊列裏面有元素了。
請看我詳細的註釋,毫不是走馬觀花
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock; //獲取鎖
lock.lockInterruptibly(); //可中斷鎖 併發類裏面凡是調用了await的方法獲取鎖時候都是使用的lockInterruptibly方法而不是Lock.
//也是一種fail-fast思想吧,await()方法會在中斷標誌設置後拋出InterruptedException異常後退出 不至於死死的等待
try {
for (;;) {//會寫死循環的都是高手
E first = q.peek();//get隊頭元素
if (first == null)
// 隊列頭爲空,則阻塞,直到新增一個入隊爲止(1)
available.await();
else {
long delay = first.getDelay(NANOSECONDS);//獲取剩餘時間
if (delay <= 0)
// 若隊列頭元素已生效,則直接返回(2)
return q.poll();
first = null; // don't retain ref while waiting 等待的時候不能引用,表示釋放當前引用的(3)
if (leader != null)
// leader 非空時,表示有其餘的一個線程在出隊阻塞中 (4.1)
// 此時掛住當前線程,等待另外一個線程出隊完成
available.await();
else {
//標識當前線程處於等待隊列頭生效的阻塞中 (4.2.1)
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
// 等待隊列頭元素生效(4.2.2)
available.awaitNanos(delay);
} finally {
//最終釋放當前的線程 設置leader爲null (4.2.3)
if (leader == thisThread)
leader = null;
}
}
}
} //(5)
} finally {
if (leader == null && q.peek() != null)
// 當前線程出隊完成,通知其餘出隊阻塞的線程繼續執行(6)
available.signal();
lock.unlock();//解鎖結束
}
}
複製代碼
那麼,下面的結論肉眼可見:
leader = thisThread
)leader=null
)4) Leader/Follower模式
5)Delayed
public interface Delayed extends Comparable<Delayed> {
long getDelay(TimeUnit unit);
}
複製代碼
據情報顯示:Delayed
是一個繼承自Delayed
的接口,而且定義了一個Delayed
方法,用於表示還有多少時間到期,到期了應返回小於等於0的數值。
很簡答就是定義了一個,一個哈,一個表延遲的接口,就是個規範接口,目的就是騙咱們去實現它的方法.哼~
說了那麼多廢話,讓我想起了那句名言:一切沒有代碼實操的講解都是耍流氓
至今深深的烙在我心中,因此我必定要實戰給大家看,顯得我不是流氓...
實戰以 訂單下單後三十分鐘內未支付則自動取消 爲業務場景
該場景的代碼邏輯分析以下:
那麼咱們寫代碼必定要通用,先來寫個通用的Delayed
通用...嗯! 泛型的
import lombok.Getter;
import lombok.Setter;
import java.util.Date;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
/** * @author LiJing * @ClassName: ItemDelayed * @Description: 數據延遲實現實例 用以包裝具體的實例轉型 * @date 2019/9/16 15:53 */
@Setter
@Getter
public class ItemDelayed<T> implements Delayed {
/**默認延遲30分鐘*/
private final static long DELAY = 30 * 60 * 1000L;
/**數據id*/
private Long dataId;
/**開始時間*/
private long startTime;
/**到期時間*/
private long expire;
/**建立時間*/
private Date now;
/**泛型data*/
private T data;
public ItemDelayed(Long dataId, long startTime, long secondsDelay) {
super();
this.dataId = dataId;
this.startTime = startTime;
this.expire = startTime + (secondsDelay * 1000);
this.now = new Date();
}
public ItemDelayed(Long dataId, long startTime) {
super();
this.dataId = dataId;
this.startTime = startTime;
this.expire = startTime + DELAY;
this.now = new Date();
}
@Override
public int compareTo(Delayed o) {
return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));
}
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(this.expire - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
}
複製代碼
public interface DelayOrder<T> {
/** * 添加延遲對象到延時隊列 * * @param itemDelayed 延遲對象 * @return boolean */
boolean addToOrderDelayQueue(ItemDelayed<T> itemDelayed);
/** * 根據對象添加到指定延時隊列 * * @param data 數據對象 * @return boolean */
boolean addToDelayQueue(T data);
/** * 移除指定的延遲對象從延時隊列中 * * @param data */
void removeToOrderDelayQueue(T data);
}
複製代碼
@Slf4j
@Lazy(false)
@Component
public class DelayOwnOrderImpl implements DelayOrder<Order> {
@Autowired
private OrderService orderService;
@Autowired
private ExecutorService delayOrderExecutor;
private final static DelayQueue<ItemDelayed<Order>> DELAY_QUEUE = new DelayQueue<>();
/** * 初始化時加載數據庫中需處理超時的訂單 * 系統啓動:掃描數據庫中未支付(要在更新時:加上已支付就不用更新了),未過時的的訂單 */
@PostConstruct
public void init() {
log.info("系統啓動:掃描數據庫中未支付,未過時的的訂單");
List<Order> orderList = orderService.selectFutureOverTimeOrder();
for (Order order : orderList) {
ItemDelayed<Order> orderDelayed = new ItemDelayed<>(order.getId(), order.getCreateDate().getTime());
this.addToOrderDelayQueue(orderDelayed);
}
log.info("系統啓動:掃描數據庫中未支付的訂單,總共掃描了" + orderList.size() + "個訂單,推入檢查隊列,準備到期檢查...");
/*啓動一個線程,去取延遲訂單*/
delayOrderExecutor.execute(() -> {
log.info("啓動處理的訂單線程:" + Thread.currentThread().getName());
ItemDelayed<Order> orderDelayed;
while (true) {
try {
orderDelayed = DELAY_QUEUE.take();
//處理超時訂單
orderService.updateCloseOverTimeOrder(orderDelayed.getDataId());
} catch (Exception e) {
log.error("執行自營超時訂單的_延遲隊列_異常:" + e);
}
}
});
}
/** * 加入延遲消息隊列 **/
@Override
public boolean addToOrderDelayQueue(ItemDelayed<Order> orderDelayed) {
return DELAY_QUEUE.add(orderDelayed);
}
/** * 加入延遲消息隊列 **/
@Override
public boolean addToDelayQueue(Order order) {
ItemDelayed<Order> orderDelayed = new ItemDelayed<>(order.getId(), order.getCreateDate().getTime());
return DELAY_QUEUE.add(orderDelayed);
}
/** * 從延遲隊列中移除 主動取消就主動從隊列中取出 **/
@Override
public void removeToOrderDelayQueue(Order order) {
if (order == null) {
return;
}
for (Iterator<ItemDelayed<Order>> iterator = DELAY_QUEUE.iterator(); iterator.hasNext(); ) {
ItemDelayed<Order> queue = iterator.next();
if (queue.getDataId().equals(order.getId())) {
DELAY_QUEUE.remove(queue);
}
}
}
}
複製代碼
解釋一番上面的寫的東東
delayOrderExecutor
是注入的一個專門處理出隊的一個線程@PostConstruct
是啥呢,是在容器啓動後只進行一次初始化動做的一個註解,至關實用- 啓動後呢,咱們去數據庫掃描一遍,防止有漏網之魚,由於單機版嗎,隊列的數據是在內存中的,重啓後確定原先的數據會丟失,因此爲保證服務質量,咱們可能會錄音.....因此爲保證重啓後數據的恢復,咱們須要從新掃描數據庫把未支付的數據從新裝載到內存的隊列中
- 接下來就是用這個線程去一直不停的訪問隊列的
take()
方法,當隊列無數據就一直阻塞,或者數據沒到期繼續阻塞着,直到到期出隊,而後獲取訂單的信息,去處理訂單的更新操做
id
到延時實例中,這樣縮減隊列單個實例內存存儲那今日份的講解就到此結束,具體的代碼請移步個人gitHub的mybot項目Master分支查閱,fork體驗一把,或者評論區留言探討,寫的很差,請多多指教~~