在第一篇教程中咱們寫程序從一個命名隊列中發送和接收消息。在這篇中,咱們將創建一個在多個工做者之間用於分發耗時任務的工做隊列。php
工做隊列(也稱爲:任務隊列)背後的主要思想是避免當即作一項資源密集型任務而且不得不等候它完成。而是咱們計劃這個任務在稍後被完成。咱們封裝一個任務爲一條消息而且發送它到一個隊列。一個在後臺運行的工做進程將當即獲取這個任務並最終執行它。當你運行多個工做進程時,任務將在它們之間被分配。html
這個概念在web應用中尤爲有用,在一個短HTTP請求窗口中,處理一項複雜的任務是不太可能的。git
在教程的前一篇中,咱們發送一條包含「Hello World!」的消息。如今咱們將發送字符串來表明複雜任務。咱們沒有一個真實的任務,相似於圖片大小被調整或pdf文件被渲染,所以讓咱們來僞裝這個任務,經過假裝咱們正忙——利用sleep()函數。咱們將用字符串中的逗號數量做爲它的複雜性。每一個逗號將佔用一秒的工做。一項被描述爲Hello...的僞裝的任務將佔用三秒鐘。github
咱們稍微修改一下咱們先前例子send.php的代碼,容許從命令行發送任意的消息。這個程序將把任務發送到咱們的工做隊列中,因此咱們命名它爲new_task.php:web
$data = implode(' ', array_slice($argv, 1)); if(empty($data)) $data = "Hello World!"; $msg = new AMQPMessage($data); $channel->basic_publish($msg, '', 'hello'); echo " [x] Sent ", $data, "\n";
咱們舊的receive.php腳本也須要一些改變:它須要假裝在消息體內每個逗號有一秒鐘的工做。它將從隊列裏獲取信息而且執行任務,因此咱們稱它爲worker.php:shell
$callback = function($msg){ echo " [x] Received ", $msg->body, "\n"; sleep(substr_count($msg->body, '.')); echo " [x] Done", "\n"; }; $channel->basic_consume('hello', '', false, true, false, false, $callback);
注意咱們的假裝任務模擬執行時間。數組
像在第一篇教程講述的同樣運行它們:緩存
# shell 1
php worker.php
# shell 2 php new_task.php "A very hard task which takes two seconds.."
使用任務隊列的優勢之一就是可以並行工做。若是咱們正在創建一項積壓的工做,咱們只要添加更多的工做者就能夠輕鬆地擴大規模。bash
首先,讓咱們試着同時運行兩個worker.php腳本。這兩個都將獲得來自隊列裏的信息,但具體怎樣?讓咱們看看。函數
你須要打開三個控制檯程序。兩個將運行worker.php程序。這些控制檯程序將使咱們的兩個消費者——C1和C2。
# shell 1 php worker.php # => [*] Waiting for messages. To exit press CTRL+C
# shell 2 php worker.php # => [*] Waiting for messages. To exit press CTRL+C
在第三個控制檯裏,咱們將發佈新的任務。一旦你啓動消費者你就能發佈一些信息了:
# shell 3
php new_task.php First message.
php new_task.php Second message..
php new_task.php Third message...
php new_task.php Fourth message....
php new_task.php Fifth message.....
讓咱們看看有什麼被傳給了咱們的工做者:
# shell 1 php worker.php # => [*] Waiting for messages. To exit press CTRL+C # => [x] Received 'First message.' # => [x] Received 'Third message...' # => [x] Received 'Fifth message.....'
# shell 2 php worker.php # => [*] Waiting for messages. To exit press CTRL+C # => [x] Received 'Second message..' # => [x] Received 'Fourth message....'
默認狀況下,RabbitMQ將依次發送每一條消息到下一個消費者,平均每一個消費者將獲得相同數量的消息。這種分發消息的方式就被稱爲輪詢。試試用三個或更多的工做者。
作一項任務會花幾秒鐘時間。你可能會想若是其中的一個消費者執行一項長時間的任務而只執行了一部分就死掉了那會怎樣。在咱們目前的代碼中,一旦RabbitMQ傳遞一條消息給消費者後,它當即就會標記這條消息爲刪除。在這種狀況下,若是你結束掉一個工做者,咱們將丟失它正處理的消息。咱們也會丟失被分發到這個工做者而還沒有被處理的全部消息。
可是咱們不想丟失任何任務。若是一個工做者死掉了,咱們會想讓這個任務交給另外一個工做者。
爲了確保一條消息不會丟失,RabbitMQ支持消息確認機制。一個ack(acknowledgement)被消費者發回以告知RabbitMQ一條特定的消息已被接收並處理,RabbitMQ能夠刪除它了。
若是一個消費者死掉(它的通道被關閉,鏈接也被關閉,或者TCP鏈接丟失)沒有發送一個ack,RabbitMQ就會知道,一條消息沒有被徹底處理,則會將這條消息將從新排入隊列。若是有其它的消費者同時在線,那麼它將會迅速的從新傳遞這條消息給另外一個消費者。這樣你就能夠確信沒有消息被丟失,即便工做者偶爾死掉。
若是沒有任何消息超時,當消費者死掉時,RabbitMQ將從新傳送消息。這樣即便處理一條消息要花很長很長時間也沒事。
消息確認機制默認是被關閉的。如今是時間將它們設置爲打開了,經過設置 basic_consume的第四個參數爲false(true是不ack),而後,一旦咱們完成一項任務就從工做者發送一個合適的確認。
$callback = function($msg){ echo " [x] Received ", $msg->body, "\n"; sleep(substr_count($msg->body, '.')); echo " [x] Done", "\n"; $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']); }; $channel->basic_consume('task_queue', '', false, false, false, false, $callback);
使用這些代碼咱們能肯定即便在一個工做者正在處理一條消息時,你用CTRL+C結束掉這個進程,也沒有什麼丟失。這個工做者死掉後的不久全部未應答的消息將會從新被投遞。
被忘記的確認
忘記確認是一個常見的錯誤。雖然這是一個簡單的錯誤,可是後果是嚴重的。當你的客戶端退出的時候消息將會被從新投遞(這可能看起來像是隨機的從新投遞),可是由於不能釋聽任何未被確認的消息,RabbitMQ將會消耗愈來愈多的內存。
爲了調試這種類型的錯誤,你可使用rabbitmqctl來打印 messages_unacknowledged字段:
sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged在Windows系統上,去掉sudo:
rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged
咱們已經學習了怎樣確保即便消費者死亡任務也不會丟失。可是若是RabbitMQ服務中止了,咱們的任務將仍然會被丟失。
當RabbitMQ退出或崩潰時它將忘記隊列和消息除非你讓它不要這麼作。要確保消息不被丟失,有兩件事被要求去作:咱們須要標記隊列和消息爲持久的。
第一,咱們須要確保RabbitMQ不會丟失咱們的隊列。爲了這樣,咱們須要定義它爲持久的。要這麼作,咱們就須要傳遞第三個參數爲true到queue_declare:
$channel->queue_declare('hello', false, true, false, false);
儘管這條命令自己是正確的,可是在咱們目前的設置中是不起做用的。這是由於咱們已經定義了一個叫作hello的而不是持久的隊列。RabbitMQ不容許你用不一樣的參數從新定義一個已經存在的隊列,這將返回一個錯誤到任何這麼作的程序中。可是有一個快速的解決辦法——咱們來用不一樣的名字定義一個隊列,例如task_queue:
$channel->queue_declare('task_queue', false, true, false, false);
At this point we're sure that the task_queue queue won't be lost even if RabbitMQ restarts. Now we need to mark our messages as persistent - by setting the delivery_mode = 2 message property which AMQPMessage takes as part of the property array.
這個設置爲true的標誌須要應用到生產者和消費者代碼中。
這時咱們就能確信及時RabbitMQ從新啓動,task_queue隊列也不會被丟失。如今咱們須要標記咱們的消息爲持久的——經過設置deliver_mode = 2消息屬性,用做AMQPMessage屬性數組的一部分。
$msg = new AMQPMessage($data, array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT) );
關於消息持久的注意事項
標記消息爲持久也不能徹底保證消息不被丟失。儘管這使得RabbitMQ保存消息懂啊磁盤,也仍然會有一個間隙窗口當RabbitMQ已經接收一條消息而還沒有保存它時。再者,RabbitMQ不會爲每一條消息調用fsync(2) ——它可能僅被保存在緩存中而不是真的寫到磁盤上。雖然持久的保證不是很強,可是對於咱們的簡單任務隊列已是足夠了。若是你須要一個更強的保證,那麼你可使用 publisher confirms。
你大概已經注意到了調度仍然不像咱們想的那樣工做。例如,在有兩個工做者的狀況下,當全部的奇數消息是重量級的而偶數消息是輕量級的,一個工做者將一直處於繁忙狀態而另外一個將幾乎沒有任何工做。對於這種狀況RabbitMQ一無所知,仍然均勻地分發消息。
發生這種狀況是由於RabbitMQ只在消息進入隊列時才調度消息。它不看一個消費者未確認的消息數量。它僅僅盲目地分發每個第n調消息到第n個消費者。
爲了不這種狀況,咱們可使用設置prefetch_count = 1 的basic_qos方法 。這會讓RabbitMQ不會一次去分配多餘一條消息給工做者。或者,換句話說,不分發一條新的消息給一個工做者直到這個工做者已經處理完而且確認了前一條消息。轉而分發消息到下一個不忙的消費者。
$channel->basic_qos(null, 1, null);
關於隊列大小的注意事項
若是全部工做者都很忙,你的隊列能夠填滿。你會想要關注這個,可能添加更多的工做者,或者有一些其它策略。
咱們的new_task.php文件的最終代碼:
<?php require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest'); $channel = $connection->channel(); $channel->queue_declare('task_queue', false, true, false, false); $data = implode(' ', array_slice($argv, 1)); if(empty($data)) $data = "Hello World!"; $msg = new AMQPMessage($data, array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT) ); $channel->basic_publish($msg, '', 'task_queue'); echo " [x] Sent ", $data, "\n"; $channel->close(); $connection->close(); ?>
咱們的worker.php文件:
<?php require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest'); $channel = $connection->channel(); $channel->queue_declare('task_queue', false, true, false, false); echo ' [*] Waiting for messages. To exit press CTRL+C', "\n"; $callback = function($msg){ echo " [x] Received ", $msg->body, "\n"; sleep(substr_count($msg->body, '.')); echo " [x] Done", "\n"; $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']); }; $channel->basic_qos(null, 1, null); $channel->basic_consume('task_queue', '', false, false, false, false, $callback); while(count($channel->callbacks)) { $channel->wait(); } $channel->close(); $connection->close(); ?>
使用消息確認和預取你能設置一個工做隊列。持久選項讓任務存活即便RabbitMQ被重啓。
如今咱們可以前往下一篇文章,學習怎樣傳遞相同的消息給多個消費者。