環境:html
MacOS 10.14node
Node.js 8.9.1shell
目前有個上線應用會接受多個請求,且每一個請求的處理時間可能好久,可能到數小時,因此就想採用異步機制,至於複雜的運算就用消息隊列(MQ)去慢慢消化。npm
網上調研了一圈,遂採用RabbitMQ。api
brew install rabbitmq
export PATH=$PATH:/usr/local/opt/rabbitmq/sbin
bash
rabbitmq-server
服務器
啓動須要(默認)200 MB的磁盤空間,但能夠經過配置文件裏的
disk_free_limit
修改。異步
以 Node.js 爲例:fetch
npm i amqplib
ui
https://www.npmjs.com/package/amqplib
var amqp = require('amqplib/callback_api');
amqp.connect('amqp://localhost', function(error0, connection) { if (error0) { throw error0; } // …… // connection.close(); });
通道分爲:
生產者(發送者)
消費者(接收者)
connection.createChannel(function(error1, channel) { if (error1) { throw error1; } // channel …… });
隊列裏面塞入的是消息
。
生產者
var queue = 'queue_name'; # 建立or連上隊列 channel.assertQueue(queue, { durable: true # 隊列持久化 }); # 臨時隊列(當前 connection 斷掉後就會被刪除)—— 隊列名隨機 channel.assertQueue('',{ exclusive:true }); # 將消息塞入隊列 channel.sendToQueue(queue, Buffer.from(msg), { persistent: true # 消息持久化 });
一個是防止服務器端的隊列丟失,一個是防止服務器端的隊列裏的消息丟失。
可是這並不能避免:若是服務器端在RabbitMQ接受消息的過程當中掛了致使的消息丟失。若是須要更強的保證,可使用 發佈者確認。
消費者
var queue = 'queue_name'; # 從隊列取出消息 channel.consume(queue, function(msg) { channel.ack(msg); # 發送確認信號 }, { noAck: false });
noAck: true
則服務器端不會指望收到 ACK,也就是說,消息在被髮送後會當即出列。
而 noAck: false
則須要消費者發送 ACK,即channel.ack(msg);
,但若是超時未回覆 ACK,消息會從新排隊(但若是同時有其餘可用消費者,則會迅速安排過去)
查看當前有多少隊列及各中有多少消息:
sudo rabbitmqctl list_queues
channel.prefetch(1); # 表示這個通道若是有{1}個未完成的消息,則不會接受新的消息
若是有多個隊列,生產者的消息應該如何分配呢?這個時候就須要一箇中間件——交換
其中交換類型
有四種:「」(默認交換), topic, headers, fanout
RabbitMQ中消息傳遞模型的核心思想是生產者永遠不會將任何消息直接發送到隊列。因此不建議使用channel.sendToQueue(),此爲 「」(默認交換)。
若是沒有隊列綁定到交換,消息將會丟失。
生產者
var exchange = 'logs'; # 建立or連上交換 channel.assertExchange(exchange, 'fanout', { durable: false # 持久化 }); ### 推消息給交換 channel.publish(exchange, '', Buffer.from(msg));
消費者
var exchange = 'logs'; # 建立or連上交換 channel.assertExchange(exchange, 'fanout', { durable: false # 持久化 }); # ------------------------ # 綁定 交換+隊列 channel.bindQueue('queue_1', exchange, ''); channel.bindQueue('queue_2', exchange, ''); channel.bindQueue('queue_3', exchange, '');
queue_一、queue_二、queue_3 都會收到相同的一條消息。
生產者
var exchange = 'logs'; # 建立or連上交換 channel.assertExchange(exchange, 'fanout', { durable: false # 持久化 }); ### 推消息給交換 channel.publish(exchange, 'black', Buffer.from(msg));
消費者
var exchange = 'logs'; # 建立or連上交換 channel.assertExchange(exchange, 'fanout', { durable: false # 持久化 }); # ------------------------ # 綁定 交換+隊列 channel.bindQueue('queue_1', exchange, 'white'); channel.bindQueue('queue_2', exchange, 'black'); channel.bindQueue('queue_3', exchange, 'red');
只有 queue_2 纔會收到消息。
生產者
var exchange = 'logs'; # 建立or連上交換 channel.assertExchange(exchange, 'fanout', { durable: false # 持久化 }); ### 推消息給交換 channel.publish(exchange, 'kern.critical', Buffer.from(msg));
消費者
var exchange = 'logs'; # 建立or連上交換 channel.assertExchange(exchange, 'fanout', { durable: false # 持久化 }); # ------------------------ # 綁定 交換+隊列 channel.bindQueue('queue_1', exchange, '#'); channel.bindQueue('queue_2', exchange, "kern.*"); channel.bindQueue('queue_3', exchange, "*.critical");
sudo rabbitmqctl list_exchanges
sudo rabbitmqctl list_bindings
生產者只管發送消息就好 (好比發送消息給隊列或者交換)
消費者要負責接受消息之外的更多事 (好比負責隊列的 prefetch 設置,或者交換的綁定)
略
例如能夠用到日誌系統中:對全部等級的日誌都打印到控制檯(即下面的隊列),而 error 日誌單獨持久化到 disk(即上面的隊列)。
一、官方RabbitMQ教程
https://www.rabbitmq.com/getstarted.html
二、amqp.node 參考API
https://www.squaremobius.net/amqp.node/channel_api.html#channel_ack