在開發中作定時任務是一個很是常見的業務場景,在代碼層面 Node.js 能夠用 setTimeout、setInerval 這種基礎語法或用 node-schedule 這些相似的庫來達到部分目的,在第三方服務上能夠用 Redis 的 Keyspace Notification 或 Linux 自身的 crontab 來作定時任務。RabbitMQ 做爲一個消息中間件,使用其死信隊列也能夠達到作定時任務的目的。javascript
本文以 Node.js 做爲演示語言,操做 RabbitMQ 使用的是 amqplib。java
RabbitMQ 中有一種交換器叫 DLX,全稱爲 Dead-Letter-Exchange,能夠稱之爲死信交換器。當消息在一個隊列中變成死信(dead message)以後,它會被從新發送到另一個交換器中,這個交換器就是 DLX,綁定在 DLX 上的隊列就稱之爲死信隊列。
消息變成死信通常是如下幾種狀況:node
DLX 也是一個正常的交換器,和通常的交換器沒有區別,它能在任何隊列上被指定,實際上就是設置某個隊列的屬性。當這個隊列存在死信時,RabbitMQ 就會自動地將這個消息從新發布到設置的 DLX 上去,進而被路由到另外一個隊列,即死信隊列。要爲某個隊列添加 DLX,須要在建立這個隊列的時候設置其deadLetterExchange
和 deadLetterRoutingKey
參數,deadLetterRoutingKey
參數可選,表示爲 DLX 指定的路由鍵,若是沒有特殊指定,則使用原隊列的路由鍵。npm
const amqp = require('amqplib'); const myNormalEx = 'my_normal_exchange'; const myNormalQueue = 'my_normal_queue'; const myDeadLetterEx = 'my_dead_letter_exchange'; const myDeadLetterRoutingKey = 'my_dead_letter_routing_key'; let connection, channel; amqp.connect('amqp://localhost') .then((conn) => { connection = conn; return conn.createChannel(); }) .then((ch) => { channel = ch; ch.assertExchange(myNormalEx, 'direct', { durable: false }); return ch.assertQueue(myNormalQueue, { exclusive: false, deadLetterExchange: myDeadLetterEx, deadLetterRoutingKey: myDeadLetterRoutingKey, }); }) .then((ok) => { channel.bindQueue(ok.queue, myNormalEx); channel.sendToQueue(ok.queue, Buffer.from('hello')); setTimeout(function () { connection.close(); process.exit(0) }, 500); }) .catch(console.error);
上面的代碼先聲明瞭一個交換器 myNormalEx
, 而後聲明瞭一個隊列 myNormalQueue
,在聲明該隊列的時候經過設置其 deadLetterExchange
參數,爲其添加了一個 DLX。因此當隊列 myNormalQueue
中有消息成爲死信後就會被髮布到 myDeadLetterEx
中去。ui
在 RabbbitMQ 中,能夠對消息和隊列設置過時時間。當經過隊列屬性設置過時時間時,隊列中全部消息都有相同的過時時間。當對消息設置單獨的過時時間時,每條消息的 TTL 能夠不一樣。若是兩種方法一塊兒使用,則消息的 TTL 以二者之間較小的那個數值爲準。消息在隊列中的生存時間一旦超過設置的 TTL 值時,就會變成「死信」(Dead Message),消費者將沒法再接收到該消息。spa
針對每條消息設置 TTL 是在發送消息的時候設置 expiration
參數,單位爲毫秒。code
const amqp = require('amqplib'); const myNormalEx = 'my_normal_exchange'; const myNormalQueue = 'my_normal_queue'; const myDeadLetterEx = 'my_dead_letter_exchange'; const myDeadLetterRoutingKey = 'my_dead_letter_routing_key'; let connection, channel; amqp.connect('amqp://localhost') .then((conn) => { connection = conn; return conn.createChannel(); }) .then((ch) => { channel = ch; ch.assertExchange(myNormalEx, 'direct', { durable: false }); return ch.assertQueue(myNormalQueue, { exclusive: false, deadLetterExchange: myDeadLetterEx, deadLetterRoutingKey: myDeadLetterRoutingKey, }); }) .then((ok) => { channel.bindQueue(ok.queue, myNormalEx); channel.sendToQueue(ok.queue, Buffer.from('hello'), { expiration: '4000'}); setTimeout(function () { connection.close(); process.exit(0) }, 500); }) .catch(console.error);
上面的代碼在向隊列發送消息的時候,經過傳遞 { expiration: '4000'}
將這條消息的過時時間設爲了4秒,對消息設置4秒鐘過時,這條消息並不必定就會在4秒鐘後被丟棄或進入死信,只有當這條消息到達隊首即將被消費時纔會判斷其是否過時,若未過時就會被消費者消費,若已過時就會被刪除或者成爲死信。orm
由於隊列中的消息過時後會成爲死信,而死信又會被髮布到該消息所在的隊列的 DLX 上去,因此經過爲消息設置過時時間,而後再消費該消息所在隊列的 DLX 所綁定的隊列,從而來達到定時處理一個任務的目的。 簡單的講就是當有一個隊列 queue1,其 DLX 爲 deadEx1,deadEx1 綁定了一個隊列 deadQueue1,當隊列 queue1 中有一條消息因過時成爲死信時,就會被髮布到 deadEx1 中去,經過消費隊列 deadQueue1 中的消息,也就至關於消費的是 queue1 中的因過時產生的死信消息。中間件
消費死信隊列的代碼以下:blog
const amqp = require('amqplib'); const myDeadLetterEx = 'my_dead_letter_exchange'; const myDeadLetterQueue = 'my_dead_letter_queue'; const myDeadLetterRoutingKey = 'my_dead_letter_routing_key'; let channel; amqp.connect('amqp://localhost') .then((conn) => { return conn.createChannel(); }) .then((ch) => { channel = ch; ch.assertExchange(myDeadLetterEx, 'direct', { durable: false }); return ch.assertQueue(myDeadLetterQueue, { exclusive: false }); }) .then((ok) => { channel.bindQueue(ok.queue, myDeadLetterEx, myDeadLetterRoutingKey); channel.consume(ok.queue, (msg) => { console.log(" [x] %s: '%s'", msg.fields.routingKey, msg.content.toString()); }, { noAck: true}) }) .catch(console.error);
這裏須要注意的是,若是聲明的 myDeadLetterEx
是 direct 類型,那麼在爲其綁定隊列的時候必定要指定 BindingKey,即這裏的 myDeadLetterRoutingKey
,若是不指定 Bindingkey,則須要將 myDeadLetterEx
聲明爲 fanout 類型。