RabbitMQ高級特性-死信隊列

[TOC]java

死信

什麼是死信

消息沒有任何消費者去消費就變爲死信ide

消息變爲死信有如下幾種狀況

  1. 消息被拒絕(basic.reject/basic.nack),而且requeue=false
  2. 消息TTL過時
  3. 隊列達到最大長度

死信隊列 Dead Letter Exchange(DLX)

DLX

利用DLX,當消息在一個隊列中變成死信以後,它能被從新publish到另一個exchange,這個exchange就是DLX。ui

DLX也是一個正常的Exchange,和通常的Exchange沒有區別,它能在任何隊列上被指定,實際就是設置某個隊列的屬性。當隊列中有死信時,RabbitMQ會自動將死信消息發送到設置的DLX,進而被路由到另一個隊列,能夠監聽這個隊列,作後續處理。spa

死信隊列設置

  1. 申明死信隊列的Exchange和queue,而後進行綁定
  2. 申明正常隊列Exchange和queue綁定,只不過要在隊列上加參數 arguments.put(x-dead-letter-exchange", "you dlx");

代碼實現

producer

package com.wyg.rabbitmq.javaclient.dlx;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 * 消費者手工ack和nack
 * 
 * @author wyg0405@gmail.com
 * @date 2019-11-22 13:25
 * @since JDK1.8
 * @version V1.0
 */

public class Producer {
    private static final String HOST = "localhost";
    private static final int PORT = 5672;
    private static final String USERNAME = "guset";
    private static final String PASSWORD = "guset";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(HOST);
        factory.setVirtualHost("/");
        factory.setPort(PORT);
        factory.setUsername(USERNAME);
        factory.setPassword(PASSWORD);
        Connection connection = factory.newConnection();

        Channel channel = connection.createChannel();
        String exchangeName = "test_dlx_exchange";
        String routingKey = "dlx.abc";
        // 申明exchange
        channel.exchangeDeclare(exchangeName, "topic");

        String msg = "正常消息1,routingKey:" + routingKey;
        AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().deliveryMode(2).expiration("2000").build();
        channel.basicPublish(exchangeName, routingKey, false, null, msg.getBytes("UTF-8"));
        // 該消息無消費者消費
        String msg2 = "過時死信消息2,routingKey:" + routingKey;
        channel.basicPublish(exchangeName, routingKey, false, props, msg2.getBytes("UTF-8"));
        String msg3 = "過時死信消息3,routingKey:" + routingKey;
        channel.basicPublish(exchangeName, routingKey, false, props, msg3.getBytes("UTF-8"));
        String msg4 = "過時死信消息4,routingKey:" + routingKey;
        channel.basicPublish(exchangeName, routingKey, false, props, msg4.getBytes("UTF-8"));
        channel.close();
        connection.close();
    }

}

producer能夠採用消息過時產生死信code

正常consumer

package com.wyg.rabbitmq.javaclient.dlx;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.*;

/**
 * 消費者手工ack和nack
 * 
 * @author wyg0405@gmail.com
 * @date 2019-11-22 14:07
 * @since JDK1.8
 * @version V1.0
 */

public class Consumer {

    private static final String HOST = "localhost";
    private static final int PORT = 5672;
    private static final String USERNAME = "guset";
    private static final String PASSWORD = "guset";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(HOST);
        factory.setVirtualHost("/");
        factory.setPort(PORT);
        factory.setUsername(USERNAME);
        factory.setPassword(PASSWORD);
        Connection connection = factory.newConnection();

        Channel channel = connection.createChannel();

        // 定義死信隊的Exchange
        String dlxExchange = "dlx.exchange";
        channel.exchangeDeclare(dlxExchange, "topic");
        // 死信隊列名
        String dlxQueue = "dlx.queue";
        channel.queueDeclare(dlxQueue, true, false, false, null);
        // # 表示全部的key均可以路由到s死信隊列
        String dlxRoutingKey = "#";
        // 綁定死信隊列和exchange
        channel.queueBind(dlxQueue, dlxExchange, dlxRoutingKey, null);

        // 定義正常的消費者j監聽隊列
        String queueName = "test_dlx_queue";
        String exchangeName = "test_dlx_exchange";
        String routingKey = "dlx.#";
        // 申明exchange
        channel.exchangeDeclare(exchangeName, "topic");
        // 申明隊列

        Map<String, Object> arguments = new HashMap<>();
        // 設置死信隊列,arguments要設置到申明的隊列上
        arguments.put("x-dead-letter-exchange", dlxExchange);
        channel.queueDeclare(queueName, true, false, false, arguments);
        // 隊列綁定到exchange
        channel.queueBind(queueName, exchangeName, routingKey);
        channel.basicQos(1);

        DeliverCallback deliverCallback = new DeliverCallback() {
            @Override
            public void handle(String consumerTag, Delivery message) throws IOException {

                try {
                    TimeUnit.SECONDS.sleep(3);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("---消費者-- " + new String(message.getBody(), "UTF-8"));

            }
        };
        CancelCallback cancelCallback = new CancelCallback() {
            @Override
            public void handle(String consumerTag) throws IOException {
                System.out.println("---消費者--:cancelCallback ");
            }
        };

        // 消費消息,autoAck必定要設爲false,手工ack
        channel.basicConsume(queueName, false, deliverCallback, cancelCallback);
    }
}

運行結果:只消費一條正常消息,其餘過時的未消費blog

DLXConusmer,監聽消費死信隊列中的消息

package com.wyg.rabbitmq.javaclient.dlx;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.*;

/**
 * 監聽私信隊列
 * 
 * @author wyg0405@gmail.com
 * @date 2019-11-22 21:52
 * @since JDK1.8
 * @version V1.0
 */

public class DLXConusmer {
    private static final String HOST = "localhost";
    private static final int PORT = 5672;
    private static final String USERNAME = "guset";
    private static final String PASSWORD = "guset";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(HOST);
        factory.setVirtualHost("/");
        factory.setPort(PORT);
        factory.setUsername(USERNAME);
        factory.setPassword(PASSWORD);
        Connection connection = factory.newConnection();

        Channel channel = connection.createChannel();

        String queueName = "dlx.queue";
        String exchangeName = "dlx.exchange";
        String routingKey = "#";

        // 申明exchange
        channel.exchangeDeclare(exchangeName, "topic");
        // 申明隊列
        channel.queueDeclare(queueName, true, false, false, null);
        // 隊列綁定到exchange
        channel.queueBind(queueName, exchangeName, routingKey, null);

        DeliverCallback deliverCallback = new DeliverCallback() {
            @Override
            public void handle(String consumerTag, Delivery message) throws IOException {
                try {
                    System.out.println("---死信隊列消費者---");
                    System.out.println(new String(message.getBody(), "UTF-8"));
                } finally {
                    // consumer手動 ack 給broker
                    channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
                }
            }
        };
        CancelCallback cancelCallback = new CancelCallback() {
            @Override
            public void handle(String consumerTag) throws IOException {
                System.out.println("---消費者:cancelCallback");
            }
        };

        // 消費消息,autoAck必定要設置爲false
        channel.basicConsume(queueName, false, deliverCallback, cancelCallback);
    }
}

運行結果:3條過時的消息進入死信隊列,並被消費rabbitmq

相關文章
相關標籤/搜索