RabbitMQ高級特性-生產端Return返回消息機制

爲何須要Return

Return Listener用於處理一些不可路由的消息java

生產者經過指定Exchange和RoutingKey將消息發送達指定隊列,消費者只需監聽這個隊列,進行消費操做。ide

可是在某些狀況下,咱們在發送消息的時候,當前Exchange不存在或者指定的路由key路由不到,這個時候咱們若是須要監聽這種達不到的消息,咱們就須要使用Return Listener。spa

實現

流程圖:

生產端Return

在API中配置項:3d

Mandatory:若是爲true,則會監聽路由不可達消息,而後進行後續處理;若是爲false,那麼broker自動刪除這條消息。code

代碼實現

Producer

package com.wyg.rabbitmq.javaclient.producer_return;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.*;

/** * 生產端return機制 * * @author wyg0405@gmail.com * @date 2019-11-22 13:25 * @since JDK1.8 * @version V1.0 */

public class Producer {
    private static final String HOST = "localhost";
    private static final int PORT = 5672;
    private static final String USERNAME = "guset";
    private static final String PASSWORD = "guset";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(HOST);
        factory.setVirtualHost("/");
        factory.setPort(PORT);
        factory.setUsername(USERNAME);
        factory.setPassword(PASSWORD);
        Connection connection = factory.newConnection();

        Channel channel = connection.createChannel();
        String exchangeName = "test_return_exchange";
        String routingKey = "return.ok";
        String routingKeyError = "return.error";
        // 申明exchange
        channel.exchangeDeclare(exchangeName, "topic");
        // 添加監聽
        channel.addReturnListener(new ReturnListener() {
            @Override
            public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("---生產端---");
                System.out.println("replyCode:" + replyCode);
                System.out.println("replyText:" + replyText);
                System.out.println("exchange:" + exchange);
                System.out.println("routingKey:" + routingKey);
                System.out.println("properties:" + properties);
                System.out.println("body:" + new String(body));

            }
        });

        // 設置 mandatory 爲 true
        boolean mandatory = true;
        String msg = "這是一條return確認消息1,routingKey " + routingKey;
        channel.basicPublish(exchangeName, routingKey, mandatory, null, msg.getBytes("UTF-8"));
        String msg2 = "這是一條return確認消息2,routingKey " + routingKeyError;
        channel.basicPublish(exchangeName, routingKeyError, mandatory, null, msg2.getBytes("UTF-8"));

    }

    // 注意,由於要等待broker的return消息,暫時不關閉channel和connection
}

複製代碼

Conusumer

package com.wyg.rabbitmq.javaclient.producer_return;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.*;

/** * 生產端return機制 * * @author wyg0405@gmail.com * @date 2019-11-22 14:07 * @since JDK1.8 * @version V1.0 */

public class Consumer {

    private static final String HOST = "localhost";
    private static final int PORT = 5672;
    private static final String USERNAME = "guset";
    private static final String PASSWORD = "guset";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(HOST);
        factory.setVirtualHost("/");
        factory.setPort(PORT);
        factory.setUsername(USERNAME);
        factory.setPassword(PASSWORD);
        Connection connection = factory.newConnection();

        Channel channel = connection.createChannel();

        String queueName = "test_return_queue";
        String exchangeName = "test_return_exchange";
        String routingKey = "return.ok";

        // 申明exchange
        channel.exchangeDeclare(exchangeName, "topic");
        // 申明隊列
        channel.queueDeclare(queueName, true, false, false, null);
        // 隊列綁定到exchange
        channel.queueBind(queueName, exchangeName, routingKey, null);

        DeliverCallback deliverCallback = new DeliverCallback() {
            @Override
            public void handle(String consumerTag, Delivery message) throws IOException {
                try {
                    System.out.println("---消費者---:" + new String(message.getBody(), "UTF-8"));
                } finally {
                    // consumer手動 ack 給broker
                    channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
                }
            }
        };
        CancelCallback cancelCallback = new CancelCallback() {
            @Override
            public void handle(String consumerTag) throws IOException {
                System.out.println("---消費者:cancelCallback");
            }
        };

        // 消費消息
        channel.basicConsume(queueName, false, deliverCallback, cancelCallback);
    }
}

複製代碼

注意cdn

先運行Consumer,綁定正確的exchange和routingKey,再啓動生產者blog

運行結果

Consumerrabbitmq

Consumer運行結果

Producer,其中routingKey 爲 return.error的消息沒法路由,被Return Listener監聽到隊列

Producer運行結果
相關文章
相關標籤/搜索