RabbitMQ 的 confirm 消息確認機制和 return 消息機制

代碼地址:     https://github.com/hmilyos/rabbitmqdemo.git  rabbitmq-api 項目下
複製代碼

1.confirm 消息確認機制

消息的確認是指生產者投遞消息後,若是 Broker 接收到消息,則會給生產者一個應答。生產者進行接收應答,用來確認這條消息是否正常的發送到 Broker,這種方式也是消息可靠性投遞的核心保障。其流程圖以下所示 git

'confirm消息確認機制流程圖'

實現 confirm 確認消息

第一步:在 channel 上開啓確認模式:channel.confirmSelect()
第二步:在channel上添加監聽:addConfirmListener,監聽成功和失敗的返回結果,根據具體的結果對消息進行從新發送、或記錄日誌等後續處理!

代碼實現: 生產端github

/**
 * confirm機制生產端
 */
public class Procuder {

    private static final Logger log = LoggerFactory.getLogger(Procuder.class);

    public static void main(String[] args) throws IOException, TimeoutException {
        //1 建立ConnectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(RabbitMQCommon.RABBITMQ_HOST);
        connectionFactory.setPort(RabbitMQCommon.RABBITMQ_PORT);
        connectionFactory.setVirtualHost(RabbitMQCommon.RABBITMQ_DEFAULT_VIRTUAL_HOST);
        //2 獲取C	onnection
        Connection connection = connectionFactory.newConnection();
        //3 經過Connection建立一個新的Channel
        Channel channel = connection.createChannel();

        //4 指定咱們的消息投遞模式: 消息的確認模式
        channel.confirmSelect();

        //5 發送一條消息
        String msg = "Hello RabbitMQ Send confirm message!";
        log.info("生產已啓動,併發送了:{}", msg);
        channel.basicPublish(Consumer.EXCHANGE_NAME, Consumer.ROUTING_KEY, null, msg.getBytes());

        //6 添加一個確認監聽
        channel.addConfirmListener(new ConfirmListener() {
            @Override  //deliveryTag表示消息的惟一標籤,
            public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                log.info("-------ack!-----------");
                log.info("deliveryTag: {}, multiple: {}", deliveryTag, multiple);
            }
            @Override  //失敗時進入這裏
            public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                log.info("-------no ack!-----------");
                log.info("deliveryTag: {}, multiple: {}", deliveryTag, multiple);
            }
        });
    }
}

複製代碼

消費端代碼無需什麼修改api

/**
 * confirm機制消費端
 */
public class Consumer {

    private static final Logger log = LoggerFactory.getLogger(Consumer.class);

    public static final String EXCHANGE_NAME = "test_confirm_exchange";
    public static final String EXCHANGE_TYPE = "topic";
    public static final String ROUTING_KEY_REG = "confirm.#";
    public static final String ROUTING_KEY = "confirm.abc";
    public static final String QUEUE_NAME = "test_confirm_queue";

    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        //1 建立ConnectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(RabbitMQCommon.RABBITMQ_HOST);
        connectionFactory.setPort(RabbitMQCommon.RABBITMQ_PORT);
        connectionFactory.setVirtualHost(RabbitMQCommon.RABBITMQ_DEFAULT_VIRTUAL_HOST);
        //2 獲取C	onnection
        Connection connection = connectionFactory.newConnection();
        //3 經過Connection建立一個新的Channel
        Channel channel = connection.createChannel();
        //4 聲明交換機和隊列 而後進行綁定設置, 最後制定路由Key
        channel.exchangeDeclare(EXCHANGE_NAME, EXCHANGE_TYPE, true);   //true表示持久化
                //是否持久化,獨佔模式,自動刪除
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY_REG);

        //5 建立消費者
        QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
//        是否自動簽收 autoAck
        channel.basicConsume(QUEUE_NAME, true, queueingConsumer);
        log.info("消費端已啓動");
        while(true){
            QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
            String msg = new String(delivery.getBody());
            log.info("消費端: {}", msg);
        }
    }
}
複製代碼

啓動消費端 bash

啓動消費端
再啓動生產端,注意看日誌打印,收到了 confirm
自此,confirm 機制的用法接受完畢。

2. return 消息機制

return Listener 用於處理一些不可路由的消息!併發

生產者經過指定一個 exchange 和 routingkey 把消息送達到某個隊列中去,而後消費者監聽隊列,進行消費處理。可是在某些狀況下,若是咱們在發送消息時,當前的 exchange 不存在或者指定的 routingkey 路由不到,這個時候若是要監聽這種不可達的消息,就要使用 return Listener。流程圖以下所示ide

實現 return 消息機制

在基礎 API 中有一個關鍵的配置項 Mandatory:若是爲 true,則監聽器會接收到路由不可達的消息,而後進行後續處理,若是爲 false,則 broke r端自動刪除該消息。ui

代碼實現 生產端代碼:spa

/**
 *  Return返回消息 生產端
 */
public class Procuder {

    private static final Logger log = LoggerFactory.getLogger(Procuder.class);

    public static void main(String[] args) throws IOException, TimeoutException {
        //1 建立ConnectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(RabbitMQCommon.RABBITMQ_HOST);
        connectionFactory.setPort(RabbitMQCommon.RABBITMQ_PORT);
        connectionFactory.setVirtualHost(RabbitMQCommon.RABBITMQ_DEFAULT_VIRTUAL_HOST);
        //2 獲取C	onnection
        Connection connection = connectionFactory.newConnection();
        //3 經過Connection建立一個新的Channel
        Channel channel = connection.createChannel();

        String msg = "Hello RabbitMQ Return Message";

        channel.addReturnListener(new ReturnListener() {
            @Override
            public void handleReturn(int replyCode, String replyText, String exchange,
                                     String routingKey, AMQP.BasicProperties properties,
                                     byte[] body) throws IOException {
                log.info("---------handle return----------");
                log.info("響應碼replyCode: {}", replyCode);
                log.info("文本信息replyText: {}", replyText);
                log.info("exchange: {}", exchange);
                log.info("routingKey: {}", routingKey);
                log.info("properties: {}", properties);
                log.info("body: {}" ,new String(body));
            }
        });

        /**
         * 若是爲true,則監聽器會接收到路由不可達的消息,而後進行後續處理,
         * 若是爲false,則broker端自動刪除該消息。
         */
        log.info("生產端{}發送:{}", Consumer.ROUTING_KEY, msg + Consumer.ROUTING_KEY);
        channel.basicPublish(Consumer.EXCHANGE_NAME, Consumer.ROUTING_KEY, true, null, (msg + Consumer.ROUTING_KEY).getBytes());
        log.info("生產端{}發送:{}", Consumer.ROUTINGKEY_ERROR, msg + Consumer.ROUTINGKEY_ERROR);
        channel.basicPublish(Consumer.EXCHANGE_NAME, Consumer.ROUTINGKEY_ERROR, true, null, (msg + Consumer.ROUTINGKEY_ERROR).getBytes());
        log.info("生產端{}發送:{}", Consumer.ROUTINGKEY_ERROR2, msg + Consumer.ROUTINGKEY_ERROR2);
        channel.basicPublish(Consumer.EXCHANGE_NAME, Consumer.ROUTINGKEY_ERROR2, false, null, (msg + Consumer.ROUTINGKEY_ERROR2).getBytes());

    }
}
複製代碼

消費端代碼和之前的同樣,無需修改什麼3d

/**
 *  Return返回消息 消費端
 */
public class Consumer {

    private static final Logger log = LoggerFactory.getLogger(Consumer.class);

    public static final String EXCHANGE_NAME = "test_return_exchange";
    public static final String EXCHANGE_TYPE = "topic";
    public static final String ROUTING_KEY_REG = "return.#";
    public static final String ROUTING_KEY = "return.save";
    public static final String ROUTINGKEY_ERROR = "abc.true";
    public static final  String ROUTINGKEY_ERROR2 = "abc.false";
    public static final String QUEUE_NAME = "test_return_queue";

    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        //1 建立ConnectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(RabbitMQCommon.RABBITMQ_HOST);
        connectionFactory.setPort(RabbitMQCommon.RABBITMQ_PORT);
        connectionFactory.setVirtualHost(RabbitMQCommon.RABBITMQ_DEFAULT_VIRTUAL_HOST);
        //2 獲取C	onnection
        Connection connection = connectionFactory.newConnection();
        //3 經過Connection建立一個新的Channel
        Channel channel = connection.createChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, EXCHANGE_TYPE, true, false, null);
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY_REG);

        QueueingConsumer queueingConsumer = new QueueingConsumer(channel);

        channel.basicConsume(QUEUE_NAME, true, queueingConsumer);
        log.info("消費端啓動成功");
        while(true){
            QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
            String msg = new String(delivery.getBody());
            log.info("消費者: {}", msg);
        }
    }
}

複製代碼

先啓動消費端,而後上 rabbitMQ 的管控臺查看對應的 exchange 和 queue 是否建立,綁定成功。 日誌

再啓動生產端,注意看打印的日誌

監聽器會接收到路由不可達的消息,而後進行後續處理,前提是消費端的 Mandatory 爲 true,能夠修改成 false,再試試看能不能接收到,我這裏就不演示了。
相關文章
相關標籤/搜索