RabbitMQ 學習筆記

環境:html

MacOS 10.14node

Node.js 8.9.1shell

零、背景


目前有個上線應用會接受多個請求,且每一個請求的處理時間可能好久,可能到數小時,因此就想採用異步機制,至於複雜的運算就用消息隊列(MQ)去慢慢消化。npm

網上調研了一圈,遂採用RabbitMQ。api

1、安裝


一、安裝

brew install rabbitmq

二、配置環境變量

export PATH=$PATH:/usr/local/opt/rabbitmq/sbinbash

三、使用

(1) 服務器端

rabbitmq-server服務器

啓動須要(默認)200 MB的磁盤空間,但能夠經過配置文件裏的 disk_free_limit 修改。異步

(2) 客戶端

以 Node.js 爲例:fetch

npm i amqplibui

https://www.npmjs.com/package/amqplib

var amqp = require('amqplib/callback_api');

2、使用


一、connection —— 鏈接

amqp.connect('amqp://localhost', function(error0, connection) {
    if (error0) {
        throw error0;
    } 
    // …… 
    // connection.close(); 
});

二、channel —— 通道

通道分爲:

生產者(發送者)

消費者(接收者)

connection.createChannel(function(error1, channel) {
        if (error1) {
            throw error1;
    }
        // channel …… 
});
(1) queue —— 隊列

隊列裏面塞入的是消息

生產者

var queue = 'queue_name';


# 建立or連上隊列
channel.assertQueue(queue, {
        durable: true               # 隊列持久化
}); 
# 臨時隊列(當前 connection 斷掉後就會被刪除)—— 隊列名隨機
channel.assertQueue('',{ exclusive:true });


# 將消息塞入隊列
channel.sendToQueue(queue, Buffer.from(msg), {
    persistent: true        # 消息持久化
});
關於持久化:

一個是防止服務器端的隊列丟失,一個是防止服務器端的隊列裏的消息丟失。

可是這並不能避免:若是服務器端在RabbitMQ接受消息的過程當中掛了致使的消息丟失。若是須要更強的保證,可使用 發佈者確認

消費者

var queue = 'queue_name';

# 從隊列取出消息
channel.consume(queue, function(msg) {
        channel.ack(msg);   # 發送確認信號
}, {
    noAck: false    
});
關於 ACK ( Acknowledgement )

noAck: true 則服務器端不會指望收到 ACK,也就是說,消息在被髮送後會當即出列。

noAck: false 則須要消費者發送 ACK,即channel.ack(msg);,但若是超時未回覆 ACK,消息會從新排隊(但若是同時有其餘可用消費者,則會迅速安排過去)

查看當前有多少隊列及各中有多少消息: sudo rabbitmqctl list_queues

(2) prefetch —— 預取
channel.prefetch(1);
# 表示這個通道若是有{1}個未完成的消息,則不會接受新的消息
(3) exchange —— 交換

若是有多個隊列,生產者的消息應該如何分配呢?這個時候就須要一箇中間件——交換

其中交換類型有四種:「」(默認交換), topic, headers, fanout

A、 「」(默認交換)

RabbitMQ中消息傳遞模型的核心思想是生產者永遠不會將任何消息直接發送到隊列。因此不建議使用channel.sendToQueue(),此爲 「」(默認交換)。

若是沒有隊列綁定到交換,消息將會丟失。

B、fanout(廣播)

生產者

var exchange = 'logs';

# 建立or連上交換
channel.assertExchange(exchange, 'fanout', {
    durable: false  # 持久化
}); 

### 推消息給交換
channel.publish(exchange, '', Buffer.from(msg));

消費者

var exchange = 'logs';

# 建立or連上交換
channel.assertExchange(exchange, 'fanout', {
    durable: false  # 持久化
});

# ------------------------

# 綁定 交換+隊列
channel.bindQueue('queue_1', exchange, ''); 
channel.bindQueue('queue_2', exchange, ''); 
channel.bindQueue('queue_3', exchange, '');

queue_一、queue_二、queue_3 都會收到相同的一條消息。

C、direct (直接)

生產者

var exchange = 'logs';

# 建立or連上交換
channel.assertExchange(exchange, 'fanout', {
    durable: false  # 持久化
}); 

### 推消息給交換
channel.publish(exchange, 'black', Buffer.from(msg));

消費者

var exchange = 'logs';

# 建立or連上交換
channel.assertExchange(exchange, 'fanout', {
    durable: false  # 持久化
});

# ------------------------
 
# 綁定 交換+隊列
channel.bindQueue('queue_1', exchange, 'white');
channel.bindQueue('queue_2', exchange, 'black');
channel.bindQueue('queue_3', exchange, 'red');

只有 queue_2 纔會收到消息。

D、topic

生產者

var exchange = 'logs';

# 建立or連上交換
channel.assertExchange(exchange, 'fanout', {
    durable: false  # 持久化
}); 

### 推消息給交換
channel.publish(exchange, 'kern.critical', Buffer.from(msg));

消費者

var exchange = 'logs';

# 建立or連上交換
channel.assertExchange(exchange, 'fanout', {
    durable: false  # 持久化
});

# ------------------------
 
# 綁定 交換+隊列
channel.bindQueue('queue_1', exchange, '#');
channel.bindQueue('queue_2', exchange, "kern.*");
channel.bindQueue('queue_3', exchange, "*.critical");
  • *(星號)能夠替代一個單詞。
  • #(hash)能夠替換零個或多個單詞。

查看全部的 交換 及 交換綁定隊列
sudo rabbitmqctl list_exchanges
sudo rabbitmqctl list_bindings
代碼職責風格:

生產者只管發送消息就好 (好比發送消息給隊列或者交換)

消費者要負責接受消息之外的更多事 (好比負責隊列的 prefetch 設置,或者交換的綁定)

三、遠程過程調用(RPC)

3、應用

例如能夠用到日誌系統中:對全部等級的日誌都打印到控制檯(即下面的隊列),而 error 日誌單獨持久化到 disk(即上面的隊列)。


參考資料

一、官方RabbitMQ教程

https://www.rabbitmq.com/getstarted.html

二、amqp.node 參考API

https://www.squaremobius.net/amqp.node/channel_api.html#channel_ack

相關文章
相關標籤/搜索