消費端進行消費的時候,若是因爲業務異常致使失敗了,返回 NACK 達到最大重試次數,此時咱們能夠進行日誌的記錄,而後手動 ACK 回去,最後對這個記錄進行補償。git
或者因爲服務器宕機等嚴重問題,致使 ACK 和 NACK 都沒有,那咱們就須要手工進行 ACK 保障消費端消費成功,再經過補償機制補償。github
消費端的重回隊列 消費端的重回隊列是爲了對沒有處理成功的消息,把消息從新遞給 broker。可是在咱們的實際生產,通常都會關閉重回隊列,api
代碼地址: https://github.com/hmilyos/rabbitmqdemo.git rabbitmq-api 項目下
複製代碼
生產端的代碼基本沒什麼變化bash
@Slf4j
public class Procuder {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(RabbitMQConfig.RABBITMQ_HOST);
connectionFactory.setPort(RabbitMQConfig.RABBITMQ_PORT);
connectionFactory.setVirtualHost(RabbitMQConfig.RABBITMQ_DEFAULT_VIRTUAL_HOST);
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
String exchange = "test_ack_exchange";
String routingKey = "ack.save";
for(int i =0; i < 5; i++){
Map<String, Object> headers = new HashMap<String, Object>();
headers.put("num", i);
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.deliveryMode(2)
.contentEncoding("UTF-8")
.headers(headers)
.build();
String msg = "Hello RabbitMQ ACK Message " + i;
log.info("生產端發送:{}", msg);
channel.basicPublish(exchange, routingKey, true, properties, msg.getBytes());
}
}
}
複製代碼
接着是消費端的代碼服務器
注意看消費端的代碼, autoack 必定要設置爲 false,要否則不會生效的
複製代碼
@Slf4j
public class Consumer {
public static void main(String[] args) throws IOException, TimeoutException {
//1 建立ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(RabbitMQConfig.RABBITMQ_HOST);
connectionFactory.setPort(RabbitMQConfig.RABBITMQ_PORT);
connectionFactory.setVirtualHost(RabbitMQConfig.RABBITMQ_DEFAULT_VIRTUAL_HOST);
//2 獲取C onnection
Connection connection = connectionFactory.newConnection();
//3 經過Connection建立一個新的Channel
Channel channel = connection.createChannel();
String exchangeName = "test_ack_exchange";
String queueName = "test_ack_queue";
String routingKey = "ack.#";
channel.exchangeDeclare(exchangeName, "topic", true, false, null);
channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);
//使用自定義消費者
//1 手工簽收 必需要關閉 autoAck = false
channel.basicConsume(queueName, false, new MyConsumer(channel));
log.info("消費端啓動成功");
}
}
複製代碼
消費端的具體消費代碼:ide
/**
* 自定義消費者
*/
@Slf4j
public class MyConsumer extends DefaultConsumer {
private Channel channel;
public MyConsumer(Channel channel) {
super(channel);
this.channel = channel;
}
@Override
public void handleDelivery(String consumerTag, //消費者標籤
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws IOException {
log.info("------MyConsumer-----consume message----------");
log.info("body: " + new String(body));
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
if((Integer)properties.getHeaders().get("num") == 0) {
//是否爲批量的,是否重回隊列
channel.basicNack(envelope.getDeliveryTag(), false, true);
} else {
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
}
複製代碼
先啓動消費端,再啓動生產端 ui