java延時隊列

應用場景

1)7天自動收貨java

  a、用戶支付完成之後,把訂單ID插入到內存的一個DelayQueue中,同時插入到Redis中。web

  b、7天以內,用戶點擊了確認收貨,則從DelayQueue中刪除,從Redis中刪除。redis

  c、超過7天,DelayQueue中的訂單ID出隊,查詢數據庫,改狀態爲自動收貨,刪除redis。spring

  d、若是7天以內,web服務器重啓過,則web服務器啓動之後,從redis中讀取待收貨的訂單,插入到DelayQueue。數據庫

2)30分鐘未付款自動取消訂單apache

1、寫一個JedisUtil,用來操做redis

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;

import javax.annotation.PostConstruct;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.aspectj.weaver.patterns.ThisOrTargetPointcut;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import com.aqh.util.MyProperties;
import com.sun.org.glassfish.external.statistics.Statistic;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;

/**
 * jedis緩存工具
 */
@Service("jedisUtil")
public class JedisUtil {
    private JedisPool pool;

    @Autowired
    private MyProperties properties;
    
    private static Log log = LogFactory.getLog(JedisUtil.class);
    
    private JedisUtil() {
    }
    
    @SuppressWarnings("unused")
    @PostConstruct  // 指定spring實例化對象以後調用的方法
    private void init() {
        JedisPoolConfig config = new JedisPoolConfig();
        config.setMaxActive(Integer.parseInt(properties.getJedisMaxActive()));
        config.setMaxIdle(Integer.parseInt(properties.getJedisMaxIdle()));
        config.setMaxWait(Long.parseLong(properties.getJedisMaxWait()));
        config.setTestOnBorrow(false);
        pool = new JedisPool(new JedisPoolConfig(), properties.getJedisHost(), 
                Integer.parseInt(properties.getJedisPort()), 
                Integer.parseInt(properties.getJedisTimeout()));
    }
    
    public void set(String key, String value) {
        Jedis jedis = this.getResource();
        try {
            jedis.set(key, value);
        } finally {
            this.returnResource(jedis);
        }
    }
    
    public String get(String key) {
        Jedis jedis = this.getResource();
        try {
            return jedis.get(key);
        } finally {
            this.returnResource(jedis);
        }
    }
    
    public void setObject(String key, Object obj) {
        Jedis jedis = this.getResource();
        try {
            jedis.set(key.getBytes(), serialize(obj));
        } finally {
            this.returnResource(jedis);
        }
    }
    public Object getObject(String key) {
        
        Jedis jedis = this.getResource();
        try {
            if(jedis.get(key.getBytes()) == null) {
                return null;
            } else {
                return unserialize(jedis.get(key.getBytes()));
            }
        } finally {
            this.returnResource(jedis);
        }
    }
    /**
     * 刪除key
     * @param key
     */
    public void delkey(String...keys) {
        Jedis jedis = this.getResource();
        try {
            jedis.del(keys);
        } finally {
            this.returnResource(jedis);
        }
    }
    
    /**
        * 設置hash的值
        * @param key   hash中的key
        * @param field hash中的域
        * @param obj   值
        */
        public void setHash(String key,String field,Object obj) {
            Jedis jedis = this.getResource();
            try {
                jedis.hset(key.getBytes(), field.getBytes(), serialize(obj));
            } finally {
                this.returnResource(jedis);
            }
        }
        /**
         * 查找redis中hash的value值
         * @param key  hash中的key
         * @param field hash中的域
         * @return 返回對象
         */
        public Object getHash(String key,String field) {
            Jedis jedis = this.getResource();
            try {
                if (jedis.hget(key, field) == null) {
                    return null;
                }
                return unserialize(jedis.hget(key.getBytes(), field.getBytes()));
            } finally {
                this.returnResource(jedis);
            }
        }
        
        /**
         * 刪除hash中的指定域
         * @param key
         * @param fields
         * @return
         */
        public Long removeHash(String key,String fields) {
            Jedis jedis = this.getResource();
            try {
                
                return jedis.hdel(key.getBytes(),fields.getBytes());
                
            } finally {
                this.returnResource(jedis);
            }
        }
    
        /**
         * 返回hash中的全部域
         * @param key
         */
        public Set<String> hKeys(String key) {
            Jedis jedis = this.getResource();
            try {
                Set<String> hkeys = jedis.hkeys(key);
                return hkeys;
            } finally {
                this.returnResource(jedis);
            }
        }
    /**
     * 序列化
     * @param object
     * @return
     */
    private static byte[] serialize(Object object) {
        ObjectOutputStream oos = null;
        ByteArrayOutputStream baos = null;
        try {
            // 序列化
            baos = new ByteArrayOutputStream();
            oos = new ObjectOutputStream(baos);
            oos.writeObject(object);
            byte[] bytes = baos.toByteArray();
            return bytes;
        } catch (Exception e) {
            e.printStackTrace();
            log.error("jedis序列化異常.....");
        }
        return null;
    }
     
    /**
     * 反序列化
     * 
     * @param bytes
     * @return
     */
    private static Object unserialize(byte[] bytes) {
        ByteArrayInputStream bais = null;
        try {
            // 反序列化
            bais = new ByteArrayInputStream(bytes);
            ObjectInputStream ois = new ObjectInputStream(bais);
            return ois.readObject();
        } catch (Exception e) {
            e.printStackTrace();
            log.info("jedis反序列化異常.....");
        }
        return null;
    }
    
    /**
     * 獲取jedis
     * @return
     */
    private Jedis getResource() {
        Jedis jedis = pool.getResource();
        jedis.auth(properties.getJedisPassword());
        return jedis;
    }

    /**
     * 設置生命週期(過時時間)
     * @param key
     * @param second
     */
    public void setExpireByKey(String key, int seconds) {
        Jedis jedis = null;
        try {
            jedis = this.getResource();
            jedis.expire(key, seconds);
        } catch (Exception e) {
            log.error(e);
        } finally {
            this.returnResource(jedis);
        }
    }
    
    /**
     * 獲取某個Key的餘下存活時間(秒)。
     * @param key
     * @return 存活時間(秒)
     */
    public long getTimeToLive(String key) {
        Jedis jedis = null;
        long sec = -2;
        try {
            jedis = this.getResource();
            sec = jedis.ttl(key);
        } catch (Exception e) {
            log.error(e);
        } finally {
            this.returnResource(jedis);
        }
        return sec;
    }
    
    /**
     * jedis放回鏈接池
     * @param jedis
     */
    private void returnResource(Jedis jedis) {
        pool.returnResource(jedis);
    }
    
    /**
     * 釋放Redis資源池。
     */
    public void destroy() {
        if(pool != null) {
            pool.destroy();
        }
        log.info("Redis池已銷燬");
    }
}
    
View Code

2、線程池的工具類

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class ThreadPoolUtils {
 
    private final ExecutorService executor;
 
    private static ThreadPoolUtils instance = new ThreadPoolUtils();
 
    private ThreadPoolUtils() {
        this.executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2);
    }
 
    public static ThreadPoolUtils getInstance() {
        return instance;
    }
 
    public static <T> Future<T> execute(final Callable<T> runnable) {
        return getInstance().executor.submit(runnable);
    }
 
    public static Future<?> execute(final Runnable runnable) {
        return getInstance().executor.submit(runnable);
    }
}
View Code

3、要加入延時隊列的對象,須要實現Delayed類

package com.aqh.util;

import java.io.Serializable;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;

import javax.print.attribute.standard.MediaSize.Other;

import sun.util.logging.resources.logging;

/**
 * 訂單隊列對象
 * @author Administrator
 *
 */
public class DshOrder implements Delayed,Serializable{
   
    private String orderNo;//訂單號
    
    private long startTime; // 超時時間
    
    
    /**
     * 構造方法
     */
    public DshOrder() {} 

    public DshOrder(String orderNo, long timeout) {
        this.orderNo = orderNo;
        this.startTime = System.currentTimeMillis() + timeout;
    }
    
    
    
    @Override
    public int compareTo(Delayed other) {
        if (other == this) {
            return 0;
        }
        if (other instanceof DshOrder) {
            DshOrder otherRequest = (DshOrder)other;
            long otherStartTime = otherRequest.getStartTime();
            return (int)(this.startTime - otherStartTime);
        }
        return 0;
    }
   
    @Override
    public long getDelay(TimeUnit unit) {
        return unit.convert(startTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
    }
    
    
    
     
    public String getOrderNo() {
        return orderNo;
    }

    public void setOrderNo(String orderNo) {
        this.orderNo = orderNo;
    }

    public long getStartTime() {
        return startTime;
    }

    public void setStartTime(long startTime) {
        this.startTime = startTime;
    }
    
    
    @Override
    public boolean equals(Object obj) {
        if (this == obj)
            return true;
        if (obj == null)
            return false;
        if (getClass() != obj.getClass())
            return false;
        DshOrder other = (DshOrder) obj;
        if (orderNo.equals(other.getOrderNo()))
            return false;
        if (startTime != other.startTime)
            return false;
        return true;
    }
    
    @Override
    public int hashCode() {
        int result = 17;  
        result = result * 31 + (int)orderNo.hashCode();  
        result = result * 31 + (int)startTime;
        return result;  
    }
    
    @Override
    public String toString() {
        return "DSHOrder [orderNo=" + orderNo + ", startTime=" + startTime + "]";
    }
}
View Code

4、延時隊列服務類

import java.util.concurrent.DelayQueue;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import com.aqh.bean.btc.BtcConstant;
import com.aqh.bean.btc.BtcOrder;
import com.aqh.dao.IBtcMemberDao;
import com.aqh.service.IBtcMemberService;
import com.aqh.util.DshOrder;
import com.aqh.util.JedisUtil;
/**
 * 延時隊列service
 * @author Administrator
 *
 */
@Service
public class DelayService {
    private boolean start;//判斷是否啓動隊列
    
    private OnDelayedListener listener;//內部接口監聽器
    
    private DelayQueue<DshOrder> delayQueue = new DelayQueue<DshOrder>(); //隊列集合
    
    private Log log = LogFactory.getLog(DelayService.class);
    
    @Autowired
    private JedisUtil jedisUtil;
    
    @Autowired
    private IBtcMemberService btcMemberService;
    
    public static interface OnDelayedListener{
          public void onDelayedArrived(DshOrder order);
    }

    
    public void start(OnDelayedListener listener) {
        if (start) {
           log.error(">>>>>>>>>>>>DelayService已經在啓動狀態");
            return;
        }
       log.info(">>>>>>>>>>>>DelayService 啓動");
       start = true;
       this.listener = listener;
       new Thread(new Runnable() {
            
            @Override
            public void run() {
                try {
                    while(true) {
                        log.info("*********準備獲取延遲隊列裏面將要取消的隊列*******");
                        /* 延時隊列會將加入隊列中的元素按照過時時間的前後順序排序,先過時的在隊首,該take方法會判斷隊首
                         * 元素是否過時,若是沒過時,會阻塞等待,直到隊首元素過時,纔會取出來,往下執行邏輯 */
                        DshOrder order = delayQueue.take();
                        log.info("*********訂單"+order.getOrderNo()+"已經超過30分鐘,被自動取消*******");
                        //修改訂單狀態
                        //根據訂單號查詢訂單,判斷狀態是否已經完成
                        BtcOrder btcOrder = btcMemberService.getOrderByNo(order.getOrderNo());
                        if (btcOrder.getStatus() == 1 ) {
                            //取消訂單改變狀態並對相應的庫存進行相加
                            btcMemberService.updateOrderAndStock(order.getOrderNo(),0);
                        }
                        /* 這裏的類名.this是爲了區分那個類的this,通常在內部類中,須要調用外部類的this的時候使用,不加類名,
                         * 直接this表明當前類,內部類中表明內部類,外部類中調用表明外部類,這裏再也不內部類中,也能夠顯示的指明是
                          * 哪一個類的this*/
                        if (DelayService.this.listener != null) {
                            DelayService.this.listener.onDelayedArrived(order);
                        }
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
                
            }
        }).start();
    }
    
    public void add(DshOrder order){
           //寫入隊列
        delayQueue.put(order);
        //存入redis
        jedisUtil.setHash(BtcConstant.ORDER_CONFIRM, order.getOrderNo(), order);
        log.info("**************訂單號:" + order.getOrderNo() + "被寫入訂單成功!*************");
    }
    /**
     * 重載主要是爲了業務中只須要寫入延時隊列,而不須要寫入redis的狀況
     * @param order 延時訂單
     * @param type null
     */
    public void add(DshOrder order,String type){
        //寫入隊列
         delayQueue.put(order);
         //存入redis
         //jedisUtil.setHash(BtcConstant.ORDER_SHIP, order.getOrderNo(), order);
     }
 
    public boolean remove(DshOrder order){
        //從redis中刪除
        jedisUtil.removeHash(BtcConstant.ORDER_CONFIRM, order.getOrderNo());
        
        log.info("**************訂單號:" + order.getOrderNo() + "被刪除成功!*************");
        //從隊列裏面刪除
        return delayQueue.remove(order);

    }    
    
    public void remove(String orderNo){
        DshOrder[] array = delayQueue.toArray(new DshOrder[]{});
        if(array == null || array.length <= 0){
            return;
        }
        DshOrder target = null;
        for(DshOrder order : array){
            if(order.getOrderNo().equals(orderNo)){
                target = order;
                break;
            }
        }
        if(target != null){
            this.remove(target);
        }
    }
}
View Code

 

 5、須要寫一個spring監聽器,系統啓動完須要執行以下兩個操做

1)啓動延時隊列的服務線程,去循環取要過時的隊首元素。(調用延時隊列的take阻塞方法)緩存

2)線程池中運行一個線程,在每次啓動時從redis中將未過時的對象從新加入到延時隊列中,由於延時隊列是基於內存的,宕機後延時隊列就不存在了,因此須要redis等數據庫配合使用,每次加入延時隊列中的對象,都須要加入redis中,從延時隊列中刪除的對象,也最好從redis中刪除,這樣宕機後服務器

未過時的延時隊列中的對象就在redis中,每次啓動服務器,線程就會從redis中將全部對象從新加入到延時隊列中。app

import java.util.Set;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.stereotype.Service;

import com.aqh.bean.btc.BtcConstant;
import com.aqh.service.delayQueue.DelayService.OnDelayedListener;
import com.aqh.util.DshOrder;
import com.aqh.util.JedisUtil;


/**
 * 用於監聽延時隊列的類
 * @author Administrator
 * spring監聽器必須加上@Service,注入到bean對象中
 */
@Service
public class StartupListener implements ApplicationListener<ContextRefreshedEvent>{
    
    private static final Log log = LogFactory.getLog(StartupListener.class);
    
    @Autowired
    private DelayService delayService;
    
    @Autowired
    private JedisUtil jedisUtil;
    
    @Override
    public void onApplicationEvent(ContextRefreshedEvent evt) {
        log.info(">>>>>>>>>>>>系統啓動完成,onApplicationEvent");
        /* applicationontext和使用MVC以後的webApplicationontext會兩次調用監聽器的方法,
         * 這樣能夠解決,applicationontext是父容器,因此沒有父級元素,這句表明父容器(applicationontext)直接返回,不執行
          * 監聽器方法,子容器(springMVC的)纔會執行後面的監聽器方法,這樣就不會兩次調用了*/
        if (evt.getApplicationContext().getParent() == null) {
            return;
        }
        
        delayService.start(new OnDelayedListener() {
            
            @Override
            public void onDelayedArrived(final DshOrder order) {
                ThreadPoolUtils.execute(new Runnable() {
                    
                    @Override
                    public void run() {
                        String orderNo = order.getOrderNo();
                        //查庫判斷是否須要進行刪除
                        log.info("30分鐘自動取消訂單,onDelayedArrived():" + orderNo);
                        delayService.remove(order);
                    }
                });
                
            }
        });
        
        //查找須要入隊的訂單
        ThreadPoolUtils.execute(new Runnable() {
            
            @Override
            public void run() {
                log.info("查找須要入隊的訂單");
                Set<String> orderNos = jedisUtil.hKeys(BtcConstant.ORDER_CONFIRM);
                log.info("30分鐘未支付須要入隊的訂單:" + orderNos);
                if (orderNos == null || orderNos.size() <= 0) {
                    return;
                }
                
                //寫到DelayQueue
                for (String str : orderNos) {
                    //經過redis取key中的str域的value
                    DshOrder dshOrder = (DshOrder) jedisUtil.getHash(BtcConstant.ORDER_CONFIRM, str);
                    //存入延時隊列裏面
                    delayService.add(dshOrder, null);
                }
            }
        });
        
    }

}
View Code

 

以上的步驟已經將延時隊列寫完了,會根據傳入延時隊列的對象過時時間(雖然上面寫的日誌都是30分鐘,可是過時時間是根據加入隊列時加的時間決定的),自動到期後出隊列,執行操做;ide

具體調用的地方:

1)下單後須要加入延時隊列,添加過時時間爲30分鐘,30分鐘後未付款自動取消訂單

2)發貨後,須要加入延時隊列,添加過時時間爲7天。

7天內用戶點擊確認收貨按鈕,調用延時隊列服務類的remove方法,從DelayQueue中刪除,從Redis中刪除;

超過7天,DelayQueue中的訂單ID出隊,查詢數據庫,改狀態爲自動收貨,刪除redis。

注意:在延時隊列的邏輯操做中,兩種狀況能夠在延時對象中加入標誌判斷,是30天自動取消,仍是7天自動確認收貨,對應的執行不一樣的邏輯,而後從redis中刪除 ;

調用的代碼以下:

//加入延時隊列和redis緩存中
            /* 建立延時對象時,傳入訂單號和過時時間(單位爲毫秒) */
            DshOrder dshOrder = new DshOrder(orderNo,BtcConstant.ORDER_CONFIRM_TIMEOUT);
            delayService.add(dshOrder);
View Code

 參考連接:https://blog.csdn.net/goldenfish1919/article/details/50923450

相關文章
相關標籤/搜索