RabbitMQ 延時消息的實現(上)git
咱們在實際業務中有一些須要延時發送消息的場景,例如:github
注意這裏的場景是延時,不是定時。固然,解決了延時,定時就很簡單了(定時=當前時刻+間隔時間)。數據庫
因爲RabbitMQ自己不支持延時隊列(延時消息),因此要經過其餘方式來實現。總的來講有三種:性能優化
定時任務實現比較簡單,此處略過。咱們來看一下後兩種方案分別怎麼實現。架構
前提知識:咱們能夠在發送消息時指定單條消息的存活時間(Time To Live,TTL)。也能夠設置一個隊列的消息過時時間。併發
這兩種方式,當隊列中的消息到達過時時間(好比30分鐘)仍未被消費,就會被髮送到隊列的死信交換機(Dead Letter Exchange,DLX),被再次路由,此時再次路由到的隊列就被稱爲死信隊列(Dead Letter Queue)。須要注意,死信交換機和死信交換機都是基於其用途來描述的,它們實際上也是普通的交換機和普通的隊列。若是隊列沒有指定DLX或者沒法被路由到一個DLQ,則隊列中過時的消息會被直接丟棄。分佈式
所以,咱們能夠利用消息TTL的特性,實現消息的延時投遞。ide
一、設置單條消息的過時時間的方法:微服務
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()高併發
.deliveryMode(2) // 持久化消息
.contentEncoding("UTF-8")
.expiration("10000") // TTL,10秒後沒有被消費則被髮送到DLX
.build();
channel.basicPublish("", "TEST_TTL_QUEUE", properties, msg.getBytes()); //此處發送到 AMQP Default 這個默認的Direct類型的交換機,並路由到TEST_TTL_QUEUE隊列
二、設置隊列的消息過時時間的方法:
Map<String, Object> argss = new HashMap<String, Object>();
argss.put("x-message-ttl",6000); // TTL,6秒後沒有被消費則被髮送到DLX
channel.queueDeclare("TEST_TTL_QUEUE", false, false, false, argss);
注意:若是同時設置了消息的過時時間和隊列的消息過時時間,則會取其中一個較小的值。好比消息設置5秒過時,隊列設置消息10秒過時,則實際過時時間是5秒。
基於消息TTL,咱們來看一下如何利用死信隊列(DLQ)實現延時隊列:
整體步驟:
1)建立一個交換機
2018已通過去過去,2019還想一成不變嗎?擁抱變化,突破瓶頸,想要學習Java架構技術的朋友能夠加個人羣:725219329,羣內每晚都會有阿里技術大牛講解的最新Java架構技術。並會錄製錄播視頻分享在羣公告中,做爲給廣大朋友的加羣的福利——分佈式(Dubbo、Redis、RabbitMQ、Netty、RPC、Zookeeper、高併發、高可用架構)/微服務(Spring Boot、Spring Cloud)/源碼(Spring、Mybatis)/性能優化(JVM、TomCat、MySQL)
2)建立一個隊列,與上述交換機綁定,而且經過屬性指定隊列的死信交換機。
3)建立一個死信交換機
4)建立一個死信隊列
4)將死信交換機綁定到死信隊列
5)消費者監聽死信隊列
代碼以下:
消費者:
由於此處使用默認的AMQP Default的Exchange,因此省略了第1)步,沒有建立交換機。
這裏用指定消息的TTL實現,因此設置隊列TTL屬性的代碼註釋了。
// 指定隊列的死信交換機
Map<String,Object> arguments = new HashMap<String,Object>();
arguments.put("x-dead-letter-exchange","DLX_EXCHANGE");
// arguments.put("x-expires","9000"); // 設置隊列的TTL
// 聲明隊列(默認交換機AMQP default,Direct)
channel.queueDeclare("TEST_DLX_QUEUE", false, false, false, arguments);
// 聲明死信交換機
channel.exchangeDeclare("DLX_EXCHANGE","topic", false, false, false, null);
// 聲明死信隊列
channel.queueDeclare("DLX_QUEUE", false, false, false, null);
// 綁定,此處 Dead letter routing key 設置爲 #,表明路由全部消息
channel.queueBind("DLX_QUEUE","DLX_EXCHANGE","#");
生產者:
String msg = "Hello world, Rabbit MQ, DLX MSG";
// 設置屬性,消息10秒鐘過時
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.deliveryMode(2) // 持久化消息
.contentEncoding("UTF-8")
.expiration("10000") // TTL
.build();
// 發送消息
channel.basicPublish("", "TEST_DLX_QUEUE", properties, msg.getBytes());
消息的流轉流程
生產者——原交換機——原隊列——(超過TTL以後)——死信交換機——死信隊列——最終消費者
如圖:
使用死信隊列實現延時消息的缺點:
RabbitMQ 延時消息的實現(下)
在RabbitMQ 3.5.7及之後的版本提供了一個插件(rabbitmq-delayed-message-exchange)來實現延時隊列功能。同時插件依賴Erlang/OPT 18.0及以上。
插件源碼地址:
https://github.com/rabbitmq/rabbitmq-delayed-message-exchange
插件下載地址:
https://bintray.com/rabbitmq/community-plugins/rabbitmq_delayed_message_exchange
一、進入插件目錄
whereis rabbitmq
cd /usr/lib/rabbitmq/lib/rabbitmq_server-3.6.12/plugins
二、下載插件
wget https://bintray.com/rabbitmq/community-plugins/download_file?file_path=rabbitmq_delayed_message_exchange-0.0.1.ez
若是下載的文件名帶問號則須要更名,例如:
mv download_file?file_path=rabbitmq_delayed_message_exchange-0.0.1.ez rabbitmq_delayed_message_exchange-0.0.1.ez
三、啓用插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
四、停用插件
rabbitmq-plugins disable rabbitmq_delayed_message_exchange
五、插件使用
經過聲明一個x-delayed-message類型的exchange來使用delayed-messaging特性。x-delayed-message是插件提供的類型,並非rabbitmq自己的(區別於direct、topic、fanout、headers)。
代碼:
消費者(先啓動):
// 聲明x-delayed-message類型的exchange
Map<String, Object> argss = new HashMap<String, Object>();
argss.put("x-delayed-type", "direct");
channel.exchangeDeclare("DELAY_EXCHANGE", "x-delayed-message", false,
false, argss);
// 聲明隊列
channel.queueDeclare("DELAY_QUEUE", false,false,false,null);
// 綁定交換機與隊列
channel.queueBind("DELAY_QUEUE", "DELAY_EXCHANGE", "DELAY_KEY");
// 建立消費者
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
String msg = new String(body, "UTF-8");
SimpleDateFormat sf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
System.out.println("收到消息:[" + msg + "]\n接收時間:" +sf.format(new Date()));
}
};
// 開始獲取消息
channel.basicConsume("DELAY_QUEUE", true, consumer);
生產者(後啓動):
// 延時投遞,好比延時1分鐘
Date now = new Date();
Calendar calendar = Calendar.getInstance();
calendar.add(Calendar.MINUTE, +1);// 1分鐘後投遞
Date delayTime = calendar.getTime();
SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
String msg = "發送時間:" + sf.format(now) + ",投遞時間:" + sf.format(delayTime);
// 延遲的間隔時間,目標時刻減去當前時刻
Map<String, Object> headers = new HashMap<String, Object>();
headers.put("x-delay", delayTime.getTime() - now.getTime());
AMQP.BasicProperties.Builder props = new AMQP.BasicProperties.Builder()
.headers(headers);
channel.basicPublish("DELAY_EXCHANGE", "DELAY_KEY", props.build(),
msg.getBytes());
channel.close();
conn.close();
控制檯輸出:
收到消息:[發送時間:2019-01-15 20:44:41.000,投遞時間:2019-01-15 20:45:41.003]
接收時間:2019-01-15 20:45:41.064