一、引入依賴web
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.62</version> </dependency>
二、配置參數spring
spring.rabbitmq.addresses=10.10.60.65 spring.rabbitmq.port=5672 spring.rabbitmq.virtual-host=/ spring.rabbitmq.username=guest spring.rabbitmq.password=guest # 消費手動確認 spring.rabbitmq.listener.simple.acknowledge-mode=manual
三、配置消費隊列json
import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.FanoutExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class MQConfig { public static final String MOON_FANOUT_EXCHANGE = "moon.fanout.exchange"; public static final String MOON_FANOUT_QUEUE = "moon.fanout.queue"; @Bean public FanoutExchange moonFanoutExchange() { return new FanoutExchange(MOON_FANOUT_EXCHANGE); } @Bean public Queue moonFanoutQueue() { return new Queue(MOON_FANOUT_QUEUE); } @Bean public Binding bindExchangeAndQueue() { return BindingBuilder.bind(moonFanoutQueue()).to(moonFanoutExchange()); } }
四、具體消費代碼spring-boot
模擬消費消息處理失敗,若是第一次失敗放回隊列尾部,再次處理失敗,則放入死信隊列(若是配置,下個小結添加死信隊列配置)ui
import com.alibaba.fastjson.JSON; import com.moon.democonsumer.config.MQConfig; import com.moon.democonsumer.entity.User; import com.rabbitmq.client.Channel; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Slf4j @Component public class MQConsumer { /** * 拋出異常的數據,會放到消息的隊尾 * @param message * @param channel * @throws Exception */ @RabbitListener(queues = MQConfig.MOON_FANOUT_QUEUE) //public void onMessage(User user, Message message, Channel channel) throws Exception { public void onMessage(Message message, Channel channel) throws Exception { log.info("***** MQConsumer onMessage 開始 *****"); User user = null; try { String body = new String(message.getBody(), "utf-8"); log.info("***** MQConsumer onMessage 接收到消息, body={}", body); user = JSON.parseObject(body, User.class); log.info("***** MQConsumer onMessage 接收到消息 name = {}", user.getName()); // 模擬執行任務 Thread.sleep(1000); if (user.getId() == 1) { throw new RuntimeException("id=1拋出異常"); } // 確認收到消息,false只確認當前consumer一個消息收到,true確認全部consumer得到的消息 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch (Exception e) { //e.printStackTrace(); if (message.getMessageProperties().getRedelivered()) { log.info("***** MQConsumer onMessage 消息已重複處理失敗,拒絕再次接收 name = {}", user.getName()); // 拒絕消息,requeue=false 表示再也不從新入隊,若是配置了死信隊列則進入死信隊列 channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); } else { log.info("***** MQConsumer onMessage 消息即將再次返回隊列處理 name = {}", user.getName()); // requeue爲是否從新回到隊列,true從新入隊 channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); } } log.info("***** MQConsumer onMessage 結束 *****"); } }