rabbitmq延遲消息示例

官方插件僅支持>=3.6.x 版本中支持。

本文描述的消息延遲機制採用官方推薦的插件rabbitmq-delayed-message-exchange,如精通rabbitmq和編程,請自行查看官方文檔,描述更加詳盡:html

安裝

須要在集羣每臺機器中安裝

因爲rabbitmq並未內置該插件,須要手動下載安裝。關於已安裝的插件經過rabbitmq-plugins list可查看.java

加載後解壓,並將其拷貝至(使用Linux Debian/RPM部署)rabbitmq服務器目錄:/usr/lib/rabbitmq/plugins中(windows和其餘系統<安裝目錄>\rabbitmq_server-version\plugins).git

啓用插件

須要在集羣每臺機器中執行

經過rabbitmq-plugins list查看已安裝列表,以下:github

...
[ ] rabbitmq_delayed_message_exchange 20171215-3.6.x
...

使用命令rabbitmq-plugins enable rabbitmq_delayed_message_exchang啓用插件,輸出以下:spring

The following plugins have been enabled:
  rabbitmq_delayed_message_exchange

經過rabbitmq-plugins list查看已安裝列表,以下:shell

...
[E*] rabbitmq_delayed_message_exchange 20171215-3.6.x
...

機制

安裝插件後會生成新的Exchange類型x-delayed-message,該類型消息支持延遲投遞機制,接收到消息後並未當即將消息投遞至目標隊列中,而是存儲在mnesia(一個分佈式數據系統)表中,檢測消息延遲時間,如達到可投遞時間時並將其經過x-delayed-type類型標記的交換機類型投遞至目標隊列。編程

Java使用過程

聲明x-delayed-message消息交換機

  • rabbitmq java client實現
// ... elided code ...
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-delayed-type", "direct");
channel.exchangeDeclare("my-exchange", "x-delayed-message", true, false, args);
// ... more code ...
  • spring rabbitmq template實現
// ... elided code ...
Map<String, Object> args = new HashMap<>();
args.put("x-delayed-type", "direct");
Exchange exchange = new CustomExchange("test.exchange", "x-delayed-message", true, false, args);
//admin = RabbitmqAdmin
admin.declareExchange(exchange);

//more code...

消息發送

  • rabbitmq java client實現
// ... elided code ...
byte[] messageBodyBytes = "delayed payload".getBytes("UTF-8");
Map<String, Object> headers = new HashMap<String, Object>();
headers.put("x-delay", 5000);
AMQP.BasicProperties.Builder props = new AMQP.BasicProperties.Builder().headers(headers);
channel.basicPublish("test.exchange", "test", props.build(), messageBodyBytes);
// ... more code ...
  • spring rabbitmq template實現
MessageProperties properties = new MessageProperties();
properties.setHeader("x-delay", 1000);
//template : RabbitmqTemplate
template.convertAndSend("test.exchange", "test", new Message(body, properties));
相關文章
相關標籤/搜索