1)7天自動收貨java
a、用戶支付完成之後,把訂單ID插入到內存的一個DelayQueue中,同時插入到Redis中。web
b、7天以內,用戶點擊了確認收貨,則從DelayQueue中刪除,從Redis中刪除。redis
c、超過7天,DelayQueue中的訂單ID出隊,查詢數據庫,改狀態爲自動收貨,刪除redis。spring
d、若是7天以內,web服務器重啓過,則web服務器啓動之後,從redis中讀取待收貨的訂單,插入到DelayQueue。數據庫
2)30分鐘未付款自動取消訂單apache
import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.util.ArrayList; import java.util.List; import java.util.Set; import javax.annotation.PostConstruct; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.aspectj.weaver.patterns.ThisOrTargetPointcut; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import com.aqh.util.MyProperties; import com.sun.org.glassfish.external.statistics.Statistic; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool; import redis.clients.jedis.JedisPoolConfig; /** * jedis緩存工具 */ @Service("jedisUtil") public class JedisUtil { private JedisPool pool; @Autowired private MyProperties properties; private static Log log = LogFactory.getLog(JedisUtil.class); private JedisUtil() { } @SuppressWarnings("unused") @PostConstruct // 指定spring實例化對象以後調用的方法 private void init() { JedisPoolConfig config = new JedisPoolConfig(); config.setMaxActive(Integer.parseInt(properties.getJedisMaxActive())); config.setMaxIdle(Integer.parseInt(properties.getJedisMaxIdle())); config.setMaxWait(Long.parseLong(properties.getJedisMaxWait())); config.setTestOnBorrow(false); pool = new JedisPool(new JedisPoolConfig(), properties.getJedisHost(), Integer.parseInt(properties.getJedisPort()), Integer.parseInt(properties.getJedisTimeout())); } public void set(String key, String value) { Jedis jedis = this.getResource(); try { jedis.set(key, value); } finally { this.returnResource(jedis); } } public String get(String key) { Jedis jedis = this.getResource(); try { return jedis.get(key); } finally { this.returnResource(jedis); } } public void setObject(String key, Object obj) { Jedis jedis = this.getResource(); try { jedis.set(key.getBytes(), serialize(obj)); } finally { this.returnResource(jedis); } } public Object getObject(String key) { Jedis jedis = this.getResource(); try { if(jedis.get(key.getBytes()) == null) { return null; } else { return unserialize(jedis.get(key.getBytes())); } } finally { this.returnResource(jedis); } } /** * 刪除key * @param key */ public void delkey(String...keys) { Jedis jedis = this.getResource(); try { jedis.del(keys); } finally { this.returnResource(jedis); } } /** * 設置hash的值 * @param key hash中的key * @param field hash中的域 * @param obj 值 */ public void setHash(String key,String field,Object obj) { Jedis jedis = this.getResource(); try { jedis.hset(key.getBytes(), field.getBytes(), serialize(obj)); } finally { this.returnResource(jedis); } } /** * 查找redis中hash的value值 * @param key hash中的key * @param field hash中的域 * @return 返回對象 */ public Object getHash(String key,String field) { Jedis jedis = this.getResource(); try { if (jedis.hget(key, field) == null) { return null; } return unserialize(jedis.hget(key.getBytes(), field.getBytes())); } finally { this.returnResource(jedis); } } /** * 刪除hash中的指定域 * @param key * @param fields * @return */ public Long removeHash(String key,String fields) { Jedis jedis = this.getResource(); try { return jedis.hdel(key.getBytes(),fields.getBytes()); } finally { this.returnResource(jedis); } } /** * 返回hash中的全部域 * @param key */ public Set<String> hKeys(String key) { Jedis jedis = this.getResource(); try { Set<String> hkeys = jedis.hkeys(key); return hkeys; } finally { this.returnResource(jedis); } } /** * 序列化 * @param object * @return */ private static byte[] serialize(Object object) { ObjectOutputStream oos = null; ByteArrayOutputStream baos = null; try { // 序列化 baos = new ByteArrayOutputStream(); oos = new ObjectOutputStream(baos); oos.writeObject(object); byte[] bytes = baos.toByteArray(); return bytes; } catch (Exception e) { e.printStackTrace(); log.error("jedis序列化異常....."); } return null; } /** * 反序列化 * * @param bytes * @return */ private static Object unserialize(byte[] bytes) { ByteArrayInputStream bais = null; try { // 反序列化 bais = new ByteArrayInputStream(bytes); ObjectInputStream ois = new ObjectInputStream(bais); return ois.readObject(); } catch (Exception e) { e.printStackTrace(); log.info("jedis反序列化異常....."); } return null; } /** * 獲取jedis * @return */ private Jedis getResource() { Jedis jedis = pool.getResource(); jedis.auth(properties.getJedisPassword()); return jedis; } /** * 設置生命週期(過時時間) * @param key * @param second */ public void setExpireByKey(String key, int seconds) { Jedis jedis = null; try { jedis = this.getResource(); jedis.expire(key, seconds); } catch (Exception e) { log.error(e); } finally { this.returnResource(jedis); } } /** * 獲取某個Key的餘下存活時間(秒)。 * @param key * @return 存活時間(秒) */ public long getTimeToLive(String key) { Jedis jedis = null; long sec = -2; try { jedis = this.getResource(); sec = jedis.ttl(key); } catch (Exception e) { log.error(e); } finally { this.returnResource(jedis); } return sec; } /** * jedis放回鏈接池 * @param jedis */ private void returnResource(Jedis jedis) { pool.returnResource(jedis); } /** * 釋放Redis資源池。 */ public void destroy() { if(pool != null) { pool.destroy(); } log.info("Redis池已銷燬"); } }
import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; public class ThreadPoolUtils { private final ExecutorService executor; private static ThreadPoolUtils instance = new ThreadPoolUtils(); private ThreadPoolUtils() { this.executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2); } public static ThreadPoolUtils getInstance() { return instance; } public static <T> Future<T> execute(final Callable<T> runnable) { return getInstance().executor.submit(runnable); } public static Future<?> execute(final Runnable runnable) { return getInstance().executor.submit(runnable); } }
package com.aqh.util; import java.io.Serializable; import java.util.concurrent.Delayed; import java.util.concurrent.TimeUnit; import javax.print.attribute.standard.MediaSize.Other; import sun.util.logging.resources.logging; /** * 訂單隊列對象 * @author Administrator * */ public class DshOrder implements Delayed,Serializable{ private String orderNo;//訂單號 private long startTime; // 超時時間 /** * 構造方法 */ public DshOrder() {} public DshOrder(String orderNo, long timeout) { this.orderNo = orderNo; this.startTime = System.currentTimeMillis() + timeout; } @Override public int compareTo(Delayed other) { if (other == this) { return 0; } if (other instanceof DshOrder) { DshOrder otherRequest = (DshOrder)other; long otherStartTime = otherRequest.getStartTime(); return (int)(this.startTime - otherStartTime); } return 0; } @Override public long getDelay(TimeUnit unit) { return unit.convert(startTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS); } public String getOrderNo() { return orderNo; } public void setOrderNo(String orderNo) { this.orderNo = orderNo; } public long getStartTime() { return startTime; } public void setStartTime(long startTime) { this.startTime = startTime; } @Override public boolean equals(Object obj) { if (this == obj) return true; if (obj == null) return false; if (getClass() != obj.getClass()) return false; DshOrder other = (DshOrder) obj; if (orderNo.equals(other.getOrderNo())) return false; if (startTime != other.startTime) return false; return true; } @Override public int hashCode() { int result = 17; result = result * 31 + (int)orderNo.hashCode(); result = result * 31 + (int)startTime; return result; } @Override public String toString() { return "DSHOrder [orderNo=" + orderNo + ", startTime=" + startTime + "]"; } }
import java.util.concurrent.DelayQueue; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import com.aqh.bean.btc.BtcConstant; import com.aqh.bean.btc.BtcOrder; import com.aqh.dao.IBtcMemberDao; import com.aqh.service.IBtcMemberService; import com.aqh.util.DshOrder; import com.aqh.util.JedisUtil; /** * 延時隊列service * @author Administrator * */ @Service public class DelayService { private boolean start;//判斷是否啓動隊列 private OnDelayedListener listener;//內部接口監聽器 private DelayQueue<DshOrder> delayQueue = new DelayQueue<DshOrder>(); //隊列集合 private Log log = LogFactory.getLog(DelayService.class); @Autowired private JedisUtil jedisUtil; @Autowired private IBtcMemberService btcMemberService; public static interface OnDelayedListener{ public void onDelayedArrived(DshOrder order); } public void start(OnDelayedListener listener) { if (start) { log.error(">>>>>>>>>>>>DelayService已經在啓動狀態"); return; } log.info(">>>>>>>>>>>>DelayService 啓動"); start = true; this.listener = listener; new Thread(new Runnable() { @Override public void run() { try { while(true) { log.info("*********準備獲取延遲隊列裏面將要取消的隊列*******"); /* 延時隊列會將加入隊列中的元素按照過時時間的前後順序排序,先過時的在隊首,該take方法會判斷隊首 * 元素是否過時,若是沒過時,會阻塞等待,直到隊首元素過時,纔會取出來,往下執行邏輯 */ DshOrder order = delayQueue.take(); log.info("*********訂單"+order.getOrderNo()+"已經超過30分鐘,被自動取消*******"); //修改訂單狀態 //根據訂單號查詢訂單,判斷狀態是否已經完成 BtcOrder btcOrder = btcMemberService.getOrderByNo(order.getOrderNo()); if (btcOrder.getStatus() == 1 ) { //取消訂單改變狀態並對相應的庫存進行相加 btcMemberService.updateOrderAndStock(order.getOrderNo(),0); } /* 這裏的類名.this是爲了區分那個類的this,通常在內部類中,須要調用外部類的this的時候使用,不加類名, * 直接this表明當前類,內部類中表明內部類,外部類中調用表明外部類,這裏再也不內部類中,也能夠顯示的指明是 * 哪一個類的this*/ if (DelayService.this.listener != null) { DelayService.this.listener.onDelayedArrived(order); } } } catch (Exception e) { e.printStackTrace(); } } }).start(); } public void add(DshOrder order){ //寫入隊列 delayQueue.put(order); //存入redis jedisUtil.setHash(BtcConstant.ORDER_CONFIRM, order.getOrderNo(), order); log.info("**************訂單號:" + order.getOrderNo() + "被寫入訂單成功!*************"); } /** * 重載主要是爲了業務中只須要寫入延時隊列,而不須要寫入redis的狀況 * @param order 延時訂單 * @param type null */ public void add(DshOrder order,String type){ //寫入隊列 delayQueue.put(order); //存入redis //jedisUtil.setHash(BtcConstant.ORDER_SHIP, order.getOrderNo(), order); } public boolean remove(DshOrder order){ //從redis中刪除 jedisUtil.removeHash(BtcConstant.ORDER_CONFIRM, order.getOrderNo()); log.info("**************訂單號:" + order.getOrderNo() + "被刪除成功!*************"); //從隊列裏面刪除 return delayQueue.remove(order); } public void remove(String orderNo){ DshOrder[] array = delayQueue.toArray(new DshOrder[]{}); if(array == null || array.length <= 0){ return; } DshOrder target = null; for(DshOrder order : array){ if(order.getOrderNo().equals(orderNo)){ target = order; break; } } if(target != null){ this.remove(target); } } }
1)啓動延時隊列的服務線程,去循環取要過時的隊首元素。(調用延時隊列的take阻塞方法)緩存
2)線程池中運行一個線程,在每次啓動時從redis中將未過時的對象從新加入到延時隊列中,由於延時隊列是基於內存的,宕機後延時隊列就不存在了,因此須要redis等數據庫配合使用,每次加入延時隊列中的對象,都須要加入redis中,從延時隊列中刪除的對象,也最好從redis中刪除,這樣宕機後服務器
未過時的延時隊列中的對象就在redis中,每次啓動服務器,線程就會從redis中將全部對象從新加入到延時隊列中。app
import java.util.Set; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.ApplicationListener; import org.springframework.context.event.ContextRefreshedEvent; import org.springframework.stereotype.Service; import com.aqh.bean.btc.BtcConstant; import com.aqh.service.delayQueue.DelayService.OnDelayedListener; import com.aqh.util.DshOrder; import com.aqh.util.JedisUtil; /** * 用於監聽延時隊列的類 * @author Administrator * spring監聽器必須加上@Service,注入到bean對象中 */ @Service public class StartupListener implements ApplicationListener<ContextRefreshedEvent>{ private static final Log log = LogFactory.getLog(StartupListener.class); @Autowired private DelayService delayService; @Autowired private JedisUtil jedisUtil; @Override public void onApplicationEvent(ContextRefreshedEvent evt) { log.info(">>>>>>>>>>>>系統啓動完成,onApplicationEvent"); /* applicationontext和使用MVC以後的webApplicationontext會兩次調用監聽器的方法, * 這樣能夠解決,applicationontext是父容器,因此沒有父級元素,這句表明父容器(applicationontext)直接返回,不執行 * 監聽器方法,子容器(springMVC的)纔會執行後面的監聽器方法,這樣就不會兩次調用了*/ if (evt.getApplicationContext().getParent() == null) { return; } delayService.start(new OnDelayedListener() { @Override public void onDelayedArrived(final DshOrder order) { ThreadPoolUtils.execute(new Runnable() { @Override public void run() { String orderNo = order.getOrderNo(); //查庫判斷是否須要進行刪除 log.info("30分鐘自動取消訂單,onDelayedArrived():" + orderNo); delayService.remove(order); } }); } }); //查找須要入隊的訂單 ThreadPoolUtils.execute(new Runnable() { @Override public void run() { log.info("查找須要入隊的訂單"); Set<String> orderNos = jedisUtil.hKeys(BtcConstant.ORDER_CONFIRM); log.info("30分鐘未支付須要入隊的訂單:" + orderNos); if (orderNos == null || orderNos.size() <= 0) { return; } //寫到DelayQueue for (String str : orderNos) { //經過redis取key中的str域的value DshOrder dshOrder = (DshOrder) jedisUtil.getHash(BtcConstant.ORDER_CONFIRM, str); //存入延時隊列裏面 delayService.add(dshOrder, null); } } }); } }
以上的步驟已經將延時隊列寫完了,會根據傳入延時隊列的對象過時時間(雖然上面寫的日誌都是30分鐘,可是過時時間是根據加入隊列時加的時間決定的),自動到期後出隊列,執行操做;ide
具體調用的地方:
1)下單後須要加入延時隊列,添加過時時間爲30分鐘,30分鐘後未付款自動取消訂單
2)發貨後,須要加入延時隊列,添加過時時間爲7天。
7天內用戶點擊確認收貨按鈕,調用延時隊列服務類的remove方法,從DelayQueue中刪除,從Redis中刪除;
超過7天,DelayQueue中的訂單ID出隊,查詢數據庫,改狀態爲自動收貨,刪除redis。
注意:在延時隊列的邏輯操做中,兩種狀況能夠在延時對象中加入標誌判斷,是30天自動取消,仍是7天自動確認收貨,對應的執行不一樣的邏輯,而後從redis中刪除 ;
調用的代碼以下:
//加入延時隊列和redis緩存中 /* 建立延時對象時,傳入訂單號和過時時間(單位爲毫秒) */ DshOrder dshOrder = new DshOrder(orderNo,BtcConstant.ORDER_CONFIRM_TIMEOUT); delayService.add(dshOrder);
參考連接:https://blog.csdn.net/goldenfish1919/article/details/50923450