[TOC]java
消息沒有任何消費者去消費就變爲死信ide
利用DLX,當消息在一個隊列中變成死信以後,它能被從新publish到另一個exchange,這個exchange就是DLX。ui
DLX也是一個正常的Exchange,和通常的Exchange沒有區別,它能在任何隊列上被指定,實際就是設置某個隊列的屬性。當隊列中有死信時,RabbitMQ會自動將死信消息發送到設置的DLX,進而被路由到另一個隊列,能夠監聽這個隊列,作後續處理。spa
arguments.put(x-dead-letter-exchange", "you dlx");
package com.wyg.rabbitmq.javaclient.dlx; import java.io.IOException; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; /** * 消費者手工ack和nack * * @author wyg0405@gmail.com * @date 2019-11-22 13:25 * @since JDK1.8 * @version V1.0 */ public class Producer { private static final String HOST = "localhost"; private static final int PORT = 5672; private static final String USERNAME = "guset"; private static final String PASSWORD = "guset"; public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(HOST); factory.setVirtualHost("/"); factory.setPort(PORT); factory.setUsername(USERNAME); factory.setPassword(PASSWORD); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); String exchangeName = "test_dlx_exchange"; String routingKey = "dlx.abc"; // 申明exchange channel.exchangeDeclare(exchangeName, "topic"); String msg = "正常消息1,routingKey:" + routingKey; AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().deliveryMode(2).expiration("2000").build(); channel.basicPublish(exchangeName, routingKey, false, null, msg.getBytes("UTF-8")); // 該消息無消費者消費 String msg2 = "過時死信消息2,routingKey:" + routingKey; channel.basicPublish(exchangeName, routingKey, false, props, msg2.getBytes("UTF-8")); String msg3 = "過時死信消息3,routingKey:" + routingKey; channel.basicPublish(exchangeName, routingKey, false, props, msg3.getBytes("UTF-8")); String msg4 = "過時死信消息4,routingKey:" + routingKey; channel.basicPublish(exchangeName, routingKey, false, props, msg4.getBytes("UTF-8")); channel.close(); connection.close(); } }
producer能夠採用消息過時產生死信code
package com.wyg.rabbitmq.javaclient.dlx; import java.io.IOException; import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.*; /** * 消費者手工ack和nack * * @author wyg0405@gmail.com * @date 2019-11-22 14:07 * @since JDK1.8 * @version V1.0 */ public class Consumer { private static final String HOST = "localhost"; private static final int PORT = 5672; private static final String USERNAME = "guset"; private static final String PASSWORD = "guset"; public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(HOST); factory.setVirtualHost("/"); factory.setPort(PORT); factory.setUsername(USERNAME); factory.setPassword(PASSWORD); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); // 定義死信隊的Exchange String dlxExchange = "dlx.exchange"; channel.exchangeDeclare(dlxExchange, "topic"); // 死信隊列名 String dlxQueue = "dlx.queue"; channel.queueDeclare(dlxQueue, true, false, false, null); // # 表示全部的key均可以路由到s死信隊列 String dlxRoutingKey = "#"; // 綁定死信隊列和exchange channel.queueBind(dlxQueue, dlxExchange, dlxRoutingKey, null); // 定義正常的消費者j監聽隊列 String queueName = "test_dlx_queue"; String exchangeName = "test_dlx_exchange"; String routingKey = "dlx.#"; // 申明exchange channel.exchangeDeclare(exchangeName, "topic"); // 申明隊列 Map<String, Object> arguments = new HashMap<>(); // 設置死信隊列,arguments要設置到申明的隊列上 arguments.put("x-dead-letter-exchange", dlxExchange); channel.queueDeclare(queueName, true, false, false, arguments); // 隊列綁定到exchange channel.queueBind(queueName, exchangeName, routingKey); channel.basicQos(1); DeliverCallback deliverCallback = new DeliverCallback() { @Override public void handle(String consumerTag, Delivery message) throws IOException { try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("---消費者-- " + new String(message.getBody(), "UTF-8")); } }; CancelCallback cancelCallback = new CancelCallback() { @Override public void handle(String consumerTag) throws IOException { System.out.println("---消費者--:cancelCallback "); } }; // 消費消息,autoAck必定要設爲false,手工ack channel.basicConsume(queueName, false, deliverCallback, cancelCallback); } }
運行結果:只消費一條正常消息,其餘過時的未消費blog
package com.wyg.rabbitmq.javaclient.dlx; import java.io.IOException; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.*; /** * 監聽私信隊列 * * @author wyg0405@gmail.com * @date 2019-11-22 21:52 * @since JDK1.8 * @version V1.0 */ public class DLXConusmer { private static final String HOST = "localhost"; private static final int PORT = 5672; private static final String USERNAME = "guset"; private static final String PASSWORD = "guset"; public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(HOST); factory.setVirtualHost("/"); factory.setPort(PORT); factory.setUsername(USERNAME); factory.setPassword(PASSWORD); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); String queueName = "dlx.queue"; String exchangeName = "dlx.exchange"; String routingKey = "#"; // 申明exchange channel.exchangeDeclare(exchangeName, "topic"); // 申明隊列 channel.queueDeclare(queueName, true, false, false, null); // 隊列綁定到exchange channel.queueBind(queueName, exchangeName, routingKey, null); DeliverCallback deliverCallback = new DeliverCallback() { @Override public void handle(String consumerTag, Delivery message) throws IOException { try { System.out.println("---死信隊列消費者---"); System.out.println(new String(message.getBody(), "UTF-8")); } finally { // consumer手動 ack 給broker channel.basicAck(message.getEnvelope().getDeliveryTag(), false); } } }; CancelCallback cancelCallback = new CancelCallback() { @Override public void handle(String consumerTag) throws IOException { System.out.println("---消費者:cancelCallback"); } }; // 消費消息,autoAck必定要設置爲false channel.basicConsume(queueName, false, deliverCallback, cancelCallback); } }
運行結果:3條過時的消息進入死信隊列,並被消費rabbitmq