RabbitMQ+PHP 教程二(Work Queues)

clipboard.png

介紹

在上一個 Hello World 教程中,咱們編寫了從指定隊列發送和接收消息的程序。在這篇文章中,咱們將建立一個工做隊列,用於在多個工人(消費者)之間分配耗時的任務。php

工做隊列(又名任務隊列)背後的主要思想是避免當即執行資源密集型任務,必須等待它完成。相反,咱們計劃稍後完成任務。咱們將任務封裝爲消息並將其發送到隊列中。後臺運行的一個工做進程將彈出任務並最終執行該任務。當你運行許多工人(消費者)時,任務將在他們之間分擔。html

這個概念在Web應用程序中尤爲有用,由於在短HTTP請求中不可能處理複雜的任務。segmentfault

先決條件

在本教程的前一部分,咱們發送了一條包含「Hello World」的消息。如今,咱們將發送支持複雜任務的字符串。咱們沒有一個真實環境的任務,如圖像進行調整或PDF文件的渲染,讓咱們利用sleep()模擬真實環境的業務功能。咱們將字符串中的點數做爲其複雜度;每一個點都將佔「工做」的一秒鐘。例如,由Hello...描述的一個僞任務…須要三秒。數組

new_task.php

咱們會稍微修改send.php代碼從咱們先前的例子,容許任意的消息是從命令行發送。這一計劃將任務分配給咱們的工做隊列,因此咱們命名它 new_task.php緩存

$data = implode(' ', array_slice($argv, 1));
if(empty($data)) $data = "Hello World!";
$msg = new AMQPMessage($data);

$channel->basic_publish($msg, '', 'hello');

echo " [x] Sent ", $data, "\n";

咱們的上一個版本的receive.php腳本也須要一些改變:它須要假第二工做在消息體中每一點。它會從隊列彈出消息和執行任務,因此讓咱們把命名worker.php:服務器

$callback = function($msg){
  echo " [x] Received ", $msg->body, "\n"; //根據"."數量個數獲取延遲時間,單位秒
  sleep(substr_count($msg->body, '.'));  //模擬業務執行時間延遲
  echo " [x] Done", "\n";
};
$channel->basic_consume('hello', '', false, true, false, false, $callback);

單worker簡單運行測試

消費者

php worker.php

消息生產者

php new_task.php "A very hard task which takes two seconds.."

循環調度

一個使用任務隊列的優勢是容易並行工做的能力。若是咱們積壓了大量的工做,咱們能夠增長更多的工人,這樣就能夠輕鬆地規模化。學習

首先,讓咱們嘗試同時運行兩worker.php腳本。他們都會從隊列中得到消息,看看效果如何?讓咱們看看。測試

你須要打開三個console命令。兩將運行worker.php腳本。這些控制檯將是咱們的兩個消費者C1和C2。fetch

消費者1

php worker.php

消費者2

php worker.php

消息生產者

php new_task.php msg1...

默認狀況下,RabbitMQ將會發送的每一條消息給下一個消費者,在序列。平均每一個消費者將獲得相同數量的消息。這種分發消息的方式稱爲循環輪詢。試着用三個或更多的工人。ui

消息確認

完成任務可能須要幾秒鐘。你可能遇到若是一個消費者開始一個長期的任務,而且只完成了部分任務,那麼會發生什麼?。咱們目前的代碼,一旦RabbitMQ發送一個消息給客戶當即標記爲刪除。在這種狀況下,若是您停止一個消費者,咱們將丟失它正在處理的消息。咱們還將丟失發送給該消費者全部的還沒有處理的消息。

若是咱們不想失去任何任務。若是一個消費者意外停止了,咱們但願把任務交給另外一個消費者。

爲了確保消息不會丟失,RabbitMQ支持消息確認。ACK(nowledgement)消費者返回的結果告訴RabbitMQ有一條消息收到,你能夠自由可控的刪除他

若是一個消費者停止了(其通道關閉,鏈接被關閉,或TCP鏈接丟失)不發送ACK,RabbitMQ將會理解這個消息並無徹底處理,將它從新加入隊列。若是有其餘用戶同時在線,它就會快速地傳遞到另外一個消費者。這樣,即便意外停止了,也能夠確保沒有丟失信息。

沒有任何消息超時;當這個消費者停止了,RabbitMQ將會從新分配消息時。即便處理消息花費很長很長時間也很好。

消息確認是默認關閉。可經過設置的第四個參數basic_consume設置爲false(true意味着沒有ACK)和從消費者發送合適的確認,一旦咱們完成一個任務。

$callback = function($msg){
  echo " [x] Received ", $msg->body, "\n";
  sleep(substr_count($msg->body, '.'));
  echo " [x] Done", "\n";
  $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
};

$channel->basic_consume('task_queue', '', false, false, false, false, $callback);

使用此代碼,咱們能夠確信,即便在處理消息時使用Ctrl + C殺死一名消費者,也不會丟失任何東西。消費者停止都未確認的消息後很快會被從新分配。

忘了確認(Forgotten acknowledgment)

丟失ACK確認是一個常見的錯誤。這是一個容易犯的錯誤,但後果很嚴重。當你的客戶退出,消息會被從新分配(這可能看起來像是隨機的分配),RabbitMQ將會消耗更多的內存,它不會釋聽任何延遲確認消息。

爲了調試這種錯誤,你可使用rabbitmqctl打印messages_unacknowledged字段:

rabbitmqctl list_queues name messages_ready messages_unacknowledged

消息持久化(Message durability)

咱們已經學會了如何確保即便消費者死了,任務也不會丟失。可是若是RabbitMQ服務器中止了,咱們的任務仍然有可能會丟失。

當RabbitMQ退出或崩潰了,會丟失隊列和消息除非你不要。要確保消息不會丟失,須要兩件事:咱們須要將隊列和消息都標記爲持久的。

首先,咱們須要確保RabbitMQ永遠不會丟失隊列。爲了作到這一點,咱們須要聲明它是持久的。爲此咱們經過queue_declare做爲第三參數爲true:

$channel->queue_declare('hello', false, true, false, false);

雖然這個命令自己是正確的,但它在咱們當前的設置中不起做用。這是由於咱們已經定義了一個名爲hello的隊列,該隊列不持久。RabbitMQ不容許你從新定義現有隊列用不一樣的參數,將返回一個錯誤的任何程序,試圖這麼作。但有一個快速的解決方法-讓咱們聲明一個名稱不一樣的隊列,例如task_queue:

$channel->queue_declare('task_queue', false, true, false, false);

須要應用到生產者和消費者代碼中設置爲true。

在這一點上,咱們能夠確保即便RabbitMQ重啓了,task_queue隊列不會丟失。如今咱們要標記咱們的消息持續經過設置delivery_mode = 2消息屬性,amqpmessage做爲屬性數組的一部分。

$msg = new AMQPMessage($data, array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT) );

關於消息持久性的說明(Note on message persistence)

將消息標記爲持久性不能徹底保證消息不會丟失。雖然它告訴RabbitMQ保存信息到磁盤上,還有一個短的時間窗口時,RabbitMQ 已經接受信息並無保存它。另外,RabbitMQ不作fsync(2)每個消息--它可能只是保存到緩存並無真正寫入到磁盤。持久性保證不強,但對於咱們的簡單任務隊列來講已經足夠了。若是你須要更強的保證,那麼你可使用消費者確認。

公平調度

您可能已經注意到,調度仍然不徹底按照咱們的要求工做。例如,在一個有兩個消費者的狀況下,當全部的奇數信息都很重,甚至很輕的消息,一個消費者會一直忙,而另外一個消費者幾乎不作任何工做。嗯,RabbitMQ不知道發生了什麼事,仍將均勻消息發送。

這是由於RabbitMQ只是調度消息時,消息進入隊列。當存在未確認的消息時。它只是盲目的分發n-th條消息給n-th個消費者。

clipboard.png

爲了改變這個分配方式,咱們能夠調用basic_qos方法,設置參數prefetch_count = 1。這告訴RabbitMQ不要在一個時間給一個消費者多個消息。或者,換句話說,在處理和確認之前的消息以前,不要向消費者發送新消息。相反,它將發送給下一個仍然不忙的消費者。

$channel->basic_qos(null, 1, null);
關於隊列大小的註釋(Note about queue size)

若是全部的消費者都很忙,你的隊列填滿了。你會想留意到這一點,也許增長更多的工人,或者有其餘的策略。

源碼

new_task.php

<?php

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$channel->queue_declare('task_queue', false, true, false, false);

$data = implode(' ', array_slice($argv, 1));
if(empty($data)) $data = "Hello World!";
$msg = new AMQPMessage($data,
                        array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT)
                      );

$channel->basic_publish($msg, '', 'task_queue');

echo " [x] Sent ", $data, "\n";

$channel->close();
$connection->close();

?>

worker.php

<?php

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$channel->queue_declare('task_queue', false, true, false, false);

echo ' [*] Waiting for messages. To exit press CTRL+C', "\n";

$callback = function($msg){
  echo " [x] Received ", $msg->body, "\n";
  sleep(substr_count($msg->body, '.'));
  echo " [x] Done", "\n";
  $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
};

$channel->basic_qos(null, 1, null);
$channel->basic_consume('task_queue', '', false, false, false, false, $callback);

while(count($channel->callbacks)) {
    $channel->wait();
}

$channel->close();
$connection->close();

?>

使用消息的確認和預取,你能夠設置一個工做隊列。耐久性的配置選項讓任務存在,即便RabbitMQ重啓。

學習如何向許多消費者傳遞一樣的信息, 你能夠閱讀下一章節:RabbitMQ+PHP 教程三(Publish/Subscribe)

翻譯來自 RabbitMQ - RabbitMQ tutorial - Work Queues

相關文章
相關標籤/搜索