(交換器)
(隊列)
(消息)
php
普通發送git
$ex->publish($message, $this->_queue->getName(), AMQP_MANDATORY, $params);
事務模式:單個發送github
$ch->startTransaction(); try { $ex->publish($message, $this->_queue->getName(), AMQP_MANDATORY, $params); $ch->commitTransaction(); } catch(AMQPConnectionException $e) { $ch->rollbackTransaction(); }
事務模式:批量發送網絡
$loop_times = 10; $ch->startTransaction(); for($i=0;$i<$loop_times;$i++) { try { $ex->publish($message, $this->_queue->getName(), AMQP_MANDATORY, $params); $ch->commitTransaction(); } catch(AMQPConnectionException $e) { $ch->rollbackTransaction(); } }
單個消息異步
$ch->confirmSelect(); $ex->publish($message, $routing_key,AMQP_MANDATORY, $params); $ack_callback = function ($delivery_tag, $multiple) { // ack處理 echo 'Message acked', PHP_EOL; var_dump(func_get_args()); return true; }; $nack_callback = function ($delivery_tag, $multiple, $requeue) use ($message) { // nack處理: 從新發送消息,或記錄日誌 echo 'Message nacked', PHP_EOL; var_dump(func_get_args()); return false; }; $ch->setConfirmCallback($ack_callback, $nack_callback); // 設置回調 $ch->waitForConfirm(1); // 在setConfirmCallback()後調用
來自broker的ack確認
svn
批量消息函數
$ch->confirmSelect(); // 批量發佈,一次確認 $messages = []; for($i=0;$i<10;$i++) { $messages[$i] = $i.$message; $ex->publish($messages[$i], $routing_key, AMQP_MANDATORY, $params); } $ack_callback = function ($delivery_tag, $multiple) { // ack處理 echo 'Message acked', PHP_EOL; var_dump(func_get_args()); return true; }; $nack_callback = function ($delivery_tag, $multiple, $requeue) use ($messages) { // nack處理: 從新發送消息該批次消息,或者記錄日誌 echo 'Message nacked', PHP_EOL; var_dump(func_get_args()); return false; }; $ch->setConfirmCallback($ack_callback, $nack_callback); // 設置回調 $ch->waitForConfirm(1); // 在setConfirmCallback()後調用
broker發送了兩個ack確認(完整的網絡包中producer發送了10條消息);
若是收到nack信令,須要從新發送整個批次消息。
小問題:爲何broker回覆了兩個ack?broker回覆ack數量和機制是什麼?
oop
自動ack測試
$conn->setReadTimeout(3); // 無數據時,超時時間設置 $consumer_tag = basename(__FILE__, '.php').uniqid() . microtime(true); // consume try { $data = $q->consume($callback, AMQP_AUTOACK, $consumer_tag); } catch (Exception $exception) { // 無數據,超時處理 // 顯式取消消費 $q->cancel($consumer_tag); }
3s後超時,cancel掉消費
fetch
完整例子(consume過程)
$conn->setReadTimeout(3); $consumer_tag = basename(__FILE__, '.php').uniqid() . microtime(true); $num = 3; $callback = function (AMQPEnvelope $envelope, AMQPQueue $queue) use(&$num){ $tag = $envelope->getDeliveryTag(); var_dump($tag); var_dump($num); if($num <= 0){ // MARK:false顯式退出callback,會致使丟數據 // 例如隊列中有30條數據,條件判斷$num<=0退出,會致使業務邏輯只處理了4條數據 // 剩餘的數據未處理而丟掉 // return false; } $num --; return true; }; // consume try { $data = $q->consume($callback, AMQP_AUTOACK, $consumer_tag); } catch (Exception $exception) { // 顯式取消消費 $q->cancel($consumer_tag); }
(consume數據,到無數據)
另外:
qos()/setPrefetchCount()/setPrefetchSize()對於autoack無效
autoack小結:
回調超時經過timeout實現,且catch異常後,須要cancel消費者;
callback回調函數,不該該return false(形成隊列丟數據),如遇異常能夠記錄日誌等;
qos()等限流設置對autoack無效。
manual ack
$conn->setReadTimeout(3); $ch->qos(0,1); // prefetchCount:最多1條unacked消息 $consumer_tag = basename(__FILE__, '.php').uniqid() . microtime(true); $num = 3; $callback = function (AMQPEnvelope $envelope, AMQPQueue $queue) use(&$num){ $tag = $envelope->getDeliveryTag(); $msg = $envelope->getBody(); var_dump(implode(',',[$tag, $num, $msg]).PHP_EOL); // 手動ack $queue->ack($tag); sleep(1); if($num <= 0){ // 顯式退出callback return false; } $num --; return true; }; // consume try { $data = $q->consume($callback, AMQP_NOPARAM, $consumer_tag); } catch (Exception $exception) { // 顯式取消消費 $q->cancel($consumer_tag); }
callback()回調函數中,只處理4條;
儘管queue推送了5條消息,可是consumer只確認了4條,所以隊列裏只是減小了4條消息
manual ack小結:
timeout實現回調超時,同autoack;
回調函數內能夠按邏輯return false,不會丟失消息(消息短暫處於unacked狀態後,恢復至ready狀態);
qos()等限流函數,有效,能夠避免客戶端內存溢出
no ack
消息須要顯式ack(),可是沒有執行ack()
結論:
消息處於unacked狀態,consumer channel斷開後,消息恢復至ready狀態
拉模式:非阻塞消費
auto ack
$conn->setReadTimeout(3); // 拉模式:非阻塞 $ch->qos(0,10); // 自動ack,qos設置無效 $num = 3; for($i=0;$i<$num;$i++) { $envelope = $q->get(AMQP_AUTOACK); var_dump($envelope->getDeliveryTag()."||".$envelope->getBody()); echo "<br>"; }
有數據時,當即返回數據;無數據時,當即返回false
manual ack
$conn->setReadTimeout(3); // 拉模式:非阻塞 $ch->qos(0,10); // 自動ack,qos設置無效 $num = 3; for($i=0;$i<$num;$i++) { $envelope = $q->get(); var_dump($envelope->getDeliveryTag()."||".$envelope->getBody()); echo "<br>"; if($i!=$num-1) { $q->ack($envelope->getDeliveryTag()); } }
拉取了3條消息,只ack了兩條
最終,只消費了隊列內的兩條數據,第3條數據短暫變成unacked狀態後,恢復至ready狀態
no ack
結論:
未ack的消息,會恢復至ready狀態
Refer:
RabbitMQ專欄:https://blog.csdn.net/u013256...
php-amqp測試用例:https://github.com/pdezwart/p...