摘要: 使用RabbitMQ的消息隊列,能夠有效提升系統的峯值處理能力。javascript
RabbitMQ是消息代理(Message Broker),它支持多種異步消息處理方式,最多見的有:html
RabbitMQ還支持Routing、Topics、以及Remote procedure calls (RPC)等方式。java
對於不一樣的消息處理方式,有一點是相同的,RabbitMQ是介於消息的生產者和消費者的中間節點,負責緩存和分發消息。RabbitMQ接收來自生產者的消息,緩存到內存中,按照不一樣的方式分發給消費者。RabbitMQ還能夠將消息寫入磁盤,保證持久化,這樣即便RabbitMQ意外崩潰了,消息數據不至於徹底丟失。node
最簡單的一點在於,它支持Work Queue等不一樣的消息處理方式,能夠用於不一樣的業務場景。對於咱們Fundebug來講,目前只用過RabbitMQ的Work Queue,即消息隊列。git
使用消息隊列,能夠將不算緊急、可是很是消耗資源的計算任務,以消息的方式插入到RabbitMQ的隊列中,而後使用多個處理模塊處理這些消息。github
這樣作最大的好處在於:提升了系統峯值處理能力。由於,來不及處理的消息緩存在RabbitMQ中,避免了同時進行大量計算致使系統因超負荷運行而崩潰。而那些來不及處理的消息,會在峯值過去以後慢慢處理掉。docker
另外一個好處在於解耦。消息的生產者只須要將消息發送給RabbitMQ,這些消息何時處理完,不會影響生產者的響應性能。api
廣告:歡迎免費試用Fundebug,爲您監控線上代碼的BUG,提升用戶體驗~promise
使用Docker運行RabbitMQ很是簡單,只須要執行一條簡單的命令:緩存
sudo docker run -d --name rabbitmq -h rabbitmq -p 5672:5672 -v /var/lib/rabbitmq:/var/lib/rabbitmq registry.docker-cn.com/library/rabbitmq:3.7
對於不熟悉Docker的朋友,我解釋一下docker的命令選項:
Docker爲官方鏡像提供了加速服務,所以命令中Rabbit的Docker鏡像名爲registry.docker-cn.com/library/rabbitmq:3.7。
若是你不會Docker,建議你學習一下。若是你不想學,Ubuntu 14.04下安裝RabbitMQ的命令是這樣的:
sudo echo "deb http://www.rabbitmq.com/debian testing main" | sudo tee -a /etc/apt/sources.list wget -O- https://www.rabbitmq.com/rabbitmq-release-signing-key.asc | sudo apt-key add - sudo apt-get update sudo apt-get install rabbitmq-server
啓動RabbitMQ:
sudo service rabbitmq-server start
下面,咱們使用Node.js實現一個簡單消息隊列。
消息的生產者:sender.js
const amqp = require("amqplib"); const queue = "demo"; async function sendMessage(message) { const connection = await amqp.connect("amqp://localhost"); const channel = await connection.createChannel(); await channel.assertQueue(queue); await channel.sendToQueue(queue, new Buffer(message), { // RabbitMQ關閉時,消息會被保存到磁盤 persistent: true }); } setInterval(function() { sendMessage("Hello, Fundebug!"); }, 1000)
消息的消費者:receiver.js
const amqp = require("amqplib"); const queue = "demo"; async function receiveMessage() { const connection = await amqp.connect("amqp://localhost"); const channel = await connection.createChannel(); await channel.assertQueue(queue); await channel.consume(queue, function(message) { console.log(message.content.toString()); channel.ack(message); }); } receiveMessage();
咱們用到了amqplib模塊,用於與RabbitMQ進行通訊,對於具體接口的細節,能夠查看文檔。
在調用sendToQueue時,將persistent屬性設爲true,這樣RabbitMQ關閉時,消息會被保存到磁盤。測試這一點很簡單:
因爲RabbitMQ容器將保存數據的目錄(/var/lib/rabbitmq)以數據卷的形式保存在本地主機,所以即便將RabbitMQ容器刪除(sudo docker rm -f rabbitmq)後從新運行,效果也是同樣的。
另外,這段代碼採用了Node.js最新的異步代碼編寫方式:Async/Await,所以很是簡潔,感興趣的同窗能夠了解一下。
這個Demo的運行方式很是簡單:
sudo ./start_rabbitmq.sh
node ./sender.js
node ./receiver.js
在receiver端,能夠看到不停地打印"Hello, Fundebug!"。
代碼倉庫地址爲:Fundebug/rabbitmq-demo
在生產環境中,RabbitMQ不免會出現重啓的狀況,好比更換磁盤或者服務器、負載太高致使崩潰。由於RabbitMQ能夠將消息寫入磁盤,因此數據是"安全"的。可是,代碼中必須實現自動重連機制,不然RabbitMQ中止時會致使Node.js應用崩潰。這裏提供一個自動重連的代碼示例,給你們參考:
消息生產者:sender_reconnect.js
const amqp = require("amqplib"); const queue = "demo"; var connection; // 鏈接RabbitMQ async function connectRabbitMQ() { try { connection = await amqp.connect("amqp://localhost"); console.info("connect to RabbitMQ success"); const channel = await connection.createChannel(); await channel.assertQueue(queue); await channel.sendToQueue(queue, new Buffer("Hello, Fundebug!"), { // RabbitMQ重啓時,消息會被保存到磁盤 persistent: true }); connection.on("error", function(err) { console.log(err); setTimeout(connectRabbitMQ, 10000); }); connection.on("close", function() { console.error("connection to RabbitQM closed!"); setTimeout(connectRabbitMQ, 10000); }); } catch (err) { console.error(err); setTimeout(connectRabbitMQ, 10000); } } connectRabbitMQ();
消息消費者:receiver_reconnect.js
const amqp = require("amqplib"); const queue = "demo"; var connection; // 鏈接RabbitMQ async function connectRabbitMQ() { try { connection = await amqp.connect("amqp://localhost"); console.info("connect to RabbitMQ success"); const channel = await connection.createChannel(); await channel.assertQueue(queue); await channel.consume(queue, async function(message) { console.log(message.content.toString()); channel.ack(message); }); connection.on("error", function(err) { console.log(err); setTimeout(connectRabbitMQ, 10000); }); connection.on("close", function() { console.error("connection to RabbitQM closed!"); setTimeout(connectRabbitMQ, 10000); }); } catch (err) { console.error(err); setTimeout(connectRabbitMQ, 10000); } } connectRabbitMQ();
這樣的話,即便RabbitMQ重啓,sender和receiver也能夠自動從新鏈接RabbitMQ。若是你但願監控RabbitMQ是否出錯,不妨使用咱們Fundebug的Node.js錯誤監控服務,在鏈接觸發"error"或者"close"事件時,第一時間發送報警,這樣開發者能夠及時定位和處理BUG。