RabbitMQ(2)- 死信隊列、延遲隊列、優先隊列

1. 前言

《RabbitMQ(1)-基礎開發應用》中,咱們已經介紹了RabbitMQ的基礎開發應用。本文基於這些基礎再作一些擴展,延伸出一些高級的用法,如:死信隊列、延遲隊列和優先隊列。不過仍是以死信隊列爲主,由於延遲隊列是死信隊列的衍生概念,並且優先隊列也比較簡單,因此先仍是在代碼層面上,把死信隊列搞透。java

1.1. 建立隊列、交換機

咱們在使用RabbitMQ以前,須要先建立好相關的隊列和交換機,而且設置一些綁定關係。由於幾篇文章都是結合springboot來開發,下面就結合springboot介紹幾種建立方式:spring

  1. 直接訪問 RabbitMQ Management 管理頁面,在頁面上建立;或者使用 RabbitMQ其餘的客戶端來建立管理。
  2. 在springboot上基於消費端開發時,@RabbitListener 註解的 bindings屬性,能夠簡單實現相似功能。
@RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = "direct.queue.d",
                    durable = "true"),
            exchange = @Exchange(value = "direct.exchange.a",
                    durable = "true",
                    type = ExchangeTypes.DIRECT,
                    ignoreDeclarationExceptions = "true"),
            key = "direct.routingKey.a"
    )
    )
  1. 在配置類下定義@Bean,即向Ioc容器中註冊Queue、Exchange、Binding的實例。
package pers.kerry.exercise.rabbitmq.rabbitmqproducera.config;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @description:
 * @date: 2020/7/12 11:26 下午
 * @author: kerry
 */
@Configuration
public class RabbitConfig {
   
    public static final String NORMAL_EXCHANGE_A="demo.direct.exchange.a";
    public static final String NORMAL_ROUTING_KEY_A="demo.direct.routingKey.a";
    public static final String NORMAL_QUEUE_A="demo.direct.queue.a";


    /**
     * NORMAL 交換機
     * @return
     */
    @Bean
    public Exchange ExchangeA(){
        return ExchangeBuilder
                .directExchange(NORMAL_EXCHANGE_A)
                .durable(true)
                .build();
    }

    /**
     * NORMAL 隊列
     * @return
     */
    @Bean
    public Queue QueueA(){
        return QueueBuilder
                .durable(NORMAL_QUEUE_A)
                .build();
    }

    /**
     * 綁定 NORMAL隊列 和 NORMAL交換機
     * @return
     */
    @Bean
    public Binding normalBinding(){
        return new Binding(NORMAL_QUEUE_A,
                Binding.DestinationType.QUEUE,
                NORMAL_EXCHANGE_A,
                NORMAL_ROUTING_KEY_A,
                null);
    }
}

我我的推薦第三種,並且建議是在生產者端定義,消費者應該更關注消費的邏輯。可是若是用代碼來建立,有一個很大的缺點,就是不能刪除和修改,至少我目前還沒找到辦法。json

所以要結合第一種和第三種來使用,固然都用第一種也是能夠的。只是開發人員,更但願隊列、交換機的建立、綁定的邏輯,都體如今代碼裏面,經過代碼能夠更好的閱讀架構設計。segmentfault

2. 死信隊列

2.1. 簡介

死信隊列這個名字聽起來很特別,但它解決的是平常開發中最多見的問題:不能正常消費的消息,該如何處理。咱們在第一篇文章中有使用到手動Ack,對於須要nack而且無需重回隊列的消息,指望有統一的異常處理;包括有些消息是有時效性的,若是支付訂單通常都最大支付時常,超時後就應該取消訂單;等等。springboot

死信隊列就是應對這些狀況的,它出現的條件以下:架構

  1. 消息被否認確認,使用basicNackbasicReject ,而且此時requeue 屬性被設置爲false。
  2. 消息在隊列的存活時間超過設置的TTL時間。
  3. 消息隊列的消息數量已經超過最大隊列長度。

死信隊列的架構以下:併發

生產者 --> 消息 --> 業務交換機 --> 業務隊列 --> 消息變成死信 --> 死信交換機 -->死信隊列 --> 消費者app

2.2. 配置

如何配置死信隊列呢?其實很簡單,大概能夠分爲如下步驟:ui

  1. 配置業務隊列,綁定到業務交換機上
  2. 爲業務隊列配置死信交換機和路由key
  3. 爲死信交換機配置死信隊列

注意,並非直接聲明一個公共的死信隊列,而後因此死信消息就本身跑到死信隊列裏去了。而是爲每一個須要使用死信的業務隊列配置一個死信交換機,這裏同一個項目的死信交換機能夠共用一個,而後爲每一個業務隊列分配一個單獨的路由key。架構設計

有了死信交換機和路由key後,接下來,就像配置業務隊列同樣,配置死信隊列,而後綁定在死信交換機上。也就是說,死信隊列並非什麼特殊的隊列,只不過是綁定在死信交換機上的隊列。死信交換機也不是什麼特殊的交換機,只不過是用來接受死信的交換機,因此能夠爲任何類型【Direct、Fanout、Topic】。通常來講,會爲每一個業務隊列分配一個獨有的路由key,並對應的配置一個死信隊列進行監聽,也就是說,通常會爲每一個重要的業務隊列配置一個死信隊列。

2.3. 代碼(生產者)

按照簡介中死信隊列的架構,咱們在配置文件中定義了NORMAL的業務隊列和業務交換機,以及DLX的死信隊列和死信交換機,並在業務隊列中設置了死信交換機。

RabbitConfig.java

package pers.kerry.exercise.rabbitmq.rabbitmqproducera.config;


import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @description:
 * @date: 2020/7/12 11:26 下午
 * @author: kerry
 */
@Configuration
public class RabbitConfig {
    /**
     * DLX,定義參數
     */
    public static final String X_DEAD_LETTER_EXCHANGE="x-dead-letter-exchange";
    public static final String X_DEAD_LETTER_ROUTING_KEY="x-dead-letter-routing-key";
    public static final String X_MESSAGE_TTL="x-message-ttl";
    public static final String X_MAX_LENGTH="x-max-length";
    /**
     * DLX,命名
     */
    public static final String DEAD_LETTER_EXCHANGE_A="demo.direct.dlx.exchange.a";
    public static final String DEAD_LETTER_ROUTING_KEY_A="demo.direct.dlx.routingKey.a";
    public static final String DEAD_LETTER_QUEUE_A="demo.direct.dlx.queue.a";
    /*
     * NORMAL,命名
     */
    public static final String NORMAL_EXCHANGE_A="demo.direct.exchange.a";
    public static final String NORMAL_ROUTING_KEY_A="demo.direct.routingKey.a";
    public static final String NORMAL_QUEUE_A="demo.direct.queue.a";


    @Bean("jsonRabbitTemplate")
    public RabbitTemplate jsonRabbitTemplate(ConnectionFactory connectionFactory){
        RabbitTemplate rabbitTemplate=new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
        return rabbitTemplate;
    }

    @Bean("defaultRabbitTemplate")
    public RabbitTemplate defaultRabbitTemplate(ConnectionFactory connectionFactory){
        RabbitTemplate rabbitTemplate=new RabbitTemplate(connectionFactory);
        return rabbitTemplate;
    }

    /**
     * DLX 交換機
     * @return
     */
    @Bean
    public Exchange dlxExchangeA(){
        return ExchangeBuilder
                .directExchange(DEAD_LETTER_EXCHANGE_A)
                .durable(true)
                .build();
    }

    /**
     * DLX 隊列
     * @return
     */
    @Bean
    public Queue dlxQueueA(){
        return QueueBuilder
                .durable(DEAD_LETTER_QUEUE_A)
                .build();
    }

    /**
     * NORMAL 交換機
     * @return
     */
    @Bean
    public Exchange ExchangeA(){
        return ExchangeBuilder
                .directExchange(NORMAL_EXCHANGE_A)
                .durable(true)
                .build();
    }

    /**
     * NORMAL 隊列
     * @return
     */
    @Bean
    public Queue QueueA(){
        return QueueBuilder
                .durable(NORMAL_QUEUE_A)
                //設置 死信交換機
                .withArgument(X_DEAD_LETTER_EXCHANGE,DEAD_LETTER_EXCHANGE_A)
                .withArgument(X_DEAD_LETTER_ROUTING_KEY,DEAD_LETTER_ROUTING_KEY_A)
                //設置 隊列全部消息 存活時間8秒
                .withArgument(X_MESSAGE_TTL,8000)
                //設置 隊列最大長度 10條
                .withArgument(X_MAX_LENGTH,10)
                .build();
    }

    /**
     * 綁定 DLX隊列 和 DLX交換機
     * @return
     */
    @Bean
    public Binding dlxBinding(){
        return new Binding(DEAD_LETTER_QUEUE_A,
                Binding.DestinationType.QUEUE,DEAD_LETTER_EXCHANGE_A,
                DEAD_LETTER_ROUTING_KEY_A,
                null);
    }

    /**
     * 綁定 NORMAL隊列 和 NORMAL交換機
     * @return
     */
    @Bean
    public Binding normalBinding(){
        return new Binding(NORMAL_QUEUE_A,
                Binding.DestinationType.QUEUE,
                NORMAL_EXCHANGE_A,
                NORMAL_ROUTING_KEY_A,
                null);
    }

}

ProducerService.java

@Slf4j
@Service
public class ProducerService {

    public void sendText(String data, MessageProperties messageProperties) {
        /**
         * 對單個消息 設置TTL
         */
        //messageProperties.setExpiration(String.valueOf(3000));
        Message message = defaultRabbitTemplate
                .getMessageConverter()
                .toMessage(data, messageProperties);

       defaultRabbitTemplate.convertAndSend(RabbitConfig.NORMAL_EXCHANGE_A,
                RabbitConfig.NORMAL_ROUTING_KEY_A,
                message);
    }
}

2.4. 代碼(消費者)

消費者的邏輯比較簡單,主要是分別監聽業務隊列和死信隊列,這裏將兩個隊列的消息輸出日誌。這裏模擬的是消費者nack消息,而且不退回隊列的狀況。

MessageListener.java

/**
 * @description:
 * @date: 2020/7/12 11:07 下午
 * @author: kerry
 */
@Component
@Slf4j
public class MessageListener {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Autowired
    private ObjectMapper objectMapper;

    /**
     * @param message
     * @param channel
     * 監聽 業務隊列
     *
     * @throws Exception
     */
    @RabbitListener(queues = RabbitConfig.NORMAL_QUEUE_A)
    @RabbitHandler
    public void onMessage(Message message, Channel channel) throws Exception {
        String contentType = message.getMessageProperties().getContentType();
        String bodyText = null;
        switch (contentType) {
            //字符串
            case MessageProperties.CONTENT_TYPE_TEXT_PLAIN:
                bodyText = (String) rabbitTemplate.getMessageConverter().fromMessage(message);
                break;
            //json對象
            case MessageProperties.CONTENT_TYPE_JSON:
                User user = objectMapper.readValue(message.getBody(), User.class);
                bodyText = user.toString();
                break;
        }
        log.info("業務隊列-拒絕消息: " + bodyText);
        channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);
         /**
         * 延遲隊列,被消費的消息須要重回隊列
         */
        //channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);
    }


    /**
     * @param message
     * @param channel
     * 監聽 死信隊列
     *
     * @throws Exception
     */
    @RabbitListener(queues = RabbitConfig.DEAD_LETTER_QUEUE_A)
    @RabbitHandler
    public void onMessageDLX(Message message, Channel channel) throws Exception {
        String contentType = message.getMessageProperties().getContentType();
        String bodyText = null;
        switch (contentType) {
            //字符串
            case MessageProperties.CONTENT_TYPE_TEXT_PLAIN:
                bodyText = (String) rabbitTemplate.getMessageConverter().fromMessage(message);
                break;
            //json對象
            case MessageProperties.CONTENT_TYPE_JSON:
                User user = objectMapper.readValue(message.getBody(), User.class);
                bodyText = user.toString();
                break;
        }
        log.info("死信隊列-接收消息: " + bodyText);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
 
    }

}

2.5. 分析

開頭說過,致使消息轉爲死信隊列的方式有3種,下面就從代碼中分析這3種狀況。

咱們回過頭來看看消費者這邊,定義業務隊列的方法:

/**
     * NORMAL 隊列
     * @return
     */
    @Bean
    public Queue QueueA(){
        return QueueBuilder
                .durable(NORMAL_QUEUE_A)
                //設置 死信交換機
                .withArgument(X_DEAD_LETTER_EXCHANGE,DEAD_LETTER_EXCHANGE_A)
                .withArgument(X_DEAD_LETTER_ROUTING_KEY,DEAD_LETTER_ROUTING_KEY_A)
                //設置 隊列全部消息 存活時間8秒
                .withArgument(X_MESSAGE_TTL,8000)
                //設置 隊列最大長度 10條
                .withArgument(X_MAX_LENGTH,10)
                .build();
    }
1. nack拒絕消息,requeue=false

在死信隊列的架構中,只要在業務隊列中設置了死信交換機x-dead-letter-exchange。消費者代碼中,咱們在業務隊列的消費過程當中nack消息,而且requeue=false便可。x-dead-letter-routing-key能夠不設置,若是不設置,默認取消息原來的路由鍵。
代碼以下:

channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);

channel.basicNack方法的參數以下:

  1. deliveryTag:該消息的index。
  2. multiple:是否批量.true:將一次性拒絕全部小於deliveryTag的消息。
  3. requeue:被拒絕的是否從新入隊列。
2. 消息超時TTL

TTL(Time-To-Live)指消息的存活時間,咱們有兩種方式設置消息的TTL:

  1. 設置隊列的TTL,即參數x-message-ttl,設置後,進入該隊列的全部消息的TTL都爲對應的值。
  2. 設置單個消息的TTL,在生產者代碼中,給消息的屬性中設置過時時間。如:messageProperties.setExpiration(String.valueOf(3000));,就是設置消息的TTL爲3秒。

不過要注意的是,若是已經設置了nack的死信邏輯,TTL的死信就不生效了。道理也很簡單,由於nack消息和requeue=false一塊兒用,表明消息被消費了,而且消息不會重回隊列,直接被丟棄或進入死信隊列,又怎麼會在隊列中超時了呢。

3. 超過隊列長度

能夠設置隊列長度,例如最大接收消息的數量。當消息在隊列中已經達到最大數量,那麼後面再來的消息,就會被直接丟進死信隊列。
咱們也是中定義業務隊列的代碼中,有經過x-max-length參數,設置業務隊列的長度。

3. 延遲隊列

在我還不知道延遲隊列以前,我就以爲消息中間件應該具有這樣的功能。在消息發佈到隊列後,我指望每一個消息延遲指定時間後,再被消費者獲取到。例如:在支付模塊中,當用戶生成訂單,再到支付完成訂單,是有一段時間的。而咱們通常會給這個訂單設置超時時間,若是超過了這段時間,訂單就應該被取消,沒法再支付。那麼將訂單做爲消息,就能夠利用延遲隊列來實現取消訂單的邏輯。

RabbitMQ並不直接支持延遲隊列的功能,而是做爲一個概念,你能夠利用死信隊列來實現「延遲隊列」。利用TTL超時時間的死信方式,來實現延遲隊列。

回顧一下上段中TTL的方式,咱們在業務隊列中除了定義死信交換機x-dead-letter-exchange,還能夠定義隊列的生存時間x-message-ttl,或者設置消息的過時時間。而若是咱們不消費這個業務隊列中的消息,那麼消息在到達TTL後,就會自動轉到死信隊列中。若是咱們只消費死信隊列中的消息,忽略掉業務隊列這個「中轉站」,就至關於消息在被髮布後,通過指定時間延遲,在死信隊列中被消費,這就造成了一個「延遲隊列」。

由於延遲隊列就是死信隊列的一種實現,因此代碼層面上能夠直接參考上段中TTL的部分。

4. 優先隊列

優先隊列,顧名思義,擁有高優先級的隊列具備高的優先權,優先級高的消息具有優先被消費的權力。所以在優先隊列中有兩個邏輯點:隊列是優先隊列消息有優先級。可參考死信隊列章節中,TTL部分的代碼,下面對代碼改動地方作一下說明:

  1. 設置隊列最大優先級,即消息可以使用的最大優先級數字,可經過x-max-priority參數來設置。
  2. 設置消息的優先級,在生產者代碼中,在消息的屬性中設置優先級,優先級越大,越先被消費。如:messageProperties.setPriority(3),即設置該消息的優先級爲3。

優先隊列的使用場景是在:消息有分優先級的需求,而且併發量較大。要求併發量大,是由於若是全部消息在發佈以後,立刻就被消費了,那麼分優先級的必要性就不大了。

相關文章
相關標籤/搜索