rabbitmq 學習與實踐分享(2)

一.寫在前面

在上一篇文章中主要簡單的介紹了一下rabbitmq 的基本概念,包括exchange的主要類型以及每種類型分別表示什麼含義。本篇文章主要結合本身的理解,解讀一下rabbitmq 是如何保證消息不丟失的?html

二.rabbitmq 是如何保證消息發送時不被丟失的?

如圖所示: producer 發送消息到rabbitmq broker,而後有2個消費者consumer1,consumer2進行信息消費,針對這個簡單的場景,咱們內心不免會有一個疑問:做爲producer,我怎麼知道個人消息已經成功的發送到了broker 呢? 再一個,我怎麼知道我發送的消息已經成功的被consumer消費了呢?還有,若是消息發送到broker後,broker機器掛了怎麼辦,消息會丟失嗎?下面就這些疑問,結合本身的理解一一進行解答.java

1) 生產者消息確認機制:

生產者消息確認機制主要就是解決消息成功發送到rabbitmq broker 的問題,rabbitmq 提供了2種手段用來解決這個問題:mysql

  • 經過事務機制實現
  • 經過發送方確認機制實現

事務機制:

rabbitmq 客戶端channel API針對事務機制這塊提供了3個方法:channel.txSelect,channel.txCommit,channel.txRollback .sql

/**
 * Enables TX mode on this channel.
 * @see com.rabbitmq.client.AMQP.Tx.Select
 * @see com.rabbitmq.client.AMQP.Tx.SelectOk
 * @return a transaction-selection method to indicate the transaction was successfully initiated
 * @throws java.io.IOException if an error is encountered
 */
Tx.SelectOk txSelect() throws IOException;


 /**
 * Commits a TX transaction on this channel.
 * @see com.rabbitmq.client.AMQP.Tx.Commit
 * @see com.rabbitmq.client.AMQP.Tx.CommitOk
 * @return a transaction-commit method to indicate the transaction was successfully committed
 * @throws java.io.IOException if an error is encountered
 */
Tx.CommitOk txCommit() throws IOException;


 /**
 * Rolls back a TX transaction on this channel.
 * @see com.rabbitmq.client.AMQP.Tx.Rollback
 * @see com.rabbitmq.client.AMQP.Tx.RollbackOk
 * @return a transaction-rollback method to indicate the transaction was successfully rolled back
 * @throws java.io.IOException if an error is encountered
 */
Tx.RollbackOk txRollback() throws IOException;
複製代碼

txSelect方法主要是用於將信道(channel)設置成事務模式,txCommit 主要用於提交事務,txRollback 主要用於將事務進行回滾。在開啓事務以後,咱們即可以將消息發送給rabbitmq了,若是在執行tx.commit執行成功時,表示消息已經成功的發送到rabbitmq服務器了,反之則會拋異常。數據庫

須要說明的是,這裏的消息已經成功的發送到rabbitmq服務器,指的是消息已經成功發送到rabbitmq 服務器的exchange 了,若是exchange 沒有匹配消息綁定的隊列,消息仍是會丟失。

說明:rabbitmq 的事務與關係數據庫如mysql的事務機制是不同的,關係數據庫事務關注的是ACID,rabbitmq關心的是消息是否成功發送。
複製代碼

發送方確認機制

生產者將信道設置成confirm(確認)模式,一旦設置成confirm 模式,當消息投遞到broker以後,rabbitmq 的broker 會給消息發送端發一條BASIC.ACK 的確認消息,發送端經過監聽這個確認消息,能夠知道信息是否已經成功的發送出去. rabbitmq 客戶端Channel API 裏也提供了相應的API channel.confirmSelect 用來開啓客戶端確認模式:服務器

/**
 * Enables publisher acknowledgements on this channel.
 * @see com.rabbitmq.client.AMQP.Confirm.Select
 * @throws java.io.IOException if an error is encountered
 */
Confirm.SelectOk confirmSelect() throws IOException;
複製代碼

2種模式的比較:事務機制發送消息的過程是同步的,發送消息以後在rabbitmq 迴應以前會阻塞,直到收到迴應以後才能發送下一條消息,這樣會下降系統的吞吐量。發送者確認機制是異步的,生產者在發送消息等待信道返回確認消息的時候繼續發送下一條信息。因此相比而言,使用消息確認機制發送消息吞吐量會更高一些。網絡

2)消費端確認機制

消費端確認機制主要是爲了確保消息投遞到消費者以後可以被成功的消費。 在rabbitmq 的Channel API 中也提供了相應的參數給業務側進行控制,以下:異步

/**
 * Start a non-nolocal, non-exclusive consumer, with
 * a server-generated consumerTag.
 * @param queue the name of the queue
 * @param autoAck true if the server should consider messages
 * acknowledged once delivered; false if the server should expect
 * explicit acknowledgements
 * @param callback an interface to the consumer object
 * @return the consumerTag generated by the server
 * @throws java.io.IOException if an error is encountered
 * @see com.rabbitmq.client.AMQP.Basic.Consume
 * @see com.rabbitmq.client.AMQP.Basic.ConsumeOk
 * @see #basicConsume(String, boolean, String, boolean, boolean, Map, Consumer)
 */
String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException;
複製代碼

在baiscConusme裏有一個參數:autoAck ,該參數爲false 的時候表示消費端須要進行手動確認(好比調用channel.basicAck進行主動確認),若是消費者在消費完一條消息以後向broker 發送確認消息,而後因爲網絡緣由或者其餘緣由致使broker 沒有確認這條消息時,broker 不會刪除這條消息,當鏈接從新創建以後,消費者仍是會收到這條消息。ide

3)持久化機制

rabbitmq 的持久化機制主要是確保生產者發送的消息能成功的落盤,確保broker重啓以後未被消費的信息不會被丟失。學習

rabbitmq 的持久化機制,主要從如下幾個方面來保障:

  • exchange 的持久化
  • queue 的持久化
  • message 的持久化

須要說明的是,消息是存儲在queue裏的,因此只有在queue設置爲持久化的時候,message的持久化纔有意義,不然若是queue是非持久化的,即使message是持久的,在broker重啓以後信息仍是會丟失。

rabbitmq 的Channel API 也提供了相應的參數來設置:

/**
 * Actively declare a non-autodelete exchange with no extra arguments
 * @see com.rabbitmq.client.AMQP.Exchange.Declare
 * @see com.rabbitmq.client.AMQP.Exchange.DeclareOk
 * @param exchange the name of the exchange
 * @param type the exchange type
 * @param durable true if we are declaring a durable exchange (the exchange will survive a server restart)
 * @throws java.io.IOException if an error is encountered
 * @return a declaration-confirm method to indicate the exchange was successfully declared
 */
Exchange.DeclareOk exchangeDeclare(String exchange, String type, boolean durable) throws IOException;
複製代碼

exchangeDeclare 接口中的durable 參數用來設置exchange是否持久化,爲true表示是持久化的,反之爲false

/**
 * Declare a queue
 * @see com.rabbitmq.client.AMQP.Queue.Declare
 * @see com.rabbitmq.client.AMQP.Queue.DeclareOk
 * @param queue the name of the queue
 * @param durable true if we are declaring a durable queue (the queue will survive a server restart)
 * @param exclusive true if we are declaring an exclusive queue (restricted to this connection)
 * @param autoDelete true if we are declaring an autodelete queue (server will delete it when no longer in use)
 * @param arguments other properties (construction arguments) for the queue
 * @return a declaration-confirm method to indicate the queue was successfully declared
 * @throws java.io.IOException if an error is encountered
 */
Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
                             Map<String, Object> arguments) throws IOException;
複製代碼

queueDeclare 接口的durable 參數通exchange相似

/**
 * Publish a message.
 *
 * Publishing to a non-existent exchange will result in a channel-level
 * protocol exception, which closes the channel.
 *
 * Invocations of <code>Channel#basicPublish</code> will eventually block if a
 * <a href="http://www.rabbitmq.com/alarms.html">resource-driven alarm</a> is in effect.
 *
 * @see com.rabbitmq.client.AMQP.Basic.Publish
 * @see <a href="http://www.rabbitmq.com/alarms.html">Resource-driven alarms</a>.
 * @param exchange the exchange to publish the message to
 * @param routingKey the routing key
 * @param props other properties for the message - routing headers etc
 * @param body the message body
 * @throws java.io.IOException if an error is encountered
 */
void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;
複製代碼

消息的持久化經過消息的屬性BasicProperties中的deliveryMode參數來設置,deliveryMode爲2表示是持久化信息。

小結

經過消息發送端確認機制,消費端確認機制以及持久化,rabbitmq 保證了消息的可靠性。可是又有一個疑問出現了,若是僅僅部署一臺broker, 即使是消息持久化了,若是broker 出故障了,無法恢復了,那消息不仍是會丟失嗎? 爲了不單點故障,提高rabbitmq的可用性,rabbitmq 支持集羣部署,以及提供了鏡像隊列等機制來確保信息的可靠性的。 關於rabbitmq的集羣,以及鏡像隊列等相關方面的知識,在下期的學習以後再進行分享。

相關文章
相關標籤/搜索