前面學習了 RabbitMQ 基礎,如今主要記錄下學習 Spring Boot 整合 RabbitMQ ,調用它的 API ,以及中間使用的相關功能的記錄。css
相關的能夠去個人博客/RabbitMQjava
我這裏測試都是使用的是 topic
交換器,Spring Boot 2.0.0, jdk 1.8git
Spring Boot 版本 2.0.0
在 pom.xml
文件中引入 AMQP 的依賴github
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
在系統配置文件中加入鏈接屬性spring
spring:
application:
name: RabbitMQ-Demo
rabbitmq:
host: k.wuwii.com
port: 5672
username: kronchan
password: 123456
#virtual-host: test
publisher-confirms: true # 開啓確認消息是否到達交換器,須要設置 true
publisher-returns: true # 開啓確認消息是否到達隊列,須要設置 true
新增一個消費者類:springboot
@Log
public class MessageReceiver implements ChannelAwareMessageListener {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
try {
byte[] body = message.getBody();
log.info(">>>>>>> receive: " + new String(body));
} finally {
// 確認成功消費,不然消息會轉發給其餘的消費者,或者進行重試
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
}
新增 RabbitMQ 的配置類,主要是對消費者的隊列,交換器,路由鍵的一些設置:markdown
@Configuration
public class RabbitMQConfig {
public final static String QUEUE_NAME = "springboot.demo.test1";
public final static String ROUTING_KEY = "route-key";
public final static String EXCHANGES_NAME = "demo-exchanges";
@Bean
public Queue queue() {
// 是否持久化
boolean durable = true;
// 僅建立者可使用的私有隊列,斷開後自動刪除
boolean exclusive = false;
// 當全部消費客戶端鏈接斷開後,是否自動刪除隊列
boolean autoDelete = false;
return new Queue(QUEUE_NAME, durable, exclusive, autoDelete);
}
/** * 設置交換器,這裏我使用的是 topic exchange */
@Bean
public TopicExchange exchange() {
// 是否持久化
boolean durable = true;
// 當全部消費客戶端鏈接斷開後,是否自動刪除隊列
boolean autoDelete = false;
return new TopicExchange(EXCHANGES_NAME, durable, autoDelete);
}
/** * 綁定路由 */
@Bean
public Binding binding(Queue queue, TopicExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY);
}
@Bean
public SimpleMessageListenerContainer container(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames(QUEUE_NAME);
container.setMessageListener(receiver());
//container.setMaxConcurrentConsumers(1);
//container.setConcurrentConsumers(1); 默認爲1
//container.setExposeListenerChannel(true);
container.setAcknowledgeMode(AcknowledgeMode.MANUAL); // 設置爲手動,默認爲 AUTO,若是設置了手動應答 basicack,就要設置manual
return container;
}
@Bean
public MessageReceiver receiver() {
return new MessageReceiver();
}
}
@Component
public class MessageSender {
@Autowired
private RabbitTemplate rabbitTemplate;
/** * logger */
private static final Logger log = LoggerFactory.getLogger(MessageSender.class);
public void send() {
// public void convertAndSend(String exchange, String routingKey, final Object object, CorrelationData correlationData)
// exchange: 交換機名稱
// routingKey: 路由關鍵字
// object: 發送的消息內容
// correlationData:消息ID
CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());
// ConfirmListener是當消息沒法發送到Exchange被觸發,此時Ack爲False,這時cause包含發送失敗的緣由,例如exchange不存在時
// 須要在系統配置文件中設置 publisher-confirms: true
if (!rabbitTemplate.isConfirmListener()) {
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
if (ack) {
log.info(">>>>>>> 消息id:{} 發送成功", correlationData.getId());
} else {
log.info(">>>>>>> 消息id:{} 發送失敗", correlationData.getId());
}
});
}
// ReturnCallback 是在交換器沒法將路由鍵路由到任何一個隊列中,會觸發這個方法。
// 須要在系統配置文件中設置 publisher-returns: true
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
log.info("消息id:{} 發送失敗", message.getMessageProperties().getCorrelationId());
});
rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGES_NAME, RabbitMQConfig.ROUTING_KEY, ">>>>> Hello World", correlationId);
log.info("Already sent message.");
}
}
先啓動系統啓動類,消費者開始訂閱,啓動測試類發送消息。app
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringbootRabbitmqApplicationTests {
@Autowired
private MessageSender sender;
@Test
public void testReceiver() {
sender.send();
}
}
能夠在消費者接收到信息,而且發送端將打出日誌 成功發送消息的記錄,也能夠測試下 Publisher Confirms and Returns機制
主要是測試 ConfirmCallback
和 ReturnCallback
這兩個方法。
* ConfirmCallback
,確認消息是否到達交換器,例如咱們發送一個消息到一個你沒有建立過的 交換器上面去,看看狀況,
* ReturnCallback
,確認消息是否到達隊列,咱們能夠這樣測試,定義一個路由鍵,不會被任何隊列訂閱到,最後查看結果就能夠了。dom
學習源碼ide
跟文章第一步的配置同樣的。
@Component
@Log
public class MessageReceiver {
/** * 無返回消息的 * * @param message */
@RabbitListener(bindings = @QueueBinding(value = @Queue(value = Constant.QUEUE_NAME, durable = "true", exclusive = "false", autoDelete = "false"),
exchange = @Exchange(value = Constant.EXCHANGES_NAME, ignoreDeclarationExceptions = "true", type = ExchangeTypes.TOPIC, autoDelete = "false"),
key = Constant.ROUTING_KEY))
public void receive(byte[] message) {
log.info(">>>>>>>>>>> receive:" + new String(message));
}
/** * 設置有返回消息的 * 須要注意的是, * 1. 在消息的在生產者(發送消息端)必定要使用 SendAndReceive(……) 這種帶有 receive 的方法,不然會拋異常,不捕獲會死循環。 * 2. 該方法調用時會鎖定當前線程,而且有可能會形成MQ的性能降低或者服務端/客戶端出現死循環現象,請謹慎使用。 * * @param message * @return */
@RabbitListener(bindings = @QueueBinding(value = @Queue(value = Constant.QUEUE_NAME, durable = "true", exclusive = "false", autoDelete = "false"),
exchange = @Exchange(value = Constant.EXCHANGES_NAME, ignoreDeclarationExceptions = "true", type = ExchangeTypes.TOPIC, autoDelete = "false"),
key = Constant.ROUTING_REPLY_KEY))
public String receiveAndReply(byte[] message) {
log.info(">>>>>>>>>>> receive:" + new String(message));
return ">>>>>>>> I got the message";
}
}
主要是使用到 @RabbitListener
,雖然看起來參數不少,仔細的你會發現這個和寫配置類裏面的基本屬性是一摸同樣的,沒有任何區別。
須要注意的是我在這裏多作了個有返回值的消息,這個使用異常的話,會不斷重試消息,從而阻塞了線程。並且使用它的時候只能使用帶有 receive
的方法給它發送消息。
生產者沒什麼變化。
@Component
public class MessageSender implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
/** * logger */
private static final Logger log = LoggerFactory.getLogger(MessageSender.class);
private RabbitTemplate rabbitTemplate;
/** * 注入 RabbitTemplate */
@Autowired
public MessageSender(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
this.rabbitTemplate.setConfirmCallback(this);
this.rabbitTemplate.setReturnCallback(this);
}
/** * 測試無返回消息的 */
public void send() {
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
rabbitTemplate.convertAndSend(Constant.EXCHANGES_NAME, Constant.ROUTING_KEY, ">>>>>> Hello World".getBytes(), correlationData);
log.info(">>>>>>>>>> Already sent message");
}
/** * 測試有返回消息的,須要注意一些問題 */
public void sendAndReceive() {
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
Object o = rabbitTemplate.convertSendAndReceive(Constant.EXCHANGES_NAME, Constant.ROUTING_REPLY_KEY, ">>>>>>>> Hello World Second".getBytes(), correlationData);
log.info(">>>>>>>>>>> {}", Objects.toString(o));
}
/** * Confirmation callback. * * @param correlationData correlation data for the callback. * @param ack true for ack, false for nack * @param cause An optional cause, for nack, when available, otherwise null. */
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack) {
log.info(">>>>>>> 消息id:{} 發送成功", correlationData.getId());
} else {
log.info(">>>>>>> 消息id:{} 發送失敗", correlationData.getId());
}
}
/** * Returned message callback. * * @param message the returned message. * @param replyCode the reply code. * @param replyText the reply text. * @param exchange the exchange. * @param routingKey the routing key. */
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
log.info("消息id:{} 發送失敗", message.getMessageProperties().getCorrelationId());
}
}
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringbootAnnotationApplicationTests {
@Autowired
private MessageSender sender;
@Test
public void send() {
sender.send();
}
@Test
public void sendAndReceive() {
sender.sendAndReceive();
}
}