RabbitMQ吐血總結(1)---基礎啊基礎

吐血總結吧~由於太經常使用,由於是平常,由於即便是平常也並無多少人懂的不少,由於想掌握一個消息中間件,就是如此吧,很少說,我一篇,我主要講講基本的使用與方法調用的入參及其意義,其實,RabbitMQ就光是基礎的api調用,也並無多少人熟悉~java

1、RabbitMQ概覽

想來想去,不知道如何開篇,我大概就把我學習到的一些比較重要的概念羅列出來吧,就算是一個記錄吧。下面是一些我想寫入的點面試

一、消息中間件的做用

  • 解耦:有別於本地線程或者線程池的解耦,這種解耦的可靠性徹底是用消息中間件體系來保證,可靠不少!本地線程池會撐爆、服務重啓等問題。
  • 削峯:恩,很關鍵性的高併發場景的實用點!主要應對流量激增狀況,不會由於突增的流量而在現用的機器承受力以內,保證服務的可用性。
  • 緩衝:主要控制數據流通過系統的速度
  • 異步通訊:這個不用解釋了。原本消息中間件就是用來通訊的。

二、RabbitMQ的特色

  • 就是使用erlang語言,實現了AMQP協議
  • 很關鍵的特性:可靠性。持久化、確認機制等
  • 靈活性高,經過交互機到隊列經過綁定鍵進行綁定,能夠有不少玩法:廣播、直連、模糊匹配、頭信息匹配等
  • 可擴展性好,能夠經過業務的狀況動態擴展集羣中的結點
  • 高可用性,經過配置鏡像結點,是的部分結點出問題的狀況下,隊列仍然可用
  • 多協議支持,除了支持AMQP,還支持STOMP、MQTT等多種消息中間件協議
  • 多語言支持,恩這個不用多說,反正好過ActiveMQ
  • 默認給了個server端的管理界面,很人性化
  • 不少插件,支持本身編寫

基本的安裝我不在我文章裏面介紹,這種東西一搜一大把,並且每一個平臺安裝模式也不同,例如個人mac,直接一條命令搞定。我看Linux下面還要先安裝erlang的虛擬機。這些並不是RabbitMQ的核心技術,想作運維,出門左轉。設計模式

2、從協議窺探RabbitMQ

RabbitMQ主要實現了AMQP這個很牛逼的協議。其實這種就相似於Redis,任何的Java這個層面的接口API調用,都是對應着相關協議的命令輸出的。這東西細想也簡單,無非就是咱們給RabbitMQ的server端下命令,讓他作什麼事兒。例如我讓他建立信道,我就發一個Channel.open的字符串過去,對方server端起了個socket監聽器,發現有數據,立刻處理,而後匹配到具體的命令處理器,相似於策略設計模式,使用一個Handler處理邏輯。本節我經過生產者與消費者的最簡單代碼,來窺探一下,到底RabbitMQ的流轉過程是怎麼樣的,讓你們有個初步的印象:哦原來,RabbitMQ大概是這樣的。api

一、生產者的流轉過程

先上基礎的最簡單的生產者代碼:安全

// 建立鏈接
Connection connection = connectionFactory.newConnection();
// 建立信道
Channel channel = connection.createChannel();
String msg = "21123123";
channel.basicPublish("ExchargeName", "RoutingKey",
        MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());
// 關閉資源
channel.close();
connection.close();

接下倆咱們來看看流轉過程的用例圖:併發

其中,Product表明生產者節點,Consumer表明消費者節點,Broker表明RabbitMQ Server端的一個機器節點。以後再也不說明運維

二、消費者的流轉過程

一樣,咱們先來一個最簡單的消費者的代碼模板:異步

// 建立鏈接
Connection connection = connectionFactory.newConnection();
// 建立信道
Channel channel = connection.createChannel();
Consumer consumer = new DefaultConsumer(channel){}
channel.basicQos(64);
channel.basicConsume("queueName",consumer);

// 關閉資源
channel.close();
connection.close();

下面是消費者的流轉圖:socket

請細看這兩個時序圖,與圖中的標記,其中對應了具體哪一個JavaAPI調用,對應了哪一個AMQP的消息協議命令。不能,就是稍微多了點步驟。接下來,我會在遇到相關AMQP的命令時候,再繼續介紹,就不集中介紹所有的AMQP協議命令了。ide

我本身感受,集中所有介紹沒用,由於是我的根本無法立刻記住,即便我給你十天記住了,而後1天不用就忘了。說白了,仍是要緊扣實戰的編碼,單獨研究協議,枯燥且沒意義,即便面試,人家也不會專門問你命令。

3、RabbitMQ幾大基礎概念

發現,AMQP的協議中,我大體看了兩個實現(RabbitMQ和Qpid),對這幾個基礎的構件,都是提早談起的,可見,AMQP就是根據這幾個構件來組織的,我就不經過大量文字解說這幾個構件了,儘可能經過圖片來展現,好懂一些。

一、總覽

主要的構件包括:

  • Exchange:交換器
  • Queue:隊列
  • RoutingKey和BindingKey:路由鍵和綁定鍵

二、隊列

爲了說明隊列,先上一個虛擬的圖(由於真實狀況,消息不會直接到隊列的):

  • RabbitMQ中消息只能存儲在隊列中
  • 多個消費者消費時候,一個消息只會發送給一個消費者,使用輪訓方式

三、交換機

其實,咱們使用生產者,pulish一條消息,首先發送到的,就是交換機這個實體裏面,並不是隊列,而後經過各類綁定關係再路由到具體的隊列中,下一小節,咱們就講這個路由的問題。首先來看看交換機的示例圖:

交換機類型:

  • fanout:每一個到了交換機裏面的消息,會發送給全部綁定的隊列裏面
  • direct:必須BindingKey和RoutingKey徹底匹配才能將信息發送給對應的隊列
  • topic:RoutingKey和BindingKey能夠進行模糊匹配,具體的匹配規則是
    • RoutintKey和BindingKey是用」.「符號進行分割,例如:com.rabbitmq.client
    • BindingKey中能夠存在兩種特殊的字符串:*、#,分別對應匹配一個單詞和多個單詞(能夠是0個)
    • 舉例:RoutingKey是com.rabbitmq.client,BindingKey是*.rabbitmq.*
  • headers:根據消息的headers屬性裏面的鍵值對進行匹配,通常不建議使用

具體的RoutingKey和BindingKey如何區分下面咱們來講,即便咱們沒有指定具體的交換機發送給一個queue,RabbitMQ服務端也會隨機生成一個交換機,後面會介紹到

四、RoutingKey與BindingKey

下面是RoutingKey和BindingKey的相關圖例:

直接上具體的區分方法吧:

  • 在使用綁定動做的時候,使用的就是BindingKey,涉及的客戶端方法有:channel.exchangeBind、channel.queueBind,對應的AMQP爲:Exchange.Bind、Queue.Bind
  • 在發送消息的時候,其中須要的路由鍵是RoutingKey,涉及客戶端的方法有:channel.basicPublish,對應的AMQP爲:Basic.Publish

4、RabbitMQ的API基礎實戰

這一部分讓咱們開始寫代碼,看看如何用RabbitMQ來發送與接收消息,而且接口參數都是什麼意思,畢竟發送與接收代碼中的參數仍是挺多的,代碼不算整潔,咱們來看看爲啥不整潔,爲啥要用這些個參數。

一、建立連接與信道

兩種方式來建立:

  • 直接多參數設置方式:
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setUsername("root");
connectionFactory.setPassword("root123");
connectionFactory.setVirtualHost("/root");
// 建立鏈接
Connection connection = connectionFactory.newConnection();
// 建立信道
Channel channel = connection.createChannel();
  • 使用uri的方式:
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setUri("amqp://root:root123@127.0.0.1:5672/root");
// 建立鏈接
Connection connection = connectionFactory.newConnection();
// 建立信道
Channel channel = connection.createChannel();

使用注意事項:

  1. 一個connection能夠對應多個channel
  2. 推薦作法是每一個線程建立本身的channel,多個線程共享一個connection
  3. 若是使用一個已經關閉的channel,會拋出ShutdownSignalException
  4. 下面咱們仍是來看看整個RabbitMQ的信道鏈接的模型:

二、聲明交換機

對應的方式是:exchangeDeclare。先來看看Java中API客戶端的源碼,發現註釋已經介紹的很清楚了:

/**
* 激活一個不自動刪除, 不持久化,且沒有拓展參數的交換機
* @param exchange 交換機名稱
* @param type 交換機類型
* @return 確認交換機聲明是否成功
* @throws java.io.IOException io異常
*/
Exchange.DeclareOk exchangeDeclare(String exchange, String type) throws IOException;

/**
* 激活一個不自動刪除, 不持久化,且沒有拓展參數的交換機
* @param exchange 交換機名稱
* @param type 交換機類型(封裝了個enum給你用)
* @return 確認交換機聲明是否成功
* @throws java.io.IOException io異常
*/
Exchange.DeclareOk exchangeDeclare(String exchange, BuiltinExchangeType type) throws IOException;

/**
* 激活一個不自動刪除, 且沒有拓展參數的交換機
* @param exchange 交換機名稱
* @param type 交換機類型
* @param durable true表示要持久化這個交換機 (在服務端重啓的時候還會存在這個交換機)
*/
Exchange.DeclareOk exchangeDeclare(String exchange, String type, boolean durable) throws IOException;

/**
* 激活一個不自動刪除, 且沒有拓展參數的交換機
* @param exchange 交換機名稱
* @param type 交換機類型(封裝了個enum給你用)
* @param durable true表示要持久化這個交換機 (在服務端重啓的時候還會存在這個交換機)
*/
Exchange.DeclareOk exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable) throws IOException;

/**
* 聲明一個交換機
* @param exchange 交換機名稱
* @param type 交換機類型
* @param durable ttrue表示要持久化這個交換機 (在服務端重啓的時候還會存在這個交換機)
* @param autoDelete true表示若是沒有被使用,這個交換機會自動刪除
* @param arguments 其餘的一個些屬性
*/
Exchange.DeclareOk exchangeDeclare(String exchange, 
                        String type, 
                        boolean durable, 
                        boolean autoDelete,
                        Map<String, Object> arguments) throws IOException;

/**
* 聲明一個交換機
* @param exchange 交換機名稱
* @param type 交換機類型(封裝了個enum給你用)
* @param durable ttrue表示要持久化這個交換機 (在服務端重啓的時候還會存在這個交換機)
* @param autoDelete true表示若是沒有被使用,這個交換機會自動刪除
* @param arguments 其餘的一個些屬性
*/
Exchange.DeclareOk exchangeDeclare(String exchange, 
                            BuiltinExchangeType type, 
                            boolean durable, 
                            boolean autoDelete,
                            Map<String, Object> arguments) throws IOException;

/**
* @param internal true表示這個交換機是一個內部的, 不能直接使用客戶端的api進行pubulish操做
*/
Exchange.DeclareOk exchangeDeclare(String exchange,
                          String type,
                          boolean durable,
                          boolean autoDelete,
                          boolean internal,
                          Map<String, Object> arguments) throws IOException;


Exchange.DeclareOk exchangeDeclare(String exchange,
                            BuiltinExchangeType type,
                            boolean durable,
                            boolean autoDelete,
                            boolean internal,
                            Map<String, Object> arguments) throws IOException;

/**
* 和前面的方式差很少,可是沒有返回值,且無論服務端是否有response
*/
void exchangeDeclareNoWait(String exchange,
           String type,
           boolean durable,
           boolean autoDelete,
           boolean internal,
           Map<String, Object> arguments) throws IOException;


void exchangeDeclareNoWait(String exchange,
            BuiltinExchangeType type,
            boolean durable,
            boolean autoDelete,
            boolean internal,
            Map<String, Object> arguments) throws IOException;

/**
* 被動的聲明一個交換機; 判斷這個名字的交換機是否存在
* @throws IOException 若是交換機不存在,會報404的異常
*/
Exchange.DeclareOk exchangeDeclarePassive(String name) throws IOException;

/**
* 刪除一個交換機
* @param exchange 交換機名稱
* @param ifUnused true表示肯定這個交換機是否被使用,只有徹底不被使用的狀況下,才能刪除
* @return a deletion-confirm method to indicate the exchange was successfully deleted
* @throws java.io.IOException if an error is encountered
*/
Exchange.DeleteOk exchangeDelete(String exchange, boolean ifUnused) throws IOException;

/**
* 一樣也是刪除一個交換機,可是沒有返回值,不等待服務端的返回確認
*/
void exchangeDeleteNoWait(String exchange, boolean ifUnused) throws IOException;

其實直接看源碼,一切已經很明瞭了,下面我對極個別一些注意點說說:

  • 自動刪除的前提是至少有一個隊列或者交換機與這個交換機綁定,以後全部與這個交換機綁定的隊列與交換機都與此解綁。就是說剛建立的交換機也是沒有與之綁定的實體,這個時候是不能自動刪除的
  • exchangeDeclarePassive這個方法在實際應用中很是有用,主要用來檢測響應的交換機是否存在,若是存在則返回,不存在則拋出異常:404 channel exception,同時channel也會關閉

三、聲明隊列

使用的方法是queueDeclare,對於隊列的聲明,重載方法少不少,一樣的咱們直接來看看源碼與源碼中的註釋:

/**
 * 聲明一個服務端命名的, 本鏈接獨佔的, 自動刪除的, 不持久化的隊列
 * 建立以後隊列的名稱,能夠從返回對象Queue.DeclareOk中的"queue"字段獲取.
 */
Queue.DeclareOk queueDeclare() throws IOException;

/**
 * 聲明一個隊列
 * @param queue 隊列的名稱
 * @param durable true表示這個隊列要作持久化(服務端重啓以後,這個隊列同樣存在)
 * @param exclusive true表示這個隊列是一個獨佔的(被限制只能當前connection使用)
 * @param autoDelete true表示這個隊列是自動刪除的 (服務端會在這個隊列不被使用的時候將其刪除)
 * @param arguments 隊列建立時候的其餘參數
 */
Queue.DeclareOk queueDeclare(String queue, 
                    boolean durable,
                    boolean exclusive,
                    boolean autoDelete,
                    Map<String, Object> arguments) throws IOException;

/**
 * 相似於queueDeclare方法,可是沒有返回值,而且無論服務端是否有回覆,聲明是否成功
 */
void queueDeclareNoWait(String queue, 
                boolean durable,
                boolean exclusive,
                boolean autoDelete,
                Map<String, Object> arguments) throws IOException;

/**
 * 聲明一個隊列,若是隊列不存在,將會拋出一個異常,而且關閉Channel
 */
Queue.DeclareOk queueDeclarePassive(String queue) throws IOException;

/**
 * 刪除一個隊列, 無論是否這個隊列被使用或者裏面有消息
 * @param queue 隊列名稱
 */
Queue.DeleteOk queueDelete(String queue) throws IOException;

/**
 * 刪除一個隊列
 * @param queue 隊列名稱
 * @param ifUnused true表示這個隊列只有不被使用時候才刪除
 * @param ifEmpty true表示只有這個隊列爲空的時候才刪除
 */
Queue.DeleteOk queueDelete(String queue, boolean ifUnused, boolean ifEmpty) throws IOException;

/**
 * 和queueDelete(String, boolean, boolean)相似,可是沒有返回值,且不等服務端返回
 */
void queueDeleteNoWait(String queue, boolean ifUnused, boolean ifEmpty) throws IOException;

一樣的,介紹幾個重點問題:

  • 解釋下exclusive(獨佔隊列)的細節:
    • true狀況下是基於鏈接(connection)可見的,就是說一個鏈接中的不一樣的信道(channel)可見,不一樣鏈接不可見
    • 當前一個鏈接中建立一個獨佔隊列,其餘鏈接不能在建立同名的隊列了,這一點與普通隊列不一樣
    • 即便這個隊列是持久化的隊列,可是一旦鏈接關閉或者客戶端退出,該獨佔隊列也會被自動刪除
    • 這種隊列比較適合一個客戶端同時發送和讀取消息的應用場景
  • 自動刪除細節:至少有一個消費者鏈接了這個隊列,以後全部與這個隊列鏈接的消費者都斷開是,會自動刪除
  • 若是消費者在同一個信道上面訂閱了另外一個隊列,就沒法再聲明隊列了,必須先取消訂閱,而後將信道設置爲「傳輸」模式,以後才能聲明隊列

四、隊列與交換機的綁定

這一部分,主要是將咱們聲明的交換機與隊列創建關聯的一個動做,涉及到的方法有:

  • queueBind
  • exchangeBind

下面咱們來看看源碼與註釋:

/**
 * 將一個隊列綁定到交換機上面, 不帶任何拓展參數
 * @param queue 隊列名稱
 * @param exchange 交換機名稱
 * @param routingKey 這裏其實就是綁定的鍵
 */
Queue.BindOk queueBind(String queue, String exchange, String routingKey) throws IOException;

/**
 * 將一個隊列綁定到交換機上面
 * @param queue 隊列名稱
 * @param routingKey 綁定鍵
 * @param arguments 其餘的拓展參數
 */
Queue.BindOk queueBind(String queue, String exchange, String routingKey, Map<String, Object> arguments) throws IOException;

/**
 * 無論是否綁定成功就返回的綁定方法
 */
void queueBindNoWait(String queue, String exchange, String routingKey, Map<String, Object> arguments) throws IOException;


/**
 * 將隊列與一個交換機經過綁定鍵綁定在一塊兒的關係解除
 * @param queue 隊列名稱
 * @param exchange 交換機名稱
 * @param routingKey 綁定鍵
 */
Queue.UnbindOk queueUnbind(String queue, String exchange, String routingKey) throws IOException;

Queue.UnbindOk queueUnbind(String queue, String exchange, String routingKey, Map<String, Object> arguments) throws IOException;


/**
 * 將一個交換機綁定到另一個交換機上面
 * @param destination 這個是消息要到達的交換機名稱
 * @param source 這個是消息從哪裏來的交換機名稱
 * @param routingKey 綁定鍵
 */
Exchange.BindOk exchangeBind(String destination, String source, String routingKey) throws IOException;


Exchange.BindOk exchangeBind(String destination, String source, String routingKey, Map<String, Object> arguments) throws IOException;


void exchangeBindNoWait(String destination, String source, String routingKey, Map<String, Object> arguments) throws IOException;

主要使用的參數,經過註釋已經很明瞭了,其餘的注意點暫時沒有。

五、發送消息

發送消息這裏對應的方法,主要就是basicPublish這個方法,先來看看他的API接口:

/**
 * 發佈一個消息到服務端。
 *
 * 若是發佈的消息到交換機不存在,會拋出一個channel-level protocol的異常,而且會關閉這個channel
 *
 * @param exchange 消息要發佈到的交換機
 * @param routingKey 這個是路由鍵(會匹配具體的綁定鍵)
 * @param 一個消息其餘的屬性 - 例如headers
 * @param body 消息體
 */
void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;

/**
 * 發佈一個消息到服務端。
 *
 * @param exchange 消息要發佈到的交換機
 * @param routingKey 這個是路由鍵
 * @param mandatory 這個後面文章會介紹
 * @param 一個消息其餘的屬性 - 例如headers
 * @param 消息體
 */
void basicPublish(String exchange, String routingKey, boolean mandatory, BasicProperties props, byte[] body)
        throws IOException;

/**
 * 發佈一個消息到服務端。
 *
 * @param mandatory 後面文章介紹
 * @param immediate 後面文章介紹
 */
void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body)
        throws IOException;

這裏能夠看到的是,mandatory、immediate這兩個入參很關鍵,後面會專門講,由於這兩個入參關係到了消息的可靠性和穩定性。下面是一些能夠添加屬性的代碼示例:

channel.basicPublish("ExchargeName", "RoutingKey",
                MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());

channel.basicPublish("ExchargeName", "RoutingKey",
                new AMQP.BasicProperties.Builder()
                .contentType("test/plain")
                .deliveryMode(2)
                .priority(1)
                .userId("hiden")
                .build(), msg.getBytes());

下面介紹下經過Builder模式構建發消息時候的屬性,有助於瞭解消息的可靠性:

  • deliveryMode設置成2,表示消息會被持久化,存入磁盤
  • priority設置成了0表示優先級爲0
  • content-type爲text/plain表示消息體爲文本類型

六、消費消息

消費消息來講,相比較來講要比單純的接口調用要複雜點,由於涉及到方法的註冊等,還存在兩種消費模式:

  • 推模式
  • 拉模式

推模式

在推模式中,能夠經過持續訂閱方式來消費消息,主要經過兩個類:

  • com.rabbitmq.client.Consumer(接口)
  • com.rabbitmq.client.DefaultConsumer(上面那個接口的默認實現類)

設計這兩個消費對象的類的方法是:

/**
 * 啓動一個非本地, 非排他, 要手動ack的, 服務端自定義的消費者標籤名稱的消費者
 */
String basicConsume(String queue, Consumer callback) throws IOException;

/**
 * 啓動一個非本地, 非排他, 服務端自定義的消費者標籤名稱的消費者
 */
String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException;

/**
 * 啓動一個非本地, 非排他, 服務端自定義的消費者標籤名稱的消費者(固然帶了個參數map)
 */
String basicConsume(String queue, boolean autoAck, Map<String, Object> arguments, Consumer callback) throws IOException;

/**
 * 啓動一個非本地, 非排他的消費者(固然帶了個參數map)
 * @param queue the name of the queue
 * @param autoAck true if the server should consider messages
 * acknowledged once delivered; false if the server should expect
 * explicit acknowledgements
 * @param consumerTag a client-generated consumer tag to establish context
 * @param callback an interface to the consumer object
 * @return the consumerTag associated with the new consumer
 * @throws java.io.IOException if an error is encountered
 * @see com.rabbitmq.client.AMQP.Basic.Consume
 * @see com.rabbitmq.client.AMQP.Basic.ConsumeOk
 * @see #basicConsume(String, boolean, String, boolean, boolean, Map, Consumer)
 */
String basicConsume(String queue, boolean autoAck, String consumerTag, Consumer callback) throws IOException;

/**
 * 啓動一個消費者
 * @param queue 隊列名稱
 * @param autoAck 消費者是否自動確認
 * @param consumerTag 消費者標籤
 * @param noLocal true表示不能把同一個connection中生產者發送的消息傳送給這個connection中的消費者
 * @param exclusive 是不是拍他的消費者
 * @param arguments 設置消費者的其餘參數
 * @param callback 設置消費者的回調函數,用來處理RabbitMQ推送過來的消息
 */
String basicConsume(String queue,
            boolean autoAck,
            String consumerTag,
            boolean noLocal,
            boolean exclusive,
            Map<String, Object> arguments,
            Consumer callback) throws IOException;

下面是一段消費者的真實代碼:

channel.basicQos(64);
channel.basicConsume("queueName", false, "tag", new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag,
                               Envelope envelope,
                               AMQP.BasicProperties properties,
                               byte[] body) throws IOException {
        String routingKey = envelope.getRoutingKey();
        String contentType = properties.getContentType();
        long deliveryTag = envelope.getDeliveryTag();
        String msg = new String(body, "UTF-8");
        // 這裏進行消息處理
        channel.basicAck(deliveryTag,false);
    }
});

具體的DefaultConsumer類裏面還有其餘的幾個方法,對應不一樣的消費動做回調的函數:

  • handleConsumeOk(String consumerTag):會在其餘方法以前調用,返回消費者標籤名稱
  • handleCancelOk(String consumerTag):若是手動調用Channel#basicCancel會被觸發回調
  • handleCancel(String consumerTag):除了手動調用Channel#basicCancel這個方法以外的消費取消動做(例如消費隊列被刪除)會觸發這個方法
  • handleShutdownSignal(String consumerTag, ShutdownSignalException sig):當connection或者channel關閉了,會被觸發
  • handleRecoverOk(String consumerTag):後面介紹

若是手動調用了Channel#basicCancel,觸發方法的序列是:

  1. handleConsumeOk
  2. handleDelivery
  3. handleCancelOk

另外還要注意的是:這些個被觸發的對象都被分配到了不一樣的線程池上,因此要注意線程安全性的問題

拉模式

拉模式,有別於推模式的被動獲取消息,咱們能夠直接在業務邏輯中,主動獲取RabbitMQ服務端中的消息,下面是總體的時序圖

對應的代碼是:

GetResponse response = channel.basicGet("QueueName", false);
System.out.println(response.getBody());
channel.basicAck(response.getEnvelope().getDeliveryTag(),false);

比較下,推模式和拉模式:

  • 只想從隊列得到單條消息而不是持續訂閱,建議使用拉模式進行消費
  • 若是要實現高吞吐量,消費者應該使用推模式
  • 具體的兩種模式的細節對比請看一個外部連接:推模式和拉模式比較

七、消息的確認與拒絕

這裏我自我感受很關鍵,由於涉及到消息消費的可靠性。首先讓咱們來看看ack機制:

  • autoAck爲true時候RabbitMQ會自動吧發出去的消息從內存或者磁盤中刪除,無論消費者是否真正消費了這些消息
  • autoAck爲false的時候,RabbitMQ會等待消費者顯示的回覆確認信號的時候才從內存和磁盤中刪除
  • 固然,這種刪除是異步,先打上標記,以後再刪除

總體上,RabbitMQ的服務端存在兩種隊列:

  • 等待投遞給消費者的隊列
  • 已經投遞給消費者,但尚未收到消費者確認信號的消息

另外,RabbitMQ不會主動刪除或者過時未確認的消息,判斷是否從新投遞給消費者的惟一依據是消費該消息的消費者鏈接是否已經斷開。若是長時間沒回復ack,直到鏈接斷開,此消息會從新進入隊列,等待投遞給下一個消費者,固然也多是當前消費者。

另一個點,是咱們能夠直接拒絕一個消息,使用的方法是:

void basicReject(long deliveryTag, boolean requeue) throws IOException;

第一個參數好理解,重點說下第二個,requeue:

  • true:RabbitMQ會將這個拒絕的消息從新存入隊列,以便給下一個訂閱者
  • false:RabbitMQ當即會把消息從隊列中移除

還有個方法,能夠請求RabbitMQ從新發送還未被確認的消息,有下面兩個方法

Basic.RecoverOk basicRecover() throws IOException;

Basic.RecoverOk basicRecover(boolean requeue) throws IOException;

再次涉及到requeue參數:

  • true:未被確認的消息從新加入到隊列中,可能會被不一樣消費者消費
  • false:一樣是未被確認的消息從新加入隊列,可是同一條消息會被分配給以前相同的消費者
  • 默認不帶參數的方法,requeue值爲true

八、關閉鏈接

關閉涉及到的有兩個對象的關閉:

  • connection
  • channel

關閉的方法有:

  • void close()
  • void close(int closeCode, String closeMessage)

整個AMQP協議中connection與channel聲明週期以下:

  • open:開啓狀態,對象可使用
  • closing:正在關閉,顯示調用關閉方法,產生一個關閉請求,等待關閉動做完成
  • closed:已經關閉

固然咱們能夠註冊關閉以後觸發的監聽器:

connection.addShutdownListener(new ShutdownListener() {
    @Override
    public void shutdownCompleted(ShutdownSignalException cause) {
        // 業務邏輯
        Method reason = cause.getReason();
    }
});

5、總結

接下倆大概會有兩篇,一篇是功能性的拔高,一篇是RabbitMQ的原理相關,若是可能我會說說集羣構建與鏡像

相關文章
相關標籤/搜索