RabbitMQ 消費端 ACK 與重回隊列機制

消費端的手工 ACK 和 NACK

消費端進行消費的時候,若是因爲業務異常致使失敗了,返回 NACK 達到最大重試次數,此時咱們能夠進行日誌的記錄,而後手動 ACK 回去,最後對這個記錄進行補償。git

或者因爲服務器宕機等嚴重問題,致使 ACK 和 NACK 都沒有,那咱們就須要手工進行 ACK 保障消費端消費成功,再經過補償機制補償。github

消費端的重回隊列 消費端的重回隊列是爲了對沒有處理成功的消息,把消息從新遞給 broker。可是在咱們的實際生產,通常都會關閉重回隊列,api

代碼地址:    https://github.com/hmilyos/rabbitmqdemo.git  rabbitmq-api 項目下
複製代碼

生產端的代碼基本沒什麼變化bash

@Slf4j
public class Procuder {
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(RabbitMQConfig.RABBITMQ_HOST);
        connectionFactory.setPort(RabbitMQConfig.RABBITMQ_PORT);
        connectionFactory.setVirtualHost(RabbitMQConfig.RABBITMQ_DEFAULT_VIRTUAL_HOST);

        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();

        String exchange = "test_ack_exchange";
        String routingKey = "ack.save";
        for(int i =0; i < 5; i++){
            Map<String, Object> headers = new HashMap<String, Object>();
            headers.put("num", i);
            AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
                    .deliveryMode(2)
                    .contentEncoding("UTF-8")
                    .headers(headers)
                    .build();
            String msg = "Hello RabbitMQ ACK Message " + i;
            log.info("生產端發送:{}", msg);
            channel.basicPublish(exchange, routingKey, true, properties, msg.getBytes());
        }

    }
}
複製代碼

接着是消費端的代碼服務器

注意看消費端的代碼, autoack 必定要設置爲 false,要否則不會生效的
複製代碼
@Slf4j
public class Consumer {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1 建立ConnectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(RabbitMQConfig.RABBITMQ_HOST);
        connectionFactory.setPort(RabbitMQConfig.RABBITMQ_PORT);
        connectionFactory.setVirtualHost(RabbitMQConfig.RABBITMQ_DEFAULT_VIRTUAL_HOST);
        //2 獲取C	onnection
        Connection connection = connectionFactory.newConnection();
        //3 經過Connection建立一個新的Channel
        Channel channel = connection.createChannel();
        String exchangeName = "test_ack_exchange";
        String queueName = "test_ack_queue";
        String routingKey = "ack.#";

        channel.exchangeDeclare(exchangeName, "topic", true, false, null);
        channel.queueDeclare(queueName, true, false, false, null);
        channel.queueBind(queueName, exchangeName, routingKey);

        //使用自定義消費者
        //1 手工簽收 必需要關閉 autoAck = false
        channel.basicConsume(queueName, false, new MyConsumer(channel));
        log.info("消費端啓動成功");

    }
}
複製代碼

消費端的具體消費代碼:ide

/**
 * 自定義消費者
 */
@Slf4j
public class MyConsumer extends DefaultConsumer {

    private Channel channel;
    public MyConsumer(Channel channel) {
        super(channel);
        this.channel = channel;
    }

    @Override
    public void handleDelivery(String consumerTag,  //消費者標籤
                               Envelope envelope,
                               AMQP.BasicProperties properties,
                               byte[] body) throws IOException {
        log.info("------MyConsumer-----consume message----------");
        log.info("body: " + new String(body));
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        if((Integer)properties.getHeaders().get("num") == 0) {
                                                //是否爲批量的,是否重回隊列
            channel.basicNack(envelope.getDeliveryTag(), false, true);
        } else {
            channel.basicAck(envelope.getDeliveryTag(), false);
        }
    }
}
複製代碼

先啓動消費端,再啓動生產端 ui

注意看消費端的日誌,發現按 0-4 消費完後,0 的重回隊列了,符合咱們的目標
自此,重回隊列演示完畢。
相關文章
相關標籤/搜索