Java延時隊列DelayQueue的使用

問題背景

最近的某個業務中,遇到一個問題,一個用戶動做,會產生A和B兩個行爲,分別經過對應的esb消息總線發出。java

咱們的業務線對AB兩條esb消息隊列進行監聽(兩個進程),作數據的同步更新操做。redis

 

在正常過程當中,A行爲比B行爲先產生,而且A行爲優先級高於B行爲,數據最終會根據A行爲作更新。sql

可是在實際應用中,出現了併發問題,數據最終根據B行爲作了更新,覆蓋了A行爲。緩存

 

最開始經過redis緩存進行上鎖,在收到A消息時,在redis中添加一個key,處理完畢後刪除key 。處理過程當中收到B消息,直接返回。併發

但測試的時候發現並不可用,可能先收到B消息,後收到A消息, 可是先更新A數據,再更新B數據,仍是進行了覆蓋。app

還有一種方法是修改底層代碼,經過自定義sql的方法,先比較再update 。ide

 

問題分析

除此以外,還在考慮是否還有別的辦法,問題的產生緣由就是A和B的消息隊列基本都在同一時間點拿到數據,對程序來講形成了併發操做。測試

若是咱們能夠把B的消息隊列的都延遲一個時間點,保證兩個消息隊列不在同一時間點得到數據,基本上就能夠解決這個問題。ui

 

因而就上網開始搜索,查到了延遲隊列DelayQueue。this

雖然咱們不能讓公司的消息隊列延遲發送,可是咱們能夠延遲處理。當收到消息時先不處理,放入延遲消息隊列中,另一個線程再從延遲隊列中得到數據進行處理。

 

類介紹

public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
    implements BlockingQueue<E>

DelayQueue 是 Delayed 元素的一個無界阻塞隊列,只有在延遲期滿時才能從中提取元素。該隊列的頭部 是延遲期滿後保存時間最長的 Delayed 元素。若是延遲都尚未期滿,則隊列沒有頭部,而且 poll 將返回 null。當一個元素的 getDelay(TimeUnit.NANOSECONDS) 方法返回一個小於等於 0 的值時,將發生到期。即便沒法使用 take 或 poll 移除未到期的元素,也不會將這些元素做爲正常元素對待。例如,size 方法同時返回到期和未到期元素的計數。此隊列不容許使用 null 元素。

 

放入DelayQueue的對象須要實現Delayed接口。

 

public interface Delayed extends Comparable<Delayed> {

    /**
     * Returns the remaining delay associated with this object, in the
     * given time unit.
     *
     * @param unit the time unit
     * @return the remaining delay; zero or negative values indicate
     * that the delay has already elapsed
     */
    long getDelay(TimeUnit unit);
}

 

測試demo

import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;

/**
 * @author lujianing01@58.com
 * @Description:
 * @date 2016/6/21
 */
public class DelayQueueTest {

    public static void main(String[] args) {
        DelayQueue<DelayedElement> delayQueue = new DelayQueue<DelayedElement>();

        //生產者
        producer(delayQueue);

        //消費者
        consumer(delayQueue);

        while (true){
            try {
                TimeUnit.HOURS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * 每100毫秒建立一個對象,放入延遲隊列,延遲時間1毫秒
     * @param delayQueue
     */
    private static void producer(final DelayQueue<DelayedElement> delayQueue){
        new Thread(new Runnable() {
            @Override
            public void run() {
                while (true){
                    try {
                        TimeUnit.MILLISECONDS.sleep(100);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }

                    DelayedElement element = new DelayedElement(1000,"test");
                    delayQueue.offer(element);
                }
            }
        }).start();

        /**
         * 每秒打印延遲隊列中的對象個數
         */
        new Thread(new Runnable() {
            @Override
            public void run() {
                while (true){
                    try {
                        TimeUnit.MILLISECONDS.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("delayQueue size:"+delayQueue.size());
                }
            }
        }).start();
    }

    /**
     * 消費者,從延遲隊列中得到數據,進行處理
     * @param delayQueue
     */
    private static void consumer(final DelayQueue<DelayedElement> delayQueue){
        new Thread(new Runnable() {
            @Override
            public void run() {
                while (true){
                    DelayedElement element = null;
                    try {
                        element =  delayQueue.take();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println(System.currentTimeMillis()+"---"+element);
                }
            }
        }).start();
    }
}


class DelayedElement implements Delayed {

    private final long delay; //延遲時間
    private final long expire;  //到期時間
    private final String msg;   //數據
    private final long now; //建立時間

    public DelayedElement(long delay, String msg) {
        this.delay = delay;
        this.msg = msg;
        expire = System.currentTimeMillis() + delay;    //到期時間 = 當前時間+延遲時間
        now = System.currentTimeMillis();
    }

    /**
     * 須要實現的接口,得到延遲時間   用過時時間-當前時間
     * @param unit
     * @return
     */
    @Override
    public long getDelay(TimeUnit unit) {
        return unit.convert(this.expire - System.currentTimeMillis() , TimeUnit.MILLISECONDS);
    }

    /**
     * 用於延遲隊列內部比較排序   當前時間的延遲時間 - 比較對象的延遲時間
     * @param o
     * @return
     */
    @Override
    public int compareTo(Delayed o) {
        return (int) (this.getDelay(TimeUnit.MILLISECONDS) -o.getDelay(TimeUnit.MILLISECONDS));
    }

    @Override
    public String toString() {
        final StringBuilder sb = new StringBuilder("DelayedElement{");
        sb.append("delay=").append(delay);
        sb.append(", expire=").append(expire);
        sb.append(", msg='").append(msg).append('\'');
        sb.append(", now=").append(now);
        sb.append('}');
        return sb.toString();
    }
}

 

補充說明

1.參考網上一些的例子,有些   compareTo  方法就是錯的, 要麼形成隊列中數據積壓,要麼不能起到延遲的效果。因此必定要通過本身的用例測試確保沒有問題。

2.樓主的使用場景,須要考慮,若是進程關閉時,要先等本地延遲隊列中的數據被處理完後,再結束進程。

相關文章
相關標籤/搜索