Spring Boot與RabbitMQ結合實現延遲隊列的示例

背景git

何爲延遲隊列?github

顧名思義,延遲隊列就是進入該隊列的消息會被延遲消費的隊列。而通常的隊列,消息一旦入隊了以後就會被消費者立刻消費。spring

場景一:在訂單系統中,一個用戶下單以後一般有30分鐘的時間進行支付,若是30分鐘以內沒有支付成功,那麼這個訂單將進行一場處理。這是就可使用延時隊列將訂單信息發送到延時隊列。springboot

場景二:用戶但願經過手機遠程遙控家裏的智能設備在指定的時間進行工做。這時候就能夠將用戶指令發送到延時隊列,當指令設定的時間到了再將指令推送到只能設備。網絡

延遲隊列能作什麼?ide

延遲隊列多用於須要延遲工做的場景。最多見的是如下兩種場景:spring-boot

一、延遲消費。好比:post

  1. 用戶生成訂單以後,須要過一段時間校驗訂單的支付狀態,若是訂單仍未支付則須要及時地關閉訂單。
  2. 用戶註冊成功以後,須要過一段時間好比一週後校驗用戶的使用狀況,若是發現用戶活躍度較低,則發送郵件或者短信來提醒用戶使用。

二、延遲重試。好比消費者從隊列裏消費消息時失敗了,可是想要延遲一段時間後自動重試。學習

若是不使用延遲隊列,那麼咱們只能經過一個輪詢掃描程序去完成。這種方案既不優雅,也不方便作成統一的服務便於開發人員使用。可是使用延遲隊列的話,咱們就能夠垂手可得地完成。測試

如何實現?

別急,在下文中,咱們將詳細介紹如何利用Spring Boot加RabbitMQ來實現延遲隊列。

本文出現的示例代碼都已push到Github倉庫中:https://github.com/Lovelcp/blog-demos/tree/master/spring-boot-rabbitmq-delay-queue

實現思路

在介紹具體的實現思路以前,咱們先來介紹一下RabbitMQ的兩個特性,一個是Time-To-Live Extensions,另外一個是Dead Letter Exchanges。

Time-To-Live Extensions

RabbitMQ容許咱們爲消息或者隊列設置TTL(time to live),也就是過時時間。TTL代表了一條消息可在隊列中存活的最大時間,單位爲毫秒。也就是說,當某條消息被設置了TTL或者當某條消息進入了設置了TTL的隊列時,這條消息會在通過TTL秒後「死亡」,成爲Dead Letter。若是既配置了消息的TTL,又配置了隊列的TTL,那麼較小的那個值會被取用。更多資料請查閱官方文檔。

Dead Letter Exchange

剛纔提到了,被設置了TTL的消息在過時後會成爲Dead Letter。其實在RabbitMQ中,一共有三種消息的「死亡」形式:

  1. 消息被拒絕。經過調用basic.reject或者basic.nack而且設置的requeue參數爲false。
  2. 消息由於設置了TTL而過時。
  3. 消息進入了一條已經達到最大長度的隊列。

若是隊列設置了Dead Letter Exchange(DLX),那麼這些Dead Letter就會被從新publish到Dead Letter Exchange,經過Dead Letter Exchange路由到其餘隊列。更多資料請查閱官方文檔。

流程圖

聰明的你確定已經想到了,如何將RabbitMQ的TTL和DLX特性結合在一塊兒,實現一個延遲隊列。

針對於上述的延遲隊列的兩個場景,咱們分別有如下兩種流程圖:

延遲消費

延遲消費是延遲隊列最爲經常使用的使用模式。以下圖所示,生產者產生的消息首先會進入緩衝隊列(圖中紅色隊列)。經過RabbitMQ提供的TTL擴展,這些消息會被設置過時時間,也就是延遲消費的時間。等消息過時以後,這些消息會經過配置好的DLX轉發到實際消費隊列(圖中藍色隊列),以此達到延遲消費的效果。

 

延遲重試

延遲重試本質上也是延遲消費的一種,可是這種模式的結構與普通的延遲消費的流程圖較爲不一樣,因此單獨拎出來介紹。

以下圖所示,消費者發現該消息處理出現了異常,好比是由於網絡波動引發的異常。那麼若是不等待一段時間,直接就重試的話,極可能會致使在這期間內一直沒法成功,形成必定的資源浪費。那麼咱們能夠將其先放在緩衝隊列中(圖中紅色隊列),等消息通過一段的延遲時間後再次進入實際消費隊列中(圖中藍色隊列),此時因爲已通過了「較長」的時間了,異常的一些波動一般已經恢復,這些消息能夠被正常地消費。

  

代碼實現

接下來咱們將介紹如何在Spring Boot中實現基於RabbitMQ的延遲隊列。咱們假設讀者已經擁有了Spring Boot與RabbitMQ的基本知識。

初始化工程

首先咱們在Intellij中建立一個Spring Boot工程,而且添加spring-boot-starter-amqp擴展。

配置隊列

從上述的流程圖中咱們能夠看到,一個延遲隊列的實現,須要一個緩衝隊列以及一個實際的消費隊列。又因爲在RabbitMQ中,咱們擁有兩種消息過時的配置方式,因此在代碼中,咱們一共配置了三條隊列:

  1. delay_queue_per_message_ttl:TTL配置在消息上的緩衝隊列。
  2. delay_queue_per_queue_ttl:TTL配置在隊列上的緩衝隊列。
  3. delay_process_queue:實際消費隊列。

咱們經過Java Config的方式將上述的隊列配置爲Bean。因爲咱們添加了spring-boot-starter-amqp擴展,Spring Boot在啓動時會根據咱們的配置自動建立這些隊列。爲了方便接下來的測試,咱們將delay_queue_per_message_ttl以及delay_queue_per_queue_ttl的DLX配置爲同一個,且過時的消息都會經過DLX轉發到delay_process_queue。

delay_queue_per_message_ttl

首先介紹delay_queue_per_message_ttl的配置代碼:

?

1

2

3

4

5

6

7

@Bean

Queue delayQueuePerMessageTTL() {

  return QueueBuilder.durable(DELAY_QUEUE_PER_MESSAGE_TTL_NAME)

            .withArgument("x-dead-letter-exchange", DELAY_EXCHANGE_NAME) // DLX,dead letter發送到的exchange

            .withArgument("x-dead-letter-routing-key", DELAY_PROCESS_QUEUE_NAME) // dead letter攜帶的routing key

            .build();

}

其中,x-dead-letter-exchange聲明瞭隊列裏的死信轉發到的DLX名稱,x-dead-letter-routing-key聲明瞭這些死信在轉發時攜帶的routing-key名稱。

delay_queue_per_queue_ttl

相似地,delay_queue_per_queue_ttl的配置代碼:

?

1

2

3

4

5

6

7

8

@Bean

Queue delayQueuePerQueueTTL() {

  return QueueBuilder.durable(DELAY_QUEUE_PER_QUEUE_TTL_NAME)

            .withArgument("x-dead-letter-exchange", DELAY_EXCHANGE_NAME) // DLX

            .withArgument("x-dead-letter-routing-key", DELAY_PROCESS_QUEUE_NAME) // dead letter攜帶的routing key

            .withArgument("x-message-ttl", QUEUE_EXPIRATION) // 設置隊列的過時時間

            .build();

}

delay_queue_per_queue_ttl隊列的配置比delay_queue_per_message_ttl隊列的配置多了一個x-message-ttl,該配置用來設置隊列的過時時間。

delay_process_queue

delay_process_queue的配置最爲簡單:

?

1

2

3

4

5

@Bean

Queue delayProcessQueue() {

  return QueueBuilder.durable(DELAY_PROCESS_QUEUE_NAME)

            .build();

}

配置Exchange

配置DLX

首先,咱們須要配置DLX,代碼以下:

?

1

2

3

4

@Bean

DirectExchange delayExchange() {

  return new DirectExchange(DELAY_EXCHANGE_NAME);

}

而後再將該DLX綁定到實際消費隊列即delay_process_queue上。這樣全部的死信都會經過DLX被轉發到delay_process_queue:

?

1

2

3

4

5

6

@Bean

Binding dlxBinding(Queue delayProcessQueue, DirectExchange delayExchange) {

  return BindingBuilder.bind(delayProcessQueue)

             .to(delayExchange)

             .with(DELAY_PROCESS_QUEUE_NAME);

}

配置延遲重試所需的Exchange

從延遲重試的流程圖中咱們能夠看到,消息處理失敗以後,咱們須要將消息轉發到緩衝隊列,因此緩衝隊列也須要綁定一個Exchange。在本例中,咱們將delay_process_per_queue_ttl做爲延遲重試裏的緩衝隊列。具體代碼是如何配置的,這裏就不贅述了,你們能夠查閱我Github中的代碼。

定義消費者

咱們建立一個最簡單的消費者ProcessReceiver,這個消費者監聽delay_process_queue隊列,對於接受到的消息,他會:

  1. 若是消息裏的消息體不等於FAIL_MESSAGE,那麼他會輸出消息體。
  2. 若是消息裏的消息體剛好是FAIL_MESSAGE,那麼他會模擬拋出異常,而後將該消息重定向到緩衝隊列(對應延遲重試場景)。

另外,咱們還須要新建一個監聽容器用於存放消費者,代碼以下:

?

1

2

3

4

5

6

7

8

@Bean

SimpleMessageListenerContainer processContainer(ConnectionFactory connectionFactory, ProcessReceiver processReceiver) {

  SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();

  container.setConnectionFactory(connectionFactory);

  container.setQueueNames(DELAY_PROCESS_QUEUE_NAME); // 監聽delay_process_queue

  container.setMessageListener(new MessageListenerAdapter(processReceiver));

  return container;

}

至此,咱們前置的配置代碼已經所有編寫完成,接下來咱們須要編寫測試用例來測試咱們的延遲隊列。

編寫測試用例

延遲消費場景

首先咱們編寫用於測試TTL設置在消息上的測試代碼。

咱們藉助spring-rabbit包下提供的RabbitTemplate類來發送消息。因爲咱們添加了spring-boot-starter-amqp擴展,Spring Boot會在初始化時自動地將RabbitTemplate當成bean加載到容器中。

解決了消息的發送問題,那麼又該如何爲每一個消息設置TTL呢?這裏咱們須要藉助MessagePostProcessor。

MessagePostProcessor一般用來設置消息的Header以及消息的屬性。咱們新建一個ExpirationMessagePostProcessor類來負責設置消息的TTL屬性: 

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

/**

 * 設置消息的失效時間

 */

public class ExpirationMessagePostProcessor implements MessagePostProcessor {

  private final Long ttl; // 毫秒

  public ExpirationMessagePostProcessor(Long ttl) {

    this.ttl = ttl;

  }

  @Override

  public Message postProcessMessage(Message message) throws AmqpException {

    message.getMessageProperties()

        .setExpiration(ttl.toString()); // 設置per-message的失效時間

    return message;

  }

}

而後在調用RabbitTemplate的convertAndSend方法時,傳入ExpirationMessagePostPorcessor便可。咱們向緩衝隊列中發送3條消息,過時時間依次爲1秒,2秒和3秒。具體的代碼以下所示:

?

1

2

3

4

5

6

7

8

9

10

@Test

public void testDelayQueuePerMessageTTL() throws InterruptedException {

  ProcessReceiver.latch = new CountDownLatch(3);

  for (int i = 1; i <= 3; i++) {

    long expiration = i * 1000;

    rabbitTemplate.convertAndSend(QueueConfig.DELAY_QUEUE_PER_MESSAGE_TTL_NAME,

        (Object) ("Message From delay_queue_per_message_ttl with expiration " + expiration), new ExpirationMessagePostProcessor(expiration));

  }

  ProcessReceiver.latch.await();

}

細心的朋友必定會問,爲何要在代碼中加一個CountDownLatch呢?這是由於若是沒有latch阻塞住測試方法的話,測試用例會直接結束,程序退出,咱們就看不到消息被延遲消費的表現了。

那麼相似地,測試TTL設置在隊列上的代碼以下:

?

1

2

3

4

5

6

7

8

9

@Test

public void testDelayQueuePerQueueTTL() throws InterruptedException {

  ProcessReceiver.latch = new CountDownLatch(3);

  for (int i = 1; i <= 3; i++) {

    rabbitTemplate.convertAndSend(QueueConfig.DELAY_QUEUE_PER_QUEUE_TTL_NAME,

        "Message From delay_queue_per_queue_ttl with expiration " + QueueConfig.QUEUE_EXPIRATION);

  }

  ProcessReceiver.latch.await();

}

咱們向緩衝隊列中發送3條消息。理論上這3條消息會在4秒後同時過時。

延遲重試場景

咱們一樣還需測試延遲重試場景。

?

1

2

3

4

5

6

7

8

@Test

public void testFailMessage() throws InterruptedException {

  ProcessReceiver.latch = new CountDownLatch(6);

  for (int i = 1; i <= 3; i++) {

    rabbitTemplate.convertAndSend(QueueConfig.DELAY_PROCESS_QUEUE_NAME, ProcessReceiver.FAIL_MESSAGE);

  }

  ProcessReceiver.latch.await();

}

咱們向delay_process_queue發送3條會觸發FAIL的消息,理論上這3條消息會在4秒後自動重試。

查看測試結果

延遲消費場景

延遲消費的場景測試咱們分爲了TTL設置在消息上和TTL設置在隊列上兩種。首先,咱們先看一下TTL設置在消息上的測試結果:

從上圖中咱們能夠看到,ProcessReceiver分別通過1秒、2秒、3秒收到消息。測試結果代表消息不只被延遲消費了,並且每條消息的延遲時間是能夠被個性化設置的。TTL設置在消息上的延遲消費場景測試成功。

而後,TTL設置在隊列上的測試結果以下圖:

 

從上圖中咱們能夠看到,ProcessReceiver通過了4秒的延遲以後,同時收到了3條消息。測試結果代表消息不只被延遲消費了,同時也證實了當TTL設置在隊列上的時候,消息的過時時間是固定的。TTL設置在隊列上的延遲消費場景測試成功。

延遲重試場景

接下來,咱們再來看一下延遲重試的測試結果:

 

ProcessReceiver首先收到了3條會觸發FAIL的消息,而後將其移動到緩衝隊列以後,過了4秒,又收到了剛纔的那3條消息。延遲重試場景測試成功。

總結

本文首先介紹了延遲隊列的概念以及用途,而且經過代碼詳細講解了如何經過Spring Boot和RabbitMQ實現一個延遲隊列。但願本文可以對你們平時的學習和工做能有所啓發和幫助。也但願你們多多支持腳本之家。

您可能感興趣的文章:

原文連接:http://www.kissyu.org/

相關文章
相關標籤/搜索