消息確認機制
- 發送端確認 生產者能知道本身的消息是否成功發送到交換機或者隊列
- 消費端確認 消費者成功消費消息後,發送確認標識使消息從MQ中刪除
如何判斷消息發送成功或失敗 ?
- 確認消息不能路由到任何隊列時,確認發送失敗
- 消息能夠路由到隊列時,當須要發送的隊列都發送成功後,進行消息確認成功.對於持久化的隊列,意味着已經寫入磁盤,對於鏡像隊列,意味着全部鏡像都接受成功.
消費端如何告知rabbitmq消息消費成功或失敗?
- 自動確認會在消息發送給消費者後當即確認,但存在丟失消息的可能,若是消費端消費邏輯拋出異常,也就是消費端沒有處理成功這條消息,那麼就至關於丟失了消息
- 若是手動確認則當消費者調用 ack、nack、reject 幾種方法進行確認,手動確承認以在業務失敗後進行一些操做,若是消息未被 ACK 則會發送到下一個消費者
- 若是某個服務忘記 ACK 了,則 RabbitMQ 不會再發送數據給它,由於 RabbitMQ 認爲該服務的處理能力有限
發送端確認實例
spring.rabbitmq.publisher-confirms=true
spring.rabbitmq.publisher-returns=true
複製代碼
- 建立兩個監聽類分別實現RabbitTemplate的ConfirmCallback和ReturnCallback接口 實現ConfirmCallback接口,當消息發送到交換機的回調 實現ReturnCallback接口,當消息路由不到指定隊列時回調
消息發送到交換機監聽類
@Slf4j
@Component
public class SendConfirmCallback implements RabbitTemplate.ConfirmCallback {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack) {
log.info("Success... 消息成功發送到交換機! correlationData:{}", correlationData);
} else {
log.info("Fail... 消息發送到交換機失敗! correlationData:{}", correlationData);
}
}
}
/**
* 消息未路由到隊列監聽類
* @author by peng
* @date in 2019-06-01 21:32
*/
@Slf4j
@Component
public class SendReturnCallback implements RabbitTemplate.ReturnCallback {
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
log.error("Fail... message:{},從交換機exchange:{},以路由鍵routingKey:{}," +
"未找到匹配隊列,replyCode:{},replyText:{}",
message, exchange, routingKey, replyCode, replyText);
}
}
複製代碼
- 從新注入RabbitTemplate,並設置兩個監聽類
@Configuration
public class RabbitConfig {
@Bean
RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMandatory(true);
rabbitTemplate.setConfirmCallback( new SendConfirmCallback());
rabbitTemplate.setReturnCallback( new SendReturnCallback());
return rabbitTemplate;
}
}
複製代碼
生產者
@Component
public class Sender {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendConfirmSuccess() {
String content = "Message sent to exist exchange!";
this.rabbitTemplate.convertAndSend("directConfirmExchange", "exist", content);
System.out.println("########### SendConfirmSuccess : " + content);
}
public void sendConfirmError() {
String content = "Message sent to not exist exchange!";
this.rabbitTemplate.convertAndSend("notExistExchange", "exist", content);
System.out.println("########### SendConfirmError : " + content);
}
public void sendReturn() {
String content = "Message sent to exist exchange! But no queue to routing to";
this.rabbitTemplate.convertAndSend("directConfirmExchange", "not-exist", content);
System.out.println("########### SendWReturn : " + content);
}
}
// 消費者
@Component
@RabbitListener(queues = "existQueue")
public class Receiver {
@RabbitHandler
public void process(String message) {
System.out.println("########### Receiver Msg:" + message);
}
}
複製代碼
@RunWith(value=SpringJUnit4ClassRunner.class)
@SpringBootTest
public class ConfirmTest {
@Autowired
private Sender sender;
@Test
public void sendConfirmSuccess() {
sender.sendConfirmSuccess();
結果:成功發送並消費了消息,並輸出監聽日誌
Success... 消息成功發送到交換機! correlationData:null
}
@Test
public void sendConfirmError() {
sender.sendConfirmError();
結果:消息發送失敗,並輸入監聽日誌
Fail... 消息發送到交換機失敗! correlationData:null
Channel shutdown: channel error; protocol method:
}
@Test
public void sendReturn() {
sender.sendReturn();
結果:消息發送到交換機,但路由不到隊列(不存在隊列匹配路由鍵)
Success... 消息成功發送到交換機! correlationData:null
Fail... message:(Body:'Message sent to exist exchange! But no queue to routing to' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0]),從交換機exchange:directConfirmExchange,以路由鍵routingKey:not-exist,未找到匹配隊列,replyCode:312,replyText:NO_ROUTE
}
}
複製代碼
消費端確認
spring.rabbitmq.listener.direct.acknowledge-mode=manual
spring.rabbitmq.listener.simple.acknowledge-mode=manual
複製代碼
@Component
@RabbitListener(queues = "existQueue")
public class AckReceiver {
@RabbitHandler
public void process(String content, Channel channel, Message message) {
try {
System.out.println("########### message:" + message);
// 業務處理成功後調用,消息會被確認消費
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
// 業務處理失敗後調用
//channel.basicNack(message.getMessageProperties().getDeliveryTag(),false, true);
//channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
} catch (IOException e) {
e.printStackTrace();
}
System.out.println("########### Receiver Msg:" + content);
}
}
複製代碼