在前一篇文章中可伸縮架構簡短系列中提到過關於異步的問題。當時推薦使用RabbitMQ來作任務隊列的實現方案。本篇文章以Node.js爲例子,來實際操做如何和RabbitMQ進行交互。javascript
RabbitMQ是一個消息代理。它最初的思想特別簡單:接受而且轉發消息。你能夠將它想象爲郵局:當你將郵件放到信箱中,你能夠很是確定快件員最終會將郵件交到接受人手中。你能夠把RabbitMQ比喻爲信箱、郵局和快遞員。RabbitMQ和郵局之間主要的區別是它不處理紙張,而是接受、存儲和轉發二進制數據‒消息。html
在RabbitMQ中,有一些基本術語:前端
打開RabbitMQ的下載頁面(https://www.rabbitmq.com/download.html),下載安裝,這裏以Mac OSX平臺安裝爲例:
RabbitMQ依賴Erlang,因爲Mac OSX自己已經安裝了Erlang,因此能夠直接經過Homebrew來進行安裝。java
$ brew update $ brew install rabbitmq
安裝完後,須要將/usr/local/sbin添加到$PATH,添加到./.bash_profile文件,而後node
$ source ./.bash_profile $ echo $PATH // 檢查環境變量是否已經成功加入
安裝完成後就能夠啓動RabbitMQ server了。
git
至此,安裝就完成了。運行rabbitmq-server命令時可能會報錯誤:ERROR: epmd error for host localhost: timeout (timed out),若是遇到這種狀況,能夠打開/etc/hosts文件,在文件末尾加上 127.0.0.1 localhost便可解決問題。github
在這個部分,我會使用Javascript編寫兩個小程序。一個發送單個消息的生產者和接收消息並將其打印出來的消費者。咱們將跳過在amqp.node API的一些細節,集中在這個很是簡單的事情。npm
在下圖中,P表明生產者,C表明消費者,中間紅色表明的是任務隊列-消息緩衝區
小程序
首先,使用npm安裝amqp.node前端工程化
$ npm install amqplib
這裏我將消息的發送者稱做send.js,消息接受者稱做receive.js,消息發送者會鏈接到RabbitMQ,發送一個消息,最後退出。
首先引入amqplib這個模塊:
var amqp = require('amqplib/callback_api');
鏈接到 RabbitMQ server
amqp.connect('amqp://localhost', function(err, conn) {});
接下來建立一個通道,
amqp.connect('amqp://localhost', function(err, conn) { conn.createChannel(function(err, ch) {}); });
爲了發送消息,咱們須要定義一個隊列,咱們能夠將消息發送到這個隊列中:
amqp.connect('amqp://localhost', function(err, conn) { conn.createChannel(function(err, ch) { var q = 'hello'; ch.assertQueue(q, {durable: false}); // Note: on Node 6 Buffer.from(msg) should be used ch.sendToQueue(q, new Buffer('Hello World!')); console.log(" [x] Sent 'Hello World!'"); }); });
最後,咱們關閉鏈接,而且退出:
setTimeout(function() { conn.close(); process.exit(0) }, 500);
最終代碼參考:send.js
創建一個接受者的方式和發送者是相同的。打開一個鏈接和通道,而且申明一個須要處理的隊列。注意:這裏的隊列和發送者裏面定義的隊列須要匹配。
amqp.connect('amqp://localhost', function(err, conn) { conn.createChannel(function(err, ch) { var q = 'hello'; ch.assertQueue(q, {durable: false}); }); });
這裏也定義隊列的緣由:接受者可能比發送者先開始執行。咱們須要確保當接受者處理queue的時候,queue是存在的。
因爲消息的發送是異步的,咱們須要提供一個回調,這樣,當RabbitMQ發送消息給咱們的消費者時,回調會執行。這個也是Channel.consume作的事情。
console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", q); ch.consume(q, function(msg) { console.log(" [x] Received %s", msg.content.toString()); }, {noAck: true});
最終代碼參考:receive.js
// 先執行send.js $ ./send.js // 後執行receive.js $ ./receive.js
最終結果如圖: