實現RabbitMQ 延時消息

RabbitMQ 延時消息的實現(上)git

咱們在實際業務中有一些須要延時發送消息的場景,例如:github

  1. 家裏有一臺智能熱水器,須要在30分鐘後啓動
  2. 未付款的訂單,15分鐘後關閉

注意這裏的場景是延時,不是定時。固然,解決了延時,定時就很簡單了(定時=當前時刻+間隔時間)。數據庫

因爲RabbitMQ自己不支持延時隊列(延時消息),因此要經過其餘方式來實現。總的來講有三種:性能優化

  1. 先存儲到數據庫,用定時任務掃描,登記時刻+延時時間,就是須要投遞的時刻
  2. 利用RabbitMQ的死信隊列(Dead Letter Queue)實現
  3. 利用rabbitmq-delayed-message-exchange插件

定時任務實現比較簡單,此處略過。咱們來看一下後兩種方案分別怎麼實現。架構

前提知識:咱們能夠在發送消息時指定單條消息的存活時間(Time To Live,TTL)。也能夠設置一個隊列的消息過時時間。併發

這兩種方式,當隊列中的消息到達過時時間(好比30分鐘)仍未被消費,就會被髮送到隊列的死信交換機(Dead Letter Exchange,DLX),被再次路由,此時再次路由到的隊列就被稱爲死信隊列(Dead Letter Queue)。須要注意,死信交換機和死信交換機都是基於其用途來描述的,它們實際上也是普通的交換機和普通的隊列。若是隊列沒有指定DLX或者沒法被路由到一個DLQ,則隊列中過時的消息會被直接丟棄。分佈式

所以,咱們能夠利用消息TTL的特性,實現消息的延時投遞。ide

一、設置單條消息的過時時間的方法:微服務

AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()高併發

   .deliveryMode(2) // 持久化消息

   .contentEncoding("UTF-8")

   .expiration("10000") // TTL,10秒後沒有被消費則被髮送到DLX

   .build();

channel.basicPublish("", "TEST_TTL_QUEUE", properties, msg.getBytes()); //此處發送到 AMQP Default 這個默認的Direct類型的交換機,並路由到TEST_TTL_QUEUE隊列

二、設置隊列的消息過時時間的方法:

Map<String, Object> argss = new HashMap<String, Object>();

argss.put("x-message-ttl",6000); // TTL,6秒後沒有被消費則被髮送到DLX

channel.queueDeclare("TEST_TTL_QUEUE", false, false, false, argss);

阿里P7架构师教你如何实现RabbitMQ 延时消息

 

注意:若是同時設置了消息的過時時間和隊列的消息過時時間,則會取其中一個較小的值。好比消息設置5秒過時,隊列設置消息10秒過時,則實際過時時間是5秒。

基於消息TTL,咱們來看一下如何利用死信隊列(DLQ)實現延時隊列:

整體步驟:

1)建立一個交換機

2018已通過去過去,2019還想一成不變嗎?擁抱變化,突破瓶頸,想要學習Java架構技術的朋友能夠加個人羣:725219329,羣內每晚都會有阿里技術大牛講解的最新Java架構技術。並會錄製錄播視頻分享在羣公告中,做爲給廣大朋友的加羣的福利——分佈式(Dubbo、Redis、RabbitMQ、Netty、RPC、Zookeeper、高併發、高可用架構)/微服務(Spring Boot、Spring Cloud)/源碼(Spring、Mybatis)/性能優化(JVM、TomCat、MySQL)

2)建立一個隊列,與上述交換機綁定,而且經過屬性指定隊列的死信交換機。

阿里P7架构师教你如何实现RabbitMQ 延时消息

3)建立一個死信交換機

4)建立一個死信隊列

4)將死信交換機綁定到死信隊列

5)消費者監聽死信隊列

代碼以下:

消費者:

由於此處使用默認的AMQP Default的Exchange,因此省略了第1)步,沒有建立交換機。

這裏用指定消息的TTL實現,因此設置隊列TTL屬性的代碼註釋了。

// 指定隊列的死信交換機
Map<String,Object> arguments = new HashMap<String,Object>();
arguments.put("x-dead-letter-exchange","DLX_EXCHANGE");
// arguments.put("x-expires","9000"); // 設置隊列的TTL

// 聲明隊列(默認交換機AMQP default,Direct)
channel.queueDeclare("TEST_DLX_QUEUE", false, false, false, arguments);

// 聲明死信交換機
channel.exchangeDeclare("DLX_EXCHANGE","topic", false, false, false, null);
// 聲明死信隊列
channel.queueDeclare("DLX_QUEUE", false, false, false, null);
// 綁定,此處 Dead letter routing key 設置爲 #,表明路由全部消息
channel.queueBind("DLX_QUEUE","DLX_EXCHANGE","#");

生產者

String msg = "Hello world, Rabbit MQ, DLX MSG";

// 設置屬性,消息10秒鐘過時
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
        .deliveryMode(2) // 持久化消息
        .contentEncoding("UTF-8")
        .expiration("10000") // TTL
        .build();

// 發送消息
channel.basicPublish("", "TEST_DLX_QUEUE", properties, msg.getBytes());

消息的流轉流程

生產者——原交換機——原隊列——(超過TTL以後)——死信交換機——死信隊列——最終消費者

如圖:

阿里P7架构师教你如何实现RabbitMQ 延时消息

 

使用死信隊列實現延時消息的缺點:

  1. 若是統一用隊列來設置消息的TTL,當梯度很是多的狀況下,好比1分鐘,2分鐘,5分鐘,10分鐘,20分鐘,30分鐘……須要建立不少交換機和隊列來路由消息。
  2. 2018已通過去過去,2019還想一成不變嗎?擁抱變化,突破瓶頸,想要學習Java架構技術的朋友能夠加個人羣:725219329,羣內每晚都會有阿里技術大牛講解的最新Java架構技術。並會錄製錄播視頻分享在羣公告中,做爲給廣大朋友的加羣的福利——分佈式(Dubbo、Redis、RabbitMQ、Netty、RPC、Zookeeper、高併發、高可用架構)/微服務(Spring Boot、Spring Cloud)/源碼(Spring、Mybatis)/性能優化(JVM、TomCat、MySQL)
  3. 若是單獨設置消息的TTL,則可能會形成隊列中的消息阻塞——前一條消息沒有出隊(沒有被消費),後面的消息沒法投遞。
  4. 可能存在必定的時間偏差。

RabbitMQ 延時消息的實現(下)

在RabbitMQ 3.5.7及之後的版本提供了一個插件(rabbitmq-delayed-message-exchange)來實現延時隊列功能。同時插件依賴Erlang/OPT 18.0及以上。

插件源碼地址:

https://github.com/rabbitmq/rabbitmq-delayed-message-exchange

插件下載地址:

https://bintray.com/rabbitmq/community-plugins/rabbitmq_delayed_message_exchange

一、進入插件目錄

whereis rabbitmq

cd /usr/lib/rabbitmq/lib/rabbitmq_server-3.6.12/plugins

二、下載插件

wget https://bintray.com/rabbitmq/community-plugins/download_file?file_path=rabbitmq_delayed_message_exchange-0.0.1.ez

 

若是下載的文件名帶問號則須要更名,例如:

阿里P7架构师教你如何实现RabbitMQ 延时消息

 

mv download_file?file_path=rabbitmq_delayed_message_exchange-0.0.1.ez rabbitmq_delayed_message_exchange-0.0.1.ez

阿里P7架构师教你如何实现RabbitMQ 延时消息

三、啓用插件

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

四、停用插件

rabbitmq-plugins disable rabbitmq_delayed_message_exchange

五、插件使用

經過聲明一個x-delayed-message類型的exchange來使用delayed-messaging特性。x-delayed-message是插件提供的類型,並非rabbitmq自己的(區別於direct、topic、fanout、headers)。

阿里P7架构师教你如何实现RabbitMQ 延时消息

 

代碼:

消費者(先啓動):

// 聲明x-delayed-message類型的exchange
Map<String, Object> argss = new HashMap<String, Object>();
argss.put("x-delayed-type", "direct");
channel.exchangeDeclare("DELAY_EXCHANGE", "x-delayed-message", false,
        false, argss);

// 聲明隊列
channel.queueDeclare("DELAY_QUEUE", false,false,false,null);

// 綁定交換機與隊列
channel.queueBind("DELAY_QUEUE", "DELAY_EXCHANGE", "DELAY_KEY");

// 建立消費者
Consumer consumer = new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                               byte[] body) throws IOException {
        String msg = new String(body, "UTF-8");
        SimpleDateFormat sf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
        System.out.println("收到消息:[" + msg + "]\n接收時間:" +sf.format(new Date()));
    }
};

// 開始獲取消息
channel.basicConsume("DELAY_QUEUE", true, consumer);

生產者(後啓動):

// 延時投遞,好比延時1分鐘
Date now = new Date();
Calendar calendar = Calendar.getInstance();
calendar.add(Calendar.MINUTE, +1);// 1分鐘後投遞
Date delayTime = calendar.getTime();

SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
String msg = "發送時間:" + sf.format(now) + ",投遞時間:" + sf.format(delayTime);

// 延遲的間隔時間,目標時刻減去當前時刻
Map<String, Object> headers = new HashMap<String, Object>();
headers.put("x-delay", delayTime.getTime() - now.getTime());

AMQP.BasicProperties.Builder props = new AMQP.BasicProperties.Builder()
        .headers(headers);
channel.basicPublish("DELAY_EXCHANGE", "DELAY_KEY", props.build(),
        msg.getBytes());

channel.close();
conn.close();

控制檯輸出:

收到消息:[發送時間:2019-01-15 20:44:41.000,投遞時間:2019-01-15 20:45:41.003]

接收時間:2019-01-15 20:45:41.064

阿里P7架构师教你如何实现RabbitMQ 延时消息

相關文章
相關標籤/搜索