RabbitMQ:筆記

持久化

  • 交換器持久化
  • 隊列持久化
  • 消息持久化

(交換器)
clipboard.png
(隊列)
clipboard.png
(消息)
clipboard.pngphp

生產者可靠性

事務機制

普通發送git

$ex->publish($message, $this->_queue->getName(), AMQP_MANDATORY, $params);

clipboard.png

事務模式:單個發送github

$ch->startTransaction();
try {
    $ex->publish($message, $this->_queue->getName(), AMQP_MANDATORY, $params);
    $ch->commitTransaction();
}
catch(AMQPConnectionException $e) {
    $ch->rollbackTransaction();
}

clipboard.png

事務模式:批量發送網絡

$loop_times = 10;
$ch->startTransaction();
for($i=0;$i<$loop_times;$i++) {
    try {
        $ex->publish($message, $this->_queue->getName(), AMQP_MANDATORY, $params);
        $ch->commitTransaction();
    }
    catch(AMQPConnectionException $e) {
        $ch->rollbackTransaction();
    }
}

clipboard.png

發送者確認

單個消息異步

$ch->confirmSelect();
$ex->publish($message, $routing_key,AMQP_MANDATORY, $params);

$ack_callback = function ($delivery_tag, $multiple) {
    // ack處理
    echo 'Message acked', PHP_EOL;
    var_dump(func_get_args());
    return true;
};
$nack_callback = function ($delivery_tag, $multiple, $requeue) use ($message) {
    // nack處理: 從新發送消息,或記錄日誌
    echo 'Message nacked', PHP_EOL;
    var_dump(func_get_args());
    return false;
};

$ch->setConfirmCallback($ack_callback, $nack_callback); // 設置回調
$ch->waitForConfirm(1); // 在setConfirmCallback()後調用

來自broker的ack確認
clipboard.pngsvn

批量消息函數

$ch->confirmSelect();
// 批量發佈,一次確認
$messages = [];
for($i=0;$i<10;$i++) {
    $messages[$i] = $i.$message;
    $ex->publish($messages[$i], $routing_key, AMQP_MANDATORY, $params);
}

$ack_callback = function ($delivery_tag, $multiple) {
    // ack處理
    echo 'Message acked', PHP_EOL;
    var_dump(func_get_args());
    return true;
};
$nack_callback = function ($delivery_tag, $multiple, $requeue) use ($messages) {
    // nack處理: 從新發送消息該批次消息,或者記錄日誌
    echo 'Message nacked', PHP_EOL;
    var_dump(func_get_args());
    return false;
};

$ch->setConfirmCallback($ack_callback, $nack_callback); // 設置回調
$ch->waitForConfirm(1); // 在setConfirmCallback()後調用

broker發送了兩個ack確認(完整的網絡包中producer發送了10條消息);
若是收到nack信令,須要從新發送整個批次消息。
小問題:爲何broker回覆了兩個ack?broker回覆ack數量和機制是什麼?
clipboard.pngoop

  • 異步確認(pecl_amqp暫時不支持)

消費者可靠性

推模式(basicConsume)

自動ack測試

$conn->setReadTimeout(3);    // 無數據時,超時時間設置
$consumer_tag = basename(__FILE__, '.php').uniqid() . microtime(true);
// consume
try {
    $data = $q->consume($callback, AMQP_AUTOACK, $consumer_tag);
}
catch (Exception $exception) {
    // 無數據,超時處理
    // 顯式取消消費
    $q->cancel($consumer_tag);
}

3s後超時,cancel掉消費
clipboard.pngfetch

完整例子(consume過程)

$conn->setReadTimeout(3);
$consumer_tag = basename(__FILE__, '.php').uniqid() . microtime(true);
$num = 3;
$callback = function (AMQPEnvelope $envelope, AMQPQueue $queue) use(&$num){
    $tag = $envelope->getDeliveryTag();
    var_dump($tag);
    var_dump($num);

    if($num <= 0){
        // MARK:false顯式退出callback,會致使丟數據
        // 例如隊列中有30條數據,條件判斷$num<=0退出,會致使業務邏輯只處理了4條數據
        // 剩餘的數據未處理而丟掉
        
        // return false; 
    }
    $num --;
    return true;
};
// consume
try {
    $data = $q->consume($callback, AMQP_AUTOACK, $consumer_tag);
}
catch (Exception $exception) {
    // 顯式取消消費
    $q->cancel($consumer_tag);
}

(consume數據,到無數據)
clipboard.png

另外:
qos()/setPrefetchCount()/setPrefetchSize()對於autoack無效

autoack小結:
回調超時經過timeout實現,且catch異常後,須要cancel消費者;
callback回調函數,不該該return false(形成隊列丟數據),如遇異常能夠記錄日誌等;
qos()等限流設置對autoack無效。

manual ack

$conn->setReadTimeout(3);
$ch->qos(0,1);    // prefetchCount:最多1條unacked消息
$consumer_tag = basename(__FILE__, '.php').uniqid() . microtime(true);
$num = 3;
$callback = function (AMQPEnvelope $envelope, AMQPQueue $queue) use(&$num){
    $tag = $envelope->getDeliveryTag();
    $msg = $envelope->getBody();
    var_dump(implode(',',[$tag, $num, $msg]).PHP_EOL);

    // 手動ack
    $queue->ack($tag);
    sleep(1);

    if($num <= 0){
        // 顯式退出callback
        return false;
    }
    $num --;
    return true;
};

// consume
try {
    $data = $q->consume($callback, AMQP_NOPARAM, $consumer_tag);
}
catch (Exception $exception) {
    // 顯式取消消費
    $q->cancel($consumer_tag);
}

callback()回調函數中,只處理4條;
儘管queue推送了5條消息,可是consumer只確認了4條,所以隊列裏只是減小了4條消息
clipboard.png

manual ack小結:
timeout實現回調超時,同autoack;
回調函數內能夠按邏輯return false,不會丟失消息(消息短暫處於unacked狀態後,恢復至ready狀態);
qos()等限流函數,有效,能夠避免客戶端內存溢出

no ack
消息須要顯式ack(),可是沒有執行ack()
結論:
消息處於unacked狀態,consumer channel斷開後,消息恢復至ready狀態

拉模式(basicGet)

拉模式:非阻塞消費

auto ack

$conn->setReadTimeout(3);    // 拉模式:非阻塞
$ch->qos(0,10);  // 自動ack,qos設置無效
$num = 3;
for($i=0;$i<$num;$i++) {
    $envelope = $q->get(AMQP_AUTOACK);
    var_dump($envelope->getDeliveryTag()."||".$envelope->getBody());
    echo "<br>";
}

有數據時,當即返回數據;無數據時,當即返回false
clipboard.png

manual ack

$conn->setReadTimeout(3);    // 拉模式:非阻塞
$ch->qos(0,10);  // 自動ack,qos設置無效
$num = 3;
for($i=0;$i<$num;$i++) {
    $envelope = $q->get();
    var_dump($envelope->getDeliveryTag()."||".$envelope->getBody());
    echo "<br>";

    if($i!=$num-1) {
        $q->ack($envelope->getDeliveryTag());
    }
}

拉取了3條消息,只ack了兩條
最終,只消費了隊列內的兩條數據,第3條數據短暫變成unacked狀態後,恢復至ready狀態
clipboard.png

no ack
結論:
未ack的消息,會恢復至ready狀態

to be continued

Refer:
RabbitMQ專欄:https://blog.csdn.net/u013256...
php-amqp測試用例:https://github.com/pdezwart/p...

相關文章
相關標籤/搜索