Spring Boot RabbitMQ - 優先級隊列

Docker With RabbitMQ

官方 Docker 鏡像倉庫地址html

本地運行 RabbitMQspring

docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management

訪問可視化面板docker

Spring Boot With RabbitMQ

Spring Boot 集成 RabbitMQide

<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

基本參數配置spring-boot

# host & port
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672

Queue / Exchange / Routing 配置fetch

/**
 * RabbitMQ 配置
 */
@Configuration
public class RabbitMQConfig {

    private static final String EXCHANGE = "priority-exchange";

    public static final String QUEUE = "priority-queue";

    private static final String ROUTING_KEY = "priority.queue.#";

    /**
     * 定義優先級隊列
     */
    @Bean
    Queue queue() {
        Map<String, Object> args= new HashMap<>();
        args.put("x-max-priority", 100);
        return new Queue(QUEUE, false, false, false, args);
    }

    /**
     * 定義交換器
     */
    @Bean
    TopicExchange exchange() {
        return new TopicExchange(EXCHANGE);
    }

    @Bean
    Binding binding(Queue queue, TopicExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY);
    }

}
priority queue 定義參考官方文檔: https://www.rabbitmq.com/priority.html

Spring Boot 應用啓動後,會自動建立 Queue 和 Exchange ,並相互綁定,優先級隊列會有如圖所示標識。

RabbitMQ Publisher

Spring Boot 相關配置ui

# 是否開啓消息發送到交換器(Exchange)後觸發回調
spring.rabbitmq.publisher-confirms=false
# 是否開啓消息發送到隊列(Queue)後觸發回調
spring.rabbitmq.publisher-returns=false
# 消息發送失敗重試相關配置
spring.rabbitmq.template.retry.enabled=true
spring.rabbitmq.template.retry.initial-interval=3000ms
spring.rabbitmq.template.retry.max-attempts=3
spring.rabbitmq.template.retry.max-interval=10000ms
spring.rabbitmq.template.retry.multiplier=1

發送消息編碼

@Component
@AllArgsConstructor
public class FileMessageSender {

    private static final String EXCHANGE = "priority-exchange";

    private static final String ROUTING_KEY_PREFIX = "priority.queue.";

    private final RabbitTemplate rabbitTemplate;

    /**
     * 發送設置有優先級的消息
     *
     * @param priority 優先級
     */
    public void sendPriorityMessage(String content, Integer priority) {
        rabbitTemplate.convertAndSend(EXCHANGE, ROUTING_KEY_PREFIX + "test", content,
                message -> {
                    message.getMessageProperties().setPriority(priority);
                    return message;
                });
    }

}

RabbitMQ Consumer

Spring Boot 相關配置spa

# 消息接收確認,可選模式:NONE(不確認)、AUTO(自動確認)、MANUAL(手動確認)
spring.rabbitmq.listener.simple.acknowledge-mode=AUTO
# 最小線程數量
spring.rabbitmq.listener.simple.concurrency=10
# 最大線程數量
spring.rabbitmq.listener.simple.max-concurrency=10
# 每一個消費者可能未完成的最大未確認消息數量
spring.rabbitmq.listener.simple.prefetch=1
消費者執行耗時較長的話,建議 spring.rabbitmq.listener.simple.prefetch 設置爲較小數值,讓優先級越高的消息更快加入到消費者線程。

監聽消息線程

@Slf4j
@Component
public class MessageListener {

    /**
     * 處理消息
     */
    @RabbitListener(queues = "priority-queue")
    public void listen(String message) {
        log.info(message);
    }

}

番外補充

一、自定義消息發送確認的回調

  • 配置以下:
# 開啓消息發送到交換器(Exchange)後觸發回調
spring.rabbitmq.publisher-confirms=true
# 開啓消息發送到隊列(Queue)後觸發回調
spring.rabbitmq.publisher-returns=true
  • 自定義 RabbitTemplate.ConfirmCallback 實現類
@Slf4j
public class RabbitConfirmCallBack implements RabbitTemplate.ConfirmCallback{

    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        log.info("消息惟一標識: {}", correlationData);
        log.info("確認狀態: {}", ack);
        log.info("形成緣由: {}", cause);
    }

}
  • 自定義 RabbitTemplate.ConfirmCallback 實現類
@Slf4j
public class RabbitReturnCallback implements RabbitTemplate.ReturnCallback {

    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        log.info("消息主體: {}", message);
        log.info("回覆編碼: {}", replyCode);
        log.info("回覆內容: {}", replyText);
        log.info("交換器: {}", exchange);
        log.info("路由鍵: {}", routingKey);
    }

}
  • 配置 rabbitTemplate
@Component
@AllArgsConstructor
public class RabbitTemplateInitializingBean implements InitializingBean {

    private final RabbitTemplate rabbitTemplate;

    @Override
    public void afterPropertiesSet() {
        rabbitTemplate.setConfirmCallback(new RabbitConfirmCallBack());
        rabbitTemplate.setReturnCallback(new RabbitReturnCallback());
    }
    
}

二、RabbitMQ Exchange 類型

相關文章
相關標籤/搜索