rabbitmq 實現延遲隊列的兩種方式

ps: 文章裏面延遲隊列=延時隊列html

什麼是延遲隊列

延遲隊列存儲的對象確定是對應的延時消息,所謂」延時消息」是指當消息被髮送之後,並不想讓消費者當即拿到消息,而是等待指定時間後,消費者纔拿到這個消息進行消費。java

場景一:在訂單系統中,一個用戶下單以後一般有30分鐘的時間進行支付,若是30分鐘以內沒有支付成功,那麼這個訂單將進行一場處理。這是就可使用延時隊列將訂單信息發送到延時隊列。node

場景二:用戶但願經過手機遠程遙控家裏的智能設備在指定的時間進行工做。這時候就能夠將用戶指令發送到延時隊列,當指令設定的時間到了再將指令推送到只能設備。git

RabbitMQ如何實現遲隊列

方法一

AMQP協議和RabbitMQ隊列自己沒有直接支持延遲隊列功能,可是能夠經過如下特性模擬出延遲隊列的功能。 
可是咱們能夠經過RabbitMQ的兩個特性來曲線實現延遲隊列:github

RabbitMQ能夠針對Queue設置x-expires 或者 針對Message設置 x-message-ttl,來控制消息的生存時間,若是超時(二者同時設置以最早到期的時間爲準),則消息變爲dead letter(死信)web

RabbitMQ針對隊列中的消息過時時間有兩種方法能夠設置。app

  • A: 經過隊列屬性設置,隊列中全部消息都有相同的過時時間。
  • B: 對消息進行單獨設置,每條消息TTL能夠不一樣。

若是同時使用,則消息的過時時間以二者之間TTL較小的那個數值爲準。消息在隊列的生存時間一旦超過設置的TTL值,就成爲dead letteride

RabbitMQ的Queue能夠配置x-dead-letter-exchange 和x-dead-letter-routing-key(可選)兩個參數,若是隊列內出現了dead letter,則按照這兩個參數從新路由轉發到指定的隊列。ui

  • x-dead-letter-exchange:出現dead letter以後將dead letter從新發送到指定exchange
  • x-dead-letter-routing-key:出現dead letter以後將dead letter從新按照指定的routing-key發送

隊列出現dead letter的狀況有:this

  • 消息或者隊列的TTL過時

  • 隊列達到最大長度

  • 消息被消費端拒絕(basic.reject or basic.nack)而且requeue=false

綜合上述兩個特性,設置了TTL規則以後當消息在一個隊列中變成死信時,利用DLX特性它能被從新轉發到另外一個Exchange或者Routing Key,這時候消息就能夠從新被消費了。

設置方法:

第一步:設置TTL產生死信,有兩種方式Per-Message TTL和 Queue TTL,第一種能夠針對每一條消息設置一個過時時間使用於大多數場景,第二種針對隊列設置過時時間、適用於一次性延時任務的場景

還有其餘產生死信的方式好比消費者拒絕消費 basic.reject 或者 basic.nack ( 前提要設置消費者的屬性requeue=false) 
- Per-Message TTL (對每一條消息設置一個過時時間)(官方文檔

java client發送一條只能駐留60秒的消息到隊列:

byte[] messageBodyBytes = "Hello, world!".getBytes();
AMQP.BasicProperties properties = new AMQP.BasicProperties();
properties.setExpiration("60000");//設置消息的過時時間爲60秒
channel.basicPublish("my-exchange", "routing-key", properties, messageBodyBytes);
//這條消息發送到相應的隊列以後,若是60秒內沒有被消費,則變爲死信
  • 1
  • 2
  • 3
  • 4
  • 5
  • Queue TTL (對整個隊列設置一個過時時間)

建立一個隊列,隊列的消息過時時間爲30分鐘(這個隊列30分鐘內沒有消費者消費消息則刪除,刪除後隊列內的消息變爲死信)

java client方式:

Map<String, Object> args = new HashMap<String, Object>();
args.put("x-expires", 1800000);
channel.queueDeclare("myqueue", false, false, false, args);

rabbitmqctl命令方式(.* 爲全部隊列, 能夠替換爲指定隊列):
rabbitmqctl set_policy expiry ".*" '{"expires":1800000}' --apply-to queues

rabbitmqctl (Windows):
rabbitmqctl set_policy expiry ".*" "{""expires"":1800000}" --apply-to queues
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

第二步:設置死信的轉發規則(若是沒有任何規則,則直接丟棄死信) 
- Dead Letter Exchanges設置方法(官方文檔

Java Client方式:
//聲明一個直連模式的exchange
channel.exchangeDeclare("some.exchange.name", "direct");
//聲明一個隊列,當myqueue隊列中有死信產生時,會轉發到交換器some.exchange.name
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-dead-letter-exchange", "some.exchange.name");

//若是設置死信會以路由鍵some-routing-key轉發到some.exchange.name,若是沒設默認爲消息發送到本隊列時用的routing key
//args.put("x-dead-letter-routing-key", "some-routing-key");
channel.queueDeclare("myqueue", false, false, false, args);

命令行方式(.* 爲全部隊列, 能夠替換爲指定隊列):
設置 "dead-letter-exchange"
rabbitmqctl:
rabbitmqctl set_policy DLX ".*" '{"dead-letter-exchange":"my-dlx"}' --apply-to queues
rabbitmqctl (Windows):
rabbitmqctl set_policy DLX ".*" "{""dead-letter-exchange"":""my-dlx""}" --apply-to queues
設置 "dead-letter-routing-key"
rabbitmqctl:
rabbitmqctl set_policy DLX ".*" '{ "dead-letter-routing-key":"my-routing-key"}' --apply-to queues
rabbitmqctl (Windows):
rabbitmqctl set_policy DLX ".*" "{""dead-letter-routing-key"":""my-routing-key""}" --apply-to queues
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22

方法二

在rabbitmq 3.5.7及以上的版本提供了一個插件(rabbitmq-delayed-message-exchange)來實現延遲隊列功能。同時插件依賴Erlang/OPT 18.0及以上。

插件源碼地址: 
https://github.com/rabbitmq/rabbitmq-delayed-message-exchange

插件下載地址: 
https://bintray.com/rabbitmq/community-plugins/rabbitmq_delayed_message_exchange

安裝:

進入插件安裝目錄 
{rabbitmq-server}/plugins/(能夠查看一下當前已存在的插件) 
下載插件 
rabbitmq_delayed_message_exchange

wget https://bintray.com/rabbitmq/community-plugins/download_file?file_path=rabbitmq_delayed_message_exchange-0.0.1.ez
  • 1

(若是下載的文件名稱不規則就手動重命名一下如: 
rabbitmq_delayed_message_exchange-0.0.1.ez)

啓用插件

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

(關閉插件)
rabbitmq-plugins disable rabbitmq_delayed_message_exchange
  • 1
  • 2
  • 3
  • 4

插件使用

經過聲明一個x-delayed-message類型的exchange來使用delayed-messaging特性 
x-delayed-message是插件提供的類型,並非rabbitmq自己的

// ... elided code ...
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-delayed-type", "direct");
channel.exchangeDeclare("my-exchange", "x-delayed-message", true, false, args);
// ... more code ...
  • 1
  • 2
  • 3
  • 4
  • 5

發送消息的時候經過在header添加」x-delay」參數來控制消息的延時時間

// ... elided code ...
byte[] messageBodyBytes = "delayed payload".getBytes("UTF-8");
Map<String, Object> headers = new HashMap<String, Object>();
headers.put("x-delay", 5000);
AMQP.BasicProperties.Builder props = new AMQP.BasicProperties.Builder().headers(headers);
channel.basicPublish("my-exchange", "", props.build(), messageBodyBytes);
// ... more code ...
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

使用示例:

消息發送端:

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class Send {
    // 隊列名稱
    private final static String EXCHANGE_NAME="delay_exchange";
    private final static String ROUTING_KEY="key_delay";

    @SuppressWarnings("deprecation")
    public static void main(String[] argv) throws Exception {
        /**
         * 建立鏈接鏈接到MabbitMQ
         */
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.12.190");
        factory.setUsername("admin");
        factory.setPassword("admin");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");

        // 聲明x-delayed-type類型的exchange
        Map<String, Object> args = new HashMap<String, Object>();
        args.put("x-delayed-type", "direct");
        channel.exchangeDeclare(EXCHANGE_NAME, "x-delayed-message", true,
                false, args);


        Map<String, Object> headers = new HashMap<String, Object>();
        //設置在2016/11/04,16:45:12向消費端推送本條消息
        Date now = new Date();
        Date timeToPublish = new Date("2016/11/04,16:45:12");

        String readyToPushContent = "publish at " + sf.format(now)
                + " \t deliver at " + sf.format(timeToPublish);

        headers.put("x-delay", timeToPublish.getTime() - now.getTime());

        AMQP.BasicProperties.Builder props = new AMQP.BasicProperties.Builder()
                .headers(headers);
        channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, props.build(),
                readyToPushContent.getBytes());

        // 關閉頻道和鏈接
        channel.close();
        connection.close();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56

消息接收端:

import java.text.SimpleDateFormat;
import java.util.Date;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;

public class Recv {

    // 隊列名稱
    private final static String QUEUE_NAME = "delay_queue";
    private final static String EXCHANGE_NAME="delay_exchange";

    public static void main(String[] argv) throws Exception,
            java.lang.InterruptedException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.12.190");
        factory.setUsername("admin");
        factory.setPassword("admin");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        QueueingConsumer queueingConsumer = new QueueingConsumer(channel);

        channel.queueDeclare(QUEUE_NAME, true,false,false,null);
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
        channel.basicConsume(QUEUE_NAME, true, queueingConsumer);
        SimpleDateFormat sf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
        try {
            System.out.println("****************WAIT***************");
            while(true){
                QueueingConsumer.Delivery delivery = queueingConsumer
                        .nextDelivery(); //

                String message = (new String(delivery.getBody()));
                System.out.println("message:"+message);
                System.out.println("now:\t"+sf.format(new Date()));
            }

        } catch (Exception exception) {
            exception.printStackTrace();

        }

    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47

啓動接收端,啓動發送端 
運行結果:

****************WAIT***************
message:publish at 2016-11-04 16:44:16.887   deliver at 2016-11-04 16:45:12.000
now:    2016-11-04 16:45:12.023
  • 1
  • 2
  • 3

結果顯示在咱們2016-11-04 16:45:12.023接收到了消息,距離咱們設定的時間2016-11-04 16:45:12.023有23毫秒的延遲

Note:使用rabbitmq-delayed-message-exchange插件時發送到隊列的消息數量在web管理界面可能不可見,不影響正常功能使用

Note :使用過程當中發現,當一臺啓用了rabbitmq-delayed-message-exchange插件的RAM節點在重啓的時候會沒法啓動,查看日誌發現了一個Timeout異常,開發者解釋說這是節點在啓動過程會同步集羣相關數據形成啓動超時,並建議不要使用Ram節點

插件開發者: 
RAM nodes start blank and need a disk node to sync tables from. In this case it times out.

More importantly, you don’t need RAM nodes. If you’re not sure if you do, you certainly don’t, as don’t 99% of users.

相關文章
相關標籤/搜索