在第一篇教程中,咱們已經寫了一個從已知隊列中發送和獲取消息的程序。在這篇教程中,咱們將建立一個工做隊列(Work Queue),它會發送一些耗時的任務給多個工做者(Works )。php
工做隊列(又稱:任務隊列——Task Queues)是爲了不等待一些佔用大量資源、時間的操做。當咱們把任務(Task)看成消息發送到隊列中,一個運行在後臺的工做者(worker)進程就會取出任務而後處理。當你運行多個工做者(workers),任務就會在它們之間共享。web
這個概念在網絡應用中是很是有用的,它能夠在短暫的HTTP請求中處理一些複雜的任務。shell
以前的教程中,咱們發送了一個包含「Hello World!」的字符串消息。如今,咱們將發送一些字符串,把這些字符串看成複雜的任務。咱們沒有真是的例子,例如圖片縮放、pdf文件轉換。因此使用 sleep()函數來模擬這種狀況。咱們在字符串中加上點號(.)來表示任務的複雜程度,一個點(.)將會耗時1秒鐘。比 如」Hello…」就會耗時3秒鐘。緩存
咱們對以前教程的send.php作些簡單的調整,以即可以發送隨意的消息。這個程序會按照計劃發送任務到咱們的工做隊列中。咱們把它命名爲new_task.php:網絡
$message = empty($argv[1]) ? 'Hello World!' : ' '.$argv[1]; $exchange->publish($message, $routeKey); var_dump("[x] Sent $message");
咱們的舊腳本(receive.php)一樣須要作一些改動:它須要爲消息體中每個點號(.)模擬1秒鐘的操做。它會從隊列中獲取消息並執行,咱們把它命名爲worker.php:函數
function callback($envelope, $queue) { $msg = $envelope->getBody(); var_dump(" [x] Received:" . $msg); sleep(substr_count($msg,'.')); $queue->ack($envelope->getDeliveryTag()); }
使用工做隊列的一個好處就是它可以並行的處理隊列。若是堆積了不少任務,咱們只須要添加更多的工做者(workers)就能夠了,擴展很簡單。學習
首先,咱們先同時運行兩個worker.php腳本,它們都會從隊列中獲取消息,究竟是不是這樣呢?咱們看看。fetch
你須要打開三個終端,兩個用來運行worker.php腳本,這兩個終端就是咱們的兩個消費者(consumers)—— C1 和 C2。spa
shell1code
$php worker.php [*] Waiting for messages. To exit press CTRL+C
shell2
$ php worker.php [*] Waiting for messages. To exit press CTRL+C
第三個終端,咱們用來發布新任務。你能夠發送一些消息給消費者(consumers):
shell3
$ php new_task.php First message.
shell3
$ php new_task.php Second message..
shell3
$ php new_task.php Third message...
shell3
$ php new_task.php Fourth message....
shell3
$ php new_task.php Fifth message.....
看看到底發送了什麼給咱們的工做者(workers):
shell1
$ php worker.php [*] Waiting for messages. To exit press CTRL+C [x] Received 'First message.' [x] Received 'Third message...' [x] Received 'Fifth message.....'
shell2
$ php worker.php [*] Waiting for messages. To exit press CTRL+C [x] Received 'Second message..' [x] Received 'Fourth message....'
默認來講,RabbitMQ會按順序得把消息發送給每一個消費者(consumer)。平均每一個消費者都會收到同等數量得消息。這種發送消息得方式叫作——輪詢(round-robin)。試着添加三個或更多得工做者(workers)。
當處理一個比較耗時得任務的時候,你也許想知道消費者(consumers)是否運行到一半就掛掉。當前的代碼中,當消息被RabbitMQ發送給 消費者(consumers)以後,立刻就會在內存中移除。這種狀況,你只要把一個工做者(worker)中止,正在處理的消息就會丟失。同時,全部發送 到這個工做者的尚未處理的消息都會丟失。
咱們不想丟失任何任務消息。若是一個工做者(worker)掛掉了,咱們但願任務會從新發送給其餘的工做者(worker)。
爲了防止消息丟失,RabbitMQ提供了消息響應(acknowledgments)。消費者會經過一個ack(響應),告訴RabbitMQ已經收到並處理了某條消息,而後RabbitMQ就會釋放並刪除這條消息。
若是消費者(consumer)掛掉了,沒有發送響應,RabbitMQ就會認爲消息沒有被徹底處理,而後從新發送給其餘消費者(consumer)。這樣,及時工做者(workers)偶爾的掛掉,也不會丟失消息。
消息是沒有超時這個概念的;當工做者與它斷開連的時候,RabbitMQ會從新發送消息。這樣在處理一個耗時很是長的消息任務的時候就不會出問題了。
消息是沒有超時這個概念的;當工做者與它斷開連的時候,RabbitMQ會從新發送消息。這樣在處理一個耗時很是長的消息任務的時候就不會出問題了。 以前的例子中咱們使用$queue->ack()。當工做者(worker)完成了任務,就發送一個響應。
function callback($envelope, $queue) { $msg = $envelope->getBody(); var_dump(" [x] Received:" . $msg); sleep(substr_count($msg,'.')); $queue->ack($envelope->getDeliveryTag()); } $queue->consume('callback');
運行上面的代碼,咱們發現即便使用CTRL+C殺掉了一個工做者(worker)進程,消息也不會丟失。當工做者(worker)掛掉這後,全部沒有響應的消息都會從新發送。
忘了響應
一個很容易犯的錯誤就是忘了basic_ack,後果很嚴重。消息在你的程序退出以後就會從新發送,若是它不可以釋放沒響應的消息,RabbitMQ就會佔用愈來愈多的內存。
爲了排除這種錯誤,你可使用rabbitmqctl命令,輸出messages_unacknowledged字段:
``` $ sudo rabbitmqctl listqueues name messagesready messages_unacknowledged Listing queues ... hello 0 0 ...done.
```
若是你沒有特地告訴RabbitMQ,那麼在它退出或者崩潰的時候,它將會流失全部的隊列和消息。爲了確保信息不會丟失,有兩個事情是須要注意的:咱們必須把「隊列」和「消息」設爲持久化。
首先,爲了避免讓隊列丟失,須要把它聲明爲持久化(durable):
$queue->setFlags(AMQP_DURABLE);
儘管這行代碼自己是正確的,可是仍然不會正確運行。由於咱們已經定義過一個叫hello的非持久化隊列。RabbitMq不容許你使用不一樣的參數從新定義一個隊列,它會返回一個錯誤。但咱們如今使用一個快捷的解決方法——用不一樣的名字,例如task_queue。
$queue->setName('task_queue'); $queue->setFlags(AMQP_DURABLE); $queue->declare();
這個$queue->declare();必須在生產者(producer)和消費者(consumer)對應的代碼中修改。
這時候,咱們就能夠確保在RabbitMq重啓以後queue_declare隊列不會丟失。
注意:消息持久化
將消息設爲持久化並不能徹底保證不會丟失。以上代碼只是告訴了RabbitMq要把消息存到硬盤,但從RabbitMq收到消息到保存之間仍是有一 個很小的間隔時間。由於RabbitMq並非全部的消息都使用fsync(2)——它有可能只是保存到緩存中,並不必定會寫到硬盤中。並不能保證真正的 持久化,但已經足夠應付咱們的簡單工做隊列。若是你必定要保證持久化,你須要改寫你的代碼來支持事務(transaction)。
你應該已經發現,它仍舊沒有按照咱們指望的那樣進行分發。好比有兩個工做者(workers),處理奇數消息的比較繁忙,處理偶數消息的比較輕鬆。然而RabbitMQ並不知道這些,它仍然一如既往的派發消息。
這時由於RabbitMQ只管分發進入隊列的消息,不會關心有多少消費者(consumer)沒有做出響應。它盲目的把第n-th條消息發給第n-th個消費者。
咱們可使用$channel->qos();方法,並設置prefetch_count=1。這樣是告訴RabbitMQ,再同一時刻,不要發送超過1條消息給一個工做者(worker),直到它已經處理了上一條消息而且做出了響應。這樣,RabbitMQ就會把消息分發給下一個空閒的工做者(worker)。
$channel->qos(0,1);
關於隊列大小
若是全部的工做者都處理繁忙狀態,你的隊列就會被填滿。你須要留意這個問題,要麼添加更多的工做者(workers),要麼使用其餘策略。
new_task.py的完整代碼:
<?php /** * PHP amqp(RabbitMQ) Demo-2 * @author yuansir &lt;yuansir@live.cn/yuansir-web.com> */ $exchangeName = 'demo'; $queueName = 'task_queue'; $routeKey = 'task_queue'; $message = empty($argv[1]) ? 'Hello World!' : ' '.$argv[1]; $connection = new AMQPConnection(array('host' => '127.0.0.1', 'port' => '5672', 'vhost' => '/', 'login' => 'guest', 'password' => 'guest')); $connection->connect() or die("Cannot connect to the broker!\n"); $channel = new AMQPChannel($connection); $exchange = new AMQPExchange($channel); $exchange->setName($exchangeName); $queue = new AMQPQueue($channel); $queue->setName($queueName); $queue->setFlags(AMQP_DURABLE); $queue->declare(); $exchange->publish($message, $routeKey); var_dump("[x] Sent $message"); $connection->disconnect();
咱們的worker:
<?php /** * PHP amqp(RabbitMQ) Demo-2 * @author yuansir &lt;yuansir@live.cn/yuansir-web.com> */ $exchangeName = 'demo'; $queueName = 'task_queue'; $routeKey = 'task_queue'; $connection = new AMQPConnection(array('host' => '127.0.0.1', 'port' => '5672', 'vhost' => '/', 'login' => 'guest', 'password' => 'guest')); $connection->connect() or die("Cannot connect to the broker!\n"); $channel = new AMQPChannel($connection); $exchange = new AMQPExchange($channel); $exchange->setName($exchangeName); $exchange->setType(AMQP_EX_TYPE_DIRECT); $exchange->declare(); $queue = new AMQPQueue($channel); $queue->setName($queueName); $queue->setFlags(AMQP_DURABLE); $queue->declare(); $queue->bind($exchangeName, $routeKey); var_dump('[*] Waiting for messages. To exit press CTRL+C'); while (TRUE) { $queue->consume('callback'); $channel->qos(0,1); } $connection->disconnect(); function callback($envelope, $queue) { $msg = $envelope->getBody(); var_dump(" [x] Received:" . $msg); sleep(substr_count($msg,'.')); $queue->ack($envelope->getDeliveryTag()); }
使用消息響應和prefetch_count你就能夠搭建起一個工做隊列了。這些持久化的選項使得在RabbitMQ重啓以後仍然可以恢復。
如今咱們能夠移步教程3學習如何發送相同的消息給多個消費者(consumers)