Return Listener用於處理一些不可路由的消息java
生產者經過指定Exchange和RoutingKey將消息發送達指定隊列,消費者只需監聽這個隊列,進行消費操做。ide
可是在某些狀況下,咱們在發送消息的時候,當前Exchange不存在或者指定的路由key路由不到,這個時候咱們若是須要監聽這種達不到的消息,咱們就須要使用Return Listener。spa
在API中配置項:3d
Mandatory
:若是爲true
,則會監聽路由不可達消息,而後進行後續處理;若是爲false
,那麼broker自動刪除這條消息。code
package com.wyg.rabbitmq.javaclient.producer_return;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.*;
/** * 生產端return機制 * * @author wyg0405@gmail.com * @date 2019-11-22 13:25 * @since JDK1.8 * @version V1.0 */
public class Producer {
private static final String HOST = "localhost";
private static final int PORT = 5672;
private static final String USERNAME = "guset";
private static final String PASSWORD = "guset";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(HOST);
factory.setVirtualHost("/");
factory.setPort(PORT);
factory.setUsername(USERNAME);
factory.setPassword(PASSWORD);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
String exchangeName = "test_return_exchange";
String routingKey = "return.ok";
String routingKeyError = "return.error";
// 申明exchange
channel.exchangeDeclare(exchangeName, "topic");
// 添加監聽
channel.addReturnListener(new ReturnListener() {
@Override
public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("---生產端---");
System.out.println("replyCode:" + replyCode);
System.out.println("replyText:" + replyText);
System.out.println("exchange:" + exchange);
System.out.println("routingKey:" + routingKey);
System.out.println("properties:" + properties);
System.out.println("body:" + new String(body));
}
});
// 設置 mandatory 爲 true
boolean mandatory = true;
String msg = "這是一條return確認消息1,routingKey " + routingKey;
channel.basicPublish(exchangeName, routingKey, mandatory, null, msg.getBytes("UTF-8"));
String msg2 = "這是一條return確認消息2,routingKey " + routingKeyError;
channel.basicPublish(exchangeName, routingKeyError, mandatory, null, msg2.getBytes("UTF-8"));
}
// 注意,由於要等待broker的return消息,暫時不關閉channel和connection
}
複製代碼
package com.wyg.rabbitmq.javaclient.producer_return;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.*;
/** * 生產端return機制 * * @author wyg0405@gmail.com * @date 2019-11-22 14:07 * @since JDK1.8 * @version V1.0 */
public class Consumer {
private static final String HOST = "localhost";
private static final int PORT = 5672;
private static final String USERNAME = "guset";
private static final String PASSWORD = "guset";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(HOST);
factory.setVirtualHost("/");
factory.setPort(PORT);
factory.setUsername(USERNAME);
factory.setPassword(PASSWORD);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
String queueName = "test_return_queue";
String exchangeName = "test_return_exchange";
String routingKey = "return.ok";
// 申明exchange
channel.exchangeDeclare(exchangeName, "topic");
// 申明隊列
channel.queueDeclare(queueName, true, false, false, null);
// 隊列綁定到exchange
channel.queueBind(queueName, exchangeName, routingKey, null);
DeliverCallback deliverCallback = new DeliverCallback() {
@Override
public void handle(String consumerTag, Delivery message) throws IOException {
try {
System.out.println("---消費者---:" + new String(message.getBody(), "UTF-8"));
} finally {
// consumer手動 ack 給broker
channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
}
}
};
CancelCallback cancelCallback = new CancelCallback() {
@Override
public void handle(String consumerTag) throws IOException {
System.out.println("---消費者:cancelCallback");
}
};
// 消費消息
channel.basicConsume(queueName, false, deliverCallback, cancelCallback);
}
}
複製代碼
注意
cdn
先運行Consumer,綁定正確的exchange和routingKey,再啓動生產者blog
Consumerrabbitmq
Producer,其中routingKey 爲 return.error的消息沒法路由,被Return Listener監聽到隊列