[譯] RabbitMQ tutorials (2) ---- 'work queue' (JavaScript)

工做隊列

work queue

在第一篇中,咱們寫了一個程序從已經聲明的隊列中收發消息,在這篇中,咱們會建立一個工做隊列(Work Queue)來分發works裏面的耗時任務。
其主要思想就是避免當即執行耗資源的任務,並等待它完成。相反的,咱們要讓這些任務在稍後的一個時間執行。咱們把任務封裝成一個消息放到隊列中。
一個工做進程會在後臺執行,取出(Pop)任務並最終會完成這項任務,當你運行多個work的時候,這些任務會在它們之間共享。javascript

這個概念在web應用中也是很是有用的,當在一個http請求窗口中不可能完成一個複雜的任務時候。html

準備

在以前的引導中,咱們發送了一個’Hello World‘的消息。如今咱們要發送一個字符串表明一個複雜的任務,咱們沒有像調整
圖片大小或者渲染一個pdf文件這樣的在真實場景中的任務,因此咱們使用setTimeout來模擬咱們正處於忙碌狀態。咱們把‘.’的數量表明這個字符串的複雜度;
每個‘.' 會消耗一秒鐘,例:一個模擬的任務'Hello...' 會消耗三秒鐘。java

從以前的例子,咱們稍稍修改一下send.js 的代碼,容許命令行能夠發送任意的消息。這個程序在工做隊列中安排好任務,因此咱們稱它new_task.jsnode

var q = 'task_queue';
var msg = process.argv.slice(2).join(' ') || "Hello World!";

ch.assertQueue(q, {durable: true});
ch.sendToQueue(q, new Buffer(msg), {persistent: true});
console.log(" [x] Sent '%s'", msg);

咱們的以前的receive.js一樣須要一些改變,須要對消息內容中的每一個'.'模擬成一個會消耗一秒的任務。它要從隊列中取出一條消息並執行這個任務,咱們把它稱做worker.jsgit

ch.consume(q, function(msg) {
    var secs = msg.content.toString().split('.').length - 1;

    console.log(" [x] Received %s", msg.content.toString());
    setTimeout(function() {
    console.log(" [x] Done");
    }, secs * 1000);
}, {noAck: true});

注意咱們模擬的執行時間
執行咱們的程序github

shell1$ ./worker.js
shell2$ ./new_task.js

循環調度

使用任務隊列(Task Queue)的其中的一個優點是有簡化並行工做的能力。若是咱們有不少堆積的未完成的任務,咱們只需添加更多的worker來進行擴展。web

首先,咱們嘗試同時啓動兩個worker.js,他們都會從隊列中受到消息,可是實際上呢?咱們來看看shell

你須要打開第三個命令行,兩個來運行worker.js腳本,咱們稱做C1,C2api

shell1$ ./worker.js
 [*] Waiting for messages. To exit press CTRL+C
shell2$ ./worker.js
 [*] Waiting for messages. To exit press CTRL+C

在第三個命令行工具中,咱們會發布新的任務,一旦你啓動消費者,你能夠發佈一些消息:緩存

shell3$ ./new_task.js First message.
shell3$ ./new_task.js Second message..
shell3$ ./new_task.js Third message...
shell3$ ./new_task.js Fourth message....
shell3$ ./new_task.js Fifth message.....

讓咱們看看什麼被分發到咱們的worker

shell1$ ./worker.js
 [*] Waiting for messages. To exit press CTRL+C
 [x] Received 'First message.'
 [x] Received 'Third message...'
 [x] Received 'Fifth message.....'
shell2$ ./worker.js
[*] Waiting for messages. To exit press CTRL+C
[x] Received 'Second message..'
[x] Received 'Fourth message....'

默認狀況下,RabbitMQ會依次地把消息推送到下一個消費者,平均每一個消費者會獲得相同數量的消息。這樣的消息分發機制稱做輪詢。能夠嘗試3個或更多的worker。

## 消息確認 (Message acknowledgment)

要完成一個任務須要一些事件,你可能會想,當一個消費者開始執行一個長的任務但只執行一部分就die了會發生什麼。就咱們當前的代碼,一旦RabbitMQ 分發了一條消息到消費者那邊,就會當即從存儲中移除這條消息。這樣的話,若是你殺掉了進程,咱們將會丟失這條正在被處理的消息。
咱們也一樣丟失了咱們發送給這個進程的但還沒被處理的消息。

可是咱們不想丟失任何的任務,若是一個進程掛掉,咱們但願這個任務會被分發到其餘的進程。

爲了確保每一條消息毫不會丟失,RabbitMQ支持 消息確認,一個ack標誌會從消費者那邊返回去通知RabbitMQ當前的這個消息已經收到而且已經完成,因而RabbitMQ就能夠取刪掉這個任務了。

若是一個消費者掛了(通道被關閉,鏈接關閉,或者TCP鏈接丟失)而沒有發送ack標誌,RabbitMQ會明白這條任務還沒被執行完,並會從新放回隊列中,若是當時有其餘的消費者在線,這個消息會被快速地發送給其餘的消費者。這樣的話你就能夠保證沒有消息會遺失,即便進程只是偶爾會掛掉。

無論消息處理是否超時,RabbitMQ只會在消費者掛掉的時候從新分發消息。這對於那些要處理好久好久的消息也是好的(add:不會被斷定爲noack,而從新分發)

在以前的例子中,消息確認是被關閉的,是時候打開它了,使用{noAck: false}(你也能夠移除這個操做選項)選項,當咱們完成這個任務的時候發送一個正確的消息確認。

ch.consume(q, function(msg) {
    var secs = msg.content.toString().split('.').length - 1;

    console.log(" [x] Received %s", msg.content.toString());
    setTimeout(function() {
    console.log(" [x] Done");
        ch.ack(msg);
    }, secs * 1000);
}, {noAck: false});

使用這樣的代碼,你能夠肯定即便在它還在處理消息的時候你使用CTRL+C 殺掉進程也不會有數據丟失。在進程掛了以後,全部的未被確認的消息會被從新分發。

## 忘記確認
 這是一個廣泛的錯誤,丟失ack。只是一個簡單的錯誤,但結果確實很嚴重的。當客戶端中止的時候,消息會被從新分發(像是被隨機分發),可是RabbitMQ會佔用愈來愈多的內存當它不能取釋放掉任何未被確認的消息。

 爲了調試這種類型的錯誤,你可使用`rabbitmqctl`來輸出未被確認的消息字段:
    $ sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
    Listing queues ...
    hello    0       0
    ...done.

消息持久化

咱們學習了確保在進程掛掉仍保證任務不會被丟失,但咱們的任務仍是會在RabbitMQ服務中止的時候丟失。

當RabbitMQ 退出或者崩潰,除非你叫它不要丟失,否則隊列和消息都會丟失。爲了使消息不會丟失,兩件事情須要確保,咱們須要持久化隊列和消息。

首先,咱們要讓RabbitMQ 不會丟失隊列,爲此,咱們要先聲明
ch.assertQueue('hello', {durable: true});

儘管這樣的操做是對的,可是在咱們如今的配置中是不起做用的,這是由於咱們已經定義了一個未持久化的叫作hello的隊列,RabbitMQ不容許你改變一個已經存在的隊列的參數,若是你這樣作,程序將會返回錯誤。
可是有一個快速的辦法 --- 讓咱們定義一個新的隊列,叫作task_queue
ch.assertQueue('task_queue', {durable: true});
這個durable選項,須要消費者和生產者都去使用。

此時咱們能保證task_queue隊列不會在RabbitMQ重啓的時候丟失,如今咱們須要對消息進行持久化 --- 使用presistentChannel.sendToQueue選項,
ch.sendToQueue(q, new Buffer(msg), {persistent: true});

注意:消息持久化
消息持久化,不能徹底地保證消息不會丟失,儘管它告訴RabbitMQ要把消息存到磁盤當中,總存在一個RabbitMQ接收到消息,但還未處理完的狀況。另外,RabbitMQ並非對每一個消息作到幀同步,有可能只是被寫到緩存中,還沒被寫到磁盤。
消息持久化不能徹底保證,但已經遠遠知足咱們的簡單的工做隊列的需求,若是你須要更強的持久化的保證,你可使用[publisher confirms](https://www.rabbitmq.com/confirms.html)。

均衡調度(Fair dispatch)

你可能已經注意到如今的調度並非咱們想要的,例:在有兩個worker的狀況下,當全部的奇數消息都是重的而偶數消息是輕量的,那會有一個worker會一直處於忙碌狀態,而另外一個worker幾乎不工做,
RabbitMQ,並不知道這些狀況,只知道持續地均勻地分發消息。

這樣發生的緣由是RabbitMQ只是在消息進入隊列的時候進行分發的工做,無論消費者的未確認的消息的數量,只是一味地分發第N條消息給第N個消費者。

work queue prefetch

爲了解決這樣的問題,咱們使用方法prefetch,並設置值爲1,表示RabbitMQ不會同時給一個worker超過一條消息,即,不會分發一條新的消息直到worker完成而且發送ack標誌。不然,RabbitMQ會把消息發送給下一個不在忙碌狀態的worker.
ch.prefetch(1);

注意隊列的大小
若是全部的worker都處於忙碌狀態,你的隊列能夠被填滿,你可能須要一個監控,或者添加更多的worker,或者有其餘的解決方案。

整合

最後的new_task.js的代碼:

#!/usr/bin/env node

var amqp = require('amqplib/callback_api');

amqp.connect('amqp://localhost', function(err, conn) {
    conn.createChannel(function(err, ch) {
    var q = 'task_queue';
    var msg = process.argv.slice(2).join(' ') || "Hello World!";

    ch.assertQueue(q, {durable: true});
    ch.sendToQueue(q, new Buffer(msg), {persistent: true});
        console.log(" [x] Sent '%s'", msg);
    });
    setTimeout(function() { conn.close(); process.exit(0) }, 500);
});

new_task.js source

worker.js的代碼:

#!/usr/bin/env node

var amqp = require('amqplib/callback_api');
amqp.connect('amqp://localhost', function(err, conn) {
    conn.createChannel(function(err, ch) {
        var q = 'task_queue';

        ch.assertQueue(q, {durable: true});
        ch.prefetch(1);
        console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", q);
        ch.consume(q, function(msg) {
        var secs = msg.content.toString().split('.').length - 1;

        console.log(" [x] Received %s", msg.content.toString());
        setTimeout(function() {
            console.log(" [x] Done");
            ch.ack(msg);
        }, secs * 1000);
        }, {noAck: false});
    });
});

worker.js source

使用消息確認和預處理,你能夠創建一個工做隊列。持久化選項使得消息能夠在RabbitMQ會重啓的狀況下得以保留。
得到更多的關於Channel的方法和消息的屬性,你能夠瀏覽amqplib docs

如今咱們能夠移至第三章,學習如何分發相同的消息給多個消費者。

翻譯:Joursion

日期 :2016/12/25

歡迎交流,學習。

相關文章
相關標籤/搜索