最近項目中有個業務,須要對用戶新增任務到期後進行業務處理。使用定時任務定時掃描過時時間,浪費資源,且不實時。只能使用延時隊列處理。java
第一想到的是java自帶的延時隊列delayqueue。算法
首先實現一個Delyed類。數據庫
實現兩個最重要方法。第一個是隊列裏面的消息排序。DelayQueue底層使用的是阻塞隊列。隊列的消費端會去take隊列的頭部元素,沒有元素就阻塞在那裏。所以,延遲隊列中的元素必須按執行時間順序排列。數據結構
@Override public int compareTo(Delayed delayed) { Message message = (Message) delayed; return this.exceptTime > message.getExceptTime() ? 1 : 0; }
第二個方法是剩餘時間延遲時間。每加入一個元素時將延遲時間傳入,獲得一個預期執行時間。每當執行此方法的時候,使用預期時間減去當前時間,即時剩餘延遲時間。換句話說,還有多長時間執行。爲0時當即執行。框架
@Override public long getDelay(TimeUnit unit) { System.out.println(exceptTime - System.nanoTime()); return unit.convert(exceptTime - System.nanoTime(), TimeUnit.SECONDS); }
所有代碼:dom
import java.util.concurrent.Delayed; import java.util.concurrent.TimeUnit; public class Message implements Delayed{ private Integer id; private String content; private long delay;//延遲時間 private long exceptTime;//執行時間 public Message() {} public Message(Integer id, String content, long delay) { this.id = id; this.content = content; this.delay = delay; this.exceptTime = System.nanoTime() + delay; } @Override public int compareTo(Delayed delayed) { Message message = (Message) delayed; return this.exceptTime > message.getExceptTime() ? 1 : 0; } @Override public long getDelay(TimeUnit unit) { System.out.println(exceptTime - System.nanoTime()); return unit.convert(exceptTime - System.nanoTime(), TimeUnit.SECONDS); } public String getContent() { return content; } public void setContent(String content) { this.content = content; } public Integer getId() { return id; } public void setId(Integer id) { this.id = id; } public long getDelay() { return delay; } public void setDelay(long delay) { this.delay = delay; } public long getExceptTime() { return exceptTime; } public void setExceptTime(long exceptTime) { this.exceptTime = exceptTime; } }
而後初始化一個DelayQueue,加入任務。並建立一個線程異步執行。異步
DelayQueue<Message> delayqueue = new DelayQueue<>(); Random random = new Random(); for (int i = 0; i < 10; i++) { Message message = new Message(i, "content" + i, random.nextInt(1000000)); delayqueue.add(message); } new Thread(new Runnable() { @Override public void run() { while (true) { Message message; try { message = delayqueue.take(); System.out.println("message = " + message.getId()); } catch (InterruptedException e) { e.printStackTrace(); } } } }).start();
缺陷分佈式
1.畢竟是jdk級別的,不可能作過多的封裝。不少API並非那麼好直接使用。好比直接傳入一個延遲時間是並不能自動實現的,須要手動封裝。ide
2.DelayQueue並無長度限制。有內存佔用的風險。post
3.效率,穩定性方面,在DelayQueue自己確定是沒有問題的,可是在項目中使用,勢必須要作一些封裝,直接上生產環境內心並無底。
netty畢竟是一個大名鼎鼎的框架,普遍使用於業界。它有許多心跳檢測等定時任務,使用延時隊列來實現。HashedWheelTimer底層數據結構依然是使用DelayedQueue。加上一種叫作時間輪的算法來實現。
關於時間輪算法,有點相似於HashMap。在new 一個HashedWheelTimer實例的時候,能夠傳入幾個參數。
第一,一個時間長度,這個時間長度跟具體任務什麼時候執行沒有關係,可是跟執行精度有關。這個時間能夠看做手錶的指針循環一圈的長度。
而後第二,刻度數。這個能夠看做手錶的刻度。好比第一個參數爲24小時,刻度數爲12,那麼每個刻度表示2小時。時間精度只能到兩小時。時間長度/刻度數值越大,精度越大。
而後添加一個任務的時候,根據hash算法獲得hash值並對刻度數求模獲得一個下標,這個下標就是刻度的位置。
然而有一些任務的執行週期超過了第一個參數,好比超過了24小時,就會獲得一個圈數round。
簡點說,添加一個任務時會根據任務獲得一個hash值,並根據時間輪長度和刻度獲得一個商值round和模index,好比時間長度24小時,刻度爲12,延遲時間爲32小時,那麼round=1,index=8。時間輪從開啓之時起每24/12個時間走一個指針,即index+1,第一圈round=0。當走到第7個指針時,此時index=7,此時剛纔的任務並不能執行,由於剛纔的任務round=1,必需要等到下一輪index=7的時候才能執行。
如圖所示
對於Delayed兩個重要實現方法,第一排序,實際上是經過hash求商和模決定放入哪一個位置。這些位置自己就已經按照時間順序排序了。第二,延遲時間,已經被封裝好了,傳入一個延遲的時間就行了。
代碼實例:
獲得一個延遲隊列實例
HashedWheelTimer timer = new HashedWheelTimer(24, //時間輪一圈的長度 TimeUnit.SECONDS, 12);//時間輪的度刻
建立一個任務
TimerTask task = new TimerTask() { @Override public void run(Timeout timeout) throws Exception { System.out.println("任務執行"); } };
將任務加入延遲隊列
timer.newTimeout(task, 1000, TimeUnit.SECONDS);
以上兩種方案都沒有實現持久化和分佈式。持久化能夠藉助數據庫來達到。分佈式的話仍是使用消息中間件吧。RabbitMq據說已經能夠藉助某些參數實現。