使用netty HashedWheelTimer構建簡單延遲隊列

背景

最近項目中有個業務,須要對用戶新增任務到期後進行業務處理。使用定時任務定時掃描過時時間,浪費資源,且不實時。只能使用延時隊列處理。java

 

DelayQueue

第一想到的是java自帶的延時隊列delayqueue。算法

首先實現一個Delyed類。數據庫

實現兩個最重要方法。第一個是隊列裏面的消息排序。DelayQueue底層使用的是阻塞隊列。隊列的消費端會去take隊列的頭部元素,沒有元素就阻塞在那裏。所以,延遲隊列中的元素必須按執行時間順序排列。數據結構

@Override
    public int compareTo(Delayed delayed) {
        Message message = (Message) delayed;
        return this.exceptTime > message.getExceptTime() ? 1 : 0;
    }

第二個方法是剩餘時間延遲時間。每加入一個元素時將延遲時間傳入,獲得一個預期執行時間。每當執行此方法的時候,使用預期時間減去當前時間,即時剩餘延遲時間。換句話說,還有多長時間執行。爲0時當即執行。框架

@Override
    public long getDelay(TimeUnit unit) {
        System.out.println(exceptTime - System.nanoTime());
        return unit.convert(exceptTime - System.nanoTime(), TimeUnit.SECONDS);
    }

所有代碼:dom

import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;

public class Message implements Delayed{
    
    private Integer id;
    private String content;
    private long delay;//延遲時間
    private long exceptTime;//執行時間
    
    public Message() {}
    
    public Message(Integer id, String content, long delay) {
        this.id = id;
        this.content = content;
        this.delay = delay;
        this.exceptTime = System.nanoTime() + delay;
    }

    @Override
    public int compareTo(Delayed delayed) {
        Message message = (Message) delayed;
        return this.exceptTime > message.getExceptTime() ? 1 : 0;
    }
    
    @Override
    public long getDelay(TimeUnit unit) {
        System.out.println(exceptTime - System.nanoTime());
        return unit.convert(exceptTime - System.nanoTime(), TimeUnit.SECONDS);
    }


    public String getContent() {
        return content;
    }

    public void setContent(String content) {
        this.content = content;
    }

    public Integer getId() {
        return id;
    }

    public void setId(Integer id) {
        this.id = id;
    }

    public long getDelay() {
        return delay;
    }

    public void setDelay(long delay) {
        this.delay = delay;
    }

    public long getExceptTime() {
        return exceptTime;
    }

    public void setExceptTime(long exceptTime) {
        this.exceptTime = exceptTime;
    }


}
View Code

而後初始化一個DelayQueue,加入任務。並建立一個線程異步執行。異步

        DelayQueue<Message> delayqueue = new DelayQueue<>();
        Random random = new Random();
        for (int i = 0; i < 10; i++) {
            Message message = new Message(i, "content" + i, random.nextInt(1000000));
            delayqueue.add(message);
        }
        
 

        new Thread(new Runnable() {

            @Override
            public void run() {
                while (true) {
                    Message message;
                    try {
                        message = delayqueue.take();
                        System.out.println("message = " + message.getId());
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }

            }
        }).start();

    
View Code

 

缺陷分佈式

1.畢竟是jdk級別的,不可能作過多的封裝。不少API並非那麼好直接使用。好比直接傳入一個延遲時間是並不能自動實現的,須要手動封裝。ide

2.DelayQueue並無長度限制。有內存佔用的風險。post

3.效率,穩定性方面,在DelayQueue自己確定是沒有問題的,可是在項目中使用,勢必須要作一些封裝,直接上生產環境內心並無底。

 

HashedWheelTimer

netty畢竟是一個大名鼎鼎的框架,普遍使用於業界。它有許多心跳檢測等定時任務,使用延時隊列來實現。HashedWheelTimer底層數據結構依然是使用DelayedQueue。加上一種叫作時間輪的算法來實現。

關於時間輪算法,有點相似於HashMap。在new 一個HashedWheelTimer實例的時候,能夠傳入幾個參數。

第一,一個時間長度,這個時間長度跟具體任務什麼時候執行沒有關係,可是跟執行精度有關。這個時間能夠看做手錶的指針循環一圈的長度。

而後第二,刻度數。這個能夠看做手錶的刻度。好比第一個參數爲24小時,刻度數爲12,那麼每個刻度表示2小時。時間精度只能到兩小時。時間長度/刻度數值越大,精度越大。

 

而後添加一個任務的時候,根據hash算法獲得hash值並對刻度數求模獲得一個下標,這個下標就是刻度的位置。

然而有一些任務的執行週期超過了第一個參數,好比超過了24小時,就會獲得一個圈數round。

簡點說,添加一個任務時會根據任務獲得一個hash值,並根據時間輪長度和刻度獲得一個商值round和模index,好比時間長度24小時,刻度爲12,延遲時間爲32小時,那麼round=1,index=8。時間輪從開啓之時起每24/12個時間走一個指針,即index+1,第一圈round=0。當走到第7個指針時,此時index=7,此時剛纔的任務並不能執行,由於剛纔的任務round=1,必需要等到下一輪index=7的時候才能執行。

如圖所示

 

 

對於Delayed兩個重要實現方法,第一排序,實際上是經過hash求商和模決定放入哪一個位置。這些位置自己就已經按照時間順序排序了。第二,延遲時間,已經被封裝好了,傳入一個延遲的時間就行了。

 

代碼實例:

獲得一個延遲隊列實例

HashedWheelTimer timer = new HashedWheelTimer(24, //時間輪一圈的長度
                TimeUnit.SECONDS,
                12);//時間輪的度刻

建立一個任務

TimerTask task = new TimerTask() {
            
            @Override
            public void run(Timeout timeout) throws Exception {
                System.out.println("任務執行");
            }
        };

將任務加入延遲隊列

timer.newTimeout(task, 1000, TimeUnit.SECONDS);

 

總結

以上兩種方案都沒有實現持久化和分佈式。持久化能夠藉助數據庫來達到。分佈式的話仍是使用消息中間件吧。RabbitMq據說已經能夠藉助某些參數實現。

相關文章
相關標籤/搜索