官方插件僅支持>=3.6.x 版本中支持。
本文描述的消息延遲機制採用官方推薦的插件rabbitmq-delayed-message-exchange
,如精通rabbitmq和編程,請自行查看官方文檔,描述更加詳盡:html
須要在集羣每臺機器中安裝因爲rabbitmq並未內置該插件,須要手動下載安裝。關於已安裝的插件經過
rabbitmq-plugins list
可查看.java
加載後解壓,並將其拷貝至(使用Linux Debian/RPM部署)rabbitmq服務器目錄:/usr/lib/rabbitmq/plugins
中(windows和其餘系統<安裝目錄>\rabbitmq_server-version\plugins
).git
須要在集羣每臺機器中執行
經過rabbitmq-plugins list
查看已安裝列表,以下:github
... [ ] rabbitmq_delayed_message_exchange 20171215-3.6.x ...
使用命令rabbitmq-plugins enable rabbitmq_delayed_message_exchang
啓用插件,輸出以下:spring
The following plugins have been enabled: rabbitmq_delayed_message_exchange
經過rabbitmq-plugins list
查看已安裝列表,以下:shell
... [E*] rabbitmq_delayed_message_exchange 20171215-3.6.x ...
安裝插件後會生成新的Exchange類型x-delayed-message
,該類型消息支持延遲投遞機制,接收到消息後並未當即將消息投遞至目標隊列中,而是存儲在mnesia
(一個分佈式數據系統)表中,檢測消息延遲時間,如達到可投遞時間時並將其經過x-delayed-type
類型標記的交換機類型投遞至目標隊列。編程
x-delayed-message
消息交換機// ... elided code ... Map<String, Object> args = new HashMap<String, Object>(); args.put("x-delayed-type", "direct"); channel.exchangeDeclare("my-exchange", "x-delayed-message", true, false, args); // ... more code ...
// ... elided code ... Map<String, Object> args = new HashMap<>(); args.put("x-delayed-type", "direct"); Exchange exchange = new CustomExchange("test.exchange", "x-delayed-message", true, false, args); //admin = RabbitmqAdmin admin.declareExchange(exchange); //more code...
// ... elided code ... byte[] messageBodyBytes = "delayed payload".getBytes("UTF-8"); Map<String, Object> headers = new HashMap<String, Object>(); headers.put("x-delay", 5000); AMQP.BasicProperties.Builder props = new AMQP.BasicProperties.Builder().headers(headers); channel.basicPublish("test.exchange", "test", props.build(), messageBodyBytes); // ... more code ...
MessageProperties properties = new MessageProperties(); properties.setHeader("x-delay", 1000); //template : RabbitmqTemplate template.convertAndSend("test.exchange", "test", new Message(body, properties));