官方 Docker 鏡像倉庫地址html
本地運行 RabbitMQspring
docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management
訪問可視化面板docker
Spring Boot 集成 RabbitMQide
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
基本參數配置spring-boot
# host & port spring.rabbitmq.host=127.0.0.1 spring.rabbitmq.port=5672
Queue / Exchange / Routing 配置fetch
/** * RabbitMQ 配置 */ @Configuration public class RabbitMQConfig { private static final String EXCHANGE = "priority-exchange"; public static final String QUEUE = "priority-queue"; private static final String ROUTING_KEY = "priority.queue.#"; /** * 定義優先級隊列 */ @Bean Queue queue() { Map<String, Object> args= new HashMap<>(); args.put("x-max-priority", 100); return new Queue(QUEUE, false, false, false, args); } /** * 定義交換器 */ @Bean TopicExchange exchange() { return new TopicExchange(EXCHANGE); } @Bean Binding binding(Queue queue, TopicExchange exchange) { return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY); } }
priority queue 定義參考官方文檔: https://www.rabbitmq.com/priority.html
Spring Boot 應用啓動後,會自動建立 Queue 和 Exchange ,並相互綁定,優先級隊列會有如圖所示標識。
Spring Boot 相關配置ui
# 是否開啓消息發送到交換器(Exchange)後觸發回調 spring.rabbitmq.publisher-confirms=false # 是否開啓消息發送到隊列(Queue)後觸發回調 spring.rabbitmq.publisher-returns=false # 消息發送失敗重試相關配置 spring.rabbitmq.template.retry.enabled=true spring.rabbitmq.template.retry.initial-interval=3000ms spring.rabbitmq.template.retry.max-attempts=3 spring.rabbitmq.template.retry.max-interval=10000ms spring.rabbitmq.template.retry.multiplier=1
發送消息編碼
@Component @AllArgsConstructor public class FileMessageSender { private static final String EXCHANGE = "priority-exchange"; private static final String ROUTING_KEY_PREFIX = "priority.queue."; private final RabbitTemplate rabbitTemplate; /** * 發送設置有優先級的消息 * * @param priority 優先級 */ public void sendPriorityMessage(String content, Integer priority) { rabbitTemplate.convertAndSend(EXCHANGE, ROUTING_KEY_PREFIX + "test", content, message -> { message.getMessageProperties().setPriority(priority); return message; }); } }
Spring Boot 相關配置spa
# 消息接收確認,可選模式:NONE(不確認)、AUTO(自動確認)、MANUAL(手動確認) spring.rabbitmq.listener.simple.acknowledge-mode=AUTO # 最小線程數量 spring.rabbitmq.listener.simple.concurrency=10 # 最大線程數量 spring.rabbitmq.listener.simple.max-concurrency=10 # 每一個消費者可能未完成的最大未確認消息數量 spring.rabbitmq.listener.simple.prefetch=1
消費者執行耗時較長的話,建議
spring.rabbitmq.listener.simple.prefetch
設置爲較小數值,讓優先級越高的消息更快加入到消費者線程。
監聽消息線程
@Slf4j @Component public class MessageListener { /** * 處理消息 */ @RabbitListener(queues = "priority-queue") public void listen(String message) { log.info(message); } }
一、自定義消息發送確認的回調
# 開啓消息發送到交換器(Exchange)後觸發回調 spring.rabbitmq.publisher-confirms=true # 開啓消息發送到隊列(Queue)後觸發回調 spring.rabbitmq.publisher-returns=true
RabbitTemplate.ConfirmCallback
實現類@Slf4j public class RabbitConfirmCallBack implements RabbitTemplate.ConfirmCallback{ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { log.info("消息惟一標識: {}", correlationData); log.info("確認狀態: {}", ack); log.info("形成緣由: {}", cause); } }
RabbitTemplate.ConfirmCallback
實現類@Slf4j public class RabbitReturnCallback implements RabbitTemplate.ReturnCallback { @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { log.info("消息主體: {}", message); log.info("回覆編碼: {}", replyCode); log.info("回覆內容: {}", replyText); log.info("交換器: {}", exchange); log.info("路由鍵: {}", routingKey); } }
rabbitTemplate
@Component @AllArgsConstructor public class RabbitTemplateInitializingBean implements InitializingBean { private final RabbitTemplate rabbitTemplate; @Override public void afterPropertiesSet() { rabbitTemplate.setConfirmCallback(new RabbitConfirmCallBack()); rabbitTemplate.setReturnCallback(new RabbitReturnCallback()); } }
二、RabbitMQ Exchange 類型