Node.js鏈接RabbitMQ,斷線重連,動態綁定routing key

RabbitMQ官方提供的教程https://www.rabbitmq.com/tuto...,是基於回調的。javascript

下面將給出基於Promise式的寫法。而且實現動態的隊列綁定html

初始化配置

const amqp = require('amqplib')
// rabbitMQ地址
const {amqpAddrHost} = require('../config/index.js')

// 交換機名稱
const ex = 'amq.topic'

const amqpAddr = `amqp://${amqpAddrHost}`

// 讀取HOSTNAME, 在跑多實例時,例如在k8s中,HOSTNAME能夠獲取當前pod的名稱
// 多實例時,寫日誌,或者創建鏈接時,最好帶上pod名稱,若是出現問題,也比較好定位哪一個pod出現的問題。
const hostName = process.env.HOSTNAME

// 隊列的屬性設置
// 通常來講,最好設置隊列自動刪除autoDelete,當連接斷開時,隊列也會刪除,這樣不會產生很是多的無用隊列
// durable是用來的持久化的,最好也能夠設置成不持久化

const queueAttr = {autoDelete: true, durable: false}

// 定義channel的引用,當連接創建時,全部方法均可以經過引用CH來獲取channel方法
let CH = null

向隊列發送消息的函數

// 向隊列發送消息的函數
function publishMessage (msg) {
  if (!CH) {
    return ''
  }

  msg = JSON.stringify(msg)
  // 指定交換機ex, routing key, 以及消息的內容
  CH.publish(ex, eventBusTopic, Buffer.from(msg))
}

當連接rabbitMQ斷開時,要主動去重連

function reconnectRabbitMq () {
  log.info('reconnect_rabbit_mq')
  connectRabbitMq()
}

鏈接rabbitMQ的主要函數

function connectRabbitMq () {
  amqp.connect(amqpAddr, {
    // 設置connection_name的屬性,能夠在rabbitMQ的控制檯的UI上,看到鏈接是來自哪一個實例
    clientProperties: {
      connection_name: hostName
    }
  })
  .then((conn) => {
    log.info('rabbitmq_connect_successd')
    // 必定要加上連接的報錯事件處理,不然一旦報error錯,若是不處理這個錯誤,程序就會崩潰
    // error是個特別的事件,務必要處理的
    // 報錯就直接去重連
    conn.on('error', (err) => {
      log.error('connect_error ' + err.message, err)
      reconnectRabbitMq()
    })
    // 建立channel
    return conn.createChannel()
  })
  .then((ch) => {
    CH = ch
    // 初始化交換機
    ch.assertExchange(ex, 'topic', {durable: true})
    // 初始化一個隊列,隊列名就用hostName, 比較容易從對列名上知道是哪一個實例建立的隊列
    return ch.assertQueue(hostName, queueAttr)
  })
  .then((q) => {
    // 能夠在隊列初始化完畢就當即綁定routing key, 也能夠暫時不綁定,後續動態的綁定
    // CH.bindQueue(q.queue, ex, 'some.topic.aaa')
    // 消費者,獲取消息
    CH.consume(q.queue, (msg) => {
      var _msg = msg.content.toString()
      var MSG = JSON.parse(_msg)
      log.info(_msg, MSG)
    }, {noAck: true})
  })
  .catch((err) => {
    console.log(err)
  })
}

動態給隊列綁定或者解綁routing key

function toggleBindQueue (routingKey, bind) {
  return new Promise((resolve, reject) => {
    if (!CH) {
      log.error('channel not established')
      reject(new Error('channel not established'))
      return ''
    }
    // 初始化隊列,若是隊列已經存在,就會直接使用
    CH.assertQueue(`${hostName}`, queueAttr)
    .then((q) => {
      // 若是bind是true,就綁定。不然就解綁
      if (bind) {
        log.info(`bindQueue ${hostName} ${topic}`)
        return CH.bindQueue(q.queue, ex, topic)
      } else {
        return CH.unbindQueue(q.queue, ex, topic)
      }
    })
    .then((res) => {
      resolve()
    })
    .catch((err) => {
      reject(err)
      log.error(err)
    })
  })
}

module.exports = {
  connectRabbitMq,
  toggleBindQueue,
  publishMessage
}

使用方法

加入你的服務端用的是Express, 那麼在app.js中能夠java

...
const {connectRabbitMq} = require('./connect-mq.js')
connectRabbitMq()
...

完整代碼

// onnect-mq.js
const amqp = require('amqplib')
// rabbitMQ地址
const {amqpAddrHost} = require('../config/index.js')

// 交換機名稱
const ex = 'amq.topic'

const amqpAddr = `amqp://${amqpAddrHost}`

// 讀取HOSTNAME, 在跑多實例時,例如在k8s中,HOSTNAME能夠獲取當前pod的名稱
// 多實例時,寫日誌,或者創建鏈接時,最好帶上pod名稱,若是出現問題,也比較好定位哪一個pod出現的問題。
const hostName = process.env.HOSTNAME

// 隊列的屬性設置
// 通常來講,最好設置隊列自動刪除autoDelete,當連接斷開時,隊列也會刪除,這樣不會產生很是多的無用隊列
// durable是用來的持久化的,最好也能夠設置成不持久化

const queueAttr = {autoDelete: true, durable: false}

// 定義channel的引用,當連接創建時,全部方法均可以經過引用CH來獲取channel方法
let CH = null


// 向隊列發送消息的函數
function publishMessage (msg) {
  if (!CH) {
    return ''
  }

  msg = JSON.stringify(msg)
  // 指定交換機ex, routing key, 以及消息的內容
  CH.publish(ex, eventBusTopic, Buffer.from(msg))
}

// 當連接rabbitMQ斷開時,要主動去重連
function reconnectRabbitMq () {
  log.info('reconnect_rabbit_mq')
  connectRabbitMq()
}

// 連接rabbitMQ的主要函數
function connectRabbitMq () {
  amqp.connect(amqpAddr, {
    // 設置connection_name的屬性,能夠在rabbitMQ的控制檯的UI上,看到連接是來自哪一個實例
    clientProperties: {
      connection_name: hostName
    }
  })
  .then((conn) => {
    log.info('rabbitmq_connect_successd')
    // 必定要加上連接的報錯事件處理,不然一旦報error錯,若是不處理這個錯誤,程序就會崩潰
    // error是個特別的事件,務必要處理的
    // 報錯就直接去重連
    conn.on('error', (err) => {
      log.error('connect_error ' + err.message, err)
      reconnectRabbitMq()
    })
    // 建立channel
    return conn.createChannel()
  })
  .then((ch) => {
    CH = ch
    // 初始化交換機
    ch.assertExchange(ex, 'topic', {durable: true})
    // 初始化一個隊列,隊列名就用hostName, 比較容易從對列名上知道是哪一個實例建立的隊列
    return ch.assertQueue(hostName, queueAttr)
  })
  .then((q) => {
    // 能夠在隊列初始化完畢就當即綁定routing key, 也能夠暫時不綁定,後續動態的綁定
    // CH.bindQueue(q.queue, ex, 'some.topic.aaa')
    // 消費者,獲取消息
    CH.consume(q.queue, (msg) => {
      var _msg = msg.content.toString()
      var MSG = JSON.parse(_msg)
      log.info(_msg, MSG)
    }, {noAck: true})
  })
  .catch((err) => {
    console.log(err)
  })
}


// 動態給隊列綁定或者解綁routing key
function toggleBindQueue (routingKey, bind) {
  return new Promise((resolve, reject) => {
    if (!CH) {
      log.error('channel not established')
      reject(new Error('channel not established'))
      return ''
    }
    // 初始化隊列,若是隊列已經存在,就會直接使用
    CH.assertQueue(`${hostName}`, queueAttr)
    .then((q) => {
      // 若是bind是true,就綁定。不然就解綁
      if (bind) {
        log.info(`bindQueue ${hostName} ${topic}`)
        return CH.bindQueue(q.queue, ex, topic)
      } else {
        return CH.unbindQueue(q.queue, ex, topic)
      }
    })
    .then((res) => {
      resolve()
    })
    .catch((err) => {
      reject(err)
      log.error(err)
    })
  })
}

module.exports = {
  connectRabbitMq,
  toggleBindQueue,
  publishMessage
}
相關文章
相關標籤/搜索