RabbitMQ系列之消息確認機制

  1. 上一篇介紹了 RabbitMQ 中的一些基本概念,並經過 SpringBoot 工程整合 RabbitMQ,作了一個小 demo;
  2. 那麼 RabbitMQ 是怎麼知道消息到底有沒有被消費者消費,生產者是怎麼知道本身發送的消息時真的已經發送到 RabbitMQ 中了呢?
  3. 本篇經過實例演示去介紹一下 RabbitMQ 的消息確認機制,閱讀完本篇內容上面的這些疑問就迎刃而解,並且也有助於後邊咱們理解 RabbitMQ 的消息爲何會出現重複消費的問題。
  4. 文中實例會在文末提供下載地址。

一. 爲何要有消息確認

  1. 因爲網絡可能以不可預知的方式出現故障,且檢測故障可能須要耗費一些時間;
  2. 所以不能保證發送的消息可以到達對等方或由它成功地處理。

二. 消息確認流程

RabbitMQ 的消息確認機制以下:
RabbitMQ消息確認.pngjava

從圖中咱們能夠看出:git

  • 生產者發送消息到 RabbitMQ Server 後,RabbitMQ Server 須要對生產者進行消息 Confirm 確認;
  • 消費者消費消息後須要對 RabbitMQ Server 進行消息 ACK 確認。

這兩個機制都是收到 TCP 協議的啓發,它們對於數據安全相當重要。
下面就分別從生產者、消費者兩個方面結合實例來認識消息確認機制。github

備註:web

  1. 在 RabbitMQ 中 有兩種事務機制來確保消息的安全送達,分別是事務機制和確認機制;
  2. 事務機制須要每一個消息或一組消息發佈、提交的通道設置爲事務性的,所以會很是耗費性能,下降了 Rabbitmq 的消息吞吐量;
  3. 所以咱們在實際生產中一般採用確認機制,下面的實例演示就採用確認機制來進行編碼。

三. 生產者確認

1. 消息投遞和消息確認鏈路

咱們先來看一下RabbitMQ 消息投遞和接收的一個完整鏈路以下:
RabbitMQ消息推送到接收.pngspring

2. 消息投遞可靠性保證

消息投遞的鏈路用文字表示:
producer->rabbitmq broker cluster->exchange->queue->consumersegmentfault

因爲:安全

  1. 生產者向 RabbitMQ Server 發出的消息可能會在發送途中丟失或者須要通過必定的延遲後才能成功發送到 RabbitMQ Server;
  2. 所以,須要 RabbitMQ 告訴生產者,生產者才能知道本身發佈的消息是否已經送達。

在編碼時咱們能夠用兩個選項用來控制消息投遞的可靠性:服務器

  • 消息從 producer 到 RabbitMQ broker cluster 成功,則會返回一個 confirmCallback
  • 消息從 exchange 到 queue 投遞失敗,則會返回一個 returnCallback

咱們能夠利用這兩個 callback 接口來控制消息的一致性和處理一部分的異常狀況。微信

3. 開啓 confirm 和 return 確認

server.port=10420

spring.rabbitmq.host=127.0.0.1

spring.rabbitmq.username=guest

spring.rabbitmq.password=guest

# 開啓發送確認

spring.rabbitmq.publisher-confirms=true

# 開啓發送失敗退回(消息有沒有找到合適的隊列)

spring.rabbitmq.publisher-returns=true

4. 使用 callback 接口來確保消息投遞狀態

在 RabbitConfig 配置類裏,定義 RabbitTemplate Bean,使用 callback 接口:網絡

/**

 * RabbitMQ配置

 *

 * @author lyf

 * @公衆號 全棧在路上

 * @GitHub https://github.com/liuyongfei1

 * @date 2020-05-17 17:20

**/

@Slf4j

@Configuration

public class RabbitConfig {

@Autowired

CachingConnectionFactory cachingConnectionFactory;

@Bean

RabbitTemplate rabbitTemplate() {

RabbitTemplate rabbitTemplate = new RabbitTemplate(cachingConnectionFactory);

// 消息只要被 rabbitmq broker 接收到就會執行 confirmCallback
// 若是是 cluster 模式,須要全部 broker 接收到纔會調用 confirmCallback
// 被 broker 接收到只能表示 message 已經到達服務器,並不能保證消息必定會被投遞到目標 queue 裏

rabbitTemplate.setConfirmCallback((data, ack, cause) -> {

String msgId = data.getId();

if (ack) {

log.info(msgId + ": 消息發送成功");

} else {

log.info(msgId + ": 消息發送失敗");

}

});

// confirm 模式只能保證消息到達 broker,不能保證消息準確投遞到目標 queue 裏。
// 在有些業務場景下,咱們須要保證消息必定要投遞到目標 queue 裏,此時就須要用到 return 退回模式
// 這樣若是未能投遞到目標 queue 裏將調用 returnCallback,能夠記錄下詳細到投遞數據,按期的巡檢或者自動糾錯都須要這些數據
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {

log.info(MessageFormat.format("消息發送失敗,ReturnCallback:{0},{1},{2},{3},{4},{5}", message, replyCode,

replyText, exchange, routingKey));

// TODO 作消息發送失敗時的處理邏輯

});

return rabbitTemplate;

}

/**

 * 聲明隊列

 * 參數說明:

 * durable 是否持久化,默認是false(持久化隊列則數據會被存儲在磁盤上,當消息代理重啓時數據不會丟失;暫存隊列只對當前鏈接有效)

 * exclusive 默認是false,只能被當前建立的鏈接使用,並且當鏈接關閉後隊列即被刪除。此參考優先級高於durable

 * autoDelete 默認是false,是否自動刪除,當沒有生產者或者消費者使用此隊列,該隊列會自動刪除。

 * 通常設置一下隊列的持久化就好,其他兩個就是默認false

 *

 * @return Queue

**/

@Bean

Queue myQueue() {

return new Queue(QueueConstants.QUEUE\_NAME, true);

}

// 設置交換機,類型爲 direct

@Bean

DirectExchange myExchange() {

return new DirectExchange(QueueConstants.QUEUE\_EXCHANGE\_NAME, true, false);

}

// 綁定:將交換機和隊列綁定,並設置路由匹配鍵

@Bean

Binding queueBinding() {

return BindingBuilder.bind(myQueue()).to(myExchange()).with(QueueConstants.QUEUE\_ROUTING\_KEY\_NAME);

}

5. 消息生產端

在 ProducerController 裏,主要乾了如下幾件事:

  • 提供了一個 Rest接口sendDirectMessage,經過請求該接口,能夠實現生產者發送消息的功能;
  • 在該接口內部使用了 CorrelationData,該對象內部只有一個 id 屬性,用來表示消息的惟一性;
  • 使用 rabbitTemplate.convertAndSend 像 RabbitMQ 發送消息(這裏使用的rabbitTemplate 就是在 RabbitConfig 裏被重寫的 RabbitTemplate)。
/**

 * 消息生產端

 * @公衆號 全棧在路上

 * @GitHub https://github.com/liuyongfei1

 * @author lyf

 * @date 2020-05-17 18:30

**/

@RestController

public class ProducerController {

/\*\*

\* RabbitTemplate提供了發送/接收消息的方法

\*/

@Autowired

RabbitTemplate rabbitTemplate;

/**

 * 生產消息

 *

 * @Author Liuyongfei

 * @Date 上午12:12 2020/5/20

 * @param test

 * @param test2

 * @return java.lang.String

**/

@GetMapping("/sendDirectMessage")

public String sendDirectMessage(String test,Integer test2) {

// 生成消息的惟一id

String msgId = UUID.randomUUID().toString();

String messageData = "hello,this is rabbitmq demo message";

String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));

// 定義要發送的消息對象

Map<String,Object\> messageObj = new HashMap<>();

messageObj.put("msgId",msgId);

messageObj.put("messageData",messageData);

messageObj.put("createTime",createTime);

rabbitTemplate.convertAndSend(QueueConstants.QUEUE\_EXCHANGE\_NAME,QueueConstants.QUEUE\_ROUTING\_KEY\_NAME,

messageObj,new CorrelationData(msgId));

return "message send ok";

}

}

6. 生產消息

  1. 保存代碼,在 RabbitConfig 裏的 setConfirmCallback方法內部打上斷點;
  2. 重啓服務後,使用 PostMan 請求生產消息接口:http://你的域名:10420/sendDirectMessage,生產消息,並將消息發送給 RabbitMQ:
    postman.png
  3. 而後打開 RabbitMQ 管理界面,找到對應的隊列,會發現:
    rabbitmq-web1.png
    關於 Read,Total 的狀態表明什麼意思,能夠翻看上一篇文章。
  4. 在 IDEA 裏,服務在啓動後直接停在斷點處:
    confirmBack.png
    也就說明咱們生產的消息已經成功的到達了 RabbitMQ Server 裏。
  5. 繼續執行斷點調試的綠色箭頭,發現 setReturnCallback 方法裏的斷點沒有執行到,也就說明了咱們生產的消息已經被交換機順利的投遞到隊列裏去了

總結

至此,生產者消息確認結束,且經過運行的實例,咱們可以得出結論:本次生產的消息已經正確無誤的投遞到了隊列中去。

四. 消費者確認

消費者確認指的就是 RabbitMQ 須要確認消息到底有沒有被收到,來肯定要不要將該條消息從隊列中刪除掉。這就須要消費者來告訴 RabbitMQ,有如下兩種方式:

1. 自動應答

消費者在消費消息的時候,若是設定應答模式爲自動,則消費者收到消息後,消息就會當即被 RabbitMQ 從 隊列中刪除掉。
所以,在實際開發者,咱們基本上是將消費應答模式設置爲手動確認更爲穩當一些。

2. 手動應答

消費者在收到消息後:

  • 能夠在既定的正常狀況下進行確認(告訴 RabbitMQ,我已經消費過該消息了,你能夠刪除該條數據了);
  • 能夠在既定的異常狀況下不進行確認(RabbitMQ 會繼續保留該條數據),這樣下一次能夠繼續消費該條數據。

3. 開啓手動應答

server.port=10421

spring.rabbitmq.host=127.0.0.1

spring.rabbitmq.username=guest

spring.rabbitmq.password=guest

# 開啓 ACK(消費者接收到消息時手動確認)

spring.rabbitmq.listener.simple.acknowledge-mode=manual

4. 消息消費者

ConsumerController 裏主要乾了如下幾件事兒:

  1. 使用 @RabbitListener 來監聽隊列;
  2. 從消息頭裏拿到消息的惟一表示 deliveryTag
  3. 使用 channel.basicAck 來確認消息已經消費;
  4. 若是有異常,使用 channel.basicNack 把消費失敗的消息從新放入到隊列中去。
/**

 * 消息消費端

 * @公衆號 全棧在路上

 * @GitHub https://github.com/liuyongfei1

 * @author Liuyongfei

 * @date 2020-05-21 18:00

**/

@Component

public class ConsumerController {

@RabbitListener(queues = {QueueConstants.QUEUE\_NAME})

public void handler(Message message, Channel channel) throws IOException {

System.out.println("收到消息:" + message.toString());

MessageHeaders headers = message.getHeaders();

Long tag = (Long) headers.get(AmqpHeaders.DELIVERY\_TAG);

try {

// 手動確認消息已消費

channel.basicAck(tag,false);

} catch (IOException e) {

// 把消費失敗的消息從新放入到隊列

channel.basicNack(tag, false, true);

e.printStackTrace();

}

}

}

5. 消費消息

  1. 重啓消費端服務,停在斷點處:

    consumer1.png

  2. 查看 RabbitMQ 管理界面會發現 隊列的 Ready 和 Total 仍然是 1,說明咱們的手動應答設置生效:
  3. 點擊 Debug 的綠色箭頭繼續像下執行,查看 RabbitMQ 管理界面:

    rabbitmq-web3.png

  4. 幾秒後再次查看 RabbitMQ 管理界面:
    rabbitmq-web4.png
    會發現:Ready 變爲0,Unacked 爲 0,Total 爲 0。 說明該條數據已經被成功消費。

總結

至此,消費者消息確認結束。你們能夠在 ConsumerController 裏添加一些測試代碼來觸發異常,體驗一下 channel.basicNack 的做用。這裏我就再也不一一測試。

五. demo下載地址

  • https://github.com/liuyongfei1/blog-demo
  • 在本篇實例中,我將消息生產端和消費端部署爲兩個單獨的服務,你們克隆完畢後請切換到 feature/rabbitmq-confirm 分支進行啓動測試。
  • 歡迎你們關注掃描二維碼或 添加微信公衆號:全棧在路上
    微信公衆號二維碼.jpg
相關文章
相關標籤/搜索