當把basic_consume的參數no_ack設置爲true時,消息達到消費者時就馬上被標記爲刪除狀態,若是這時一個worker的消息來不及執行完成就被停止掉,那麼這條消息就會丟失,因此須要一個消息確認機制,當worker掛掉後,把消息從新分發給另外一個woker執行
方法:將no_ack設置爲false,而後在回調函數中確認消息php
$callback = function($msg){ echo " [x] Received ", $msg->body, "\n"; sleep(substr_count($msg->body, '.')); echo " [x] Done", "\n"; $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']); }; $channel->basic_consume('task_queue', '', false, false, false, false, $callback);
須要特別注意的是,如何忘記確認消息,將耗盡內存。查看未確認消息命令:html
sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
雖然消息確認機制可以保證消費者掛掉時消息不丟失,可是當rabbitmq掛掉時,那就無法保證了,這時就須要持久化
了。
方法:隊列和消息必須設置爲持久化
(1) 隊列持久化:生成者和消費者聲明隊列參數durable設置爲true,已存在的隊列不能從新設置參數值。命令以下:bash
$channel->queue_declare('task_queue', false, true, false, false);
(2) 消息持久化:消息delivery_mode設置爲2,以下:函數
$msg = new AMQPMessage($data, array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT) );
問題:當多個worker處理隊列消息,rabbitmq循環均勻分配消息到分一個worker,若是此時其中一個worker分配到比較耗時的任務,那麼會出現這個worker會比較忙碌,而其餘worker比較悠閒的狀況。
方法:在worker處理和確認消息以前,不要再向worker發送新消息,而是向下一個悠閒的worker發送,把prefetch參數設置爲1.
固然,若是全部的worker都很忙,這時候應該增長worker數量fetch
$channel->basic_qos(null, 1, null);