RabbitMQ 實踐之在處理異步任務中的流程

1、背景:

我司的系統,用戶能夠建立任務,啓動任務,但任務的運行須要很長的時間,因此採用消息隊列的方式,後臺異步處理。前端

這裏所用到的是 RabbitMQ ,對應的 Node.js 庫爲 amqplib ( 這裏採用的是回調形式:require("amqplib/callback_api") )。數據庫

2、MQ 處理任務的流程


① ② ③ ④ ⑤ :從前端發來 HTTP 請求,被 Producer(express) 處理,通過 Route -> Controller -> Function ,使用 amqplib 的 sendToQueue(),發送須要處理的任務的 uuid 入 MQ 隊列。這時候,還要修改數據庫,把該任務的狀態從 "new" -> "queue"。express

⑥ ⑦ ⑧ ⑨ ⑩ ⑪ : Consumer 消化 MQ 隊列吐出來的 message,即任務的 uuid。先修改數據庫該任務的狀態爲"runnning", 而後調用"處理"模塊去執行復雜運算,執行完成後,修改數據庫該任務的狀態是 "success" 仍是 」fail」,而後返回 ack 信號給 MQ 。api

注:此次的需求比較簡單,因此沒有用到 MQ 的交換機功能。安全

3、問與答


Q:如何作好 MQ 的錯誤處理?

MQ 的 connection 和 channel 對象都有 "error" 和 "close" 事件,需作好相關的日誌記錄。尤爲是 "error",要加上 reconnect 機制,防止由於某個任務致使的錯誤或者 MQ 自身的緣由,影響到後續任務的處理。服務器

connection.on("error", function(err) {
        // reconnect 
});

channel.on("error", function(err) {
        // reconnect 
});

最後能夠根據實際須要,在全局加上 try……catch。運維

Q:如何保證 MQ 自身消息的數據安全?

爲了防止 MQ server 的崩潰致使的消息損失,須要對數據作持久化。大體分兩塊:異步

隊列持久化 + 消息持久化ui

channel.assertQueue(queue_name, {
        durable: true
        // 隊列持久化
});
channel.sendToQueue(
          queue_name,
          Buffer.from(uuid),
          {
            persistent: true
            // 消息持久化
          },
          function(err, ok) { 
          
          }
);

Q:如何保證 DB 跟 MQ 數據的一致性?

一、發送 message 時

④ 中的 sendToQueue() ,須要在 createConfirmChannel() 的基礎下使用,這樣 sendToQueue() 的第三個參數纔有 MQ 收到 message 成功與否的回調,根據這個,去結合 ② 的 DB 操做, 綁定爲事務,來保證數據的一致性。日誌

二、接受 message 時

channel.consume() 需開啓 ack 模式,等 Consumer 端一切確認完成後,再通知 MQ 。

channel.consume(
        queue_name,
        function(msg) {
          const uuid = msg.content.toString();
                // use uuid todo…… 
        },
        {
          noAck: false
        }
);

Q:如何避免 MQ 多發、少發的問題

從上面的 如何保證 DB 跟 MQ 數據的一致性? 其實就避免了該問題的發生。

可是額外要作的是:

一、重試機制,例如 發送 message 失敗,規定重試的次數。

二、善用 MQ 的 Web 控制檯,地址形如 http://localhost:15672。除了關注基本的服務器負載狀態,還要關注任務隊列是否正常吞吐,是否有卡殼。

三、構建運維一體的後臺管控系統,比上面的 2 自定義程度更高。

四、提供用戶相似"提交工單"/"問題反饋"/"錯誤上傳"的功能,查缺補漏。

相關文章
相關標籤/搜索