消息的確認,是指生產者投遞消息後,若是 Broker 收到消息,則會給咱們生產者一個應答。生產者進行接收應答,用來肯定這條消息是否正常的發送到 Broker ,這種方式也是消息的可靠性投遞的核心保障!api
channel.confirmSelect()
channel.addConfirmListener(ConfirmListener listener);
, 監聽成功和失敗的返回結果,根據具體的結果對消息進行從新發送、或記錄日誌等後續處理!import com.rabbitmq.client.Channel; import com.rabbitmq.client.ConfirmListener; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; public class ConfirmProducer { public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setVirtualHost("/"); factory.setUsername("guest"); factory.setPassword("guest"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); String exchangeName = "test_confirm_exchange"; String routingKey = "item.update"; //指定消息的投遞模式:confirm 確認模式 channel.confirmSelect(); //發送 final long start = System.currentTimeMillis(); for (int i = 0; i < 5 ; i++) { String msg = "this is confirm msg "; channel.basicPublish(exchangeName, routingKey, null, msg.getBytes()); System.out.println("Send message : " + msg); } //添加一個確認監聽, 這裏就不關閉鏈接了,爲了能保證能收到監聽消息 channel.addConfirmListener(new ConfirmListener() { /** * 返回成功的回調函數 */ public void handleAck(long deliveryTag, boolean multiple) throws IOException { System.out.println("succuss ack"); System.out.println(multiple); System.out.println("耗時:" + (System.currentTimeMillis() - start) + "ms"); } /** * 返回失敗的回調函數 */ public void handleNack(long deliveryTag, boolean multiple) throws IOException { System.out.printf("defeat ack"); System.out.println("耗時:" + (System.currentTimeMillis() - start) + "ms"); } }); } }
import com.rabbitmq.client.*; import java.io.IOException; public class ConfirmConsumer { public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setVirtualHost("/"); factory.setUsername("guest"); factory.setPassword("guest"); factory.setAutomaticRecoveryEnabled(true); factory.setNetworkRecoveryInterval(3000); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); String exchangeName = "test_confirm_exchange"; String queueName = "test_confirm_queue"; String routingKey = "item.#"; channel.exchangeDeclare(exchangeName, "topic", true, false, null); channel.queueDeclare(queueName, false, false, false, null); //通常不用代碼綁定,在管理界面手動綁定 channel.queueBind(queueName, exchangeName, routingKey); //建立消費者並接收消息 Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received '" + message + "'"); } }; //設置 Channel 消費者綁定隊列 channel.basicConsume(queueName, true, consumer); } }
咱們此處只關注生產端輸出消息服務器
Send message : this is confirm msg Send message : this is confirm msg Send message : this is confirm msg Send message : this is confirm msg Send message : this is confirm msg succuss ack true 耗時:3ms succuss ack true 耗時:4ms
咱們採用的是異步 confirm 模式:提供一個回調方法,服務端 confirm 了一條或者多條消息後 Client 端會回調這個方法。除此以外還有單條同步 confirm 模式、批量同步 confirm 模式,因爲現實場景中不多使用咱們在此不作介紹,若有興趣直接參考官方文檔。app
咱們運行生產端會發現每次運行結果都不同,會有多種狀況出現,由於 Broker 會進行優化,有時會批量一次性 confirm ,有時會分開幾條 confirm。異步
succuss ack true 耗時:3ms succuss ack false 耗時:4ms 或者 succuss ack true 耗時:3ms
Exchange
和 Routingkey
,把消息送達到某一個隊列中去,而後咱們的消費者監聽隊列,進行消費處理操做!可是在某些狀況下,若是咱們在發送消息的時候,當前的 exchange 不存在或者指定的路由 key 路由不到,這個時候若是咱們須要監聽這種不可達的消息,就要使用 Return Listener !
ide
在基礎API中有一個關鍵的配置項:Mandatory
:若是爲 true
,則監聽器會接收到路由不可達的消息,而後進行後續處理,若是爲 false
,那麼 broker 端自動刪除該消息!函數
首先咱們須要發送三條消息,而且故意將第 0 條消息的 routing Key
設置爲錯誤的,讓他沒法正常路由到消費端。優化
mandatory
設置爲 true
路由不可達的消息會被監聽到,不會被自動刪除.即channel.basicPublish(exchangeName, errRoutingKey, true,null, msg.getBytes());
最後添加監聽便可監聽到不可路由到消費端的消息channel.addReturnListener(ReturnListener r))
ui
import com.rabbitmq.client.*; import java.io.IOException; public class ReturnListeningProducer { public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setVirtualHost("/"); factory.setUsername("guest"); factory.setPassword("guest"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); String exchangeName = "test_return_exchange"; String routingKey = "item.update"; String errRoutingKey = "error.update"; //指定消息的投遞模式:confirm 確認模式 channel.confirmSelect(); //發送 for (int i = 0; i < 3 ; i++) { String msg = "this is return——listening msg "; //@param mandatory 設置爲 true 路由不可達的消息會被監聽到,不會被自動刪除 if (i == 0) { channel.basicPublish(exchangeName, errRoutingKey, true,null, msg.getBytes()); } else { channel.basicPublish(exchangeName, routingKey, true, null, msg.getBytes()); } System.out.println("Send message : " + msg); } //添加一個確認監聽, 這裏就不關閉鏈接了,爲了能保證能收到監聽消息 channel.addConfirmListener(new ConfirmListener() { /** * 返回成功的回調函數 */ public void handleAck(long deliveryTag, boolean multiple) throws IOException { System.out.println("succuss ack"); } /** * 返回失敗的回調函數 */ public void handleNack(long deliveryTag, boolean multiple) throws IOException { System.out.printf("defeat ack"); } }); //添加一個 return 監聽 channel.addReturnListener(new ReturnListener() { public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("return relyCode: " + replyCode); System.out.println("return replyText: " + replyText); System.out.println("return exchange: " + exchange); System.out.println("return routingKey: " + routingKey); System.out.println("return properties: " + properties); System.out.println("return body: " + new String(body)); } }); } }
import com.rabbitmq.client.*; import java.io.IOException; public class ReturnListeningConsumer { public static void main(String[] args) throws Exception { //1. 建立一個 ConnectionFactory 並進行設置 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setVirtualHost("/"); factory.setUsername("guest"); factory.setPassword("guest"); factory.setAutomaticRecoveryEnabled(true); factory.setNetworkRecoveryInterval(3000); //2. 經過鏈接工廠來建立鏈接 Connection connection = factory.newConnection(); //3. 經過 Connection 來建立 Channel Channel channel = connection.createChannel(); //4. 聲明 String exchangeName = "test_return_exchange"; String queueName = "test_return_queue"; String routingKey = "item.#"; channel.exchangeDeclare(exchangeName, "topic", true, false, null); channel.queueDeclare(queueName, false, false, false, null); //通常不用代碼綁定,在管理界面手動綁定 channel.queueBind(queueName, exchangeName, routingKey); //5. 建立消費者並接收消息 Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received '" + message + "'"); } }; //6. 設置 Channel 消費者綁定隊列 channel.basicConsume(queueName, true, consumer); } }
咱們只關注生產端結果,消費端只收到兩條消息。this
Send message : this is return——listening msg Send message : this is return——listening msg Send message : this is return——listening msg return relyCode: 312 return replyText: NO_ROUTE return exchange: test_return_exchange return routingKey: error.update return properties: #contentHeader<basic>(content-type=null, content-encoding=null, headers=null, delivery-mode=null, priority=null, correlation-id=null, reply-to=null, expiration=null, message-id=null, timestamp=null, type=null, user-id=null, app-id=null, cluster-id=null) return body: this is return——listening msg succuss ack succuss ack succuss ack
消費端進行消費的時候,若是因爲業務異常咱們能夠進行日誌的記錄,而後進行補償!若是因爲服務器宕機等嚴重問題,那咱們就須要手工進行ACK保障消費端消費成功!消費端重回隊列是爲了對沒有處理成功的消息,把消息從新會遞給Broker!通常咱們在實際應用中,都會關閉重回隊列,也就是設置爲False。
void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException;
void basicAck(long deliveryTag, boolean multiple) throws IOException;
首先咱們發送五條消息,將每條消息對應的循環下標 i 放入消息的 properties
中做爲標記,以便於咱們在後面的回調方法中識別。
其次, 咱們將消費端的 ·channel.basicConsume(queueName, false, consumer);
中的 autoAck
屬性設置爲 false
,若是設置爲true
的話 將會正常輸出五條消息。
咱們經過 Thread.sleep(2000)
來延時一秒,用以看清結果。咱們獲取到properties
中的num
以後,經過channel.basicNack(envelope.getDeliveryTag(), false, true);
將 num
爲0的消息設置爲 nack,即消費失敗,而且將 requeue
屬性設置爲true
,即消費失敗的消息重回隊列末端。
import com.rabbitmq.client.*; import java.util.HashMap; import java.util.Map; public class AckAndNackProducer { public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setVirtualHost("/"); factory.setUsername("guest"); factory.setPassword("guest"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); String exchangeName = "test_ack_exchange"; String routingKey = "item.update"; String msg = "this is ack msg"; for (int i = 0; i < 5; i++) { Map<String, Object> headers = new HashMap<String, Object>(); headers.put("num" ,i); AMQP.BasicProperties properties = new AMQP.BasicProperties().builder() .deliveryMode(2) .headers(headers) .build(); String tem = msg + ":" + i; channel.basicPublish(exchangeName, routingKey, true, properties, tem.getBytes()); System.out.println("Send message : " + msg); } channel.close(); connection.close(); } }
import com.rabbitmq.client.*; import java.io.IOException; public class AckAndNackConsumer { public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setVirtualHost("/"); factory.setUsername("guest"); factory.setPassword("guest"); factory.setAutomaticRecoveryEnabled(true); factory.setNetworkRecoveryInterval(3000); Connection connection = factory.newConnection(); final Channel channel = connection.createChannel(); String exchangeName = "test_ack_exchange"; String queueName = "test_ack_queue"; String routingKey = "item.#"; channel.exchangeDeclare(exchangeName, "topic", true, false, null); channel.queueDeclare(queueName, false, false, false, null); //通常不用代碼綁定,在管理界面手動綁定 channel.queueBind(queueName, exchangeName, routingKey); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received '" + message + "'"); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } if ((Integer) properties.getHeaders().get("num") == 0) { channel.basicNack(envelope.getDeliveryTag(), false, true); } else { channel.basicAck(envelope.getDeliveryTag(), false); } } }; //6. 設置 Channel 消費者綁定隊列 channel.basicConsume(queueName, false, consumer); } }
咱們此處只關心消費端輸出,能夠看到第 0 條消費失敗從新回到隊列尾部消費。
[x] Received 'this is ack msg:1' [x] Received 'this is ack msg:2' [x] Received 'this is ack msg:3' [x] Received 'this is ack msg:4' [x] Received 'this is ack msg:0' [x] Received 'this is ack msg:0' [x] Received 'this is ack msg:0' [x] Received 'this is ack msg:0' [x] Received 'this is ack msg:0'