【原創】RabbitMQ 之 no_ack 分析


      熟悉 RabbitMQ 的人確定知道  no_ack 屬性是在調用 Basic.Consume 方法時能夠設置的一個重要參數。本文主要針對 no_ack 設置的兩種狀況,經過抓包分析的形式講解下實際應用中的異同,並總結一下相關的處理經驗。

============ 我是分隔線 =============

      no_ack 的用途:確保 message 被 consumer 「成功」處理了。這裏「成功」的意思是,(在設置了 no_ack=false 的狀況下)只要 consumer 手動應答了 Basic.Ack ,就算其「成功」處理了。

狀況一no_ack=true (此時爲自動應答
      在這種狀況下,consumer 會在接收到 Basic.Deliver + Content-Header + Content-Body 以後,當即回覆 Ack 。而這個 Ack 是 TCP 協議中的 Ack 。此 Ack 的回覆不關心 consumer 是否對接收到的數據進行了處理,固然也不關心處理數據所須要的耗時。

圖1:(Producer+Consumer)


圖2:(Consumer)


圖3:(Producer)


狀況二:no_ack=false (此時爲手動應答
      在這種狀況下,要求 consumer 在處理完接收到的 Basic.Deliver + Content-Header + Content-Body 以後纔回復 Ack 。而這個 Ack 是 AMQP 協議中的 Basic.Ack 。此 Ack 的回覆是和業務處理相關的,因此具體的回覆時間應該要取決於業務處理的耗時。

圖4:(Producer+Consumer)


圖5:(Consumer)


圖6:(Producer)


總結:緩存

  • Basic.Ack 發回給 RabbitMQ 以告知,能夠將相應 message 從 RabbitMQ 的消息緩存中移除。
  • Basic.Ack 未被 consumer 發回給 RabbitMQ 前出現了異常,RabbitMQ 發現與該 consumer 對應的鏈接被斷開,以後將該 message 以輪詢方式發送給其餘 consumer (假設存在多個 consumer 訂閱同一個 queue)。
  • 在 no_ack=true 的狀況下,RabbitMQ 認爲 message 一旦被 deliver 出去了,就已被確認了,因此會當即將緩存中的 message 刪除。因此在 consumer 異常時會致使消息丟失。
  • 來自 consumer 側的 Basic.Ack 與 發送給 Producer 側的 Basic.Ack 沒有直接關係。

 


============ 我是分隔線 =============

最後貼上本身改造的、基於 libevent 實現的 rabbitmq-c 的測試打印。

狀況一:socket

[warn] evsignal_init: socketpair: No error
drive_machine: [conn_init]  ---  in TCP 3-way connecting!
drive_machine: [conn_connecting]  ---  connection timeout 1 time on socket(6040)
drive_machine: [conn_connected]  ---  connected on socket(6040)

6040: conn_state change   connected ==> snd_protocol_header
  --> Send Protocol.Header!
6040: conn_state change   snd_protocol_header ==> rcv_connection_start_method
  <-- Recv Connection.Start Method frame!
6040: conn_state change   rcv_connection_start_method ==> snd_connection_start_rsp_method
  --> Send Connection.Start-Ok Method frame!
6040: conn_state change   snd_connection_start_rsp_method ==> rcv_connection_tune_method
  <-- Recv Connection.Tune Method frame!
6040: conn_state change   rcv_connection_tune_method ==> snd_connection_tune_rsp_method
  --> Send Connection.Tune-Ok Method frame!
6040: conn_state change   snd_connection_tune_rsp_method ==> snd_connection_open_method
  --> Send Connection.Open Method frame!
6040: conn_state change   snd_connection_open_method ==> rcv_connection_open_rsp_method
  <-- Recv Connection.Open-Ok Method frame!
6040: conn_state change   rcv_connection_open_rsp_method ==> snd_channel_open_method
  --> Send Channel.Open Method frame!
6040: conn_state change   snd_channel_open_method ==> rcv_channel_open_rsp_method
  <-- Recv Channel.Open-Ok Method frame!
6040: conn_state change   rcv_channel_open_rsp_method ==> idle

drive_machine: [conn_idle]  ---  [CONSUMER]: Queue Declaring!
6040: conn_state change   idle ==> snd_queue_declare_method
  --> Send Queue.Declare Method frame!
6040: conn_state change   snd_queue_declare_method ==> rcv_queue_declare_rsp_method
  <-- Recv Queue.Declare-Ok Method frame!
6040: conn_state change   rcv_queue_declare_rsp_method ==> idle

drive_machine: [conn_idle]  ---  [CONSUMER]: Queue Binding!
6040: conn_state change   idle ==> snd_queue_bind_method
  --> Send Queue.Bind Method frame!
6040: conn_state change   snd_queue_bind_method ==> rcv_queue_bind_rsp_method
  <-- Recv Queue.Bind-Ok Method frame!
6040: conn_state change   rcv_queue_bind_rsp_method ==> idle

drive_machine: [conn_idle]  ---  [CONSUMER]: Basic QoS!
6040: conn_state change   idle ==> snd_basic_qos_method
  --> Send Basic.Qos Method frame!
6040: conn_state change   snd_basic_qos_method ==> rcv_basic_qos_rsp_method
  <-- Recv Basic.Qos-Ok Method frame!
6040: conn_state change   rcv_basic_qos_rsp_method ==> idle

drive_machine: [conn_idle]  ---  [CONSUMER]: Basic Consuming!
6040: conn_state change   idle ==> snd_basic_consume_method
  --> Send Basic.Consume Method frame!
6040: conn_state change   snd_basic_consume_method ==> rcv_basic_consume_rsp_method
  <-- Recv Basic.Consume-Ok Method frame!
6040: conn_state change   rcv_basic_consume_rsp_method ==> idle

drive_machine: [conn_idle]  ---  [CONSUMER]: Start waiting to recv!
6040: conn_state change   idle ==> rcv_basic_deliver_method
drive_machine: wait for Basic.Deliver method another 10 seconds!!
drive_machine: wait for Basic.Deliver method another 10 seconds!!

  <-- Recv Basic.Deliver Method frame!
6040: conn_state change   rcv_basic_deliver_method ==> rcv_basic_content_header
  <-- Recv Content.Header frame!
6040: conn_state change   rcv_basic_content_header ==> rcv_basic_content_body
  <-- Recv Content.Body frame!
Content Body is [Hello World betty].
@@@ CB: body len : [17]    body : [Hello World betty]
6040: conn_state change   rcv_basic_content_body ==> idle

drive_machine: [conn_idle]  ---  [CONSUMER]: Start waiting to recv!
6040: conn_state change   idle ==> rcv_basic_deliver_method
drive_machine: wait for Basic.Deliver method another 10 seconds!!
drive_machine: wait for Basic.Deliver method another 10 seconds!!


狀況二:測試

[warn] evsignal_init: socketpair: No error
drive_machine: [conn_init]  ---  in TCP 3-way connecting!
drive_machine: [conn_connecting]  ---  connection timeout 1 time on socket(6040)
drive_machine: connected on socket(6040)
6040: conn_state change   connected ==> snd_protocol_header
  --> Send Protocol.Header!
6040: conn_state change   snd_protocol_header ==> rcv_connection_start_method
  <-- Recv Connection.Start Method frame!
6040: conn_state change   rcv_connection_start_method ==> snd_connection_start_rsp_method
  --> Send Connection.Start-Ok Method frame!
6040: conn_state change   snd_connection_start_rsp_method ==> rcv_connection_tune_method
  <-- Recv Connection.Tune Method frame!
6040: conn_state change   rcv_connection_tune_method ==> snd_connection_tune_rsp_method
  --> Send Connection.Tune-Ok Method frame!
6040: conn_state change   snd_connection_tune_rsp_method ==> snd_connection_open_method
  --> Send Connection.Open Method frame!
6040: conn_state change   snd_connection_open_method ==> rcv_connection_open_rsp_method
  <-- Recv Connection.Open-Ok Method frame!
6040: conn_state change   rcv_connection_open_rsp_method ==> snd_channel_open_method
  --> Send Channel.Open Method frame!
6040: conn_state change   snd_channel_open_method ==> rcv_channel_open_rsp_method
  <-- Recv Channel.Open-Ok Method frame!
6040: conn_state change   rcv_channel_open_rsp_method ==> idle

drive_machine: [conn_idle]  ---  [CONSUMER]: Queue Declaring!
6040: conn_state change   idle ==> snd_queue_declare_method
  --> Send Queue.Declare Method frame!
6040: conn_state change   snd_queue_declare_method ==> rcv_queue_declare_rsp_method
  <-- Recv Queue.Declare-Ok Method frame!
6040: conn_state change   rcv_queue_declare_rsp_method ==> idle

drive_machine: [conn_idle]  ---  [CONSUMER]: Queue Binding!
6040: conn_state change   idle ==> snd_queue_bind_method
  --> Send Queue.Bind Method frame!
6040: conn_state change   snd_queue_bind_method ==> rcv_queue_bind_rsp_method
  <-- Recv Queue.Bind-Ok Method frame!
6040: conn_state change   rcv_queue_bind_rsp_method ==> idle

drive_machine: [conn_idle]  ---  [CONSUMER]: Basic QoS!
6040: conn_state change   idle ==> snd_basic_qos_method
  --> Send Basic.Qos Method frame!
6040: conn_state change   snd_basic_qos_method ==> rcv_basic_qos_rsp_method
  <-- Recv Basic.Qos-Ok Method frame!
6040: conn_state change   rcv_basic_qos_rsp_method ==> idle

drive_machine: [conn_idle]  ---  [CONSUMER]: Basic Consuming!
6040: conn_state change   idle ==> snd_basic_consume_method
  --> Send Basic.Consume Method frame!
6040: conn_state change   snd_basic_consume_method ==> rcv_basic_consume_rsp_method
  <-- Recv Basic.Consume-Ok Method frame!
6040: conn_state change   rcv_basic_consume_rsp_method ==> idle

drive_machine: [conn_idle]  ---  [CONSUMER]: Start waiting to recv!
6040: conn_state change   idle ==> rcv_basic_deliver_method
drive_machine: wait for Basic.Deliver method another 10 seconds!!
drive_machine: wait for Basic.Deliver method another 10 seconds!!
drive_machine: wait for Basic.Deliver method another 10 seconds!!

  <-- Recv Basic.Deliver Method frame!
6040: conn_state change   rcv_basic_deliver_method ==> rcv_basic_content_header
  <-- Recv Content.Header frame!
6040: conn_state change   rcv_basic_content_header ==> rcv_basic_content_body
  <-- Recv Content.Body frame!
Content Body is [Hello World betty].
@@@ CB: body len : [17]    body : [Hello World betty]
6040: conn_state change   rcv_basic_content_body ==> snd_basic_ack_method
  --> Send Basic.Ack Method frame!
6040: conn_state change   snd_basic_ack_method ==> idle

drive_machine: [conn_idle]  ---  [CONSUMER]: Start waiting to recv!
6040: conn_state change   idle ==> rcv_basic_deliver_method
drive_machine: wait for Basic.Deliver method another 10 seconds!!
drive_machine: wait for Basic.Deliver method another 10 seconds!!
相關文章
相關標籤/搜索