在第一篇中,咱們寫了一個程序從已經聲明的隊列中收發消息,在這篇中,咱們會建立一個工做隊列(Work Queue)來分發works裏面的耗時任務。
其主要思想就是避免當即執行耗資源的任務,並等待它完成。相反的,咱們要讓這些任務在稍後的一個時間執行。咱們把任務封裝成一個消息放到隊列中。
一個工做進程會在後臺執行,取出(Pop)任務並最終會完成這項任務,當你運行多個work的時候,這些任務會在它們之間共享。javascript
這個概念在web應用中也是很是有用的,當在一個http請求窗口中不可能完成一個複雜的任務時候。html
在以前的引導中,咱們發送了一個’Hello World‘的消息。如今咱們要發送一個字符串表明一個複雜的任務,咱們沒有像調整
圖片大小或者渲染一個pdf文件這樣的在真實場景中的任務,因此咱們使用setTimeout來模擬咱們正處於忙碌狀態。咱們把‘.’的數量表明這個字符串的複雜度;
每個‘.' 會消耗一秒鐘,例:一個模擬的任務'Hello...' 會消耗三秒鐘。java
從以前的例子,咱們稍稍修改一下send.js
的代碼,容許命令行能夠發送任意的消息。這個程序在工做隊列中安排好任務,因此咱們稱它new_task.js
node
var q = 'task_queue'; var msg = process.argv.slice(2).join(' ') || "Hello World!"; ch.assertQueue(q, {durable: true}); ch.sendToQueue(q, new Buffer(msg), {persistent: true}); console.log(" [x] Sent '%s'", msg);
咱們的以前的receive.js
一樣須要一些改變,須要對消息內容中的每一個'.'模擬成一個會消耗一秒的任務。它要從隊列中取出一條消息並執行這個任務,咱們把它稱做worker.js
git
ch.consume(q, function(msg) { var secs = msg.content.toString().split('.').length - 1; console.log(" [x] Received %s", msg.content.toString()); setTimeout(function() { console.log(" [x] Done"); }, secs * 1000); }, {noAck: true});
注意咱們模擬的執行時間
執行咱們的程序github
shell1$ ./worker.js shell2$ ./new_task.js
使用任務隊列(Task Queue)的其中的一個優點是有簡化並行工做的能力。若是咱們有不少堆積的未完成的任務,咱們只需添加更多的worker來進行擴展。web
首先,咱們嘗試同時啓動兩個worker.js
,他們都會從隊列中受到消息,可是實際上呢?咱們來看看shell
你須要打開第三個命令行,兩個來運行worker.js
腳本,咱們稱做C1,C2api
shell1$ ./worker.js [*] Waiting for messages. To exit press CTRL+C
shell2$ ./worker.js [*] Waiting for messages. To exit press CTRL+C
在第三個命令行工具中,咱們會發布新的任務,一旦你啓動消費者,你能夠發佈一些消息:緩存
shell3$ ./new_task.js First message. shell3$ ./new_task.js Second message.. shell3$ ./new_task.js Third message... shell3$ ./new_task.js Fourth message.... shell3$ ./new_task.js Fifth message.....
讓咱們看看什麼被分發到咱們的worker
shell1$ ./worker.js [*] Waiting for messages. To exit press CTRL+C [x] Received 'First message.' [x] Received 'Third message...' [x] Received 'Fifth message.....'
shell2$ ./worker.js [*] Waiting for messages. To exit press CTRL+C [x] Received 'Second message..' [x] Received 'Fourth message....'
默認狀況下,RabbitMQ會依次地把消息推送到下一個消費者,平均每一個消費者會獲得相同數量的消息。這樣的消息分發機制稱做輪詢。能夠嘗試3個或更多的worker。
## 消息確認 (Message acknowledgment)
要完成一個任務須要一些事件,你可能會想,當一個消費者開始執行一個長的任務但只執行一部分就die了會發生什麼。就咱們當前的代碼,一旦RabbitMQ 分發了一條消息到消費者那邊,就會當即從存儲中移除這條消息。這樣的話,若是你殺掉了進程,咱們將會丟失這條正在被處理的消息。
咱們也一樣丟失了咱們發送給這個進程的但還沒被處理的消息。
可是咱們不想丟失任何的任務,若是一個進程掛掉,咱們但願這個任務會被分發到其餘的進程。
爲了確保每一條消息毫不會丟失,RabbitMQ支持 消息確認
,一個ack標誌會從消費者那邊返回去通知RabbitMQ當前的這個消息已經收到而且已經完成,因而RabbitMQ就能夠取刪掉這個任務了。
若是一個消費者掛了(通道被關閉,鏈接關閉,或者TCP鏈接丟失)而沒有發送ack標誌,RabbitMQ會明白這條任務還沒被執行完,並會從新放回隊列中,若是當時有其餘的消費者在線,這個消息會被快速地發送給其餘的消費者。這樣的話你就能夠保證沒有消息會遺失,即便進程只是偶爾會掛掉。
無論消息處理是否超時,RabbitMQ只會在消費者掛掉的時候從新分發消息。這對於那些要處理好久好久的消息也是好的(add:不會被斷定爲noack,而從新分發)
在以前的例子中,消息確認
是被關閉的,是時候打開它了,使用{noAck: false}
(你也能夠移除這個操做選項)選項,當咱們完成這個任務的時候發送一個正確的消息確認。
ch.consume(q, function(msg) { var secs = msg.content.toString().split('.').length - 1; console.log(" [x] Received %s", msg.content.toString()); setTimeout(function() { console.log(" [x] Done"); ch.ack(msg); }, secs * 1000); }, {noAck: false});
使用這樣的代碼,你能夠肯定即便在它還在處理消息的時候你使用CTRL+C 殺掉進程也不會有數據丟失。在進程掛了以後,全部的未被確認的消息會被從新分發。
## 忘記確認 這是一個廣泛的錯誤,丟失ack。只是一個簡單的錯誤,但結果確實很嚴重的。當客戶端中止的時候,消息會被從新分發(像是被隨機分發),可是RabbitMQ會佔用愈來愈多的內存當它不能取釋放掉任何未被確認的消息。 爲了調試這種類型的錯誤,你可使用`rabbitmqctl`來輸出未被確認的消息字段: $ sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged Listing queues ... hello 0 0 ...done.
咱們學習了確保在進程掛掉仍保證任務不會被丟失,但咱們的任務仍是會在RabbitMQ服務中止的時候丟失。
當RabbitMQ 退出或者崩潰,除非你叫它不要丟失,否則隊列和消息都會丟失。爲了使消息不會丟失,兩件事情須要確保,咱們須要持久化隊列和消息。
首先,咱們要讓RabbitMQ 不會丟失隊列,爲此,咱們要先聲明ch.assertQueue('hello', {durable: true});
儘管這樣的操做是對的,可是在咱們如今的配置中是不起做用的,這是由於咱們已經定義了一個未持久化的叫作hello
的隊列,RabbitMQ不容許你改變一個已經存在的隊列的參數,若是你這樣作,程序將會返回錯誤。
可是有一個快速的辦法 --- 讓咱們定義一個新的隊列,叫作task_queue
ch.assertQueue('task_queue', {durable: true});
這個durable
選項,須要消費者和生產者都去使用。
此時咱們能保證task_queue
隊列不會在RabbitMQ重啓的時候丟失,如今咱們須要對消息進行持久化 --- 使用presistent
的Channel.sendToQueue
選項,ch.sendToQueue(q, new Buffer(msg), {persistent: true});
注意:消息持久化 消息持久化,不能徹底地保證消息不會丟失,儘管它告訴RabbitMQ要把消息存到磁盤當中,總存在一個RabbitMQ接收到消息,但還未處理完的狀況。另外,RabbitMQ並非對每一個消息作到幀同步,有可能只是被寫到緩存中,還沒被寫到磁盤。 消息持久化不能徹底保證,但已經遠遠知足咱們的簡單的工做隊列的需求,若是你須要更強的持久化的保證,你可使用[publisher confirms](https://www.rabbitmq.com/confirms.html)。
你可能已經注意到如今的調度並非咱們想要的,例:在有兩個worker的狀況下,當全部的奇數消息都是重的而偶數消息是輕量的,那會有一個worker會一直處於忙碌狀態,而另外一個worker幾乎不工做,
RabbitMQ,並不知道這些狀況,只知道持續地均勻地分發消息。
這樣發生的緣由是RabbitMQ只是在消息進入隊列的時候進行分發的工做,無論消費者的未確認的消息的數量,只是一味地分發第N條消息給第N個消費者。
爲了解決這樣的問題,咱們使用方法prefetch
,並設置值爲1,表示RabbitMQ不會同時給一個worker超過一條消息,即,不會分發一條新的消息直到worker完成而且發送ack標誌。不然,RabbitMQ會把消息發送給下一個不在忙碌狀態的worker.ch.prefetch(1);
注意隊列的大小 若是全部的worker都處於忙碌狀態,你的隊列能夠被填滿,你可能須要一個監控,或者添加更多的worker,或者有其餘的解決方案。
最後的new_task.js
的代碼:
#!/usr/bin/env node var amqp = require('amqplib/callback_api'); amqp.connect('amqp://localhost', function(err, conn) { conn.createChannel(function(err, ch) { var q = 'task_queue'; var msg = process.argv.slice(2).join(' ') || "Hello World!"; ch.assertQueue(q, {durable: true}); ch.sendToQueue(q, new Buffer(msg), {persistent: true}); console.log(" [x] Sent '%s'", msg); }); setTimeout(function() { conn.close(); process.exit(0) }, 500); });
worker.js
的代碼:
#!/usr/bin/env node var amqp = require('amqplib/callback_api'); amqp.connect('amqp://localhost', function(err, conn) { conn.createChannel(function(err, ch) { var q = 'task_queue'; ch.assertQueue(q, {durable: true}); ch.prefetch(1); console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", q); ch.consume(q, function(msg) { var secs = msg.content.toString().split('.').length - 1; console.log(" [x] Received %s", msg.content.toString()); setTimeout(function() { console.log(" [x] Done"); ch.ack(msg); }, secs * 1000); }, {noAck: false}); }); });
使用消息確認和預處理,你能夠創建一個工做隊列。持久化選項使得消息能夠在RabbitMQ會重啓的狀況下得以保留。
得到更多的關於Channel
的方法和消息的屬性,你能夠瀏覽amqplib docs
翻譯:Joursion
日期 :2016/12/25
歡迎交流,學習。