本文 SpringBoot 與 RabbitMQ 進行整合的時候,包含了三種消息的確認模式,若是查詢詳細的確認模式設置,請閱讀:RabbitMQ的三種消息確認模式
同時消費端也採起了限流的措施,若是對限流細節有興趣請參照以前的文章閱讀:消費端限流
java
首先引入 maven 依賴spring
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> <version>2.1.4.RELEASE</version> </dependency>
Application.properties 中進行設置,開啓 confirm 確認機制,開啓 return 確認模式,設置 mandatory
屬性 爲 true,當設置爲 true 的時候,路由不到隊列的消息不會被自動刪除,從而才能夠被 return 消息模式監聽到。json
spring.rabbitmq.host=localhost spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest spring.rabbitmq.virtual-host=/ spring.rabbitmq.connection-timeout=15000 #開啓 confirm 確認機制 spring.rabbitmq.publisher-confirms=true #開啓 return 確認機制 spring.rabbitmq.publisher-returns=true #設置爲 true 後 消費者在消息沒有被路由到合適隊列狀況下會被return監聽,而不會自動刪除 spring.rabbitmq.template.mandatory=true
建立隊列和交換機,此處不該該建立 ConnectionFactory 和 RabbitAdmin,應該在 application.properties 中設置用戶名、密碼、host、端口、虛擬主機便可。併發
import org.springframework.amqp.core.Exchange; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.TopicExchange; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class MQConfig { // @Bean // public ConnectionFactory connectionFactory(){ // return new CachingConnectionFactory(); // } // // @Bean // public RabbitAdmin rabbitAdmin(){ // return new RabbitAdmin(connectionFactory()); // } @Bean public Exchange bootExchange(){ return new TopicExchange("BOOT-EXCHANGE-1", true, false); } @Bean public Queue bootQueue(){ return new Queue("boot.queue1", true); } }
若是程序有特殊的設置要求,追求更靈活的設置能夠參考如下方式進行編碼設置,從而不用在application.properties 指定。例如咱們在測試環境和生產環境中配置的虛擬主機、密碼不一樣、咱們能夠在程序中判斷處於哪一種環境,靈活切換設置。app
@Bean public ConnectionFactory connectionFactory(){ CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); if("生產環境"){ connectionFactory.set..... } else { ...... } connectionFactory.setVirtualHost("/"); connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); return connectionFactory; } @Bean public RabbitAdmin rabbitAdmin(){ RabbitAdmin rabbitAdmin = new RabbitAdmin(); rabbitAdmin.setAutoStartup(true); return new RabbitAdmin(connectionFactory()); }
MQSender代碼以下,包含發送消息以及添加 confirm 監聽、添加 return 監聽。若是消費端要設置爲手工 ACK ,那麼生產端發送消息的時候必定發送 correlationData ,而且全局惟一,用以惟一標識消息。maven
import com.anqi.mq.bean.User; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageProperties; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.util.Date; import java.util.Map; @Component public class MQSender { @Autowired private RabbitTemplate rabbitTemplate; final RabbitTemplate.ConfirmCallback confirmCallback= new RabbitTemplate.ConfirmCallback() { public void confirm(CorrelationData correlationData, boolean ack, String cause) { System.out.println("correlationData: " + correlationData); System.out.println("ack: " + ack); if(!ack){ System.out.println("異常處理...."); } } }; final RabbitTemplate.ReturnCallback returnCallback = new RabbitTemplate.ReturnCallback() { public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { System.out.println("return exchange: " + exchange + ", routingKey: " + routingKey + ", replyCode: " + replyCode + ", replyText: " + replyText); } }; //發送消息方法調用: 構建Message消息 public void send(Object message, Map<String, Object> properties) throws Exception { MessageProperties mp = new MessageProperties(); //在生產環境中這裏不用Message,而是使用 fastJson 等工具將對象轉換爲 json 格式發送 Message msg = new Message(message.toString().getBytes(),mp); rabbitTemplate.setMandatory(true); rabbitTemplate.setConfirmCallback(confirmCallback); rabbitTemplate.setReturnCallback(returnCallback); //id + 時間戳 全局惟一 CorrelationData correlationData = new CorrelationData("1234567890"+new Date()); rabbitTemplate.convertAndSend("BOOT-EXCHANGE-1", "boot.save", msg, correlationData); } //發送消息方法調用: 構建Message消息 public void sendUser(User user) throws Exception { rabbitTemplate.setMandatory(true); rabbitTemplate.setConfirmCallback(confirmCallback); rabbitTemplate.setReturnCallback(returnCallback); //id + 時間戳 全局惟一 CorrelationData correlationData = new CorrelationData("1234567890"+new Date()); rabbitTemplate.convertAndSend("BOOT-EXCHANGE-1", "boot.save", user, correlationData); } }
在實際生產環境中,生產端和消費端通常都是兩個系統,咱們在此也將拆分紅兩個項目。spring-boot
如下爲消費端的 application.properties 中的配置,首先配置手工確認模式,用於 ACK 的手工處理,這樣咱們能夠保證消息的可靠性送達,或者在消費端消費失敗的時候能夠作到重回隊列、根據業務記錄日誌等處理。咱們也能夠設置消費端的監聽個數和最大個數,用於控制消費端的併發狀況。咱們要開啓限流,指定每次處理消息最多隻能處理兩條消息。工具
spring.rabbitmq.host=localhost spring.rabbitmq.virtual-host=/ spring.rabbitmq.username=guest spring.rabbitmq.password=guest #設置消費端手動 ack spring.rabbitmq.listener.simple.acknowledge-mode=manual #消費者最小數量 spring.rabbitmq.listener.simple.concurrency=1 #消費之最大數量 spring.rabbitmq.listener.simple.max-concurrency=10 #在單個請求中處理的消息個數,他應該大於等於事務數量(unack的最大數量) spring.rabbitmq.listener.simple.prefetch=2
咱們可使用 @RabbitListener
和@RabblitHandler
組合來監聽隊列,固然@RabbitListener
也能夠加在方法上。咱們這裏是建立了兩個方法用來監聽同一個隊列,具體調用哪一個方法是經過匹配方法的入參來決定的,自定義類型的消息須要標註@Payload
,類要實現序列化接口。單元測試
package com.anqi.mq.receiver; import com.anqi.mq.bean.User; import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.*; import org.springframework.amqp.support.AmqpHeaders; import org.springframework.messaging.handler.annotation.Headers; import org.springframework.messaging.handler.annotation.Payload; import org.springframework.stereotype.Component; import java.io.IOException; import java.util.Map; @RabbitListener( bindings = @QueueBinding( value = @Queue(value = "boot.queue1", durable = "true"), exchange = @Exchange(value = "BOOT-EXCHANGE-1", type = "topic", durable = "true", ignoreDeclarationExceptions = "true"), key = "boot.*" ) ) @Component public class MQReceiver { @RabbitHandler public void onMessage(Message message, Channel channel) throws IOException { try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } long deliveryTag = message.getMessageProperties().getDeliveryTag(); //手工ack channel.basicAck(deliveryTag,true); System.out.println("receive--1: " + new String(message.getBody())); } @RabbitHandler public void onUserMessage(@Payload User user, Channel channel, @Headers Map<String,Object> headers) throws IOException { try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } long deliveryTag = (Long)headers.get(AmqpHeaders.DELIVERY_TAG); //手工ack channel.basicAck(deliveryTag,true); System.out.println("receive--11: " + user.toString()); } }
消息的序列化與反序列化由內部轉換器完成,若是咱們要採用其餘類型的消息轉換器,咱們能夠對其進行設置SimpleMessageListenerContainer
。
@Bean public SimpleMessageListenerContainer simpleMessageListenerContainer(){ SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory()); container.setMessageConverter(new Jackson2JsonMessageConverter()); // 默認採用下面的這種轉換器 // container.setMessageConverter(new SimpleMessageConverter()); return container; }
單元測試類
import com.anqi.mq.bean.User; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; @SpringBootTest @RunWith(SpringRunner.class) public class MQSenderTest { @Autowired private MQSender mqSender; @Test public void send() { String msg = "hello spring boot"; try { for (int i = 0; i < 15; i++) { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } //mqSender.send(msg + ":" + i, null); mqSender.sendUser(new User("anqi", 25)); } } catch (Exception e) { e.printStackTrace(); } } }
測試結果以下,咱們在消費方法使用了Thread.sleep(5000)
來模擬消息的處理過程,故意的延長了消息的處理時間,從而更好的觀察限流效果。咱們能夠發現Unacked
一直是 2, 表明正在處理的消息數量爲 2,這與咱們限流的數量一致,說明了限流的目的已經實現。