代碼地址: https://github.com/hmilyos/rabbitmqdemo.git rabbitmq-api 項目下
複製代碼
消息的確認是指生產者投遞消息後,若是 Broker 接收到消息,則會給生產者一個應答。生產者進行接收應答,用來確認這條消息是否正常的發送到 Broker,這種方式也是消息可靠性投遞的核心保障。其流程圖以下所示 git
代碼實現: 生產端github
/**
* confirm機制生產端
*/
public class Procuder {
private static final Logger log = LoggerFactory.getLogger(Procuder.class);
public static void main(String[] args) throws IOException, TimeoutException {
//1 建立ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(RabbitMQCommon.RABBITMQ_HOST);
connectionFactory.setPort(RabbitMQCommon.RABBITMQ_PORT);
connectionFactory.setVirtualHost(RabbitMQCommon.RABBITMQ_DEFAULT_VIRTUAL_HOST);
//2 獲取C onnection
Connection connection = connectionFactory.newConnection();
//3 經過Connection建立一個新的Channel
Channel channel = connection.createChannel();
//4 指定咱們的消息投遞模式: 消息的確認模式
channel.confirmSelect();
//5 發送一條消息
String msg = "Hello RabbitMQ Send confirm message!";
log.info("生產已啓動,併發送了:{}", msg);
channel.basicPublish(Consumer.EXCHANGE_NAME, Consumer.ROUTING_KEY, null, msg.getBytes());
//6 添加一個確認監聽
channel.addConfirmListener(new ConfirmListener() {
@Override //deliveryTag表示消息的惟一標籤,
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
log.info("-------ack!-----------");
log.info("deliveryTag: {}, multiple: {}", deliveryTag, multiple);
}
@Override //失敗時進入這裏
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
log.info("-------no ack!-----------");
log.info("deliveryTag: {}, multiple: {}", deliveryTag, multiple);
}
});
}
}
複製代碼
消費端代碼無需什麼修改api
/**
* confirm機制消費端
*/
public class Consumer {
private static final Logger log = LoggerFactory.getLogger(Consumer.class);
public static final String EXCHANGE_NAME = "test_confirm_exchange";
public static final String EXCHANGE_TYPE = "topic";
public static final String ROUTING_KEY_REG = "confirm.#";
public static final String ROUTING_KEY = "confirm.abc";
public static final String QUEUE_NAME = "test_confirm_queue";
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
//1 建立ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(RabbitMQCommon.RABBITMQ_HOST);
connectionFactory.setPort(RabbitMQCommon.RABBITMQ_PORT);
connectionFactory.setVirtualHost(RabbitMQCommon.RABBITMQ_DEFAULT_VIRTUAL_HOST);
//2 獲取C onnection
Connection connection = connectionFactory.newConnection();
//3 經過Connection建立一個新的Channel
Channel channel = connection.createChannel();
//4 聲明交換機和隊列 而後進行綁定設置, 最後制定路由Key
channel.exchangeDeclare(EXCHANGE_NAME, EXCHANGE_TYPE, true); //true表示持久化
//是否持久化,獨佔模式,自動刪除
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY_REG);
//5 建立消費者
QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
// 是否自動簽收 autoAck
channel.basicConsume(QUEUE_NAME, true, queueingConsumer);
log.info("消費端已啓動");
while(true){
QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
String msg = new String(delivery.getBody());
log.info("消費端: {}", msg);
}
}
}
複製代碼
啓動消費端 bash
return Listener 用於處理一些不可路由的消息!併發
生產者經過指定一個 exchange 和 routingkey 把消息送達到某個隊列中去,而後消費者監聽隊列,進行消費處理。可是在某些狀況下,若是咱們在發送消息時,當前的 exchange 不存在或者指定的 routingkey 路由不到,這個時候若是要監聽這種不可達的消息,就要使用 return Listener。流程圖以下所示ide
在基礎 API 中有一個關鍵的配置項 Mandatory:若是爲 true,則監聽器會接收到路由不可達的消息,而後進行後續處理,若是爲 false,則 broke r端自動刪除該消息。ui
代碼實現 生產端代碼:spa
/**
* Return返回消息 生產端
*/
public class Procuder {
private static final Logger log = LoggerFactory.getLogger(Procuder.class);
public static void main(String[] args) throws IOException, TimeoutException {
//1 建立ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(RabbitMQCommon.RABBITMQ_HOST);
connectionFactory.setPort(RabbitMQCommon.RABBITMQ_PORT);
connectionFactory.setVirtualHost(RabbitMQCommon.RABBITMQ_DEFAULT_VIRTUAL_HOST);
//2 獲取C onnection
Connection connection = connectionFactory.newConnection();
//3 經過Connection建立一個新的Channel
Channel channel = connection.createChannel();
String msg = "Hello RabbitMQ Return Message";
channel.addReturnListener(new ReturnListener() {
@Override
public void handleReturn(int replyCode, String replyText, String exchange,
String routingKey, AMQP.BasicProperties properties,
byte[] body) throws IOException {
log.info("---------handle return----------");
log.info("響應碼replyCode: {}", replyCode);
log.info("文本信息replyText: {}", replyText);
log.info("exchange: {}", exchange);
log.info("routingKey: {}", routingKey);
log.info("properties: {}", properties);
log.info("body: {}" ,new String(body));
}
});
/**
* 若是爲true,則監聽器會接收到路由不可達的消息,而後進行後續處理,
* 若是爲false,則broker端自動刪除該消息。
*/
log.info("生產端{}發送:{}", Consumer.ROUTING_KEY, msg + Consumer.ROUTING_KEY);
channel.basicPublish(Consumer.EXCHANGE_NAME, Consumer.ROUTING_KEY, true, null, (msg + Consumer.ROUTING_KEY).getBytes());
log.info("生產端{}發送:{}", Consumer.ROUTINGKEY_ERROR, msg + Consumer.ROUTINGKEY_ERROR);
channel.basicPublish(Consumer.EXCHANGE_NAME, Consumer.ROUTINGKEY_ERROR, true, null, (msg + Consumer.ROUTINGKEY_ERROR).getBytes());
log.info("生產端{}發送:{}", Consumer.ROUTINGKEY_ERROR2, msg + Consumer.ROUTINGKEY_ERROR2);
channel.basicPublish(Consumer.EXCHANGE_NAME, Consumer.ROUTINGKEY_ERROR2, false, null, (msg + Consumer.ROUTINGKEY_ERROR2).getBytes());
}
}
複製代碼
消費端代碼和之前的同樣,無需修改什麼3d
/**
* Return返回消息 消費端
*/
public class Consumer {
private static final Logger log = LoggerFactory.getLogger(Consumer.class);
public static final String EXCHANGE_NAME = "test_return_exchange";
public static final String EXCHANGE_TYPE = "topic";
public static final String ROUTING_KEY_REG = "return.#";
public static final String ROUTING_KEY = "return.save";
public static final String ROUTINGKEY_ERROR = "abc.true";
public static final String ROUTINGKEY_ERROR2 = "abc.false";
public static final String QUEUE_NAME = "test_return_queue";
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
//1 建立ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(RabbitMQCommon.RABBITMQ_HOST);
connectionFactory.setPort(RabbitMQCommon.RABBITMQ_PORT);
connectionFactory.setVirtualHost(RabbitMQCommon.RABBITMQ_DEFAULT_VIRTUAL_HOST);
//2 獲取C onnection
Connection connection = connectionFactory.newConnection();
//3 經過Connection建立一個新的Channel
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, EXCHANGE_TYPE, true, false, null);
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY_REG);
QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
channel.basicConsume(QUEUE_NAME, true, queueingConsumer);
log.info("消費端啓動成功");
while(true){
QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
String msg = new String(delivery.getBody());
log.info("消費者: {}", msg);
}
}
}
複製代碼
先啓動消費端,而後上 rabbitMQ 的管控臺查看對應的 exchange 和 queue 是否建立,綁定成功。 日誌