在這個電子商務流行年代,24小時隨時隨地網購已經成爲咱們習覺得常的生活習慣。看到不錯的商品,咱們會當即下單,徹底不受時間、空間的限制,剁手,而後在家坐等收快遞,那種感受。。。。
javascript
細心的你是否發現,咱們好像不多去主動點擊‘’確認收貨」,畢竟拿到了貨,我總要先體驗幾天,誰知道它會不會壞,但時間一拖就忘了操做確認收貨。而擔保交易,若是買家不確認收貨,交易訂單沒法完結,那商家是收不到貨款的。有什麼解決辦法?css
上面示圖是淘寶APP的訂單詳情頁,左上方的自動確認時間起到了關鍵做用。也就是說,到了目標時間系統會自動觸發代替買家執行確認收貨。對上述的任務,咱們給一個專業的名字,那就是延遲任務。那麼這裏你可能會問,這個延遲任務和定時任務的區別究竟在哪裏呢?java
一、定時任務有固定的觸發時間(好比天天的凌晨2點執行),延遲任務的執行時間不固定,嚴格依賴於業務事件的觸發時間(好比:自動確認收貨是在賣家發貨那個時刻日後延15天)mysql
二、定時任務有執行週期,而延遲任務在某事件觸發後一段時間內執行,通常是一次性的,沒有執行週期git
三、定時任務通常執行的是批處理操做多個任務,而延遲任務通常是單個任務程序員
延遲任務的一些業務場景:github
一、當你下了一筆訂單後,一直沒有付款,通常超過30分鐘後,系統會自動關閉訂單並退還庫存redis
二、購買一件商品,若是你不喜歡會申請退款,當賣家超過3天未處理,系統會自動退款成功算法
三、生成訂單60秒後,自動給用戶發短信sql
延遲任務不只僅適用於電商業務,對於預先設定目標執行時間,當時間到了須要自動觸發執行的業務場景均可以參考該設計方案。下面咱們具體講一講延遲任務常見的技術實現,後面工做中你可能會用的上。。。
1、JDK 延遲隊列
經過JDK提供的DelayQueue 類來實現。DelayQueue 是一個無界阻塞隊列,支持延時獲取元素,隊列中的元素必須實現 Delayed 接口,並重寫 getDelay(TimeUnit) 和 compareTo(Delayed) 方法,代碼示例以下:
public class DelayQueueTest { public static void main(String[] args) { DelayQueue<DelayTask> dq = new DelayQueue<DelayTask>(); //生產者生產一個2秒的延時任務 new Thread(new ProducerDelay(dq, 2000)).start(); //開啓消費者輪詢 new Thread(new ConsumerDelay(dq)).start(); }
}
class ProducerDelay implements Runnable { DelayQueue<DelayTask> delayQueue; int delaySecond; SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
public ProducerDelay(DelayQueue<DelayTask> delayQueue, int delaySecond) { this.delayQueue = delayQueue; this.delaySecond = delaySecond; }
@Override public void run() {
for (int i = 1; i < 6; i++) { delayQueue.add(new DelayTask(delaySecond, i + "")); System.out.println(sdf.format(new Date()) + " Thread " + Thread.currentThread() + " 添加了一個延遲任務,id=" + i); try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } } }}
class ConsumerDelay implements Runnable { DelayQueue<DelayTask> delayQueue; SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
public ConsumerDelay(DelayQueue<DelayTask> delayQueue) { this.delayQueue = delayQueue; }
@Override public void run() { while (true) { DelayTask delayTask = null; try { delayTask = delayQueue.take(); } catch (Exception e) { e.printStackTrace(); } //若是Delay元素存在,則任務到達超時時間 if (delayTask != null) { //處理任務 System.out.println(sdf.format(new Date()) + " Thread " + Thread.currentThread() + " 消費了一個延遲任務,id=" + delayTask.getId()); } else { try { Thread.sleep(200); } catch (InterruptedException e) { } } } }}
@Data@AllArgsConstructorclass DelayTask implements Delayed { String id;
// 延遲截止時間(單位:毫秒) long delayTime = System.currentTimeMillis();
public DelayTask(long delayTime, String id) { this.delayTime = (this.delayTime + delayTime); this.id = id; }
@Override // 獲取剩餘時間 public long getDelay(TimeUnit unit) { return unit.convert(delayTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS); }
@Override // 隊列裏元素的排序依據 public int compareTo(Delayed o) { if (this.getDelay(TimeUnit.MILLISECONDS) > o.getDelay(TimeUnit.MILLISECONDS)) { return 1; } else if (this.getDelay(TimeUnit.MILLISECONDS) < o.getDelay(TimeUnit.MILLISECONDS)) { return -1; } else { return 0; } }
@Override public String toString() { return DateFormat.getDateTimeInstance().format(new Date(delayTime)); }}
點評:
一、因爲採用無界阻塞隊列,佔用本地內存,若是任務太多的話,很容易產生內存溢出(OOM)的風險;
二、另外該實現是單機版玩法,若是發生系統重啓等狀況會致使內存數據丟失,須要考慮將數據從新預熱到緩存的操做,有額外實現成本
固然有人提過使用基於調度的線程池ScheduledExecutorService來實現,裏面提三種維度的方法實現
一、schedule。單次延遲任務。
二、scheduleAtFixedRate。基於固定時間間隔進行循環延遲任務。若是上一次任務尚未結束,會等它結束後,才執行下一次任務,取間隔時間和任務執行時間的最大值。
三、scheduleWithFixedDelay。取決於每次任務執行的時間長短,是基於不固定時間間隔進行循環延遲任務,每次執行時間爲上一次任務結束起向後推一個時間間隔,即每次執行時間爲:initialDelay, initialDelay+executeTime+delay, initialDelay+2executeTime+2delay
點評:
一、上面提到的第2、三種,都是循環執行任務,區別在與執行的時間調度上有區別。不適合本文的業務場景
二、方法一,也就是schedule,屬於單次執行,且時間支持靈活計算,本文業務場景的時間=(目標執行時間-當前時間)
MyScheduledRunnable runnable = new MyScheduledRunnable();// 業務運行2秒runnable.setBizCostTime(2000L);
ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor();// 單次任務,延遲1秒,開始執行任務service.schedule(runnable, 1, TimeUnit.SECONDS);
2、數據庫輪詢掃描
按業務需求,咱們會建立一個超時記錄表,當業務執行時會插入一條記錄到mysql表,並指定目標執行時間。
而後會啓動一個定時任務,通常會採用 Quartz 框架來實現, 無限循環掃描該表記錄,若是發現目標執行時間小於當前時間,會提取記錄執行並修改狀態。爲何要先修改狀態呢?主要是考慮多線程併發問題,畢竟執行超時任務(如:自動確認收貨)也要花費時間,待超時任務執行結束後,再修改狀態標記爲「已完成」。
缺點:採用主動發現機制,執行時間嚴重依賴掃描頻率,若是定時任務配置的時間週期太長,那麼任務真正執行時間可能會有較大延遲。反之,若是掃描週期時間過短,掃描頻率過快,數據庫的壓力會比較大,還存在較大的系統資源浪費。
若是表的數據量過大,每次掃描任務負擔會很重,咱們會考慮採用分庫分表機制,每張物理表都有獨立的掃描線程,提升處理速度。另外,當任務已經執行完成,該記錄基本沒什麼業務價值,會有歸檔任務,對歷史數據按期清理。
優勢: 實現簡單、無技術難點、異常恢復、支持分佈式/集羣環境
3、Redis 有序集合實現延遲任務
Redis提供了豐富的數據存儲結構,其中Zset支持按score對value值排序,這裏的score能夠採用超時記錄的目標執行時間。也就是說集合列表中的記錄是按執行時間排好序,咱們只須要取小於當前時間的便可。
爲了不一次拉取的記錄過多,致使程序處理壓力過大,在調用 redisTemplate.opsForZSet().rangeByScoreWithScores(key, 0, max, 0, count); 通常咱們會限制拉取的條數,好比一次只拉取最小的50條,下降單次處理的RT時長。
public void test() throws InterruptedException {
//清理數據 cacheService.delKey(keyPrefix);
// 模擬插入10條超時記錄 for (int i = 1; i <= 10; i++) { long delayTime = Instant.now().plusSeconds(i + 4).getEpochSecond(); boolean result = cacheService.addData(keyPrefix, "v" + i, delayTime); if (result) { System.out.println("記錄:" + i + " 插入成功!"); } }
// 啓動延遲隊列掃描 while (true) { long nowtTime = Instant.now().getEpochSecond(); // 一次掃描出小於當前時間且按時間排序的最小兩條記錄 List<String> result = cacheService.scanData(keyPrefix, nowtTime, 3); if (result != null) { for (String record : result) { // 對ZREM的返回值進行判斷,只有大於0的時候,才消費數據 // 防止多個線程消費同一個資源的狀況 long affectRow = cacheService.removeData(keyPrefix, record); if (affectRow > 0) { // 模擬業務處理 System.out.println("處理超時記錄:" + record); } } } Thread.sleep(800); }}
優勢: 解耦、異常恢復、支持分佈式/集羣環境;
4、pulsa 消息實現延遲任務
當前公司使用 pulsa 消息中間件,咱們來看下如何藉助現成的消息框架來實現延遲任務。
當producer發出一個延遲消息,訂閱方並不會當即收到消息,消息存儲在BookKeeper中,DelayedDeliveryTracker將時間索引(time-> messageId)保存在內存中,一旦延遲時間到了,消息會被髮布到一臺broker,而後傳遞給訂閱者。
延遲消息傳遞僅在共享訂閱模式下有效。在「獨佔」和「故障轉移」訂閱模式下,延遲的消息會當即分派。
代碼示例:
producer.newMessage().deliverAfter(3L, TimeUnit.Minute).value("Hello Pulsar!").send();
更多仍是藉助於pulsa消息框架自己機制來實現功能,你會發現調用的API很是簡單。
5、ActiveMQ 消息實現延遲任務
ActiveMQ做爲一個開箱即用的中間件,提供了擴展配置屬性支持延遲消息。
示例1:延遲60秒發送消息
MessageProducer producer = session.createProducer(destination);TextMessage message = session.createTextMessage("test msg");long time = 60 * 1000;message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, time);producer.send(message);
示例2:開始延遲30秒發送,重複發送10次,每次之間間隔10秒
MessageProducer producer = session.createProducer(destination);TextMessage message = session.createTextMessage("test msg");long delay = 30 * 1000;long period = 10 * 1000;int repeat = 9;delay); period); repeat); producer.send(message);
示例3:使用Cron 表示式定時發送消息
MessageProducer producer = session.createProducer(destination);TextMessage message = session.createTextMessage("test msg");message.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_CRON, "0 * * * *");producer.send(message);
6、Netty 實現延遲任務
因爲netty動輒管理10w+的鏈接,每個鏈接都會有不少超時任務。好比發送超時、心跳檢測間隔等,若是每個定時任務都啓動一個Timer,不只低效,並且會消耗大量的資源。
時間輪是一種高效來利用線程資源來進行批量化調度的一種調度模型。把大批量的調度任務所有都綁定到同一個的調度器上面,使用這一個調度器來進行全部任務的管理(manager),觸發(trigger)以及運行(runnable)。可以高效的管理各類延時任務,週期任務,通知任務等等。
缺點,時間輪調度器的時間精度可能不是很高,對於精度要求特別高的調度任務可能不太適合。由於時間輪算法的精度取決於,時間段「指針」單元的最小粒度大小,好比時間輪的格子是一秒跳一次,那麼調度精度小於一秒的任務就沒法被時間輪所調度。並且時間輪算法沒有作宕機備份,所以沒法再宕機以後恢復任務從新調度。
// 初始化netty時間輪HashedWheelTimer timer = new HashedWheelTimer(1, // 時間間隔 TimeUnit.SECONDS, 10); // 時間輪中的槽數
TimerTask task1 = new TimerTask() { public void run(Timeout timeout) throws Exception { System.out.println("已通過了" + costTime() + " 秒,task1 開始執行"); }};
TimerTask task2 = new TimerTask() { public void run(Timeout timeout) throws Exception { System.out.println("已通過了" + costTime() + " 秒,task2 開始執行"); }};
TimerTask task3 = new TimerTask() { public void run(Timeout timeout) throws Exception { System.out.println("已通過了" + costTime() + " 秒,task3 開始執行"); }};
// 將任務添加到延遲隊列timer.newTimeout(task1, 0, TimeUnit.SECONDS);timer.newTimeout(task2, 3, TimeUnit.SECONDS);timer.newTimeout(task3, 15, TimeUnit.SECONDS);
寫在最後
附上本文示例代碼:
https://github.com/aalansehaiyang/project-example
往期推薦
本文分享自微信公衆號 - 後端技術漫談(Rude3Knife)。
若有侵權,請聯繫 support@oschina.cn 刪除。
本文參與「OSC源創計劃」,歡迎正在閱讀的你也加入,一塊兒分享。