一、引入依賴java
參考跟前一篇代碼。spring
二、配置文件添加配置項ui
參考跟前一篇代碼。設計
三、配置死信隊列code
import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.FanoutExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; import java.util.Map; @Configuration public class MQDeadConfig { public static final String MOON_FANOUT_EXCHANGE = "moon.fanout.exchange"; public static final String MOON_FANOUT_QUEUE = "moon.fanout.queue"; @Bean public FanoutExchange moonFanoutExchange() { return new FanoutExchange(MOON_FANOUT_EXCHANGE); } /** * 死信隊列設計思路 * 生產者 --> 消息 --> 交換機 --> 隊列 --> 變成死信 --> DLX交換機 -->隊列 --> 消費者 * * 進入死信隊列: * 1. 消息被拒絕,而且requeue = false * 2. 消息ttl過時 * 3. 隊列達到最大的長度 * * @return */ @Bean public Queue moonFanoutQueue() { Map<String, Object> args = new HashMap<>(3); // 聲明死信交換器 args.put("x-dead-letter-exchange", MOON_DEAD_FANOUT_EXCHANGE); // 聲明死信路由鍵 args.put("x-dead-letter-routing-key", "DelayKey"); // 聲明隊列消息過時時間,若是超過這個時間尚未消費則發送到死信交換機 //args.put("x-message-ttl", 10000); return new Queue(MOON_FANOUT_QUEUE, true, false, false, args); } @Bean public Binding bindExchangeAndQueue() { return BindingBuilder.bind(moonFanoutQueue()).to(moonFanoutExchange()); } /************ 如下是死信交換機和隊列配置信息 ************/ // 死信交換機 public static final String MOON_DEAD_FANOUT_EXCHANGE = "moon.dead.fanout.exchange"; // 死信交換機綁定的queue public static final String MOON_DEAD_FANOUT_QUEUE = "moon.dead.fanout.queue"; @Bean public FanoutExchange moonDeadFanoutExchange() { return new FanoutExchange(MOON_DEAD_FANOUT_EXCHANGE); } @Bean public Queue moonDeadFanoutQueue() { return new Queue(MOON_DEAD_FANOUT_QUEUE); } @Bean public Binding bindDeadExchangeAndQueue() { return BindingBuilder.bind(moonDeadFanoutQueue()).to(moonDeadFanoutExchange()); }
四、正常消費代碼rabbitmq
參考跟前一篇代碼。隊列
五、死信隊列消費代碼utf-8
import com.moon.democonsumer.config.MQDeadConfig; import com.rabbitmq.client.Channel; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; /** * 消費綁定了死信exchange的queue裏的數據 */ @Slf4j @Component public class MQDeadConsumer { @RabbitListener(queues = MQDeadConfig.MOON_DEAD_FANOUT_QUEUE) public void onMessage(Message message, Channel channel) { log.info("***** 死信隊列收到信息 處理開始 *****"); try { String body = new String(message.getBody(), "utf-8"); log.info("***** 死信隊列收到信息, body={}", body); // 確認收到消息,false只確認當前consumer一個消息收到,true確認全部consumer得到的消息 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch (Exception e) { log.error("死信隊列收到信息處理異常", e); } log.info("***** 死信隊列收到信息 處理結束 *****"); } }