RabbitMQ官方提供的教程https://www.rabbitmq.com/tuto...,是基於回調的。javascript
下面將給出基於Promise式的寫法。而且實現動態的隊列綁定html
const amqp = require('amqplib') // rabbitMQ地址 const {amqpAddrHost} = require('../config/index.js') // 交換機名稱 const ex = 'amq.topic' const amqpAddr = `amqp://${amqpAddrHost}` // 讀取HOSTNAME, 在跑多實例時,例如在k8s中,HOSTNAME能夠獲取當前pod的名稱 // 多實例時,寫日誌,或者創建鏈接時,最好帶上pod名稱,若是出現問題,也比較好定位哪一個pod出現的問題。 const hostName = process.env.HOSTNAME // 隊列的屬性設置 // 通常來講,最好設置隊列自動刪除autoDelete,當連接斷開時,隊列也會刪除,這樣不會產生很是多的無用隊列 // durable是用來的持久化的,最好也能夠設置成不持久化 const queueAttr = {autoDelete: true, durable: false} // 定義channel的引用,當連接創建時,全部方法均可以經過引用CH來獲取channel方法 let CH = null
// 向隊列發送消息的函數 function publishMessage (msg) { if (!CH) { return '' } msg = JSON.stringify(msg) // 指定交換機ex, routing key, 以及消息的內容 CH.publish(ex, eventBusTopic, Buffer.from(msg)) }
function reconnectRabbitMq () { log.info('reconnect_rabbit_mq') connectRabbitMq() }
function connectRabbitMq () { amqp.connect(amqpAddr, { // 設置connection_name的屬性,能夠在rabbitMQ的控制檯的UI上,看到鏈接是來自哪一個實例 clientProperties: { connection_name: hostName } }) .then((conn) => { log.info('rabbitmq_connect_successd') // 必定要加上連接的報錯事件處理,不然一旦報error錯,若是不處理這個錯誤,程序就會崩潰 // error是個特別的事件,務必要處理的 // 報錯就直接去重連 conn.on('error', (err) => { log.error('connect_error ' + err.message, err) reconnectRabbitMq() }) // 建立channel return conn.createChannel() }) .then((ch) => { CH = ch // 初始化交換機 ch.assertExchange(ex, 'topic', {durable: true}) // 初始化一個隊列,隊列名就用hostName, 比較容易從對列名上知道是哪一個實例建立的隊列 return ch.assertQueue(hostName, queueAttr) }) .then((q) => { // 能夠在隊列初始化完畢就當即綁定routing key, 也能夠暫時不綁定,後續動態的綁定 // CH.bindQueue(q.queue, ex, 'some.topic.aaa') // 消費者,獲取消息 CH.consume(q.queue, (msg) => { var _msg = msg.content.toString() var MSG = JSON.parse(_msg) log.info(_msg, MSG) }, {noAck: true}) }) .catch((err) => { console.log(err) }) }
function toggleBindQueue (routingKey, bind) { return new Promise((resolve, reject) => { if (!CH) { log.error('channel not established') reject(new Error('channel not established')) return '' } // 初始化隊列,若是隊列已經存在,就會直接使用 CH.assertQueue(`${hostName}`, queueAttr) .then((q) => { // 若是bind是true,就綁定。不然就解綁 if (bind) { log.info(`bindQueue ${hostName} ${topic}`) return CH.bindQueue(q.queue, ex, topic) } else { return CH.unbindQueue(q.queue, ex, topic) } }) .then((res) => { resolve() }) .catch((err) => { reject(err) log.error(err) }) }) } module.exports = { connectRabbitMq, toggleBindQueue, publishMessage }
加入你的服務端用的是Express, 那麼在app.js中能夠java
... const {connectRabbitMq} = require('./connect-mq.js') connectRabbitMq() ...
// onnect-mq.js const amqp = require('amqplib') // rabbitMQ地址 const {amqpAddrHost} = require('../config/index.js') // 交換機名稱 const ex = 'amq.topic' const amqpAddr = `amqp://${amqpAddrHost}` // 讀取HOSTNAME, 在跑多實例時,例如在k8s中,HOSTNAME能夠獲取當前pod的名稱 // 多實例時,寫日誌,或者創建鏈接時,最好帶上pod名稱,若是出現問題,也比較好定位哪一個pod出現的問題。 const hostName = process.env.HOSTNAME // 隊列的屬性設置 // 通常來講,最好設置隊列自動刪除autoDelete,當連接斷開時,隊列也會刪除,這樣不會產生很是多的無用隊列 // durable是用來的持久化的,最好也能夠設置成不持久化 const queueAttr = {autoDelete: true, durable: false} // 定義channel的引用,當連接創建時,全部方法均可以經過引用CH來獲取channel方法 let CH = null // 向隊列發送消息的函數 function publishMessage (msg) { if (!CH) { return '' } msg = JSON.stringify(msg) // 指定交換機ex, routing key, 以及消息的內容 CH.publish(ex, eventBusTopic, Buffer.from(msg)) } // 當連接rabbitMQ斷開時,要主動去重連 function reconnectRabbitMq () { log.info('reconnect_rabbit_mq') connectRabbitMq() } // 連接rabbitMQ的主要函數 function connectRabbitMq () { amqp.connect(amqpAddr, { // 設置connection_name的屬性,能夠在rabbitMQ的控制檯的UI上,看到連接是來自哪一個實例 clientProperties: { connection_name: hostName } }) .then((conn) => { log.info('rabbitmq_connect_successd') // 必定要加上連接的報錯事件處理,不然一旦報error錯,若是不處理這個錯誤,程序就會崩潰 // error是個特別的事件,務必要處理的 // 報錯就直接去重連 conn.on('error', (err) => { log.error('connect_error ' + err.message, err) reconnectRabbitMq() }) // 建立channel return conn.createChannel() }) .then((ch) => { CH = ch // 初始化交換機 ch.assertExchange(ex, 'topic', {durable: true}) // 初始化一個隊列,隊列名就用hostName, 比較容易從對列名上知道是哪一個實例建立的隊列 return ch.assertQueue(hostName, queueAttr) }) .then((q) => { // 能夠在隊列初始化完畢就當即綁定routing key, 也能夠暫時不綁定,後續動態的綁定 // CH.bindQueue(q.queue, ex, 'some.topic.aaa') // 消費者,獲取消息 CH.consume(q.queue, (msg) => { var _msg = msg.content.toString() var MSG = JSON.parse(_msg) log.info(_msg, MSG) }, {noAck: true}) }) .catch((err) => { console.log(err) }) } // 動態給隊列綁定或者解綁routing key function toggleBindQueue (routingKey, bind) { return new Promise((resolve, reject) => { if (!CH) { log.error('channel not established') reject(new Error('channel not established')) return '' } // 初始化隊列,若是隊列已經存在,就會直接使用 CH.assertQueue(`${hostName}`, queueAttr) .then((q) => { // 若是bind是true,就綁定。不然就解綁 if (bind) { log.info(`bindQueue ${hostName} ${topic}`) return CH.bindQueue(q.queue, ex, topic) } else { return CH.unbindQueue(q.queue, ex, topic) } }) .then((res) => { resolve() }) .catch((err) => { reject(err) log.error(err) }) }) } module.exports = { connectRabbitMq, toggleBindQueue, publishMessage }