用 RabbitMQ 的死信隊列來作定時任務

在開發中作定時任務是一個很是常見的業務場景,在代碼層面 Node.js 能夠用 setTimeout、setInerval 這種基礎語法或用 node-schedule 這些相似的庫來達到部分目的,在第三方服務上能夠用 Redis 的 Keyspace Notification 或 Linux 自身的 crontab 來作定時任務。RabbitMQ 做爲一個消息中間件,使用其死信隊列也能夠達到作定時任務的目的。javascript

本文以 Node.js 做爲演示語言,操做 RabbitMQ 使用的是 amqplibjava

死信隊列

RabbitMQ 中有一種交換器叫 DLX,全稱爲 Dead-Letter-Exchange,能夠稱之爲死信交換器。當消息在一個隊列中變成死信(dead message)以後,它會被從新發送到另一個交換器中,這個交換器就是 DLX,綁定在 DLX 上的隊列就稱之爲死信隊列。
消息變成死信通常是如下幾種狀況:node

  • 消息被拒絕,而且設置 requeue 參數爲 false
  • 消息過時
  • 隊列達到最大長度

DLX 也是一個正常的交換器,和通常的交換器沒有區別,它能在任何隊列上被指定,實際上就是設置某個隊列的屬性。當這個隊列存在死信時,RabbitMQ 就會自動地將這個消息從新發布到設置的 DLX 上去,進而被路由到另外一個隊列,即死信隊列。要爲某個隊列添加 DLX,須要在建立這個隊列的時候設置其deadLetterExchangedeadLetterRoutingKey 參數,deadLetterRoutingKey 參數可選,表示爲 DLX 指定的路由鍵,若是沒有特殊指定,則使用原隊列的路由鍵。npm

const amqp = require('amqplib');

const myNormalEx = 'my_normal_exchange';
const myNormalQueue = 'my_normal_queue';
const myDeadLetterEx = 'my_dead_letter_exchange';
const myDeadLetterRoutingKey = 'my_dead_letter_routing_key';
let connection, channel;
amqp.connect('amqp://localhost')
  .then((conn) => {
    connection = conn;
    return conn.createChannel();
  })
  .then((ch) => {
    channel = ch;
    ch.assertExchange(myNormalEx, 'direct', { durable: false });
    return ch.assertQueue(myNormalQueue, {
      exclusive: false,
      deadLetterExchange: myDeadLetterEx,
      deadLetterRoutingKey: myDeadLetterRoutingKey,
    });
  })
  .then((ok) => {
    channel.bindQueue(ok.queue, myNormalEx);
    channel.sendToQueue(ok.queue, Buffer.from('hello'));
    setTimeout(function () { connection.close(); process.exit(0) }, 500);
  })
  .catch(console.error);

上面的代碼先聲明瞭一個交換器 myNormalEx, 而後聲明瞭一個隊列 myNormalQueue,在聲明該隊列的時候經過設置其 deadLetterExchange 參數,爲其添加了一個 DLX。因此當隊列 myNormalQueue 中有消息成爲死信後就會被髮布到 myDeadLetterEx 中去。ui

過時時間(TTL)

在 RabbbitMQ 中,能夠對消息和隊列設置過時時間。當經過隊列屬性設置過時時間時,隊列中全部消息都有相同的過時時間。當對消息設置單獨的過時時間時,每條消息的 TTL 能夠不一樣。若是兩種方法一塊兒使用,則消息的 TTL 以二者之間較小的那個數值爲準。消息在隊列中的生存時間一旦超過設置的 TTL 值時,就會變成「死信」(Dead Message),消費者將沒法再接收到該消息。spa

針對每條消息設置 TTL 是在發送消息的時候設置 expiration 參數,單位爲毫秒。code

const amqp = require('amqplib');

const myNormalEx = 'my_normal_exchange';
const myNormalQueue = 'my_normal_queue';
const myDeadLetterEx = 'my_dead_letter_exchange';
const myDeadLetterRoutingKey = 'my_dead_letter_routing_key';
let connection, channel;
amqp.connect('amqp://localhost')
  .then((conn) => {
    connection = conn;
    return conn.createChannel();
  })
  .then((ch) => {
    channel = ch;
    ch.assertExchange(myNormalEx, 'direct', { durable: false });
    return ch.assertQueue(myNormalQueue, {
      exclusive: false,
      deadLetterExchange: myDeadLetterEx,
      deadLetterRoutingKey: myDeadLetterRoutingKey,
    });
})
  .then((ok) => {
    channel.bindQueue(ok.queue, myNormalEx);
    channel.sendToQueue(ok.queue, Buffer.from('hello'), { expiration: '4000'});
    setTimeout(function () { connection.close(); process.exit(0) }, 500);
  })
  .catch(console.error);

上面的代碼在向隊列發送消息的時候,經過傳遞 { expiration: '4000'} 將這條消息的過時時間設爲了4秒,對消息設置4秒鐘過時,這條消息並不必定就會在4秒鐘後被丟棄或進入死信,只有當這條消息到達隊首即將被消費時纔會判斷其是否過時,若未過時就會被消費者消費,若已過時就會被刪除或者成爲死信。orm

定時任務

由於隊列中的消息過時後會成爲死信,而死信又會被髮布到該消息所在的隊列的 DLX 上去,因此經過爲消息設置過時時間,而後再消費該消息所在隊列的 DLX 所綁定的隊列,從而來達到定時處理一個任務的目的。 簡單的講就是當有一個隊列 queue1,其 DLX 爲 deadEx1,deadEx1 綁定了一個隊列 deadQueue1,當隊列 queue1 中有一條消息因過時成爲死信時,就會被髮布到 deadEx1 中去,經過消費隊列 deadQueue1 中的消息,也就至關於消費的是 queue1 中的因過時產生的死信消息。中間件

消費死信隊列的代碼以下:blog

const amqp = require('amqplib');

const myDeadLetterEx = 'my_dead_letter_exchange';
const myDeadLetterQueue = 'my_dead_letter_queue';
const myDeadLetterRoutingKey = 'my_dead_letter_routing_key';
let channel;
amqp.connect('amqp://localhost')
.then((conn) => {
  return conn.createChannel();
})
.then((ch) => {
  channel = ch;
  ch.assertExchange(myDeadLetterEx, 'direct', { durable: false });
  return ch.assertQueue(myDeadLetterQueue, { exclusive: false });
})
.then((ok) => {
  channel.bindQueue(ok.queue, myDeadLetterEx, myDeadLetterRoutingKey);
  channel.consume(ok.queue, (msg) => {
    console.log(" [x] %s: '%s'", msg.fields.routingKey, msg.content.toString());
  }, { noAck: true})
})
.catch(console.error);

這裏須要注意的是,若是聲明的 myDeadLetterEx 是 direct 類型,那麼在爲其綁定隊列的時候必定要指定 BindingKey,即這裏的 myDeadLetterRoutingKey,若是不指定 Bindingkey,則須要將 myDeadLetterEx 聲明爲 fanout 類型。

相關文章
相關標籤/搜索