RabbitMQ死信隊列 RabbitMQ死信隊列

RabbitMQ死信隊列

 

關於RabbitMQ死信隊列html

死信隊列 聽上去像 消息「死」了     其實也有點這個意思,死信隊列  是 當消息在一個隊列 由於下列緣由:前端

消息被拒絕(basic.reject/ basic.nack)而且再也不從新投遞 requeue=falsejava

消息超期 (rabbitmq  Time-To-Live -> messageProperties.setExpiration())git

隊列超載程序員

變成了 「死信」 後    被從新投遞(publish)到另外一個Exchange   該Exchange 就是DLX     而後該Exchange 根據綁定規則 轉發到對應的 隊列上  監聽該隊列  就能夠從新消費     說白了 就是  沒有被消費的消息  換個地方從新被消費github

生產者   -->  消息 --> 交換機  --> 隊列  --> 變成死信  --> DLX交換機 -->隊列 --> 消費者面試

 

什麼是死信呢?什麼樣的消息會變成死信呢?

消息被拒絕(basic.reject或basic.nack)而且requeue=false.算法

消息TTL過時spring

隊列達到最大長度(隊列滿了,沒法再添加數據到mq中)編程

應用場景分析

在定義業務隊列的時候,能夠考慮指定一個死信交換機,並綁定一個死信隊列,當消息變成死信時,該消息就會被髮送到該死信隊列上,這樣就方便咱們查看消息失敗的緣由了

死信隊列 聽上去像 消息「死」了 ,其實也有點這個意思,
死信隊列 是 當消息在一個隊列 由於下列緣由:
1.消息被拒絕(basic.reject或basic.nack)而且requeue=false.
2.消息TTL過時
3.隊列達到最大長度(隊列滿了,沒法再添加數據到mq中)
應用場景分析
在定義業務隊列的時候,能夠考慮指定一個死信交換機,並綁定一個死信隊列,當消息變成死信時,該消息就會被髮送到該死信隊列上,這樣就方便咱們查看消息失敗的緣由了

channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false); 丟棄消息

如何使用死信交換機呢?

定義業務(普通)隊列的時候指定參數

x-dead-letter-exchange: 用來設置死信後發送的交換機

x-dead-letter-routing-key:用來設置死信的routingKey

 

若是高併發狀況到來  某一個隊列好比郵件隊列滿了 或者異常  或者消息過時 或者消費者拒絕消息

 

 

 

郵件隊列 綁定一個死信交換機  一旦郵件隊列滿了的狀況下  爲了防止數據丟失狀況   消息再也不郵件隊列存放了 放到死信交換機 而後交給私信郵件隊列  最終交給 死信消費者

 

 步驟:

 一、 建立 死信交換機  死信隊列  以及綁定

    以前的隊列沒有綁定死信隊列和死信交換機 不能作更改綁定死信交互機

    以前建立好的郵件隊列 刪除掉  已經建立好的隊列不能作更改  交換機也清理掉

   config:

複製代碼
import java.util.HashMap;
import java.util.Map;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;

//Fanout 類型 發佈訂閱模式
@Component
public class FanoutConfig {

    /**
     * 定義死信隊列相關信息
     */
    public final static String deadQueueName = "dead_queue";
    public final static String deadRoutingKey = "dead_routing_key";
    public final static String deadExchangeName = "dead_exchange";
    /**
     * 死信隊列 交換機標識符
     */
    public static final String DEAD_LETTER_QUEUE_KEY = "x-dead-letter-exchange";
    /**
     * 死信隊列交換機綁定鍵標識符
     */
    public static final String DEAD_LETTER_ROUTING_KEY = "x-dead-letter-routing-key";

    // 郵件隊列
    private String FANOUT_EMAIL_QUEUE = "fanout_email_queue";

    // 短信隊列
    private String FANOUT_SMS_QUEUE = "fanout_sms_queue";
    // fanout 交換機
    private String EXCHANGE_NAME = "fanoutExchange";

    // 1.定義郵件隊列
    @Bean
    public Queue fanOutEamilQueue() {
        // 將普通隊列綁定到死信隊列交換機上
        Map<String, Object> args = new HashMap<>(2);
        args.put(DEAD_LETTER_QUEUE_KEY, deadExchangeName);
        args.put(DEAD_LETTER_ROUTING_KEY, deadRoutingKey);
        Queue queue = new Queue(FANOUT_EMAIL_QUEUE, true, false, false, args);
        return queue;
    }

    // 2.定義短信隊列
    @Bean
    public Queue fanOutSmsQueue() {
        return new Queue(FANOUT_SMS_QUEUE);
    }

    // 2.定義交換機
    @Bean
    FanoutExchange fanoutExchange() {
        return new FanoutExchange(EXCHANGE_NAME);
    }

    // 3.隊列與交換機綁定郵件隊列
    @Bean
    Binding bindingExchangeEamil(Queue fanOutEamilQueue, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(fanOutEamilQueue).to(fanoutExchange);
    }

    // 4.隊列與交換機綁定短信隊列
    @Bean
    Binding bindingExchangeSms(Queue fanOutSmsQueue, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(fanOutSmsQueue).to(fanoutExchange);
    }

    /**
     * 建立配置死信郵件隊列
     * 
     * @return
     */
    @Bean
    public Queue deadQueue() {
        Queue queue = new Queue(deadQueueName, true);
        return queue;
    }
   /*
    * 建立死信交換機
    */
    @Bean
    public DirectExchange deadExchange() {
        return new DirectExchange(deadExchangeName);
    }
   /*
    * 死信隊列與死信交換機綁定
    */
    @Bean
    public Binding bindingDeadExchange(Queue deadQueue, DirectExchange deadExchange) {
        return BindingBuilder.bind(deadQueue).to(deadExchange).with(deadRoutingKey);
    }

}
複製代碼

 

 生產者  timestamp 設置爲0 

複製代碼
package com.itmayiedu.rabbitmq;

import java.util.UUID;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import com.alibaba.fastjson.JSONObject;

@Component
public class FanoutProducer {
    @Autowired
    private AmqpTemplate amqpTemplate;

    public void send(String queueName) {
        JSONObject jsonObject = new JSONObject();
        jsonObject.put("email", "xx@163.com");
        jsonObject.put("timestamp", 0);
        String jsonString = jsonObject.toJSONString();
        System.out.println("jsonString:" + jsonString);
        // 設置消息惟一id 保證每次重試消息id惟一  
        Message message = MessageBuilder.withBody(jsonString.getBytes())
                .setContentType(MessageProperties.CONTENT_TYPE_JSON).setContentEncoding("utf-8")
                .setMessageId(UUID.randomUUID() + "").build(); //消息id設置在請求頭裏面 用UUID作全局ID 
        amqpTemplate.convertAndSend(queueName, message);
    }
}
複製代碼

 

 

 此時的消費者:

複製代碼
@RabbitListener(queues = "fanout_email_queue")
    public void process(Message message, @Headers Map<String, Object> headers, Channel channel) throws Exception {
        // 獲取消息Id
        String messageId = message.getMessageProperties().getMessageId();
        String msg = new String(message.getBody(), "UTF-8");
        System.out.println("郵件消費者獲取生產者消息msg:"+msg+",消息id"+messageId);
        
        JSONObject jsonObject = JSONObject.parseObject(msg);
        Integer timestamp = jsonObject.getInteger("timestamp");
        
        try {
            int result  = 1/timestamp;
            System.out.println("result"+result);
            // // 手動ack
            Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
            // 手動簽收
            channel.basicAck(deliveryTag, false);
        } catch (Exception e) {
            //拒絕消費消息(丟失消息) 給死信隊列
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
        }
        
        System.out.println("執行結束....");
    }
複製代碼

 

 異常情況:

 

 

添加死信隊列的消費者,並啓動後:

 

複製代碼
package com.itmayiedu.rabbitmq;

import java.util.Map;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.stereotype.Component;

import com.rabbitmq.client.Channel;

//死信隊列
@Component
public class FanoutDeadEamilConsumer {    
    
    @RabbitListener(queues = "dead_queue")
    public void process(Message message, @Headers Map<String, Object> headers, Channel channel) throws Exception {
        // 獲取消息Id
        String messageId = message.getMessageProperties().getMessageId();
        String msg = new String(message.getBody(), "UTF-8");
        System.out.println("死信郵件消費者獲取生產者消息msg:"+msg+",消息id"+messageId);
        // // 手動ack
        Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
       // 手動簽收
       channel.basicAck(deliveryTag, false);
        
        System.out.println("執行結束....");
    }    
    
}
複製代碼

 

 

 
分類:  RabbitMQ
 
好文要頂  關注我  收藏該文   
0
0
 
 
 
« 上一篇: 自動補償機制(消費者)及冪等問題
» 下一篇: RabbitMQ解決分佈式事務
posted @  2019-01-18 16:17 toov5 閱讀(2811) 評論(0) 編輯 收藏
 
 
 
相關文章
相關標籤/搜索