RabbitMQ的消息確認有兩種。android
一種是消息發送確認。這種是用來確認生產者將消息發送給交換器,交換器傳遞給隊列的過程當中,消息是否成功投遞。發送確認分爲兩步,一是確認是否到達交換器,二是確認是否到達隊列。spring
第二種是消費接收確認。這種是確認消費者是否成功消費了隊列中的消息。app
(1)ConfirmCallbackspring-boot
經過實現ConfirmCallBack接口,消息發送到交換器Exchange後觸發回調。3d
使用該功能須要開啓確認,spring-boot中配置以下:blog
spring.rabbitmq.publisher-confirms = true接口
(2)ReturnCallbackrabbitmq
經過實現ReturnCallback接口,若是消息從交換器發送到對應隊列失敗時觸發(好比根據發送消息時指定的routingKey找不到隊列時會觸發)隊列
使用該功能須要開啓確認,spring-boot中配置以下:ip
spring.rabbitmq.publisher-returns = true
(1)確認模式
spring-boot中配置方法:
spring.rabbitmq.listener.simple.acknowledge-mode = manual
(2)手動確認
上圖爲channel中未被消費者確認的消息數。
經過RabbitMQ的host地址加上默認端口號15672訪問管理界面。
(2.1)成功確認
void basicAck(long deliveryTag, boolean multiple) throws IOException;
deliveryTag:該消息的index
multiple:是否批量. true:將一次性ack全部小於deliveryTag的消息。
消費者成功處理後,調用channel.basicAck(message.getMessageProperties().getDeliveryTag(), false)方法對消息進行確認。
(2.2)失敗確認
void basicNack(long deliveryTag, boolean multiple, boolean requeue)
throws IOException;
deliveryTag:該消息的index。
multiple:是否批量. true:將一次性拒絕全部小於deliveryTag的消息。
requeue:被拒絕的是否從新入隊列。
void basicReject(long deliveryTag, boolean requeue) throws IOException;
deliveryTag:該消息的index。
requeue:被拒絕的是否從新入隊列。
channel.basicNack 與 channel.basicReject 的區別在於basicNack能夠批量拒絕多條消息,而basicReject一次只能拒絕一條消息。
消息隊列的基礎知識能夠參考:消息隊列RabbitMQ基礎知識詳解
(1)手動確認模式,消息手動拒絕中若是requeue爲true會從新放入隊列,可是若是消費者在處理過程當中一直拋出異常,會致使入隊-》拒絕-》入隊的循環,該怎麼處理呢?
第一種方法是根據異常類型來選擇是否從新放入隊列。
第二種方法是先成功確認,而後經過channel.basicPublish()從新發布這個消息。從新發布的消息網上說會放到隊列後面,進而不會影響已經進入隊列的消息處理。
void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body)
throws IOException;
(2)消息確認的做用是什麼?
爲了防止消息丟失。消息丟失分爲發送丟失和消費者處理丟失,相應的也有兩種確認機制。