利用 dlx,當消息在一個隊列中變成死信 (dead message) 以後,它能被從新 publish 到另外一個 exchange,這個 exchange 就是 dlxgit
1.消息被拒絕 (basic.reject / basic.nack) 而且 reQueue=false
2.消息 TTL 過時
3.隊列達到最大長度了
複製代碼
dlx 也是一個正常的 exchange,和通常的 exchange 沒什麼區別,它能在任何隊列上被指定,實際上就是設置一個屬性。github
當這個隊列中有死信時,rabbitMQ 就會自動的將這個消息從新發布到設置的 exchange 上去,進而被路由到另外一個隊列。api
能夠監聽這個隊列中消息作相應的處理,這個特性能夠彌補 rabbitMQ3.0 之前支持的 immediate 參數功能。bash
首先要設置死信隊列的 exchange 和 queue,而後進行綁定:ide
exchange: dlx.exchange
queue: dlx.queue
routingkey: #
複製代碼
而後進行正常聲明交換機、隊列、綁定,只不過須要在隊列加上一個參數便可: argument.put("x-dead-letter-exchange", "dlx.exchange");
ui
這樣消息在過時、reQueue、隊列在達到最大長度時,消息就能夠直接路由到死信隊列spa
代碼地址: https://github.com/hmilyos/rabbitmqdemo.git rabbitmq-api 項目下
複製代碼
public class Producer {
private static final Logger log = LoggerFactory.getLogger(Producer.class);
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(RabbitMQCommon.RABBITMQ_HOST);
connectionFactory.setPort(RabbitMQCommon.RABBITMQ_PORT);
connectionFactory.setVirtualHost(RabbitMQCommon.RABBITMQ_DEFAULT_VIRTUAL_HOST);
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
//注意,這只是普通的交換機和routingKey
String exchange = "test_dlx_exchange";
String routingKey = "dlx.save";
for (int i = 0; i < 1; i++) {
String msg = "Hello RabbitMQ DLX Message" + i;
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.deliveryMode(2)
.contentEncoding("UTF-8")
.expiration("10000")
.build();
log.info("生產端發送:{}", msg);
channel.basicPublish(exchange, routingKey, true, properties, msg.getBytes());
}
}
}
複製代碼
/**
* 死信隊列
*
*/
public class Consumer {
private static final Logger log = LoggerFactory.getLogger(Consumer.class);
public static void main(String[] args) throws IOException, TimeoutException {
//1 建立ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(RabbitMQCommon.RABBITMQ_HOST);
connectionFactory.setPort(RabbitMQCommon.RABBITMQ_PORT);
connectionFactory.setVirtualHost(RabbitMQCommon.RABBITMQ_DEFAULT_VIRTUAL_HOST);
//2 獲取Connection
Connection connection = connectionFactory.newConnection();
//3 經過Connection建立一個新的Channel
Channel channel = connection.createChannel();
// 這就是一個普通的交換機 和 隊列 以及路由
String exchangeName = "test_dlx_exchange";
String routingKey = "dlx.#";
String queueName = "test_dlx_queue";
channel.exchangeDeclare(exchangeName, "topic", true, false, null);
Map<String, Object> agruments = new HashMap<String, Object>();
agruments.put("x-dead-letter-exchange", "dlx.exchange");
//這個agruments屬性,要設置到聲明隊列上
channel.queueDeclare(queueName, true, false, false, agruments);
channel.queueBind(queueName, exchangeName, routingKey);
//要進行死信隊列的聲明: dlx.exchange/queue都是由你本身命名的,只不過爲了這裏只是爲了簡潔明瞭而已
channel.exchangeDeclare("dlx.exchange", "topic", true, false, null);
channel.queueDeclare("dlx.queue", true, false, false, null);
channel.queueBind("dlx.queue", "dlx.exchange", "#");
channel.basicConsume(queueName, true, new MyConsumer(channel));
log.info("消費端啓動成功");
}
}
複製代碼
public class MyConsumer extends DefaultConsumer {
private static final Logger log = LoggerFactory.getLogger(MyConsumer.class);
public MyConsumer(Channel channel) {
super(channel);
}
@Override
public void handleDelivery(String consumerTag, // 消費者標籤
Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
log.info("----dlx--MyConsumer-----consume message----------");
log.info("consumerTag: " + consumerTag);
log.info("envelope: " + envelope);
log.info("properties: " + properties);
log.info("body: " + new String(body));
}
}
複製代碼
先把消費端啓動,去管控臺查看 test_dlx_exchange 以及 test_dlx_queue 這兩個普通的交換機、隊列, 死信隊列 dlx.queue 以及綁定的 dlx.exchange 是否建立成功 code
確認建立成功就關閉消費端,而後再啓動生產端,這時候消息沒被消費,一直在 test_dlx_queue 中, 直到設置的超時時間後,消息就被轉發到死信隊列中 自此,死信隊列的簡單用法介紹完畢。