在Node.js中使用RabbitMQ系列二 任務隊列

在上一篇文章在Node.js中使用RabbitMQ系列一 Hello world我有使用一個任務隊列,不過當時的場景是將消息發送給一個消費者,本篇文章我將討論有多個消費者的場景。javascript

其實,任務隊列最核心解決的問題是避免當即處理那些耗時的任務,也就是避免請求-響應的這種同步模式。取而代之的是咱們經過調度算法,讓這些耗時的任務以後再執行,也就是採用異步的模式。咱們須要將一條消息封裝成一個任務,而且將它添加到任務隊列裏面。後臺會運行多個工做進程(worker process),經過調度算法,將隊列裏的任務依次彈出來,並交給其中的一個工做進程進行處理執行。這個概念尤爲適合那些HTTP短鏈接的web應用,它們沒法在短期內處理這種複雜的任務。html

準備工做

咱們將字符串類型的消息看做是耗時的任務,而且每一個字符串消息最後帶上一些點。每一個點表明該任務須要消耗的秒數。在worker進程處理的時候能夠採用setTimeout函數來進行模擬。舉個例子:一個僞造的耗時任務是Hello.,則這個任務會消耗1秒,一個僞造的耗時任務是Hello..,則這個任務會消耗2秒,一個僞造的耗時任務是Hello... ,而且這個任務的處理時間會耗費3秒。java

這裏稍微對上篇文章的send.js文件修改,讓它能夠發送用戶自定義的任意的消息,這個程序會將任務交給任務隊列,咱們用new_task.js來進行命名:web

var q = 'task_queue';
var msg = process.argv.slice(2).join(' ') || "Hello World!";

ch.assertQueue(q, {durable: true});
ch.sendToQueue(q, new Buffer(msg), {persistent: true});
console.log(" [x] Sent '%s'", msg);

咱們的receive.js文件也要進行修改,它須要僞造任務的處理時間,讓字符串類型的任務看起來須要幾秒鐘時間(具體取決於 . 的個數)。算法

ch.consume(q, function(msg) {
  var secs = msg.content.toString().split('.').length - 1;

  console.log(" [x] Received %s", msg.content.toString());
  setTimeout(function() {
    console.log(" [x] Done");
  }, secs * 1000);
}, {noAck: true});

這裏以一個worker工做進程爲例子,咱們先在左側的shell窗口開啓一個worker進程,以後在右側shell窗口發送自定義的消息,執行看看效果:
shell

輪詢調度算法(Round-robin)

使用任務隊列的一個優點是可以將任務並行執行,若是須要處理大量的積壓任務,咱們只須要像上面運行worker進程的方式,增長更多的worker,這個讓可伸縮變得更加的容易。
首先,咱們使用item2開啓2個shell窗口,並在裏面運行兩個worker進程,可是究竟是哪一個worker會對消息進行處理呢?這裏咱們能夠作個簡單的實驗來看看,以下圖所示,左側是兩個worker進程,右側是消息發送端:
api

默認狀況下,RabbitMQ會採用Round-robin算法來分發任務隊列中的任務,每次分發的時候都會將任務派發給下一個消費者,這樣每一個消費者(worker進程)處理的任務數量實際上是同樣多的。服務器

消息確認

處理一個複雜的任務須要耗費很長時間,這個時間段裏面,可能咱們的worker進程因爲某種緣由掛掉了,這種異常狀況是須要考慮的。可是咱們現有的代碼裏面並無作這種異常的處理,當RabbitMQ將任務派發給worker進程以後,咱們當即將這個任務從內存中剔除掉了,設想下,假設worker收到消息以後,咱們立刻將進程殺死掉,這個時候任務並無被成功執行的。同時,咱們也會丟失全部派發到這個worker進程可是尚未被處理的任務信息。
可是,咱們並不想丟掉任何一個任務,若是一個worker進程掛掉,咱們更但願可以將這個任務派發給其它的worker來處理。
爲了不任務信息丟失的狀況,RabbitMQ支持消息確認。在一個任務發送到了worker進程而且被成功處理完畢以後,一個ack (消息確認)的標識會從消費者發回來告訴RabbitMQ這個任務已經被處理完了,能夠將它刪除了。
若是一個消費者掛掉了(常見的緣由如消息通道關閉了,鏈接丟失,TCP鏈接丟失),沒有向RabbitMQ發送消息確認這個ack的標識,這個時候RabbitMQ會將它重新加入到隊列中,若是有其它消費者存在,那麼RabbitMQ會立刻將這個任務從新派發下去。以前的例子裏面咱們並無開啓消息確認這個選項,如今咱們能夠經過{noAck: false}來開啓:異步

ch.consume(q, function(msg) {
  var secs = msg.content.toString().split('.').length - 1;

  console.log(" [x] Received %s", msg.content.toString());
  setTimeout(function() {
    console.log(" [x] Done");
    ch.ack(msg);
  }, secs * 1000);
}, {noAck: false});     // 開啓消息確認標識

能夠用CTRL + C來作個實驗看看效果。函數

消息持久化

剛剛談到,若是一個worker進程掛掉了,不讓消息丟失的作法。可是,若是整個RabbitMQ的服務器掛掉了呢?當一個RabbitMQ服務退出或者中斷的狀況下,它會忘記任務隊列裏面的消息除非你告訴它不要丟掉,即咱們通知RabbitMQ任務隊列和這些任務都是須要持久化的。

首先,咱們須要確保RabbitMQ永遠不會丟失掉咱們的任務隊列。

ch.assertQueue('hello', {durable: true});

可是,你會發現這樣並無效果,那是由於hello這個隊列咱們已經定義過,而且指定了它不須要持久化。RabbitMQ不容許咱們經過改變參數配置的方式對已經存在的任務隊列進行從新定義,所以咱們須要定義一個新的任務隊列。

ch.assertQueue('task_queue', {durable: true});

這行代碼須要同時在生產者和消費者裏面的相關代碼的地方進行修改。

接下來,咱們須要經過配置persistent 選項讓咱們發送的消息也是持久化的。

ch.sendToQueue(q, new Buffer(msg), {persistent: true});

公平調度

前面的例子,咱們討論了RabbitMQ的調度方式,即採用Round-robin輪詢調度算法,所以它會將消息均勻的分配給每一個worker進程。RabbitMQ並不會關注每一個worker進程有多少個消息沒有確認,它只會不斷的給你派發任務,無論你能不能處理的過來。這個時候,問題就出現了,設想下,假設有2個worker,其中1個worker恰好很不幸被分配了一個很是複雜的任務,可能須要耗費好幾個小時的時間,另一個worker被分配的任務都比較簡單,只須要幾分鐘就能處理完,因爲RabbitMQ的任務分配問題,有不少新的任務依然會分配到那個正在處理很耗時任務的worker上面,這個worker後面的任務都會處於等待狀態。幸虧,RabbitMQ能夠經過prefetch(1)來指定某個worker同時最多隻會派發到1個任務,一旦任務處理完成發送了確認通知,纔會有新的任務派發過來。

ch.prefetch(1);

最終的代碼

new_task.js 代碼:

var amqp = require('amqplib/callback_api');

amqp.connect('amqp://localhost', function(err, conn) {
  conn.createChannel(function(err, ch) {
    var q = 'task_queue';
    var msg = process.argv.slice(2).join(' ') || "Hello World!";

    ch.assertQueue(q, {durable: true});
    ch.sendToQueue(q, new Buffer(msg), {persistent: true});
    console.log(" [x] Sent '%s'", msg);
  });
  setTimeout(function() { conn.close(); process.exit(0) }, 500);
});

worker.js:

var amqp = require('amqplib/callback_api');

amqp.connect('amqp://localhost', function(err, conn) {
  conn.createChannel(function(err, ch) {
    var q = 'task_queue';

    ch.assertQueue(q, {durable: true});
    ch.prefetch(1);
    console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", q);
    ch.consume(q, function(msg) {
      var secs = msg.content.toString().split('.').length - 1;

      console.log(" [x] Received %s", msg.content.toString());
      setTimeout(function() {
        console.log(" [x] Done");
        ch.ack(msg);
      }, secs * 1000);
    }, {noAck: false});
  });
});
相關文章
相關標籤/搜索