我司的系統,用戶能夠建立任務,啓動任務,但任務的運行須要很長的時間,因此採用消息隊列的方式,後臺異步處理。前端
這裏所用到的是 RabbitMQ
,對應的 Node.js 庫爲 amqplib
( 這裏採用的是回調形式:require("amqplib/callback_api") )。數據庫
① ② ③ ④ ⑤ :從前端發來 HTTP 請求,被 Producer(express) 處理,通過 Route -> Controller -> Function ,使用 amqplib 的 sendToQueue(),發送須要處理的任務的 uuid 入 MQ 隊列。這時候,還要修改數據庫,把該任務的狀態從 "new" -> "queue"。express
⑥ ⑦ ⑧ ⑨ ⑩ ⑪ : Consumer 消化 MQ 隊列吐出來的 message,即任務的 uuid。先修改數據庫該任務的狀態爲"runnning", 而後調用"處理"模塊去執行復雜運算,執行完成後,修改數據庫該任務的狀態是 "success" 仍是 」fail」,而後返回 ack 信號給 MQ 。api
注:此次的需求比較簡單,因此沒有用到 MQ 的交換機功能。安全
MQ 的 connection 和 channel 對象都有 "error" 和 "close" 事件,需作好相關的日誌記錄。尤爲是 "error",要加上 reconnect 機制,防止由於某個任務致使的錯誤或者 MQ 自身的緣由,影響到後續任務的處理。服務器
connection.on("error", function(err) { // reconnect }); channel.on("error", function(err) { // reconnect });
最後能夠根據實際須要,在全局加上 try……catch。運維
爲了防止 MQ server 的崩潰致使的消息損失,須要對數據作持久化。大體分兩塊:異步
隊列持久化 + 消息持久化
ui
channel.assertQueue(queue_name, { durable: true // 隊列持久化 });
channel.sendToQueue( queue_name, Buffer.from(uuid), { persistent: true // 消息持久化 }, function(err, ok) { } );
④ 中的 sendToQueue() ,須要在 createConfirmChannel() 的基礎下使用,這樣 sendToQueue() 的第三個參數纔有 MQ 收到 message 成功與否的回調,根據這個,去結合 ② 的 DB 操做, 綁定爲事務,來保證數據的一致性。日誌
channel.consume() 需開啓 ack 模式,等 Consumer 端一切確認完成後,再通知 MQ 。
channel.consume( queue_name, function(msg) { const uuid = msg.content.toString(); // use uuid todo…… }, { noAck: false } );
從上面的 如何保證 DB 跟 MQ 數據的一致性? 其實就避免了該問題的發生。
可是額外要作的是:
一、重試機制,例如 發送 message 失敗,規定重試的次數。
二、善用 MQ 的 Web 控制檯,地址形如 http://localhost:15672。除了關注基本的服務器負載狀態,還要關注任務隊列是否正常吞吐,是否有卡殼。
三、構建運維一體的後臺管控系統,比上面的 2 自定義程度更高。
四、提供用戶相似"提交工單"/"問題反饋"/"錯誤上傳"的功能,查缺補漏。