本篇博文是「Java秒殺系統實戰系列文章」的第十篇,本篇博文咱們將採用RabbitMQ的死信隊列的方式處理「用戶秒殺成功生成訂單後,卻遲遲沒有支付」的狀況,一塊兒來見識一下RabbitMQ死信隊列在實際業務環境下的強大之處!git
對於消息中間件RabbitMQ,Debug其實在前面的篇章中已經簡單分享介紹過了,在這裏就再也不贅述了!在本文咱們將採用RabbitMQ的死信隊列實現這樣的業務需求:「用戶在秒殺成功併成功建立一筆訂單記錄後,理論上應該是執行去支付的操做,可是卻存在着一種狀況是用戶遲遲不願去支付~至於緣由,不得而知!」算法
對於這種場景,各位小夥伴能夠在一些商城平臺體驗一下,即挑選完商品,加入購物車後,點擊去結算,這個時候會有個倒計時,提醒你須要在指定的時間內完成付款,不然訂單將失效!數據庫
對於這種業務邏輯的處理,傳統的作法是採用「定時器的方式」,定時輪詢獲取已經超過指定時間的訂單,而後執行一系列的處理措施(好比再爭取給用戶發送短信,提醒超過多長時間訂單就要失效了等等。。。),在這個秒殺系統中,咱們將藉助RabbitMQ死信隊列這一組件,對該訂單執行「失效」的措施!後端
「死信隊列」,顧明思議,是能夠延時、延遲必定的時間再處理消息的一種特殊隊列,它相對於「普通的隊列」而言,能夠實現「進入死信隊列的消息不當即處理,而是能夠等待必定的時間再進行處理」的功能!而普通的隊列則不行,即進入隊列後的消息會當即被對應的消費者監聽消費,以下圖所示爲普通隊列的基本消息模型:tomcat
而對於「死信隊列」,它的構成以及使用相對而言比較複雜一點,在正常狀況,死信隊列由三大核心組件組成:死信交換機+死信路由+TTL(消息存活時間~非必需的),而死信隊列又能夠由「面向生產者的基本交換機+基本路由」綁定而成,故而生產者首先是將消息發送至「基本交換機+基本路由」所綁定而成的消息模型中,即間接性地進入到死信隊列中,當過了TTL,消息將「掛掉」,從而進入下一個中轉站,即「面下那個消費者的死信交換機+死信路由」所綁定而成的消息模型中。以下圖所示:bash
下面,咱們以實際的代碼來構建死信隊列的消息模型,並將此消息模型應用到秒殺系統的上述功能模塊中。服務器
(1)首先,須要在RabbitmqConfig配置類建立死信隊列的消息模型,其完整的源代碼以下所示:微信
//構建秒殺成功以後-訂單超時未支付的死信隊列消息模型
@Bean
public Queue successKillDeadQueue(){
Map<String, Object> argsMap= Maps.newHashMap();
argsMap.put("x-dead-letter-exchange",env.getProperty("mq.kill.item.success.kill.dead.exchange"));
argsMap.put("x-dead-letter-routing-key",env.getProperty("mq.kill.item.success.kill.dead.routing.key"));
return new Queue(env.getProperty("mq.kill.item.success.kill.dead.queue"),true,false,false,argsMap);
}
//基本交換機
@Bean
public TopicExchange successKillDeadProdExchange(){
return new TopicExchange(env.getProperty("mq.kill.item.success.kill.dead.prod.exchange"),true,false);
}
//建立基本交換機+基本路由 -> 死信隊列 的綁定
@Bean
public Binding successKillDeadProdBinding(){
return BindingBuilder.bind(successKillDeadQueue()).to(successKillDeadProdExchange()).with(env.getProperty("mq.kill.item.success.kill.dead.prod.routing.key"));
}
//真正的隊列
@Bean
public Queue successKillRealQueue(){
return new Queue(env.getProperty("mq.kill.item.success.kill.dead.real.queue"),true);
}
//死信交換機
@Bean
public TopicExchange successKillDeadExchange(){
return new TopicExchange(env.getProperty("mq.kill.item.success.kill.dead.exchange"),true,false);
}
//死信交換機+死信路由->真正隊列 的綁定
@Bean
public Binding successKillDeadBinding(){
return BindingBuilder.bind(successKillRealQueue()).to(successKillDeadExchange()).with(env.getProperty("mq.kill.item.success.kill.dead.routing.key"));
}
複製代碼
其中,環境變量對象實例env讀取的變量是配置在application.properties配置文件中的,取值以下所示:app
#訂單超時未支付自動失效-死信隊列消息模型
mq.kill.item.success.kill.dead.queue=${mq.env}.kill.item.success.kill.dead.queue
mq.kill.item.success.kill.dead.exchange=${mq.env}.kill.item.success.kill.dead.exchange
mq.kill.item.success.kill.dead.routing.key=${mq.env}.kill.item.success.kill.dead.routing.key
mq.kill.item.success.kill.dead.real.queue=${mq.env}.kill.item.success.kill.dead.real.queue
mq.kill.item.success.kill.dead.prod.exchange=${mq.env}.kill.item.success.kill.dead.prod.exchange
mq.kill.item.success.kill.dead.prod.routing.key=${mq.env}.kill.item.success.kill.dead.prod.routing.key
#單位爲ms
mq.kill.item.success.kill.expire=20000
複製代碼
(2)成功建立了消息模型以後,緊接着,咱們須要在通用的RabbitMQ發送消息服務類RabbitSenderService中開發「發送消息入死信隊列」的功能,在該功能方法中,咱們指定了消息的存活時間TTL,取值爲配置的變量:mq.kill.item.success.kill.expire 的值,即20s;其完整的源代碼以下所示:dom
//秒殺成功後生成搶購訂單-發送信息入死信隊列,等待着必定時間失效超時未支付的訂單
public void sendKillSuccessOrderExpireMsg(final String orderCode){
try {
if (StringUtils.isNotBlank(orderCode)){
KillSuccessUserInfo info=itemKillSuccessMapper.selectByCode(orderCode);
if (info!=null){
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
rabbitTemplate.setExchange(env.getProperty("mq.kill.item.success.kill.dead.prod.exchange"));
rabbitTemplate.setRoutingKey(env.getProperty("mq.kill.item.success.kill.dead.prod.routing.key"));
rabbitTemplate.convertAndSend(info, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
MessageProperties mp=message.getMessageProperties();
mp.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
mp.setHeader(AbstractJavaTypeMapper.DEFAULT_CONTENT_CLASSID_FIELD_NAME,KillSuccessUserInfo.class);
//TODO:動態設置TTL(爲了測試方便,暫且設置20s)
mp.setExpiration(env.getProperty("mq.kill.item.success.kill.expire"));
return message;
}
});
}
}
}catch (Exception e){
log.error("秒殺成功後生成搶購訂單-發送信息入死信隊列,等待着必定時間失效超時未支付的訂單-發生異常,消息爲:{}",orderCode,e.fillInStackTrace());
}
}
複製代碼
從該「發送消息入死信隊列」的代碼中,咱們能夠看到,消息首先是先入到「基本交換機+基本路由」所綁定的死信隊列的消息模型中的!當消息到了TTL,天然會從死信隊列中出來(即「解脫了」),而後進入下一個中轉站,即:「死信交換機+死信路由」 所綁定而成的真正隊列的消息模型中,最終真正被消費者監聽消費!
此時,能夠將整個項目、系統運行在外置的tomcat服務器中,而後打開RabbitMQ後端控制檯應用,找到該死信隊列,能夠看到該死信隊列的詳細信息,以下圖所示:
(3)最後,是須要在RabbitMQ通用的消息監聽服務類RabbitReceiverService 中監聽「真正隊列」中的消息並進行處理:在這裏咱們是對該訂單進行失效處理(前提是還沒付款的狀況下!),其完整的源代碼以下所示:
//用戶秒殺成功後超時未支付-監聽者
@RabbitListener(queues = {"${mq.kill.item.success.kill.dead.real.queue}"},containerFactory = "singleListenerContainer")
public void consumeExpireOrder(KillSuccessUserInfo info){
try {
log.info("用戶秒殺成功後超時未支付-監聽者-接收消息:{}",info);
if (info!=null){
ItemKillSuccess entity=itemKillSuccessMapper.selectByPrimaryKey(info.getCode());
if (entity!=null && entity.getStatus().intValue()==0){
itemKillSuccessMapper.expireOrder(info.getCode());
}
}
}catch (Exception e){
log.error("用戶秒殺成功後超時未支付-監聽者-發生異常:",e.fillInStackTrace());
}
}
複製代碼
其中,失效更新訂單的記錄的操做由 itemKillSuccessMapper.expireOrder(info.getCode()); 來實現,其對應的動態Sql的寫法以下所示:
<!--失效更新訂單信息-->
<update id="expireOrder">
UPDATE item_kill_success
SET status = -1
WHERE code = #{code} AND status = 0
</update>
複製代碼
(4)至此,關於RabbitMQ死信隊列消息模型的代碼實戰已經完畢了!最後我只須要在「用戶秒殺成功建立訂單的那一刻,發送消息入死信隊列」的地方調用便可,其調用代碼以下所示:
/**
* 通用的方法-記錄用戶秒殺成功後生成的訂單-並進行異步郵件消息的通知
* @param kill
* @param userId
* @throws Exception
*/
private void commonRecordKillSuccessInfo(ItemKill kill, Integer userId) throws Exception{
//TODO:記錄搶購成功後生成的秒殺訂單記錄
ItemKillSuccess entity=new ItemKillSuccess();
String orderNo=String.valueOf(snowFlake.nextId());
//entity.setCode(RandomUtil.generateOrderCode()); //傳統時間戳+N位隨機數
entity.setCode(orderNo); //雪花算法
entity.setItemId(kill.getItemId());
entity.setKillId(kill.getId());
entity.setUserId(userId.toString());
entity.setStatus(SysConstant.OrderStatus.SuccessNotPayed.getCode().byteValue());
entity.setCreateTime(DateTime.now().toDate());
//TODO:學以至用,觸類旁通 -> 仿照單例模式的雙重檢驗鎖寫法
if (itemKillSuccessMapper.countByKillUserId(kill.getId(),userId) <= 0){
int res=itemKillSuccessMapper.insertSelective(entity);
if (res>0){
//TODO:進行異步郵件消息的通知=rabbitmq+mail
rabbitSenderService.sendKillSuccessEmailMsg(orderNo);
//TODO:入死信隊列,用於 「失效」 超過指定的TTL時間時仍然未支付的訂單
rabbitSenderService.sendKillSuccessOrderExpireMsg(orderNo);
}
}
}
複製代碼
最後,是進行自測:點擊「搶購」按鈕,用戶秒殺成功後,會發送一條消息入死信隊列(這一點能夠在RabbitMQ後端控制檯中能夠看到一條正Ready好的消息),等待20s,便可看到消息轉移到真正的隊列,並被真正的消費者監聽消費,以下所示:
好了,關於「RabbitMQ死信隊列」的介紹以及應用實戰本文就暫且介紹到這裏了,此種方式能夠很靈活對「超時未支付的訂單」,進行很好的處理,並且整個過程是「自動、天然」的,而無需人爲去手動點擊按鈕觸發了!固然啦,萬事萬物都並不是十全十美的,死信隊列也是如此,在一篇文章中咱們將介紹此種方式的瑕疵之處,並採用相應的解決方案進行處理!
一、目前,這一秒殺系統的總體構建與代碼實戰已經所有完成了,完整的源代碼數據庫地址能夠來這裏下載:gitee.com/steadyjack/… 記得Fork跟Star啊!!
二、最後,關注一下Debug的技術微信公衆號唄: