實戰|我仍是很建議你用DelayQueue搞定超時訂單的-(1)

1、用三根雞毛作引言

  • 真的! 不騙大家的喔~ 相信你們都遇到相似於:訂單30min後未支付自動取消的開發任務
  • 那麼今日份就來了解一下怎麼用延時隊列 DelayQueue搞定單機版的超時訂單

2、延時隊列使用場景

那麼何時須要用延時隊列呢?常見的延時任務場景 舉栗子:java

  1. 訂單在30分鐘以內未支付則自動取消。
  2. 重試機制實現,把調用失敗的接口放入一個固定延時的隊列,到期後再重試。
  3. 新建立的店鋪,若是在十天內都沒有上傳過商品,則自動發送消息提醒。
  4. 用戶發起退款,若是三天內沒有獲得處理則通知相關運營人員。
  5. 預約會議後,須要在預約的時間點前十分鐘通知各個與會人員參加會議。
  6. 關閉空閒鏈接,服務器中,有不少客戶端的鏈接,空閒一段時間以後須要關閉之。
  7. 清理過時數據業務。好比緩存中的對象,超過了空閒時間,須要從緩存中移出。
  8. 多考生考試,到期所有考生必須交卷,要求時間很是準確的場景。

3、解決辦法多如雞毛

  1. 按期輪詢(數據庫等)
  2. JDK DelayQueue
  3. JDK Timer
  4. ScheduledExecutorService 週期性線程池
  5. 時間輪(kafka)
  6. 時間輪(Netty的HashedWheelTimer)
  7. Redis有序集合(zset)
  8. zookeeper之curator
  9. RabbitMQ
  10. Quartz,xxljob等定時任務框架
  11. Koala(考拉)
  12. JCronTab(仿crontab的java調度器)
  13. SchedulerX(阿里)
  14. 有贊延遲隊列
  15. .....(雞毛)
  • 解決問題方法真是不勝枚舉,正所謂一呼百應,一千個讀者眼裏有一千個哈姆雷特

🌱 🌱 🌱 🌱 🌱 🌱 🌱 🌱 🌱 🌱git

  • 那咱們第一篇先來實戰JDK的DelayQueue,萬祖歸宗,萬法同源,學會了最基礎的Queue,就不愁其餘的了
  • 後續再寫幾篇使用Redis,Zk,MQ的一些機制,實戰分佈式狀況下的使用

4、先認親

延時隊列,首先,它是一種隊列,隊列意味着內部的元素是有序的,元素出隊入隊是有方向性的,元素從一端進入,從另外一端取出。github

其次,延時隊列,最重要的特性就體如今它的延時屬性上,跟普通的隊列不同的是,普通隊列中的元素老是等着但願被早點取出處理,而延時隊列中的元素則是但願被在指定時間獲得取出和處理,因此延時隊列中的元素是都是帶時間屬性的,一般來講是須要被處理的消息或者任務。數據庫

一言以蔽之曰 : 延時隊列就是用來存放須要在指定時間被處理的元素的隊列。編程

1) DelayQueue 是誰,上族譜 數組

看的出來到 DelayQueue這一代已經第五代傳人了,

要知道 DelayQueue自幼生在八戒家,長大就往外面拉,熊熊烈火它不怕,水是水來渣是渣。緩存

不過它真的是文韜武略,有一把ReentrantLock就是它的九齒釘耙,抗的死死の捍衛着本身的PriorityQueue.安全

有典故曰:服務器

public class DelayQueue<E extends Delayed> extends AbstractQueue<E> implements BlockingQueue<E> {
// 用於控制併發的 可重入 全局 鎖
private final transient ReentrantLock lock = new ReentrantLock();
// 根據Delay時間排序的 無界的 優先級隊列
private final PriorityQueue<E> q = new PriorityQueue<E>();
// 用於優化阻塞通知的線程元素leader,標記當前是否有線程在排隊(僅用於取元素時)
private Thread leader = null;
// 條件,用於阻塞和通知的Condition對象,表示如今是否有可取的元素
private final Condition available = lock.newCondition();

       /** * 省洛方法代碼..... 大家懂個人省洛嗎? */
複製代碼
  • 註釋的已經很清楚他們的意思了,也具有了併發編程之藝術的 鎖,隊列,狀態(條件)
  • 他的幾個方法也是經過 鎖-->維護隊列-->出隊,入隊-->根據Condition進行條件的判斷-->進行線程之間的通訊和喚起
  • 以支持優先級無界隊列的PriorityQueue做爲一個容器,容器裏面的元素都應該實現Delayed接口,在每次往優先級隊列中添加元素時以元素的過時時間做爲排序條件,最早過時的元素放在優先級最高。
  • DelayQueue是一個沒有大小限制的隊列,所以往隊列中插入數據的操做(生產者)永遠不會被阻塞,而只有獲取數據的操做(消費者)纔會被阻塞。

2) 優先級隊列 PriorityQueue多線程

由於咱們的DelayQueue裏面維護了一個優先級的隊列PriorityQueue 簡單的看下:

//默認容量11
     private static final int DEFAULT_INITIAL_CAPACITY = 11;
    //存儲元素的地方 數組
    transient Object[] queue; // non-private to simplify nested class access
    //元素個數
    private int size = 0;
    //比較器
    private final Comparator<? super E> comparator;
複製代碼
  1. 默認容量是11;
  2. queue,元素存儲在數組中,這跟咱們以前說的堆通常使用數組來存儲是一致的;
  3. comparator,比較器,在優先級隊列中,也有兩種方式比較元素,一種是元素的天然順序,一種是經過比較器來比較;
  4. modCount,修改次數,有這個屬性表示PriorityQueue也是fast-fail的;
  5. PriorityQueue不是有序的,只有堆頂存儲着最小的元素;
  6. PriorityQueue 是非線程安全的;

3) DelayQueue的方法簡介

  • 入隊方法 : 若添加的元素是隊首(堆頂)元素,就把leader置爲空,並喚醒等待在條件available上的線程;
public boolean add(E e) {    return offer(e);}
public void put(E e) {    offer(e);}
public boolean offer(E e, long timeout, TimeUnit unit) {    return offer(e);}
public boolean offer(E e) {    
    final ReentrantLock lock = this.lock;    
    lock.lock();   //加鎖 由於優先隊列線程不安全
    try {
        q.offer(e);  //判斷優先級 進行入隊 
    if (q.peek() == e) {    //-----[1]
        //leader記錄了被阻塞在等待隊列頭生效的線程 新增一個元素到隊列頭,
        //表示等待原來隊列頭生效的阻塞的線程已經失去了阻塞的意義
        //,此時須要獲取新的隊列頭進行返回了
        leader = null;      
        //獲取隊列頭的線程被喚起,主要有兩種場景:
        //1. 以前隊列爲空,致使被阻塞的線程
        //2. 以前隊列非空,可是隊列頭沒有生效(到期)致使被阻塞的線程
        available.signal();     
    }        
        return true; //由於是無界隊列 因此添加元素確定成功 直到OOM
    } finally {    
        lock.unlock();   //釋放鎖
    }
}
複製代碼

offer()方法,首先獲取獨佔鎖,而後添加元素到優先級隊列,因爲q是優先級隊列,因此添加元素後,peek並不必定是當前添加的元素,若是[1]爲true,說明當前元素e的優先級最小也就即將過時的,這時候激活avaliable變量條件隊列裏面的線程,通知他們隊列裏面有元素了。

  • 出隊方法 take()

請看我詳細的註釋,毫不是走馬觀花

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock; //獲取鎖 
    lock.lockInterruptibly(); //可中斷鎖 併發類裏面凡是調用了await的方法獲取鎖時候都是使用的lockInterruptibly方法而不是Lock. 
    //也是一種fail-fast思想吧,await()方法會在中斷標誌設置後拋出InterruptedException異常後退出 不至於死死的等待
    try {
        for (;;) {//會寫死循環的都是高手
            E first = q.peek();//get隊頭元素
            if (first == null)
                // 隊列頭爲空,則阻塞,直到新增一個入隊爲止(1)
                available.await();
            else {
                long delay = first.getDelay(NANOSECONDS);//獲取剩餘時間
                if (delay <= 0)
                    // 若隊列頭元素已生效,則直接返回(2)
                    return q.poll();
                first = null; // don't retain ref while waiting 等待的時候不能引用,表示釋放當前引用的(3)
                if (leader != null)
                    // leader 非空時,表示有其餘的一個線程在出隊阻塞中 (4.1)
                    // 此時掛住當前線程,等待另外一個線程出隊完成
                    available.await();
                else {
                    //標識當前線程處於等待隊列頭生效的阻塞中 (4.2.1)
                    Thread thisThread = Thread.currentThread(); 
                    leader = thisThread;
                    try {
                        // 等待隊列頭元素生效(4.2.2)
                        available.awaitNanos(delay);
                    } finally {
                        //最終釋放當前的線程 設置leader爲null (4.2.3)
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }     //(5)
    } finally {
        if (leader == null && q.peek() != null)
            // 當前線程出隊完成,通知其餘出隊阻塞的線程繼續執行(6)
            available.signal();
            lock.unlock();//解鎖結束
    }
}
複製代碼

那麼,下面的結論肉眼可見:

  1. 若是隊列爲空,則阻塞,直到有個線程(生產者投遞數據)完成入隊操做
  2. 獲取隊列頭,若隊列頭已生效,則直接返回
  3. 未生效則釋放當前引用
  4. 當隊列頭部沒有生效時候:
    1. 如有另外一個線程已經處於等待隊列頭生效的阻塞過程當中,則阻塞當前線程,直到另外一個線程完成出隊操做
    2. 若沒有其餘線程阻塞在出隊過程當中,即當前線程爲第一個獲取隊列頭的線程
      • 標識當前線程處於等待隊列頭生效的阻塞中(leader = thisThread
      • 阻塞當前線程,等待隊列頭生效
      • 隊列頭生效以後,清空標識(leader=null)
  5. 再次進入循環,獲取隊列頭並返回
  6. 最後,當前線程出隊完成,通知其餘出隊阻塞的線程繼續執行

4) Leader/Follower模式

  1. 若是不是隊首節點,根本不須要喚醒操做!
  2. 假設取值時,延時時間尚未到,那麼須要等待,但這個時候,隊列中新加入了一個延時更短的,並放在了隊首,那麼 此時,for循環由開始了,取得是新加入的元素,那以前的等待就白等了,明顯能夠早點退出等待!
  3. 還有就是若是好多線程都在此等待,若是時間到了,同時好多線程會充等待隊列進入鎖池中,去競爭鎖資源,但結果只能是一個成功, 多了寫無畏的競爭!(屢次的等待和喚醒)

5)Delayed

public interface Delayed extends Comparable<Delayed> { 
    long getDelay(TimeUnit unit);
}
複製代碼

據情報顯示:Delayed是一個繼承自Delayed的接口,而且定義了一個Delayed方法,用於表示還有多少時間到期,到期了應返回小於等於0的數值。

很簡答就是定義了一個,一個哈,一個表延遲的接口,就是個規範接口,目的就是騙咱們去實現它的方法.哼~

5、再實戰

說了那麼多廢話,讓我想起了那句名言:一切沒有代碼實操的講解都是耍流氓 至今深深的烙在我心中,因此我必定要實戰給大家看,顯得我不是流氓...

  • 實戰以 訂單下單後三十分鐘內未支付則自動取消 爲業務場景

  • 該場景的代碼邏輯分析以下:

    1. 下單後將訂單直接放入未支付的延時隊列中
    2. 若是超時未支付,則從隊列中取出,進行修改成取消狀態的訂單
    3. 若是支付了,則不去進行取消,或者取消的時候作個狀態篩選,便可避免更新
    4. 或者支付完成後,作個主動出隊
    5. 還有就是用戶主動取消訂單,也作個主動出隊
  • 那麼咱們寫代碼必定要通用,先來寫個通用的Delayed 通用...嗯! 泛型的

import lombok.Getter;
import lombok.Setter;

import java.util.Date;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;

/** * @author LiJing * @ClassName: ItemDelayed * @Description: 數據延遲實現實例 用以包裝具體的實例轉型 * @date 2019/9/16 15:53 */

@Setter
@Getter
public class ItemDelayed<T> implements Delayed {

    /**默認延遲30分鐘*/
    private final static long DELAY = 30 * 60 * 1000L;
    /**數據id*/
    private Long dataId;
    /**開始時間*/
    private long startTime;
    /**到期時間*/
    private long expire;
    /**建立時間*/
    private Date now;
    /**泛型data*/
    private T data;
    
    public ItemDelayed(Long dataId, long startTime, long secondsDelay) {
        super();
        this.dataId = dataId;
        this.startTime = startTime;
        this.expire = startTime + (secondsDelay * 1000);
        this.now = new Date();
    }

    public ItemDelayed(Long dataId, long startTime) {
        super();
        this.dataId = dataId;
        this.startTime = startTime;
        this.expire = startTime + DELAY;
        this.now = new Date();
    }

    @Override
    public int compareTo(Delayed o) {
        return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));
    }

    @Override
    public long getDelay(TimeUnit unit) {
        return unit.convert(this.expire - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
    }
}
複製代碼
  • 再寫個通用的接口,用於規範和方便統一實現 這樣任何類型的訂單均可以實現這個接口 進行延時任務的處理
public interface DelayOrder<T> {


    /** * 添加延遲對象到延時隊列 * * @param itemDelayed 延遲對象 * @return boolean */
    boolean addToOrderDelayQueue(ItemDelayed<T> itemDelayed);

    /** * 根據對象添加到指定延時隊列 * * @param data 數據對象 * @return boolean */
    boolean addToDelayQueue(T data);

    /** * 移除指定的延遲對象從延時隊列中 * * @param data */
    void removeToOrderDelayQueue(T data);
}
複製代碼
  • 來具體的任務,具體的邏輯具體實現
@Slf4j
@Lazy(false)
@Component
public class DelayOwnOrderImpl implements DelayOrder<Order> {

    @Autowired
    private OrderService orderService;

    @Autowired
    private ExecutorService delayOrderExecutor;

    private final static DelayQueue<ItemDelayed<Order>> DELAY_QUEUE = new DelayQueue<>();

    /** * 初始化時加載數據庫中需處理超時的訂單 * 系統啓動:掃描數據庫中未支付(要在更新時:加上已支付就不用更新了),未過時的的訂單 */
    @PostConstruct
    public void init() {
        log.info("系統啓動:掃描數據庫中未支付,未過時的的訂單");
        List<Order> orderList = orderService.selectFutureOverTimeOrder();
        for (Order order : orderList) {
            ItemDelayed<Order> orderDelayed = new ItemDelayed<>(order.getId(), order.getCreateDate().getTime());
            this.addToOrderDelayQueue(orderDelayed);
        }
        log.info("系統啓動:掃描數據庫中未支付的訂單,總共掃描了" + orderList.size() + "個訂單,推入檢查隊列,準備到期檢查...");

        /*啓動一個線程,去取延遲訂單*/
        delayOrderExecutor.execute(() -> {
            log.info("啓動處理的訂單線程:" + Thread.currentThread().getName());
            ItemDelayed<Order> orderDelayed;
            while (true) {
                try {
                    orderDelayed = DELAY_QUEUE.take();
                    //處理超時訂單
                    orderService.updateCloseOverTimeOrder(orderDelayed.getDataId());
                } catch (Exception e) {
                    log.error("執行自營超時訂單的_延遲隊列_異常:" + e);
                }
            }
        });
    }

    /** * 加入延遲消息隊列 **/
    @Override
    public boolean addToOrderDelayQueue(ItemDelayed<Order> orderDelayed) {
        return DELAY_QUEUE.add(orderDelayed);
    }

    /** * 加入延遲消息隊列 **/
    @Override
    public boolean addToDelayQueue(Order order) {
        ItemDelayed<Order> orderDelayed = new ItemDelayed<>(order.getId(), order.getCreateDate().getTime());
        return DELAY_QUEUE.add(orderDelayed);
    }

    /** * 從延遲隊列中移除 主動取消就主動從隊列中取出 **/
    @Override
    public void removeToOrderDelayQueue(Order order) {
        if (order == null) {
            return;
        }
        for (Iterator<ItemDelayed<Order>> iterator = DELAY_QUEUE.iterator(); iterator.hasNext(); ) {
            ItemDelayed<Order> queue = iterator.next();
            if (queue.getDataId().equals(order.getId())) {
                DELAY_QUEUE.remove(queue);
            }
        }
    }
}
複製代碼

解釋一番上面的寫的東東

  1. delayOrderExecutor是注入的一個專門處理出隊的一個線程
  2. @PostConstruct是啥呢,是在容器啓動後只進行一次初始化動做的一個註解,至關實用
  3. 啓動後呢,咱們去數據庫掃描一遍,防止有漏網之魚,由於單機版嗎,隊列的數據是在內存中的,重啓後確定原先的數據會丟失,因此爲保證服務質量,咱們可能會錄音.....因此爲保證重啓後數據的恢復,咱們須要從新掃描數據庫把未支付的數據從新裝載到內存的隊列中
  4. 接下來就是用這個線程去一直不停的訪問隊列的take()方法,當隊列無數據就一直阻塞,或者數據沒到期繼續阻塞着,直到到期出隊,而後獲取訂單的信息,去處理訂單的更新操做

6、後總結

  • 這就是單機的很差處,也是一個痛點,因此確定是不太適合訂單量特別大的場景 你們也要酌情考慮和運用
  • 相對於同等量級的數據庫輪詢操做來講,真是節省了很多數據庫的壓力和鏈接,仍是值得一用的,咱們能夠只保存訂單的id到延時實例中,這樣縮減隊列單個實例內存存儲
  • 那還有技巧就是更新的時候注意控制好冪等性,控制好冪等性,會讓你輕鬆不少,順暢不少,可是數據量大了,要蛀牙的哦

那今日份的講解就到此結束,具體的代碼請移步個人gitHub的mybot項目Master分支查閱,fork體驗一把,或者評論區留言探討,寫的很差,請多多指教~~

相關文章
相關標籤/搜索