RabbitMQ-05-消費消息 有死信隊列

一、引入依賴java

參考跟前一篇代碼。spring

二、配置文件添加配置項ui

參考跟前一篇代碼。設計

三、配置死信隊列code

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class MQDeadConfig {

    public static final String MOON_FANOUT_EXCHANGE = "moon.fanout.exchange";
    public static final String MOON_FANOUT_QUEUE = "moon.fanout.queue";

    @Bean
    public FanoutExchange moonFanoutExchange() {
        return new FanoutExchange(MOON_FANOUT_EXCHANGE);
    }

    /**
     * 死信隊列設計思路
     *  生產者 --> 消息 --> 交換機 --> 隊列 --> 變成死信 --> DLX交換機 -->隊列 --> 消費者
     *
     * 進入死信隊列:
     * 1. 消息被拒絕,而且requeue = false
     * 2. 消息ttl過時
     * 3. 隊列達到最大的長度
     *
     * @return
     */
    @Bean
    public Queue moonFanoutQueue() {
        Map<String, Object> args = new HashMap<>(3);
        // 聲明死信交換器
        args.put("x-dead-letter-exchange", MOON_DEAD_FANOUT_EXCHANGE);
        // 聲明死信路由鍵
        args.put("x-dead-letter-routing-key", "DelayKey");
        // 聲明隊列消息過時時間,若是超過這個時間尚未消費則發送到死信交換機
        //args.put("x-message-ttl", 10000);
        return new Queue(MOON_FANOUT_QUEUE, true, false, false, args);
    }

    @Bean
    public Binding bindExchangeAndQueue() {
        return BindingBuilder.bind(moonFanoutQueue()).to(moonFanoutExchange());
    }

    /************ 如下是死信交換機和隊列配置信息 ************/

    // 死信交換機
    public static final String MOON_DEAD_FANOUT_EXCHANGE = "moon.dead.fanout.exchange";
    // 死信交換機綁定的queue
    public static final String MOON_DEAD_FANOUT_QUEUE = "moon.dead.fanout.queue";

    @Bean
    public FanoutExchange moonDeadFanoutExchange() {
        return new FanoutExchange(MOON_DEAD_FANOUT_EXCHANGE);
    }

    @Bean
    public Queue moonDeadFanoutQueue() {
        return new Queue(MOON_DEAD_FANOUT_QUEUE);
    }

    @Bean
    public Binding bindDeadExchangeAndQueue() {
        return BindingBuilder.bind(moonDeadFanoutQueue()).to(moonDeadFanoutExchange());
    }

四、正常消費代碼rabbitmq

參考跟前一篇代碼。隊列

五、死信隊列消費代碼utf-8

import com.moon.democonsumer.config.MQDeadConfig;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * 消費綁定了死信exchange的queue裏的數據
 */
@Slf4j
@Component
public class MQDeadConsumer {

    @RabbitListener(queues = MQDeadConfig.MOON_DEAD_FANOUT_QUEUE)
    public void onMessage(Message message, Channel channel) {
        log.info("***** 死信隊列收到信息 處理開始 *****");
        try {
            String body = new String(message.getBody(), "utf-8");
            log.info("***** 死信隊列收到信息, body={}", body);

            // 確認收到消息,false只確認當前consumer一個消息收到,true確認全部consumer得到的消息
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (Exception e) {
            log.error("死信隊列收到信息處理異常", e);
        }
        log.info("***** 死信隊列收到信息 處理結束 *****");
    }
}
相關文章
相關標籤/搜索