RabbitMQ-04-消費消息

一、引入依賴web

<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-web</artifactId>
</dependency>

<dependency>
	<groupId>org.projectlombok</groupId>
	<artifactId>lombok</artifactId>
	<optional>true</optional>
</dependency>

<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

<dependency>
	<groupId>com.alibaba</groupId>
	<artifactId>fastjson</artifactId>
	<version>1.2.62</version>
</dependency>

二、配置參數spring

spring.rabbitmq.addresses=10.10.60.65
spring.rabbitmq.port=5672
spring.rabbitmq.virtual-host=/
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

# 消費手動確認
spring.rabbitmq.listener.simple.acknowledge-mode=manual

三、配置消費隊列json

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class MQConfig {

    public static final String MOON_FANOUT_EXCHANGE = "moon.fanout.exchange";
    public static final String MOON_FANOUT_QUEUE = "moon.fanout.queue";

    @Bean
    public FanoutExchange moonFanoutExchange() {
        return new FanoutExchange(MOON_FANOUT_EXCHANGE);
    }

    @Bean
    public Queue moonFanoutQueue() {
        return new Queue(MOON_FANOUT_QUEUE);
    }

    @Bean
    public Binding bindExchangeAndQueue() {
        return BindingBuilder.bind(moonFanoutQueue()).to(moonFanoutExchange());
    }
    
}

四、具體消費代碼spring-boot

模擬消費消息處理失敗,若是第一次失敗放回隊列尾部,再次處理失敗,則放入死信隊列(若是配置,下個小結添加死信隊列配置)ui

import com.alibaba.fastjson.JSON;
import com.moon.democonsumer.config.MQConfig;
import com.moon.democonsumer.entity.User;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class MQConsumer {

    /**
     * 拋出異常的數據,會放到消息的隊尾
     * @param message
     * @param channel
     * @throws Exception
     */
    @RabbitListener(queues = MQConfig.MOON_FANOUT_QUEUE)
    //public void onMessage(User user, Message message, Channel channel) throws Exception {
    public void onMessage(Message message, Channel channel) throws Exception {
        log.info("***** MQConsumer onMessage 開始 *****");
        User user = null;
        try {
            String body = new String(message.getBody(), "utf-8");
            log.info("***** MQConsumer onMessage 接收到消息, body={}", body);
            user = JSON.parseObject(body, User.class);

            log.info("***** MQConsumer onMessage 接收到消息 name = {}", user.getName());
            // 模擬執行任務
            Thread.sleep(1000);

            if (user.getId() == 1) {
                throw new RuntimeException("id=1拋出異常");
            }

            // 確認收到消息,false只確認當前consumer一個消息收到,true確認全部consumer得到的消息
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (Exception e) {
            //e.printStackTrace();
            if (message.getMessageProperties().getRedelivered()) {
                log.info("***** MQConsumer onMessage 消息已重複處理失敗,拒絕再次接收 name = {}", user.getName());
                // 拒絕消息,requeue=false 表示再也不從新入隊,若是配置了死信隊列則進入死信隊列
                channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
            } else {
                log.info("***** MQConsumer onMessage 消息即將再次返回隊列處理 name = {}", user.getName());
                // requeue爲是否從新回到隊列,true從新入隊
                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
            }
        }
        log.info("***** MQConsumer onMessage 結束 *****");
    }
}
相關文章
相關標籤/搜索