RabbitMq學習(二)RabbitMQ的消息確認機制

一. 爲何有消息確認機制git

 在RabbitMq中,一個消息從產生到最終的消息接受,中間大體會有三個環節,首先是消息到達交換機、而後是消息經過交換機到達隊列,最後消費者消費綁定的隊列消息。github

 可是在這個過程當中,若是出現網絡或者系統的異常,就會致使消息不能被正常消費。若是不能正常消費消息,會形成兩方面的問題。spring

 1.1 在服務端

消息到達隊列,可是沒有消費者去消費,就會形成消息積壓,被積壓的消息會存入緩存,直到有消費者進行消費。若是一直沒有消費者進行消費,那麼就會直接將內存佔滿,影響服務器性能。緩存

1.2 消費端

一個消息一旦被消費後,那麼就會從隊列刪除。若是說消息已經到達消費者,可是消費者處理消息以前系統出現了異常,那麼就至關於這條消息丟失了,是個很大的問題。springboot

 

因此RabbitMq纔會出現消息確認機制。對應的也是服務端客客戶端兩個方面解決服務器

2、 怎麼使用消息確認機制

2.1 消息發送確認

發送的確認也是分爲兩個步驟:到交換機的確認 ConfirmCallback 和到隊列的確認 ReturnCallback網絡

       這些確認機制默認都是不開啓的,在SpringBoot 項目中,咱們能夠在配置文件中開啓:app

spring.rabbitmq.publisher-confirms = true
spring.rabbitmq.publisher-returns = true

或者 在配置鏈接工廠的時候開啓:dom

@Bean public ConnectionFactory connectionFactory() { CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host,port); connectionFactory.setUsername(username); connectionFactory.setPassword(password); connectionFactory.setVirtualHost("/");  //開啓到交換機的確認
        connectionFactory.setPublisherConfirms(true);  //開啓到隊列的確認
        connectionFactory.setPublisherReturns(true); return connectionFactory; }

 

在代碼中實現 RabbitTemplate.ConfirmCallback 接口,若是消息被交換機正常接受,就會回調confirm 方法,參數的含義經過代碼能夠知曉。ide

實現 RabbitTemplate.ReturnCallback 接口,若是消息不能被髮送到隊列,就會調用ReturnedMessage 方法。

注意:一個是接收成功調用,一個是接收失敗調用

@Component public class ASender implements RabbitTemplate.ReturnCallback,RabbitTemplate.ConfirmCallback { private final Logger logger = LoggerFactory.getLogger(this.getClass()); @Autowired private RabbitTemplate rabbitTemplate; /** * 回調 */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { logger.info(" 回調id:" + correlationData); if (ack) { logger.info("消息成功消費"); } else { logger.info("消息消費失敗:" + cause); } } @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { logger.info("消息內容:{}", new String(message.getBody())); logger.info("回覆文本:{},回覆代碼:{}", replyText, replyCode); logger.info("交換器名稱:{},路由鍵:{}", exchange, routingKey); } @PostConstruct public void init(){ rabbitTemplate.setReturnCallback(this); rabbitTemplate.setConfirmCallback(this); } public void sendMsg(String content) { CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString()); rabbitTemplate.convertAndSend(EXCHANGE_C, "aa.apple.big", content, correlationId); } }

2.2 消息接受確認

 消費端消息經過 ACK 確認是否被正確接收,每一個 Message 都要被確認(acknowledged),能夠手動去 ACK 或自動 ACK

  ACK 確認模式分爲三種:

  • AcknowledgeMode.NONE:自動確認
  • AcknowledgeMode.AUTO:根據狀況確認
  • AcknowledgeMode.MANUAL:手動確認

默認是自動確認,開啓手動確認的方式也是兩種方式:

配置文件配置:

spring:
rabbitmq:
listener:
simple:
acknowledge-mode: manual
另外一種是在RabbitListenerContainerFactory配置:
@Bean public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){ SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); factory.setMessageConverter(new Jackson2JsonMessageConverter()); factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); //開啓手動 ack
    return factory; }

 


在客戶端接受消息:
@RabbitHandler public void processMessage2(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) { System.out.println(message); try { channel.basicAck(tag, false);            // 確認消息
        logger.info("消費者成功確認" + message); } catch (IOException e) { e.printStackTrace(); } }


確認 basicAck 參數解釋:
  • deliveryTag(惟一標識 ID):當一個消費者向 RabbitMQ 註冊後,會創建起一個 Channel ,RabbitMQ 會用 basic.deliver 方法向消費者推送消息,這個方法攜帶了一個 delivery tag, 它表明了 RabbitMQ 向該 Channel 投遞的這條消息的惟一標識 ID,是一個單調遞增的正整數,delivery tag 的範圍僅限於 Channel
  • multiple:爲了減小網絡流量,手動確承認以被批處理,當該參數爲 true 時,則能夠一次性確認 delivery_tag 小於等於傳入值的全部消息


部分參考: https://www.jianshu.com/p/2c5eebfd0e95
相關文章
相關標籤/搜索