rabbitmq 死信隊列

死信隊列: DLX,dead-letter-exchange

利用 dlx,當消息在一個隊列中變成死信 (dead message) 以後,它能被從新 publish 到另外一個 exchange,這個 exchange 就是 dlxgit

消息變成死信的緣由有:

1.消息被拒絕 (basic.reject / basic.nack) 而且 reQueue=false

2.消息 TTL 過時

3.隊列達到最大長度了
複製代碼

dlx 也是一個正常的 exchange,和通常的 exchange 沒什麼區別,它能在任何隊列上被指定,實際上就是設置一個屬性。github

當這個隊列中有死信時,rabbitMQ 就會自動的將這個消息從新發布到設置的 exchange 上去,進而被路由到另外一個隊列。api

能夠監聽這個隊列中消息作相應的處理,這個特性能夠彌補 rabbitMQ3.0 之前支持的 immediate 參數功能。bash

死信隊列的設置:

首先要設置死信隊列的 exchange 和 queue,而後進行綁定:ide

exchange: dlx.exchange

  queue: dlx.queue

  routingkey:  #
複製代碼

而後進行正常聲明交換機、隊列、綁定,只不過須要在隊列加上一個參數便可: argument.put("x-dead-letter-exchange", "dlx.exchange");ui

這樣消息在過時、reQueue、隊列在達到最大長度時,消息就能夠直接路由到死信隊列spa

代碼地址:    https://github.com/hmilyos/rabbitmqdemo.git  rabbitmq-api 項目下
複製代碼
public class Producer {

	private static final Logger log = LoggerFactory.getLogger(Producer.class);
	
	public static void main(String[] args) throws IOException, TimeoutException {
		 ConnectionFactory connectionFactory = new ConnectionFactory();
	        connectionFactory.setHost(RabbitMQCommon.RABBITMQ_HOST);
	        connectionFactory.setPort(RabbitMQCommon.RABBITMQ_PORT);
	        connectionFactory.setVirtualHost(RabbitMQCommon.RABBITMQ_DEFAULT_VIRTUAL_HOST);

	        Connection connection = connectionFactory.newConnection();
	        Channel channel = connection.createChannel();

	        //注意,這只是普通的交換機和routingKey
	        String exchange = "test_dlx_exchange";
	        String routingKey = "dlx.save";

	        for (int i = 0; i < 1; i++) {
	            String msg = "Hello RabbitMQ DLX Message" + i;
	            AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
	                    .deliveryMode(2)
	                    .contentEncoding("UTF-8")
	                    .expiration("10000")
	                    .build();
	            log.info("生產端發送:{}", msg);
	            channel.basicPublish(exchange, routingKey, true, properties, msg.getBytes());
	        }
	}
}
複製代碼
/**
 *	 死信隊列
 *
 */
public class Consumer {

	private static final Logger log = LoggerFactory.getLogger(Consumer.class);
	
	public static void main(String[] args) throws IOException, TimeoutException {
		//1 建立ConnectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(RabbitMQCommon.RABBITMQ_HOST);
        connectionFactory.setPort(RabbitMQCommon.RABBITMQ_PORT);
        connectionFactory.setVirtualHost(RabbitMQCommon.RABBITMQ_DEFAULT_VIRTUAL_HOST);
        //2 獲取Connection
        Connection connection = connectionFactory.newConnection();
        //3 經過Connection建立一個新的Channel
        Channel channel = connection.createChannel();

        // 這就是一個普通的交換機 和 隊列 以及路由
        String exchangeName = "test_dlx_exchange";
        String routingKey = "dlx.#";
        String queueName = "test_dlx_queue";
        channel.exchangeDeclare(exchangeName, "topic", true, false, null);

        Map<String, Object> agruments = new HashMap<String, Object>();
        agruments.put("x-dead-letter-exchange", "dlx.exchange");
        //這個agruments屬性,要設置到聲明隊列上
        channel.queueDeclare(queueName, true, false, false, agruments);
        channel.queueBind(queueName, exchangeName, routingKey);

        //要進行死信隊列的聲明: dlx.exchange/queue都是由你本身命名的,只不過爲了這裏只是爲了簡潔明瞭而已
        channel.exchangeDeclare("dlx.exchange", "topic", true, false, null);
        channel.queueDeclare("dlx.queue", true, false, false, null);
        channel.queueBind("dlx.queue", "dlx.exchange", "#");

        channel.basicConsume(queueName, true, new MyConsumer(channel));
        log.info("消費端啓動成功");
        
	}
}
複製代碼
public class MyConsumer extends DefaultConsumer {

	private static final Logger log = LoggerFactory.getLogger(MyConsumer.class);

	public MyConsumer(Channel channel) {
		super(channel);
	}

	@Override
	public void handleDelivery(String consumerTag, // 消費者標籤
			Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
		log.info("----dlx--MyConsumer-----consume message----------");
		log.info("consumerTag: " + consumerTag);
		log.info("envelope: " + envelope);
		log.info("properties: " + properties);
		log.info("body: " + new String(body));
	}

}
複製代碼

先把消費端啓動,去管控臺查看 test_dlx_exchange 以及 test_dlx_queue 這兩個普通的交換機、隊列, 死信隊列 dlx.queue 以及綁定的 dlx.exchange 是否建立成功 code

確認建立成功就關閉消費端,而後再啓動生產端,這時候消息沒被消費,一直在 test_dlx_queue 中,
直到設置的超時時間後,消息就被轉發到死信隊列中
自此,死信隊列的簡單用法介紹完畢。
相關文章
相關標籤/搜索