原文發表於簡書DelayQueue之通用組件,本次將作部分優化調整。bash
在咱們產品中有這麼一個場景,在醫生關閉問診的3min後,患者將沒法繼續和醫生進行對話。我根據對業務的理解,和對技術實現成本的衡量,決定經過DelayQueue的方式來實現。app
關於DelayQueue的相關內容介紹和核心源碼解析已在上一篇DelayQueue系列(一):源碼分析說明了。異步
根據個人經驗,我認爲在生活中有以下場景能夠用獲得DelayQueue:
1.下單後一段時間(業內基本上都是30分鐘)內不付款,就自動取消訂單。
2.提交打車申請後,一段時間內(好比說30秒)沒有附近的司機接單,就自動將該請求發送給更多距離更遠的司機。ide
這類場景都有以下特色:
1.須要有一段時間的延遲,若是僅僅是爲了異步執行,那麼消息隊列顯然是是更優的選擇。
2.對執行時間的精確度有必定要求,固然異常情況下,也能夠對精確度適當放寬鬆。好比場景1的訂單取消,規則設置爲30分鐘不支付就取消,但實際場景中,精確到30分天然是最好結果,但假如出現故障,那麼在可容許的範圍內將訂單取消也是能夠接受的(好比說將取消時間在放寬到32分鐘)。
3.執行是高頻率的。這點須要和第2點結合起來看,若是僅僅是爲了低頻率的定時執行,我的認爲任務調度多是更優的選擇。函數
綜合來看,若是不須要延遲執行,那麼推薦用消息隊列;若是對執行時間的精確度不那麼在乎或執行頻率並不高,那麼推薦使用任務調度;若是須要延遲執行,且執行比較高頻,對執行時間的精確度有必定要求,能夠考慮使用延遲隊列。 以上這些是咱們爲什麼採用DelayQueue來實現這個業務場景的緣由。源碼分析
爲了方便使用DelayQueue,我封裝了組件對DelayQueue進行了擴展。
首先我定義了一個類TaskMessage,對Delayed進行了擴展,實現了compareTo和getDelay方法。 以下是TaskMessage類的核心代碼。post
public class TaskMessage implements Delayed {
private String body; //消息內容
private long executeTime;//執行時間
private Function function;//具體執行方式
public TaskMessage(Long delayTime,String body, Function function) {
this.body = body;
this.function = function;
this.executeTime = TimeUnit.NANOSECONDS.convert(delayTime, TimeUnit.MILLISECONDS) + System.nanoTime();
}
@Override
public int compareTo(Delayed delayed) {
TaskMessage msg = (TaskMessage) delayed;
return (int) (this.getDelay(TimeUnit.MILLISECONDS) -msg.getDelay(TimeUnit.MILLISECONDS));
}
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(this.executeTime - System.nanoTime(), TimeUnit.NANOSECONDS);
}
}
複製代碼
外部調用只須要TaskMessage m1 = new TaskMessage(delayTime, body, function)就能夠生成一個延遲任務的元素了,內部自動就根據延遲時間計算出這個延遲任務元素的預期執行時間。優化
Function是1.8版引入的函數式接口,主要方法是R apply(T t),功能是將Function對象應用到輸入的參數上,而後返回計算結果。 那麼達到延遲任務的預期執行時間時,只須要調用一下function.apply()方法就能夠了,不須要關心apply的具體實現。apply的具體實現方法是在調用時才明確的。ui
而後定義一個延遲任務的執行線程類TaskConsumer,實現了Runnable,重寫了run方法。由於延遲任務的執行,必然是須要從新起線程去執行的,不能阻礙主線程的操做。this
以下是TaskConsumer類的核心代碼。
public class TaskConsumer implements Runnable {
// 延時隊列
private DelayQueue<TaskMessage> queue;
//用於標記處理任務線程的id
private int threadId;
//信號量
private Boolean signal;
TaskConsumer(DelayQueue<TaskMessage> queue, int threadId) {
this.queue = queue;
this.threadId = threadId;
this.signal = Boolean.TRUE;
}
void finish() {
this.signal = Boolean.FALSE;
}
@Override
public void run() {
while (signal) {
try {
TaskMessage take = queue.take();
if (logger.isInfoEnabled()) {
logger.info("處理線程的id爲" + threadId + ",消費消息內容爲:" + take.getBody() + ",預計執行時間爲" +
DateFormatUtils.timeStampToString(take.getDelay(TimeUnit.MILLISECONDS) + System.currentTimeMillis(), "yyyy-MM-dd HH:mm:ss"));
}
take.getFunction().apply(take.getBody());
} catch (InterruptedException e) {
if (logger.isInfoEnabled()) {
logger.info("id爲" + threadId + "的處理線程被強制中斷");
}
} catch (Exception e) {
logger.error("taskConsumer error", e);
}
}
if (logger.isInfoEnabled()) {
logger.info("id爲" + threadId + "的處理線程已中止");
}
}
}
複製代碼
這個類核心代碼就只有以下兩行。
TaskMessage take = queue.take();
獲取延遲隊列的隊首元素。前文已經解釋過,Queue的take方法會返回隊列的隊首元素,不然就會掛起線程。因此只要有返回值,必然就能獲取到當前須要執行的TaskMessage元素。
take.getFunction().apply(take.getBody());
執行延遲任務元素的apply方法。applay方法是在定義TaskMessage的時候肯定的,代表了到達預期執行時間所須要進行的一系列操做,那麼此時只須要執行對應的apply方法就能夠了。
最後是加載TaskConsumer的統一管理類TaskManager。
以下是TaskManager類的核心代碼。
public class TaskManager implements InitializingBean,DisposableBean{
@Override
public void afterPropertiesSet() {
for (int i = 0; i < threadCount; i++) {
TaskConsumer taskConsumer = new TaskConsumer(queue, i);
taskConsumerList.add(taskConsumer);
Thread thread = new Thread(taskConsumer);
threadList.add(thread);
thread.start();
}
}
@Override
public void destroy() {
for (int i = 0; i < threadList.size(); i++) {
threadList.get(i).interrupt();
taskConsumerList.get(i).finish();
}
}
}
複製代碼
這個類的做用在於初始化類後,就啓動線程不斷的去獲取延遲任務。而後在銷燬類後,先中斷消費者線程,而後設置信號量使得消費者線程的run方法能跳出死循環,從而使得消費線程正常結束。
最後是如何調用的示例。很簡單,就只有兩步:
一、生成延遲任務元素taskMessage
二、將taskMessage添加到延遲隊列中
TaskMessage taskMessage = new TaskMessage(delayTime * 1000, messageBody,
function -> this.processTask(delayTaskMessage));
DelayQueue<TaskMessage> queue = taskManager.getQueue();
queue.offer(taskMessage);
複製代碼
ok,以上是如何擴展DelayQueue的功能構形成高可用的組件的方案,歡迎你們來一塊兒討論。
下一章我準備講一下咱們項目中運用DelayQueue的過程當中碰到的問題以及如何持久化的方案。