文章末尾有源代碼地址
https://www.sunnyblog.top/detail.html?id=1265257400324063232
本章節主要實現消息的延遲消費,在學習延遲消費以前必須先了解RabbitMQ兩個基本概念,消息的TTL和死信Exchange,經過這二者的組合來實現消息的延遲消費。
不想看原理講解的,直接經過標題6看代碼實現html
消息的TTL就是消息的存活時間。RabbitMQ能夠對隊列和消息分別設置TTL。對隊列設置就是隊列沒有消費者連着的保留時間,也能夠對每個單獨的消息作單獨的設置。超過了這個時間,咱們認爲這個消息就死了,稱之爲死信。app
這裏咱們以最熟悉的12306購票爲例進行案例場景的分析,12306購票步驟以下:學習
在RabbitMQConfig中建立隊列、交換機以及綁定關係測試
@Configuration
public class RabbitMQConfig {ui
/** * 測試發送消息到MQ * @return
*/ @Bean public Queue testHello() { return new Queue(SysConstant.QUEUE_TEST_HELLO); } /** * 死信交換機 * @return */ @Bean public DirectExchange sysOrderDelayExchange() { return new DirectExchange(SysConstant.SYS_ORDER_DELAY_EXCHANGE); } /** * 死信隊列 * @return */ @Bean public Queue sysOrderDelayQueue() { Map<String, Object> map = new HashMap<String, Object>(16); map.put("x-dead-letter-exchange",SysConstant.SYS_ORDER_RECEIVE_EXCHANGE); //指定死信送往的交換機 map.put("x-dead-letter-routing-key", SysConstant.SYS_ORDER_RECEIVE_KEY); //指定死信的routingkey return new Queue(SysConstant.SYS_ORDER_DELAY_QUEUE, true, false, false, map); } /** * 給死信隊列綁定死信交換機 * @return */ @Bean public Binding sysOrderDelayBinding() { return BindingBuilder.bind(sysOrderDelayQueue()).to(sysOrderDelayExchange()).with(SysConstant.SYS_ORDER_DELAY_KEY); } /** * 死信接收交換機,用於接收死信隊列的消息 * @return */ @Bean public DirectExchange sysOrderReceiveExchange() { return new DirectExchange(SysConstant.SYS_ORDER_RECEIVE_EXCHANGE); } /** * 死信接收隊列 * @return */ @Bean public Queue sysOrderReceiveQueue() { return new Queue(SysConstant.SYS_ORDER_RECEIVE_QUEUE); } /** * 死信接收交換機綁定接收死信隊列消費隊列 * @return */ @Bean public Binding sysOrdeReceiveBinding() { return BindingBuilder.bind(sysOrderReceiveQueue()).to(sysOrderReceiveExchange()).with(SysConstant.SYS_ORDER_RECEIVE_KEY); } }
發送延時消息到死信交換器方法spa
@Service
public class MsgService {日誌
@Autowired private RabbitTemplate rabbitTemplate; /** * 發送延時消息到mq * @param exchange 死信交換機 * @param routeKey 路由key * @param data 發送數據 * @param delayTime 過時時間,單位毫秒
*/ public void sendDelayMsgToMQ(String exchange, String routeKey, String data,int delayTime) { rabbitTemplate.convertAndSend(exchange, routeKey, data, message -> { message.getMessageProperties().setExpiration(delayTime + ""); return message; }); } }
監聽隊列消息ReceiveMsgListener類code
/**orm
* 獲取到的延時消息 * 這裏接收到消息進行對應的業務處理(例如:取消訂單,關閉支付,回滾庫存等 ...) * @param msg
*/ @RabbitListener(queues = SysConstant.SYS_ORDER_RECEIVE_QUEUE) @RabbitHandler public void getdelayMsg(String msg) { log.info("MQ接收消息時間:{},消息內容:{}", DateUtil.formatDateTime(DateUtil.date()),msg); log.info("------->這裏實現訂單關閉、支付關閉、回滾庫存業務邏輯..."); }
建立Controller向隊列發送消息,設置過時時間10秒htm
@RestController
@RequestMapping("mq")
@Slf4j
public class MQController {
@Autowired private MsgService msgService; @GetMapping("sendMsg") public String sendMsg() { log.info("發送延時消息時間:" + DateUtil.formatDateTime(DateUtil.date())); OrderInfo orderInfo = new OrderInfo(); orderInfo.setOrderId(IdUtil.fastSimpleUUID()); orderInfo.setOrderState("待支付"); orderInfo.setPayMoney(999.88); msgService.sendDelayMsgToMQ(SysConstant.SYS_ORDER_DELAY_EXCHANGE,SysConstant.SYS_ORDER_DELAY_KEY, JSONUtil.toJsonStr(orderInfo),10*1000);//1分鐘 return JSONUtil.toJsonStr("發送延時消息成功"); }
}
https://www.sunnyblog.top/index.html?tagId=1264009609236971520
詳細開發技術文檔盡在 點擊這裏查看技術文檔 ;更多技術文章: https://www.sunnyblog.top;任何疑問加QQ羣諮詢:534073451