7.RabbitMQ--消息確認機制(confirm)

RabbitMQ--消息確認機制(confirm)

Confirm模式

RabbitMQ爲了解決生成者不知道消息是否真正到達broker這個問題,採用經過AMQP協議層面爲咱們提供了事務機制方案,可是採用事務機制實現會下降RabbitMQ的消息吞吐量,那麼有沒有更加高效的解決方式呢?答案是採用Confirm模式。編程

producer端confirm模式的實現原理

  • 生產者將信道設置成confirm模式,一旦信道進入confirm模式,全部在該信道上面發佈的消息都會被指派一個惟一的ID(從1開始),一旦消息被投遞到全部匹配的隊列以後,broker就會發送一個確認給生產者(包含消息的惟一ID),這就使得生產者知道消息已經正確到達目的隊列了,若是消息和隊列是可持久化的,那麼確認消息會將消息寫入磁盤以後發出,broker回傳給生產者的確認消息中deliver-tag域包含了確認消息的序列號,此外broker也能夠設置basic.ack的multiple域,表示到這個序列號以前的全部消息都已經獲得了處理。
  • confirm模式最大的好處在於他是異步的,一旦發佈一條消息,生產者應用程序就能夠在等信道返回確認的同時繼續發送下一條消息,當消息最終獲得確認以後,生產者應用即可以經過回調方法來處理該確認消息,若是RabbitMQ由於自身內部錯誤致使消息丟失,就會發送一條nack消息,生產者應用程序一樣能夠在回調方法中處理該nack消息。
  • 在channel 被設置成 confirm 模式以後,全部被 publish 的後續消息都將被 confirm(即 ack) 或者被nack一次。可是沒有對消息被 confirm 的快慢作任何保證,而且同一條消息不會既被 confirm又被nack 。

編程優化邏輯服務器

  對於固定消息體大小和線程數,若是消息持久化,生產者confirm(或者採用事務機制),消費者ack,那麼對性能有很大的影響。消息持久化的優化沒有太好方法,用更好的物理存儲(SAS, SSD, RAID卡)總會帶來改善。生產者confirm這一環節的優化則主要在於客戶端程序的優化之上。概括起來,客戶端實現生產者confirm有三種編程方式:異步

  • 普通confirm模式:每發送一條消息後,調用waitForConfirms()方法,等待服務器端confirm。其實是一種串行confirm了。
  • 批量confirm模式:每發送一批消息後,調用waitForConfirms()方法,等待服務器端confirm。
  • 異步confirm模式:提供一個回調方法,服務端confirm了一條或者多條消息後Client端會回調這個方法。

第1種 
普通confirm模式最簡單,publish一條消息後,等待服務器端confirm,若是服務端返回false或者超時時間內未返回,客戶端進行消息重傳。 
關鍵代碼以下:ide

1 channel.basicPublish(ConfirmConfig.exchangeName, ConfirmConfig.routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, ConfirmConfig.msg_10B.getBytes());
2 if(!channel.waitForConfirms()){
3     System.out.println("send message failed.");
4 }

 

第二種
批量confirm模式稍微複雜一點,客戶端程序須要按期(每隔多少秒)或者定量(達到多少條)或者兩則結合起來publish消息,而後等待服務器端confirm, 相比普通confirm模式,批量極大提高confirm效率,可是問題在於一旦出現confirm返回false或者超時的狀況時,客戶端須要將這一批次的消息所有重發,這會帶來明顯的重複消息數量,而且,當消息常常丟失時,批量confirm性能應該是不升反降的。
關鍵代碼:性能

1 channel.confirmSelect();
2 for(int i=0;i<batchCount;i++){
3     channel.basicPublish(ConfirmConfig.exchangeName, ConfirmConfig.routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, ConfirmConfig.msg_10B.getBytes());
4 }
5 if(!channel.waitForConfirms()){
6     System.out.println("send message failed.");
7 }

 

第三種
異步confirm模式的編程實現最複雜,Channel對象提供的ConfirmListener()回調方法只包含deliveryTag(當前Chanel發出的消息序號),咱們須要本身爲每個Channel維護一個unconfirm的消息序號集合,每publish一條數據,集合中元素加1,每回調一次handleAck方法,unconfirm集合刪掉相應的一條(multiple=false)或多條(multiple=true)記錄。從程序運行效率上看,這個unconfirm集合最好採用有序集合SortedSet存儲結構。實際上,SDK中的waitForConfirms()方法也是經過SortedSet維護消息序號的。
關鍵代碼:優化

 1 SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<Long>());
 2  channel.confirmSelect();
 3         channel.addConfirmListener(new ConfirmListener() {
 4             public void handleAck(long deliveryTag, boolean multiple) throws IOException {
 5                 if (multiple) {
 6                     confirmSet.headSet(deliveryTag + 1).clear();
 7                 } else {
 8                     confirmSet.remove(deliveryTag);
 9                 }
10             }
11             public void handleNack(long deliveryTag, boolean multiple) throws IOException {
12                 System.out.println("Nack, SeqNo: " + deliveryTag + ", multiple: " + multiple);
13                 if (multiple) {
14                     confirmSet.headSet(deliveryTag + 1).clear();
15                 } else {
16                     confirmSet.remove(deliveryTag);
17                 }
18             }
19         });
20 
21         while (true) {
22             long nextSeqNo = channel.getNextPublishSeqNo();
23             channel.basicPublish(ConfirmConfig.exchangeName, ConfirmConfig.routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, ConfirmConfig.msg_10B.getBytes());
24             confirmSet.add(nextSeqNo);
25 }

消息確認(Consumer端)
爲了保證消息從隊列可靠地到達消費者,RabbitMQ提供消息確認機制(message acknowledgment)。消費者在聲明隊列時,能夠指定noAck參數,當noAck=false時,RabbitMQ會等待消費者顯式發回ack信號後才從內存(和磁盤,若是是持久化消息的話)中移去消息。不然,RabbitMQ會在隊列中消息被消費後當即刪除它。spa

採用消息確認機制後,只要令noAck=false,消費者就有足夠的時間處理消息(任務),不用擔憂處理消息過程當中消費者進程掛掉後消息丟失的問題,由於RabbitMQ會一直持有消息直到消費者顯式調用basicAck爲止。命令行

當noAck=false時,對於RabbitMQ服務器端而言,隊列中的消息分紅了兩部分:一部分是等待投遞給消費者的消息;一部分是已經投遞給消費者,可是尚未收到消費者ack信號的消息。若是服務器端一直沒有收到消費者的ack信號,而且消費此消息的消費者已經斷開鏈接,則服務器端會安排該消息從新進入隊列,等待投遞給下一個消費者(也可能仍是原來的那個消費者)。線程

RabbitMQ不會爲未ack的消息設置超時時間,它判斷此消息是否須要從新投遞給消費者的惟一依據是消費該消息的消費者鏈接是否已經斷開。這麼設計的緣由是RabbitMQ容許消費者消費一條消息的時間能夠好久好久。設計

RabbitMQ管理平臺界面上能夠看到當前隊列中Ready狀態和Unacknowledged狀態的消息數,分別對應上文中的等待投遞給消費者的消息數和已經投遞給消費者可是未收到ack信號的消息數。也能夠經過命令行來查看上述信息:rabbitmqctl list_queues name messages_ready messages_unacknowledged

代碼示例:

 1  public static void main(String[] args) throws IOException {
 2         ConnectionFactory connectionFactory = RabbitConfig.getConnectionFactory();
 3         Connection connection = connectionFactory.createConnection();
 4         Channel channel = connection.createChannel(false);
 5         /**
 6          * 建立隊列申明
 7          */
 8         boolean durable = true;
 9         channel.queueDeclare(RabbitConfig.QUEUE_TOPIC2, durable, false, false, null);
10 
11         /**
12          * 綁定隊列到交換機
13          */
14         channel.queueBind(RabbitConfig.QUEUE_TOPIC2, EXCHANGE_TOPIC, "commodity.*");
15 
16 
17         /**
18          * 改變分發規則
19          */
20         channel.basicQos(1);
21         DefaultConsumer consumer = new DefaultConsumer(channel) {
22             @Override
23             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
24                 super.handleDelivery(consumerTag, envelope, properties, body);
25                 System.out.println("[2] 接口數據 : " + new String(body, "utf-8"));
26                 try {
27                     Thread.sleep(200);
28                 } catch (InterruptedException e) {
29                     e.printStackTrace();
30                 } finally {
31                     System.out.println("[2] done! ");
32                     //消息應答:手動回執,手動確認消息
33                     channel.basicAck(envelope.getDeliveryTag(), false);
34                 }
35             }
36         };
37         //監聽隊列
38         /**
39          * autoAck 消息應答
40          *  默認輪詢分發打開:true :這種模式一旦rabbitmq將消息發送給消費者,就會從內存中刪除該消息,不關心客戶端是否消費正常。
41          *  使用公平分發須要關閉autoAck:false  須要手動發送回執
42          */
43         boolean autoAck = false;
44         channel.basicConsume(RabbitConfig.QUEUE_TOPIC2, autoAck, consumer);
45     }

broker將在下面的狀況中對消息進行confirm:

  • broker發現當前消息沒法被路由到指定的queues中(若是設置了mandatory屬性,則broker會發送basic.return)
  • 非持久屬性的消息到達了其所應該到達的全部queue中(和鏡像queue中)
  • 持久消息到達了其所應該到達的全部queue中(和鏡像中),並被持久化到了磁盤(fsync)
  • 持久消息從其所在的全部queue中被consume了(若是必要則會被ack)

basicRecover:是路由不成功的消息可使用recovery從新發送到隊列中。 basicReject:是接收端告訴服務器這個消息我拒絕接收,不處理,能夠設置是否放回到隊列中仍是丟掉,並且只能一次拒絕一個消息,官網中有明確說明不能批量拒絕消息,爲解決批量拒絕消息纔有了basicNack。 basicNack:能夠一次拒絕N條消息,客戶端能夠設置basicNack方法的multiple參數爲true,服務器會拒絕指定了delivery_tag的全部未確認的消息(tag是一個64位的long值,最大值是9223372036854775807)。

相關文章
相關標籤/搜索