RabbitMQ入門教程

摘要: 使用RabbitMQ的消息隊列,能夠有效提升系統的峯值處理能力。javascript

clipboard.png

RabbitMQ簡介

RabbitMQ消息代理(Message Broker),它支持多種異步消息處理方式,最多見的有:html

  • Work Queue:將消息緩存到一個隊列,默認狀況下,多個worker按照Round Robin的方式處理隊列中的消息。每一個消息只會分配給單個worker。
  • Publish/Subscribe:每一個訂閱消息的消費者都會收到消息,所以每一個消息一般會分配給多個worker,每一個worker對消息進行不一樣的處理。

RabbitMQ還支持RoutingTopics、以及Remote procedure calls (RPC)等方式。java

對於不一樣的消息處理方式,有一點是相同的,RabbitMQ是介於消息的生產者和消費者的中間節點,負責緩存和分發消息。RabbitMQ接收來自生產者的消息,緩存到內存中,按照不一樣的方式分發給消費者。RabbitMQ還能夠將消息寫入磁盤,保證持久化,這樣即便RabbitMQ意外崩潰了,消息數據不至於徹底丟失。node

爲何使用RabbitMQ?

最簡單的一點在於,它支持Work Queue等不一樣的消息處理方式,能夠用於不一樣的業務場景。對於咱們Fundebug來講,目前只用過RabbitMQ的Work Queue,即消息隊列。git

使用消息隊列,能夠將不算緊急、可是很是消耗資源的計算任務,以消息的方式插入到RabbitMQ的隊列中,而後使用多個處理模塊處理這些消息。github

這樣作最大的好處在於:提升了系統峯值處理能力。由於,來不及處理的消息緩存在RabbitMQ中,避免了同時進行大量計算致使系統因超負荷運行而崩潰。而那些來不及處理的消息,會在峯值過去以後慢慢處理掉。docker

另外一個好處在於解耦。消息的生產者只須要將消息發送給RabbitMQ,這些消息何時處理完,不會影響生產者的響應性能。api

廣告:歡迎免費試用Fundebug,爲您監控線上代碼的BUG,提升用戶體驗~promise

安裝並運行RabbitMQ

使用Docker運行RabbitMQ很是簡單,只須要執行一條簡單的命令:緩存

sudo docker run -d --name rabbitmq -h rabbitmq -p 5672:5672 -v /var/lib/rabbitmq:/var/lib/rabbitmq registry.docker-cn.com/library/rabbitmq:3.7

對於不熟悉Docker的朋友,我解釋一下docker的命令選項:

  • -d : 後臺運行容器
  • --name rabbitmq : 將容器的名字設爲rabbitmq
  • -h rabbitmq : 將容器的主機名設爲rabbitmq,但願RabbitMQ消息數據持久化保存到本地磁盤是須要設置主機名,由於RabbitMQ保存數據的目錄爲主機名
  • -p 5672:5672 : 將容器的5672端口映射爲本地主機的5672端口,這樣能夠經過本地的5672端口訪問rabbitmq
  • -v /var/lib/rabbitmq:/var/lib/rabbitmq:將容器的/var/lib/rabbitmq目錄映射爲本地主機的/var/lib/rabbitmq目錄,這樣能夠將RabbitMQ消息數據持久化保存到本地磁盤,即便RabbitMQ容器被刪除,數據依然還在。

Docker爲官方鏡像提供了加速服務,所以命令中Rabbit的Docker鏡像名爲registry.docker-cn.com/library/rabbitmq:3.7

若是你不會Docker,建議你學習一下。若是你不想學,Ubuntu 14.04下安裝RabbitMQ的命令是這樣的:

sudo echo "deb http://www.rabbitmq.com/debian testing main" | sudo tee -a /etc/apt/sources.list
wget -O- https://www.rabbitmq.com/rabbitmq-release-signing-key.asc | sudo apt-key add -
sudo apt-get update
sudo apt-get install rabbitmq-server

啓動RabbitMQ:

sudo service rabbitmq-server start

消息隊列代碼示例

下面,咱們使用Node.js實現一個簡單消息隊列。

clipboard.png

消息的生產者:sender.js

const amqp = require("amqplib");

const queue = "demo";

async function sendMessage(message)
{
    const connection = await amqp.connect("amqp://localhost");
    const channel = await connection.createChannel();
    await channel.assertQueue(queue);
    await channel.sendToQueue(queue, new Buffer(message),
    {
        // RabbitMQ關閉時,消息會被保存到磁盤
        persistent: true
    });
}


setInterval(function()
{
    sendMessage("Hello, Fundebug!");
}, 1000)
  • 在sender中,不斷地往消息隊列中發送"Hello, Fundebug!"。

消息的消費者:receiver.js

const amqp = require("amqplib");

const queue = "demo";

async function receiveMessage()
{
    const connection = await amqp.connect("amqp://localhost");
    const channel = await connection.createChannel();
    await channel.assertQueue(queue);
    await channel.consume(queue, function(message)
    {
        console.log(message.content.toString());
        channel.ack(message);
    });
}

receiveMessage();
  • 在receiver中,從消息隊列中讀出message並打印。

咱們用到了amqplib模塊,用於與RabbitMQ進行通訊,對於具體接口的細節,能夠查看文檔

在調用sendToQueue時,將persistent屬性設爲true,這樣RabbitMQ關閉時,消息會被保存到磁盤。測試這一點很簡單:

  • 關閉receiver
  • 啓動sender,發送消息給RabbitMQ
  • 重啓RabbitMQ(sudo docker restart rabbitmq)
  • 啓動receiver,會發現它能夠接收sender在RabbitMQ重啓以前發送的消息

因爲RabbitMQ容器將保存數據的目錄(/var/lib/rabbitmq)以數據卷的形式保存在本地主機,所以即便將RabbitMQ容器刪除(sudo docker rm -f rabbitmq)後從新運行,效果也是同樣的。

另外,這段代碼採用了Node.js最新的異步代碼編寫方式:Async/Await,所以很是簡潔,感興趣的同窗能夠了解一下。

這個Demo的運行方式很是簡單:

  • 運行RabbitMQ容器
sudo ./start_rabbitmq.sh
  • 發送消息
node ./sender.js
  • 接收消息
node ./receiver.js

在receiver端,能夠看到不停地打印"Hello, Fundebug!"。

代碼倉庫地址爲:Fundebug/rabbitmq-demo

自動重連代碼示例

在生產環境中,RabbitMQ不免會出現重啓的狀況,好比更換磁盤或者服務器、負載太高致使崩潰。由於RabbitMQ能夠將消息寫入磁盤,因此數據是"安全"的。可是,代碼中必須實現自動重連機制,不然RabbitMQ中止時會致使Node.js應用崩潰。這裏提供一個自動重連的代碼示例,給你們參考:

消息生產者:sender_reconnect.js

const amqp = require("amqplib");

const queue = "demo";

var connection;

// 鏈接RabbitMQ
async function connectRabbitMQ()
{
    try
    {
        connection = await amqp.connect("amqp://localhost");
        console.info("connect to RabbitMQ success");

        const channel = await connection.createChannel();
        await channel.assertQueue(queue);
        await channel.sendToQueue(queue, new Buffer("Hello, Fundebug!"),
        {
            // RabbitMQ重啓時,消息會被保存到磁盤
            persistent: true
        });

        connection.on("error", function(err)
        {
            console.log(err);
            setTimeout(connectRabbitMQ, 10000);
        });

        connection.on("close", function()
        {
            console.error("connection to RabbitQM closed!");
            setTimeout(connectRabbitMQ, 10000);
        });

    }
    catch (err)
    {
        console.error(err);
        setTimeout(connectRabbitMQ, 10000);
    }
}


connectRabbitMQ();

消息消費者:receiver_reconnect.js

const amqp = require("amqplib");

const queue = "demo";

var connection;

// 鏈接RabbitMQ
async function connectRabbitMQ()
{
    try
    {
        connection = await amqp.connect("amqp://localhost");
        console.info("connect to RabbitMQ success");

        const channel = await connection.createChannel();
        await channel.assertQueue(queue);
        await channel.consume(queue, async function(message)
        {
            console.log(message.content.toString());
            channel.ack(message);
        });

        connection.on("error", function(err)
        {
            console.log(err);
            setTimeout(connectRabbitMQ, 10000);
        });

        connection.on("close", function()
        {
            console.error("connection to RabbitQM closed!");
            setTimeout(connectRabbitMQ, 10000);
        });

    }
    catch (err)
    {
        console.error(err);
        setTimeout(connectRabbitMQ, 10000);
    }
}


connectRabbitMQ();

這樣的話,即便RabbitMQ重啓,sender和receiver也能夠自動從新鏈接RabbitMQ。若是你但願監控RabbitMQ是否出錯,不妨使用咱們Fundebug的Node.js錯誤監控服務,在鏈接觸發"error"或者"close"事件時,第一時間發送報警,這樣開發者能夠及時定位和處理BUG。

參考

相關文章
相關標籤/搜索