RabbitMQ死信隊列
關於RabbitMQ死信隊列html
死信隊列 聽上去像 消息「死」了 其實也有點這個意思,死信隊列 是 當消息在一個隊列 由於下列緣由:前端
消息被拒絕(basic.reject/ basic.nack)而且再也不從新投遞 requeue=falsejava
消息超期 (rabbitmq Time-To-Live -> messageProperties.setExpiration())git
隊列超載程序員
變成了 「死信」 後 被從新投遞(publish)到另外一個Exchange 該Exchange 就是DLX 而後該Exchange 根據綁定規則 轉發到對應的 隊列上 監聽該隊列 就能夠從新消費 說白了 就是 沒有被消費的消息 換個地方從新被消費github
生產者 --> 消息 --> 交換機 --> 隊列 --> 變成死信 --> DLX交換機 -->隊列 --> 消費者面試
什麼是死信呢?什麼樣的消息會變成死信呢?
消息被拒絕(basic.reject或basic.nack)而且requeue=false.算法
消息TTL過時spring
隊列達到最大長度(隊列滿了,沒法再添加數據到mq中)編程
應用場景分析
在定義業務隊列的時候,能夠考慮指定一個死信交換機,並綁定一個死信隊列,當消息變成死信時,該消息就會被髮送到該死信隊列上,這樣就方便咱們查看消息失敗的緣由了
死信隊列 聽上去像 消息「死」了 ,其實也有點這個意思,
死信隊列 是 當消息在一個隊列 由於下列緣由:
1.消息被拒絕(basic.reject或basic.nack)而且requeue=false.
2.消息TTL過時
3.隊列達到最大長度(隊列滿了,沒法再添加數據到mq中)
應用場景分析
在定義業務隊列的時候,能夠考慮指定一個死信交換機,並綁定一個死信隊列,當消息變成死信時,該消息就會被髮送到該死信隊列上,這樣就方便咱們查看消息失敗的緣由了
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false); 丟棄消息
如何使用死信交換機呢?
定義業務(普通)隊列的時候指定參數
x-dead-letter-exchange: 用來設置死信後發送的交換機
x-dead-letter-routing-key:用來設置死信的routingKey
若是高併發狀況到來 某一個隊列好比郵件隊列滿了 或者異常 或者消息過時 或者消費者拒絕消息
郵件隊列 綁定一個死信交換機 一旦郵件隊列滿了的狀況下 爲了防止數據丟失狀況 消息再也不郵件隊列存放了 放到死信交換機 而後交給私信郵件隊列 最終交給 死信消費者
步驟:
一、 建立 死信交換機 死信隊列 以及綁定
以前的隊列沒有綁定死信隊列和死信交換機 不能作更改綁定死信交互機
以前建立好的郵件隊列 刪除掉 已經建立好的隊列不能作更改 交換機也清理掉
config:
import java.util.HashMap; import java.util.Map; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.FanoutExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Component; //Fanout 類型 發佈訂閱模式 @Component public class FanoutConfig { /** * 定義死信隊列相關信息 */ public final static String deadQueueName = "dead_queue"; public final static String deadRoutingKey = "dead_routing_key"; public final static String deadExchangeName = "dead_exchange"; /** * 死信隊列 交換機標識符 */ public static final String DEAD_LETTER_QUEUE_KEY = "x-dead-letter-exchange"; /** * 死信隊列交換機綁定鍵標識符 */ public static final String DEAD_LETTER_ROUTING_KEY = "x-dead-letter-routing-key"; // 郵件隊列 private String FANOUT_EMAIL_QUEUE = "fanout_email_queue"; // 短信隊列 private String FANOUT_SMS_QUEUE = "fanout_sms_queue"; // fanout 交換機 private String EXCHANGE_NAME = "fanoutExchange"; // 1.定義郵件隊列 @Bean public Queue fanOutEamilQueue() { // 將普通隊列綁定到死信隊列交換機上 Map<String, Object> args = new HashMap<>(2); args.put(DEAD_LETTER_QUEUE_KEY, deadExchangeName); args.put(DEAD_LETTER_ROUTING_KEY, deadRoutingKey); Queue queue = new Queue(FANOUT_EMAIL_QUEUE, true, false, false, args); return queue; } // 2.定義短信隊列 @Bean public Queue fanOutSmsQueue() { return new Queue(FANOUT_SMS_QUEUE); } // 2.定義交換機 @Bean FanoutExchange fanoutExchange() { return new FanoutExchange(EXCHANGE_NAME); } // 3.隊列與交換機綁定郵件隊列 @Bean Binding bindingExchangeEamil(Queue fanOutEamilQueue, FanoutExchange fanoutExchange) { return BindingBuilder.bind(fanOutEamilQueue).to(fanoutExchange); } // 4.隊列與交換機綁定短信隊列 @Bean Binding bindingExchangeSms(Queue fanOutSmsQueue, FanoutExchange fanoutExchange) { return BindingBuilder.bind(fanOutSmsQueue).to(fanoutExchange); } /** * 建立配置死信郵件隊列 * * @return */ @Bean public Queue deadQueue() { Queue queue = new Queue(deadQueueName, true); return queue; } /* * 建立死信交換機 */ @Bean public DirectExchange deadExchange() { return new DirectExchange(deadExchangeName); } /* * 死信隊列與死信交換機綁定 */ @Bean public Binding bindingDeadExchange(Queue deadQueue, DirectExchange deadExchange) { return BindingBuilder.bind(deadQueue).to(deadExchange).with(deadRoutingKey); } }
生產者 timestamp 設置爲0
package com.itmayiedu.rabbitmq; import java.util.UUID; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageBuilder; import org.springframework.amqp.core.MessageProperties; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import com.alibaba.fastjson.JSONObject; @Component public class FanoutProducer { @Autowired private AmqpTemplate amqpTemplate; public void send(String queueName) { JSONObject jsonObject = new JSONObject(); jsonObject.put("email", "xx@163.com"); jsonObject.put("timestamp", 0); String jsonString = jsonObject.toJSONString(); System.out.println("jsonString:" + jsonString); // 設置消息惟一id 保證每次重試消息id惟一 Message message = MessageBuilder.withBody(jsonString.getBytes()) .setContentType(MessageProperties.CONTENT_TYPE_JSON).setContentEncoding("utf-8") .setMessageId(UUID.randomUUID() + "").build(); //消息id設置在請求頭裏面 用UUID作全局ID amqpTemplate.convertAndSend(queueName, message); } }
此時的消費者:
@RabbitListener(queues = "fanout_email_queue") public void process(Message message, @Headers Map<String, Object> headers, Channel channel) throws Exception { // 獲取消息Id String messageId = message.getMessageProperties().getMessageId(); String msg = new String(message.getBody(), "UTF-8"); System.out.println("郵件消費者獲取生產者消息msg:"+msg+",消息id"+messageId); JSONObject jsonObject = JSONObject.parseObject(msg); Integer timestamp = jsonObject.getInteger("timestamp"); try { int result = 1/timestamp; System.out.println("result"+result); // // 手動ack Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG); // 手動簽收 channel.basicAck(deliveryTag, false); } catch (Exception e) { //拒絕消費消息(丟失消息) 給死信隊列 channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false); } System.out.println("執行結束...."); }
異常情況:
添加死信隊列的消費者,並啓動後:
package com.itmayiedu.rabbitmq; import java.util.Map; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.support.AmqpHeaders; import org.springframework.messaging.handler.annotation.Headers; import org.springframework.stereotype.Component; import com.rabbitmq.client.Channel; //死信隊列 @Component public class FanoutDeadEamilConsumer { @RabbitListener(queues = "dead_queue") public void process(Message message, @Headers Map<String, Object> headers, Channel channel) throws Exception { // 獲取消息Id String messageId = message.getMessageProperties().getMessageId(); String msg = new String(message.getBody(), "UTF-8"); System.out.println("死信郵件消費者獲取生產者消息msg:"+msg+",消息id"+messageId); // // 手動ack Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG); // 手動簽收 channel.basicAck(deliveryTag, false); System.out.println("執行結束...."); } }
暱稱:
退出 訂閱評論
[Ctrl+Enter快捷鍵提交]
【前端】SpreadJS表格控件,可嵌入系統開發的在線Excel
【培訓】阿里P8面試官:什麼樣的人能進阿里
【推薦】程序員問答平臺,解決您開發中遇到的技術難題
· Rabbitmq消費失敗死信隊列
· rabbitmq實現延時隊列(死信隊列)
· RabbitMQ:僞延時隊列
· 關於 RabbitMQ 的 Dead-Letters-Queue 「死信隊列」
· RabbitMQ Dead Lettering(死信)
· 你可能忽略的iPadOS六個新改進 碾壓第三方
· 大疆的「小坦克」,是教學工具仍是玩具?
· 太陽幾千年發一次脾氣 仍會爆發「超級耀斑」
· 2018安卓應用安全白皮書:超98%安卓應用存有安全風險
· 投資人呼籲索尼剝離半導體賣掉金融 專心作娛樂巨頭
» 更多新聞...