RabbitMQ爲了解決生成者不知道消息是否真正到達broker這個問題,採用經過AMQP協議層面爲咱們提供了事務機制方案,可是採用事務機制實現會下降RabbitMQ的消息吞吐量,那麼有沒有更加高效的解決方式呢?答案是採用Confirm模式。編程
編程優化邏輯服務器
對於固定消息體大小和線程數,若是消息持久化,生產者confirm(或者採用事務機制),消費者ack,那麼對性能有很大的影響。消息持久化的優化沒有太好方法,用更好的物理存儲(SAS, SSD, RAID卡)總會帶來改善。生產者confirm這一環節的優化則主要在於客戶端程序的優化之上。概括起來,客戶端實現生產者confirm有三種編程方式:異步
第1種
普通confirm模式最簡單,publish一條消息後,等待服務器端confirm,若是服務端返回false或者超時時間內未返回,客戶端進行消息重傳。
關鍵代碼以下:ide
1 channel.basicPublish(ConfirmConfig.exchangeName, ConfirmConfig.routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, ConfirmConfig.msg_10B.getBytes()); 2 if(!channel.waitForConfirms()){ 3 System.out.println("send message failed."); 4 }
第二種
批量confirm模式稍微複雜一點,客戶端程序須要按期(每隔多少秒)或者定量(達到多少條)或者兩則結合起來publish消息,而後等待服務器端confirm, 相比普通confirm模式,批量極大提高confirm效率,可是問題在於一旦出現confirm返回false或者超時的狀況時,客戶端須要將這一批次的消息所有重發,這會帶來明顯的重複消息數量,而且,當消息常常丟失時,批量confirm性能應該是不升反降的。
關鍵代碼:性能
1 channel.confirmSelect(); 2 for(int i=0;i<batchCount;i++){ 3 channel.basicPublish(ConfirmConfig.exchangeName, ConfirmConfig.routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, ConfirmConfig.msg_10B.getBytes()); 4 } 5 if(!channel.waitForConfirms()){ 6 System.out.println("send message failed."); 7 }
第三種
異步confirm模式的編程實現最複雜,Channel對象提供的ConfirmListener()回調方法只包含deliveryTag(當前Chanel發出的消息序號),咱們須要本身爲每個Channel維護一個unconfirm的消息序號集合,每publish一條數據,集合中元素加1,每回調一次handleAck方法,unconfirm集合刪掉相應的一條(multiple=false)或多條(multiple=true)記錄。從程序運行效率上看,這個unconfirm集合最好採用有序集合SortedSet存儲結構。實際上,SDK中的waitForConfirms()方法也是經過SortedSet維護消息序號的。
關鍵代碼:優化
1 SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<Long>()); 2 channel.confirmSelect(); 3 channel.addConfirmListener(new ConfirmListener() { 4 public void handleAck(long deliveryTag, boolean multiple) throws IOException { 5 if (multiple) { 6 confirmSet.headSet(deliveryTag + 1).clear(); 7 } else { 8 confirmSet.remove(deliveryTag); 9 } 10 } 11 public void handleNack(long deliveryTag, boolean multiple) throws IOException { 12 System.out.println("Nack, SeqNo: " + deliveryTag + ", multiple: " + multiple); 13 if (multiple) { 14 confirmSet.headSet(deliveryTag + 1).clear(); 15 } else { 16 confirmSet.remove(deliveryTag); 17 } 18 } 19 }); 20 21 while (true) { 22 long nextSeqNo = channel.getNextPublishSeqNo(); 23 channel.basicPublish(ConfirmConfig.exchangeName, ConfirmConfig.routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, ConfirmConfig.msg_10B.getBytes()); 24 confirmSet.add(nextSeqNo); 25 }
消息確認(Consumer端)
爲了保證消息從隊列可靠地到達消費者,RabbitMQ提供消息確認機制(message acknowledgment)。消費者在聲明隊列時,能夠指定noAck參數,當noAck=false時,RabbitMQ會等待消費者顯式發回ack信號後才從內存(和磁盤,若是是持久化消息的話)中移去消息。不然,RabbitMQ會在隊列中消息被消費後當即刪除它。spa
採用消息確認機制後,只要令noAck=false,消費者就有足夠的時間處理消息(任務),不用擔憂處理消息過程當中消費者進程掛掉後消息丟失的問題,由於RabbitMQ會一直持有消息直到消費者顯式調用basicAck爲止。命令行
當noAck=false時,對於RabbitMQ服務器端而言,隊列中的消息分紅了兩部分:一部分是等待投遞給消費者的消息;一部分是已經投遞給消費者,可是尚未收到消費者ack信號的消息。若是服務器端一直沒有收到消費者的ack信號,而且消費此消息的消費者已經斷開鏈接,則服務器端會安排該消息從新進入隊列,等待投遞給下一個消費者(也可能仍是原來的那個消費者)。線程
RabbitMQ不會爲未ack的消息設置超時時間,它判斷此消息是否須要從新投遞給消費者的惟一依據是消費該消息的消費者鏈接是否已經斷開。這麼設計的緣由是RabbitMQ容許消費者消費一條消息的時間能夠好久好久。設計
RabbitMQ管理平臺界面上能夠看到當前隊列中Ready狀態和Unacknowledged狀態的消息數,分別對應上文中的等待投遞給消費者的消息數和已經投遞給消費者可是未收到ack信號的消息數。也能夠經過命令行來查看上述信息:rabbitmqctl list_queues name messages_ready messages_unacknowledged
代碼示例:
1 public static void main(String[] args) throws IOException { 2 ConnectionFactory connectionFactory = RabbitConfig.getConnectionFactory(); 3 Connection connection = connectionFactory.createConnection(); 4 Channel channel = connection.createChannel(false); 5 /** 6 * 建立隊列申明 7 */ 8 boolean durable = true; 9 channel.queueDeclare(RabbitConfig.QUEUE_TOPIC2, durable, false, false, null); 10 11 /** 12 * 綁定隊列到交換機 13 */ 14 channel.queueBind(RabbitConfig.QUEUE_TOPIC2, EXCHANGE_TOPIC, "commodity.*"); 15 16 17 /** 18 * 改變分發規則 19 */ 20 channel.basicQos(1); 21 DefaultConsumer consumer = new DefaultConsumer(channel) { 22 @Override 23 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { 24 super.handleDelivery(consumerTag, envelope, properties, body); 25 System.out.println("[2] 接口數據 : " + new String(body, "utf-8")); 26 try { 27 Thread.sleep(200); 28 } catch (InterruptedException e) { 29 e.printStackTrace(); 30 } finally { 31 System.out.println("[2] done! "); 32 //消息應答:手動回執,手動確認消息 33 channel.basicAck(envelope.getDeliveryTag(), false); 34 } 35 } 36 }; 37 //監聽隊列 38 /** 39 * autoAck 消息應答 40 * 默認輪詢分發打開:true :這種模式一旦rabbitmq將消息發送給消費者,就會從內存中刪除該消息,不關心客戶端是否消費正常。 41 * 使用公平分發須要關閉autoAck:false 須要手動發送回執 42 */ 43 boolean autoAck = false; 44 channel.basicConsume(RabbitConfig.QUEUE_TOPIC2, autoAck, consumer); 45 }
broker將在下面的狀況中對消息進行confirm:
basicRecover:是路由不成功的消息可使用recovery從新發送到隊列中。 basicReject:是接收端告訴服務器這個消息我拒絕接收,不處理,能夠設置是否放回到隊列中仍是丟掉,並且只能一次拒絕一個消息,官網中有明確說明不能批量拒絕消息,爲解決批量拒絕消息纔有了basicNack。 basicNack:能夠一次拒絕N條消息,客戶端能夠設置basicNack方法的multiple參數爲true,服務器會拒絕指定了delivery_tag的全部未確認的消息(tag是一個64位的long值,最大值是9223372036854775807)。