SpringBoot 中使用RabbitMQ(二)消息確認

消息確認機制

  • 發送端確認 生產者能知道本身的消息是否成功發送到交換機或者隊列
  • 消費端確認 消費者成功消費消息後,發送確認標識使消息從MQ中刪除

如何判斷消息發送成功或失敗 ?

  • 確認消息不能路由到任何隊列時,確認發送失敗
  • 消息能夠路由到隊列時,當須要發送的隊列都發送成功後,進行消息確認成功.對於持久化的隊列,意味着已經寫入磁盤,對於鏡像隊列,意味着全部鏡像都接受成功.

消費端如何告知rabbitmq消息消費成功或失敗?

  • 自動確認會在消息發送給消費者後當即確認,但存在丟失消息的可能,若是消費端消費邏輯拋出異常,也就是消費端沒有處理成功這條消息,那麼就至關於丟失了消息
  • 若是手動確認則當消費者調用 ack、nack、reject 幾種方法進行確認,手動確承認以在業務失敗後進行一些操做,若是消息未被 ACK 則會發送到下一個消費者
  • 若是某個服務忘記 ACK 了,則 RabbitMQ 不會再發送數據給它,由於 RabbitMQ 認爲該服務的處理能力有限

發送端確認實例

  • 添加配置
# 消息發送到交換器確認
spring.rabbitmq.publisher-confirms=true
# 消息發送到隊列確認
spring.rabbitmq.publisher-returns=true
複製代碼
  • 建立兩個監聽類分別實現RabbitTemplate的ConfirmCallback和ReturnCallback接口 實現ConfirmCallback接口,當消息發送到交換機的回調 實現ReturnCallback接口,當消息路由不到指定隊列時回調
消息發送到交換機監聽類
@Slf4j
@Component
public class SendConfirmCallback implements RabbitTemplate.ConfirmCallback {
   
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if (ack) {
            log.info("Success... 消息成功發送到交換機! correlationData:{}", correlationData);
        } else {
            log.info("Fail... 消息發送到交換機失敗! correlationData:{}", correlationData);
        }
    }
}

/**
 * 消息未路由到隊列監聽類
 * @author by peng
 * @date in 2019-06-01 21:32
 */
@Slf4j
@Component
public class SendReturnCallback implements RabbitTemplate.ReturnCallback {
    
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        log.error("Fail... message:{},從交換機exchange:{},以路由鍵routingKey:{}," +
                        "未找到匹配隊列,replyCode:{},replyText:{}",
                message, exchange, routingKey, replyCode, replyText);
    }
}
複製代碼
  • 從新注入RabbitTemplate,並設置兩個監聽類
@Configuration
public class RabbitConfig {
    
    @Bean
    RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.setConfirmCallback( new SendConfirmCallback());
        rabbitTemplate.setReturnCallback( new SendReturnCallback());
        return rabbitTemplate;
    }
}
複製代碼
  • 定義生產者和消費者
生產者
@Component
public class Sender {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendConfirmSuccess() {
        String content = "Message sent to exist exchange!";
        this.rabbitTemplate.convertAndSend("directConfirmExchange", "exist", content);
        System.out.println("########### SendConfirmSuccess : " + content);
    }
    
    public void sendConfirmError() {
        String content = "Message sent to not exist exchange!";
        this.rabbitTemplate.convertAndSend("notExistExchange", "exist", content);
        System.out.println("########### SendConfirmError : " + content);
    }
    
    public void sendReturn() {
        String content = "Message sent to exist exchange! But no queue to routing to";
        this.rabbitTemplate.convertAndSend("directConfirmExchange", "not-exist", content);
        System.out.println("########### SendWReturn : " + content);
    }
   
}

// 消費者
@Component
@RabbitListener(queues = "existQueue")
public class Receiver {

    @RabbitHandler
    public void process(String message) {
        System.out.println("########### Receiver Msg:" + message);
    }
}
複製代碼
  • 測試類
@RunWith(value=SpringJUnit4ClassRunner.class)
@SpringBootTest
public class ConfirmTest {
    @Autowired
    private Sender sender;
    @Test
    public void sendConfirmSuccess() {
        sender.sendConfirmSuccess();
        
        結果:成功發送並消費了消息,並輸出監聽日誌
        ########### SendConfirmSuccess : Message sent to exist exchange!
        Success... 消息成功發送到交換機! correlationData:null
        ########### Receiver Msg:Message sent to exist exchange!
    }
    @Test
    public void sendConfirmError() {
        sender.sendConfirmError();
        結果:消息發送失敗,並輸入監聽日誌
        ########### SendConfirmError : Message sent to not exist exchange!
        Fail... 消息發送到交換機失敗! correlationData:null
        Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'notExistExchange' in vhost '/', class-id=60, method-id=40)
    }
    @Test
    public void sendReturn() {
        sender.sendReturn();
        結果:消息發送到交換機,但路由不到隊列(不存在隊列匹配路由鍵)
        ########### SendWReturn : Message sent to exist exchange! But no queue to routing to
        Success... 消息成功發送到交換機! correlationData:null
        Fail... message:(Body:'Message sent to exist exchange! But no queue to routing to' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0]),從交換機exchange:directConfirmExchange,以路由鍵routingKey:not-exist,未找到匹配隊列,replyCode:312,replyText:NO_ROUTE
    }
    
}
複製代碼

消費端確認

  • 添加配置
# 消費者消息確認--手動 ACK
spring.rabbitmq.listener.direct.acknowledge-mode=manual
spring.rabbitmq.listener.simple.acknowledge-mode=manual
複製代碼
  • 消費者代碼
@Component
@RabbitListener(queues = "existQueue")
public class AckReceiver {
    
    @RabbitHandler
    public void process(String content, Channel channel, Message message) {
        try {
            System.out.println("########### message:" + message);
            // 業務處理成功後調用,消息會被確認消費
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            // 業務處理失敗後調用
            //channel.basicNack(message.getMessageProperties().getDeliveryTag(),false, true);
            //channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
        } catch (IOException e) {
            e.printStackTrace();
        }
        System.out.println("########### Receiver Msg:" + content);
    }
}
複製代碼
相關文章
相關標籤/搜索