Mq的使用中,延遲隊列是不少業務都須要用到的,最近我也是剛在項目中用到,就在跟你們講講吧。git
延遲隊列就是進入該隊列的消息會被延遲消費的隊列。而通常的隊列,消息一旦入隊了以後就會被消費者立刻消費。github
延遲隊列能作什麼?最多見的是如下兩種場景:web
好比:用戶生成訂單以後,須要過一段時間校驗訂單的支付狀態,若是訂單仍未支付則須要及時地關閉訂單;用戶註冊成功以後,須要過一段時間好比一週後校驗用戶的使用狀況,若是發現用戶活躍度較低,則發送郵件或者短信來提醒用戶使用。spring
好比消費者從隊列裏消費消息時失敗了,可是想要延遲一段時間後自動重試。網絡
若是不使用延遲隊列,那麼咱們只能經過一個輪詢掃描程序去完成。這種方案既不優雅,也不方便作成統一的服務便於開發人員使用。可是使用延遲隊列的話,咱們就能夠垂手可得地完成。svg
在介紹具體思路錢,先介紹RabbitMQ的兩個特性:Time-To-Live Extensions(消息存活時間) 和 Dead Letter Exchanges(死信交換機)學習
RabbitMQ容許咱們爲消息或者隊列設置TTL(time to live),也就是過時時間。TTL代表了一條消息可在隊列中存活的最大時間,單位爲毫秒。當某條消息被設置了TTL或者當某條消息進入了設置了TTL的隊列時,這條消息會在通過TTL秒後「死亡」,成爲Dead Letter(死信)。若是既配置了消息的TTL,又配置了隊列的TTL,那麼較小的那個值會被取用。測試
設置了TTL的消息在過時後會成爲Dead Letter。其實在RabbitMQ中,一共有三種消息的「死亡」形式:ui
消息被拒絕。經過調用basic.reject或者basic.nack而且設置的requeue參數爲false。code
消息由於設置了TTL而過時。
消息進入了一條已經達到最大長度的隊列。
若是隊列設置了Dead Letter Exchange(DLX),那麼這些Dead Letter就會被從新publish(推送)到Dead Letter Exchange,經過Dead Letter Exchange路由到其餘隊列。
延遲消費是延遲隊列最爲經常使用的使用模式。以下圖所示,生產者產生的消息首先會進入緩衝隊列(圖中紅色隊列)。經過RabbitMQ提供的TTL擴展,這些消息會被設置過時時間,等消息過時以後,這些消息會經過配置好的DLX轉發到實際消費隊列(圖中藍色隊列),以此達到延遲消費的效果。
延遲重試本質上也是延遲消費的一種。
以下圖所示,消費者發現該消息處理出現了異常,好比是由於網絡波動引發的異常。那麼若是不等待一段時間,直接就重試的話,極可能會致使在這期間內一直沒法成功,形成必定的資源浪費。那麼咱們能夠將其先放在緩衝隊列中(圖中紅色隊列),等消息通過一段的延遲時間後再次進入實際消費隊列中(圖中藍色隊列),此時因爲已通過了「較長」的時間了,異常的一些波動一般已經恢復,這些消息能夠被正常地消費。
這裏只貼上最主要的代碼,所有的代碼可查看github
Mq隊列與交換機實例建立
/** * 緩衝隊列 */ private String DELAY_BUFFER_QUEUE = "delay_buffer_queue"; /** * 實際消費交換機(DLX) */ private String DELAY_SERVICE_EXCHANGE = "delay_service_exchange"; /** * 實際消費隊列 */ private String DELAY_SERVICE_QUEUE = "delay_service_queue"; /** * 消息過時時間 3秒 */ private Integer QUEUE_EXPIRATION = 3 * 1000; /** * 實際消費隊列 * @return */ @Bean Queue delayServiceQueue(){ return QueueBuilder.durable(DELAY_SERVICE_QUEUE).build(); } /** * 實際消費交換機 * @return */ @Bean DirectExchange delayServiceExchange() { return new DirectExchange(DELAY_SERVICE_EXCHANGE); } /** * 實際消費隊列綁定實際消費交換機(DLX) * @param delayServiceQueue * @param delayServiceExchange * @return */ @Bean Binding delayBinding(Queue delayServiceQueue, DirectExchange delayServiceExchange) { return BindingBuilder.bind(delayServiceQueue) .to(delayServiceExchange) .with(DELAY_SERVICE_QUEUE); } /** * 緩衝隊列配置 * @return */ @Bean Queue delayBufferQueue(){ return QueueBuilder.durable(DELAY_BUFFER_QUEUE) // 死信交換機 DLX .withArgument("x-dead-letter-exchange", DELAY_SERVICE_EXCHANGE) // 目標routing key .withArgument("x-dead-letter-routing-key", DELAY_SERVICE_QUEUE) // 設置隊列的過時時間 .withArgument("x-message-ttl", QUEUE_EXPIRATION) .build(); }
監聽實際消費隊列
@Component public class DelayMsgListener { @RabbitListener(queues="delay_service_queue") public void listenServiceMsg(Message message){ System.out.println(new Date()+ "收到延遲消息啦:"+new String(message.getBody())); } }
測試:發送消息到緩衝隊列
@Test public void send1(){ System.out.println(new Date() +"發送延遲消息!!!"); amqpTemplate.convertAndSend("delay_buffer_queue","Hello!Delay Message!"); }
結果以下
能夠看到,在發消息後3秒(TTL),實際消費隊列接收到了消息並被消費
Mq隊列與交換機實例建立
/** * 緩衝隊列 */ private String RETRY_BUFFER_QUEUE = "retry_buffer_queue"; /** * 緩衝交換機 */ private String RETRY_BUFFER_EXCHANGE = "retry_buffer_exchange"; /** * 實際消費交換機(DLX) */ private String RETRY_SERVICE_EXCHANGE = "retry_service_exchange"; /** * 實際消費隊列 */ private String RETRY_SERVICE_QUEUE = "retry_service_queue"; /** * 實際消費隊列 * @return */ @Bean Queue retryServiceQueue(){ return QueueBuilder.durable(RETRY_SERVICE_QUEUE).build(); } /** * 實際消費交換機 * @return */ @Bean DirectExchange retryServiceExchange() { return new DirectExchange(RETRY_SERVICE_EXCHANGE); } /** * 實際消費隊列綁定實際消費交換機(DLX) * @param retryServiceQueue * @param retryServiceExchange * @return */ @Bean Binding retryBinding(Queue retryServiceQueue, DirectExchange retryServiceExchange) { return BindingBuilder.bind(retryServiceQueue) .to(retryServiceExchange) .with(RETRY_SERVICE_QUEUE); } /** * 緩衝隊列配置 * @return */ @Bean Queue retryBufferQueue(){ return QueueBuilder.durable(RETRY_BUFFER_QUEUE) // 死信交換機 DLX .withArgument("x-dead-letter-exchange", RETRY_SERVICE_EXCHANGE) // 目標routing key .withArgument("x-dead-letter-routing-key", RETRY_SERVICE_QUEUE) // 設置隊列的過時時間 .withArgument("x-message-ttl", QUEUE_EXPIRATION) .build(); } /** * 緩衝交換機 * @return */ @Bean DirectExchange retryBufferExchange() { return new DirectExchange(RETRY_BUFFER_EXCHANGE); } /** * 緩衝隊列綁定緩衝交換機 * @param retryBufferQueue * @param retryBufferQueue * @return */ @Bean Binding bufferBinding(Queue retryBufferQueue, DirectExchange retryBufferExchange) { return BindingBuilder.bind(retryBufferQueue) .to(retryBufferExchange) .with(RETRY_BUFFER_QUEUE); }
監聽實際消費隊列
@Component public class RetryMsgListener { /** * 緩衝隊列 */ private String RETRY_BUFFER_QUEUE = "retry_buffer_queue"; /** * 緩衝交換機 */ private String RETRY_BUFFER_EXCHANGE = "retry_buffer_exchange"; @Autowired private MessagePropertiesConverter messagePropertiesConverter; @RabbitListener(queues="retry_service_queue") public void listenServiceMsg(@Payload Message message, Channel channel){ try { System.out.println(new Date() + "收到消息:" + new String(message.getBody())); //TODO 業務邏輯 //忽然出現異常 throw new RuntimeException("特殊異常"); }catch (Exception e){ Map<String,Object> headers = message.getMessageProperties().getHeaders(); try{ Long retryCount = getRetryCount(headers); //重試3次 if(retryCount < 3){ retryCount += 1; System.out.println("消費異常,準備重試,第"+retryCount+"次"); //轉換爲RabbitMQ 的Message Properties對象 AMQP.BasicProperties rabbitMQProperties = messagePropertiesConverter.fromMessageProperties( message.getMessageProperties(), "UTF-8"); //設置headers rabbitMQProperties.builder().headers(headers); //程序異常重試 //這裏必須把rabbitMQProperties也傳進來,不然死信隊列沒法識別是不是同一條信息,致使重試次數沒法遞增 channel.basicPublish(RETRY_BUFFER_EXCHANGE,RETRY_BUFFER_QUEUE,rabbitMQProperties, message.getBody()); }else { //TODO 重試失敗,須要人工處理 (發送到失敗隊列或發郵件/信息) System.out.println("已重試3次,需人工處理!"); } }catch (IOException ioe){ System.out.println("消息重試失敗!"); ioe.printStackTrace(); } } } /** * 獲取重試次數 * 若是這條消息是死信,header中會有一個x-death的記錄相關信息 * 其中包含死亡次數 * @param headers * @return */ private long getRetryCount(Map<String, Object> headers) { long retryCount = 0; if(null != headers) { if(headers.containsKey("x-death")) { List<Map<String, Object>> deathList = (List<Map<String, Object>>) headers.get("x-death"); if(!deathList.isEmpty()) { Map<String, Object> deathEntry = deathList.get(0); retryCount = (Long)deathEntry.get("count"); } } } return retryCount; } }
測試:發送消息到實際消費隊列
@Test public void send2(){ System.out.println(new Date() +"發送延遲重試消息!!!"); //直接發消息到實際消費隊列 amqpTemplate.convertAndSend("retry_service_queue","Hello!Retry Message!"); }
結果以下:
能夠看到,消費異常後,重試了3次
延遲隊列在實際業務中是常常被用到的,同窗們最好都學學哦,代碼已上傳github
https://github.com/zhangwenkang0/springcloud-learning-from-0-to-1/tree/master/rabbitmq-demo
若是以爲不錯,分享給你的朋友!
一個立志成大腿而天天努力奮鬥的年輕人
伴學習伴成長,成長之路你並不孤單!