ack
表示告知RabbitMQ
已經成功消費消息java
nack
表示告知RabbitMQ
消費端處理消息失敗服務器
RabbitMQ
既收不到ack
也收不到nack
,此時消費端採用手工ack
,等消費端服務重啓好後,RabbitMQ
回重發此未能消費成功的消息,保障消息消費成功消費端重回隊列是爲了對沒有處理成功的消息,把消息從新遞給Brokeride
通常咱們在實際應用中,都會關閉重回隊列ui
package com.wyg.rabbitmq.javaclient.consumer_ack; import java.io.IOException; import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; /** * 消費者手工ack和nack * * @author wyg0405@gmail.com * @date 2019-11-22 13:25 * @since JDK1.8 * @version V1.0 */ public class Producer { private static final String HOST = "localhost"; private static final int PORT = 5672; private static final String USERNAME = "guset"; private static final String PASSWORD = "guset"; public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(HOST); factory.setVirtualHost("/"); factory.setPort(PORT); factory.setUsername(USERNAME); factory.setPassword(PASSWORD); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); String exchangeName = "test_ack_exchange"; String routingKey = "ack.abc"; // 申明exchange channel.exchangeDeclare(exchangeName, "topic"); for (int i = 0; i < 6; i++) { Map<String, Object> map = new HashMap<>(); map.put("num", i); AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().deliveryMode(2).contentEncoding("UTF-8").headers(map).build(); String msg = "這是第" + i + "條ack消息"; channel.basicPublish(exchangeName, routingKey, false, props, msg.getBytes("UTF-8")); } channel.close(); connection.close(); } }
package com.wyg.rabbitmq.javaclient.consumer_ack; import java.io.IOException; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.*; /** * 消費者手工ack和nack * * @author wyg0405@gmail.com * @date 2019-11-22 14:07 * @since JDK1.8 * @version V1.0 */ public class Consumer { private static final String HOST = "localhost"; private static final int PORT = 5672; private static final String USERNAME = "guset"; private static final String PASSWORD = "guset"; public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(HOST); factory.setVirtualHost("/"); factory.setPort(PORT); factory.setUsername(USERNAME); factory.setPassword(PASSWORD); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); String queueName = "test_ack_queue"; String exchangeName = "test_ack_exchange"; String routingKey = "ack.#"; // 申明exchange channel.exchangeDeclare(exchangeName, "topic"); // 申明隊列 channel.queueDeclare(queueName, true, false, false, null); // 隊列綁定到exchange channel.queueBind(queueName, exchangeName, routingKey, null); DeliverCallback deliverCallback = new DeliverCallback() { @Override public void handle(String consumerTag, Delivery message) throws IOException { // consumer手動 ack 給broker int num = (int)message.getProperties().getHeaders().get("num"); // 根據headers裏的num作判斷,num<3,發ack給broker,並將消息從新入隊 if (num < 3) { System.out.println("---消費端nack---DeliveryTag:" + message.getEnvelope().getDeliveryTag() + "," + new String(message.getBody(), "UTF-8")); channel.basicAck(message.getEnvelope().getDeliveryTag(), false); } else { // 根據headers裏的num作判斷,num>=3,發nack給broker,並將消息從新入隊 System.out.println("---消費端nack---DeliveryTag:" + message.getEnvelope().getDeliveryTag() + "," + new String(message.getBody(), "UTF-8")); channel.basicNack(message.getEnvelope().getDeliveryTag(), false, true); } } }; CancelCallback cancelCallback = new CancelCallback() { @Override public void handle(String consumerTag) throws IOException { System.out.println("---消費者--:cancelCallback"); } }; // 消費消息,autoAck必定要設爲false,手工ack channel.basicConsume(queueName, false, deliverCallback, cancelCallback); } }
發現前3條消息成功消費,手工發ack
給Brokerspa
最後3條消息,發nack
給Broker
,並不斷重回隊列尾端,broker再將其推給消費端,一直循環消費失敗3d