RabbitMQ不講武德,發個消息也這麼多花招

前言

本篇博客已被收錄GitHub:https://zhouwenxing.github.io/
文中所涉及的源碼也已被收錄GitHub:https://github.com/zhouwenxing/lonely-wolf-note (message-queue模塊)

使用消息隊列必需要保證生產者發送的消息能被消費者所接收,那麼生產者如何接收消息呢?下圖是 RabbitMQ 的工做模型:java

上圖中生產者會將消息發送到交換機 Exchange 上,再由 Exchange 發送給不一樣的 Queue ,而 Queue 是用來存儲消息隊列,那麼假若有多個生產者,那麼消息發送到交換機 Exchange 以後,應該如何和 Queue 之間創建綁定關係呢?git

如何使用 RabbitMQ 發送消息

RabbitMQ 中提供了3種發送消息的路由方式。github

直連 Direct 模式

經過指定一個精確的綁定鍵來實現 Exchange(交換機) 和 Queue(消息隊列) 之間的綁定,也就是說,當建立了一個直連類型的交換機時,生產者在發送消息時攜帶的路由鍵(routing key),必須與某個綁定鍵(binding key)徹底匹配時,這條消息纔會從交換機路由到知足路由關係消息隊列上,而後消費者根據各自監聽的隊列就能夠獲取到消息(以下如吐所示,Queue1 綁定了 order ,那麼這時候發送消息的路由鍵必須爲 order 才能分配到 Queue1 上):spring

主題 Topic 模式

Direct 模式會存在必定的侷限性,有時候咱們須要按類型劃分,好比訂單類路由到一個隊列,產品類路由到另外一個隊列,因此在 RabbitMQ 中,提供了主題模式來實現模糊匹配。使用主題類型鏈接方式支持兩種通配符:編程

直連方式只能精確匹配,有時候咱們須要實現模糊匹配,那麼這時候就須要主題類型的鏈接方式,在 RabbitMQ 中,使用主題類型鏈接方式支持兩種通配符:api

  • :表示 0 個或者多個單詞

  • *:表示 1 個單詞

PS:使用通配符時,單詞指的是用英文符號的小數點 . 隔開的字符,如:abc.def 就表示有 abcdef 兩個單詞。安全

下圖所示中,由於 Queue1 綁定了 order.#,因此當發送消息的路由鍵爲 order 或者 order.xxx時均可以使得消息分配到 Queue1 上:springboot

廣播 Fanout 模式

當咱們定義了一個廣播類型的交換機時就不須要指定綁定鍵,並且生產者發送消息到交換機上時,也不須要攜帶路由鍵,此時當消息到達交換機時,全部與其綁定的隊列都會收到消息,這種模式的消息發送適用於消息通知類需求。服務器

以下如所示,Queue1Queue2Queue3 三個隊列都綁定到了一個 Fanout 交換機上,那麼當 Fanout Exchange 收到消息時,會同時將消息發送給三個隊列:微信

RabbitMQ 提供的後臺管理系統中也能查詢到建立的交換機和隊列等信息,而且能夠經過管理後臺直接建立隊列和交換機:

消息發送實戰

下面經過一個 SpringBoot 例子來體會一下三種發送消息的方式。

  • 一、application.yml 文件中添加以下配置:
spring:
  rabbitmq:
    host: ip
    port: 5672
    username: admin
    password: 123456
  • 二、新增一個 RabbitConfig 配置類(爲了節省篇幅省略了包名和導入 ),此類中聲明瞭三個交換機和三個隊列,並分別進行綁定:
@Configuration
public class RabbitConfig {
    //直連交換機
    @Bean("directExchange")
    public DirectExchange directExchange(){
        return new DirectExchange("LONGLY_WOLF_DIRECT_EXCHANGE");
    }

    //主題交換機
    @Bean("topicExchange")
    public TopicExchange topicExchange(){
        return new TopicExchange("LONGLY_WOLF_TOPIC_EXCHANGE");
    }

    //廣播交換機
    @Bean("fanoutExchange")
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange("LONGLY_WOLF_FANOUT_EXCHANGE");
    }


    @Bean("orderQueue")
    public Queue orderQueue(){
        return new Queue("LONGLY_WOLF_ORDER_QUEUE");
    }

    @Bean("userQueue")
    public Queue userQueue(){
        return new Queue("LONGLY_WOLF_USER_QUEUE");
    }

    @Bean("productQueue")
    public Queue productQueue(){
        return new Queue("LONGLY_WOLF_PRODUCT_QUEUE");
    }

    //Direct交換機和orderQueue綁定,綁定鍵爲:order.detail
    @Bean
    public Binding bindDirectExchange(@Qualifier("orderQueue") Queue queue, @Qualifier("directExchange") DirectExchange directExchange){
        return BindingBuilder.bind(queue).to(directExchange).with("order.detail");
    }

    //Topic交換機和userQueue綁定,綁定鍵爲:user.#
    @Bean
    public Binding bindTopicExchange(@Qualifier("userQueue") Queue queue, @Qualifier("topicExchange") TopicExchange topicExchange){
        return BindingBuilder.bind(queue).to(topicExchange).with("user.#");
    }

    //Fanout交換機和productQueue綁定
    @Bean
    public Binding bindFanoutExchange(@Qualifier("productQueue") Queue queue, @Qualifier("fanoutExchange") FanoutExchange fanoutExchange){
        return BindingBuilder.bind(queue).to(fanoutExchange);
    }
}
  • 三、新建一個消費者 ExchangeConsumer 類,不一樣的方法實現分別監聽不一樣的隊列:
@Component
public class ExchangeConsumer {

    /**
     * 監聽綁定了direct交換機的的消息隊列
     */
    @RabbitHandler
    @RabbitListener(queues = "LONGLY_WOLF_ORDER_QUEUE")
    public void directConsumer(String msg){
        System.out.println("direct交換機收到消息:" + msg);
    }

    /**
     * 監聽綁定了topic交換機的的消息隊列
     */
    @RabbitHandler
    @RabbitListener(queues = "LONGLY_WOLF_USER_QUEUE")
    public void topicConsumer(String msg){
        System.out.println("topic交換機收到消息:" + msg);
    }

    /**
     * 監聽綁定了fanout交換機的的消息隊列
     */
    @RabbitHandler
    @RabbitListener(queues = "LONGLY_WOLF_PRODUCT_QUEUE")
    public void fanoutConsumer(String msg){
        System.out.println("fanout交換機收到消息:" + msg);
    }
}
  • 四、新增一個 RabbitExchangeController 類來做爲生產者,進行消息發送:
@RestController
@RequestMapping("/exchange")
public class RabbitExchangeController {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping(value="/send/direct")
    public String sendDirect(String routingKey,@RequestParam(value = "msg",defaultValue = "no direct message") String msg){
        rabbitTemplate.convertAndSend("LONGLY_WOLF_DIRECT_EXCHANGE",routingKey,msg);
        return "succ";
    }
    @GetMapping(value="/send/topic")
    public String sendTopic(String routingKey,@RequestParam(value = "msg",defaultValue = "no topic message") String msg){
        rabbitTemplate.convertAndSend("LONGLY_WOLF_TOPIC_EXCHANGE",routingKey,msg);
        return "succ";
    }
    @GetMapping(value="/send/fanout")
    public String sendFaout(String routingKey,@RequestParam(value = "msg",defaultValue = "no faout message") String msg){
        rabbitTemplate.convertAndSend("LONGLY_WOLF_FANOUT_EXCHANGE",routingKey,msg);
        return "succ";
    }
}
  • 五、啓動服務,當咱們調用第一個接口時候,路由鍵和綁定鍵 order.detail 精確匹配時,directConsumer 就會收到消息,一樣的,調用第二接口時,路由鍵知足 user.# 時,topicConsumer 就會收到消息,而只要調用第三個接口,不管是否指定路由鍵,fanoutConsumer 都會收到消息。

消息過時了怎麼辦

簡單的發送消息咱們學會了,難道這就能讓咱們就此止步了嗎?顯然是不能的,要玩就要玩高級點,因此接下來讓咱們給消息加點佐料。

TTL(Time-To-Live)

TTL 即 一條消息在隊列中的最大存活時間。在一條在隊列中超過配置的 TTL 的消息稱爲已死消息。可是須要注意的是,已死消息並不能保證會當即從隊列中刪除,可是能保證已死的消息不會被投遞出去。

設置 TTL 的方式有兩種:

  • 一、給隊列設置 x-message-ttl,此時全部被投遞到隊列中的消息,都會在到達 TTL 時成爲已死消息。

    這種狀況就會出現當一條消息同時路由到 N 個帶有 TTL 時間的隊列,而因爲每一個隊列的 TTL 不必定相同,因此同一條消息在不一樣的隊列中可能會在不一樣時間死亡或者不會死亡(未設置 TTL ),因此一個隊列中的消息死亡不會影響到其餘隊列中的消息。

  • 二、單獨給某一條消息設置過時時間。

    此時須要注意的時,當消息達到 TTL 時,可能不會立刻被丟棄,由於只有處於隊列頭部消息過時後纔會被丟棄,假如隊列頭部的消息沒有設置 TTL,而第 2 條消息設置了 TTL,那麼即便第 2 條消息成爲了已死消息,也必需要等到隊列頭部的消息被消費以後纔會被丟棄,而已死消息在被丟棄以前也會被計入統計數據(好比隊列中的消息總數)。因此爲了更好的利用 TTL 特性,建議讓消費者在線消費消息,這樣才能確保消息更快的被丟棄,防止消息堆積。

PS:消息過時和消費者傳遞之間可能存在天然的競爭條件。例如,消息可能在發送途中(未到達消費者)過時。

隊列的生存

TTL 針對消息不一樣的是,咱們能夠經過設置過時時間屬性 `x-expires`` 來處理隊列,當在指定過時時間內內未使用隊列時,服務器保證將刪除隊列(可是沒法保證在過時時間事後隊列將以多快的速度被刪除)。

TTL 和過時時間實戰

  • 一、在上面定義的 RabbitConfig 類中,再新增一個 TTL 隊列並將其綁定到 direct 交換機上:
@Bean("ttlQueue")
public Queue ttlQueue(){
    Map<String, Object> map = new HashMap<String, Object>();
    map.put("x-message-ttl", 5000);//隊列中全部消息5秒後過時
    map.put("x-expires", 100000);//隊列閒置10秒後被刪除
    //參數1-name:隊列名稱
    //參數2-durable:是否持久化
    //參數3-exclusive:是否排他。設置爲true時,則該隊列只對聲明當前隊列的鏈接(Connection)可用,一旦鏈接斷開,隊列自動被刪除
    //參數4-autoDelete:是否自動刪除。前提是必需要至少有一個消費者先連上當前隊列,而後當全部消費者都斷開鏈接以後,隊列自動被刪除
    return new Queue("LONGLY_WOLF_TTL_QUEUE",false,false,false,map);
    }

//ttl隊列綁定到direct交換機(交換機和隊列能夠多對多)
@Bean
public Binding ttlBindFanoutExchange(@Qualifier("ttlQueue") Queue queue, @Qualifier("directExchange") DirectExchange directExchange){
    return BindingBuilder.bind(queue).to(directExchange).with("test.ttl");
}
  • 二、在 ExchangeConsumer 消費者類上監聽 TTL 隊列(和其餘消費者不一樣的時候,這裏爲了打印出隊列屬性,改爲了經過 Message 對象來接收消息 ):
/**
 * 監聽ttl消息隊列
*/
@RabbitHandler
@RabbitListener(queues = "LONGLY_WOLF_TTL_QUEUE")
public void ttlConsumer(Message message){
    System.out.println("ttl隊列收到消息:" + new String(message.getBody()));
    System.out.println("ttl隊列收到消息:" + JSONObject.toJSONString(message.getMessageProperties()));
}
  • 三、在生產者類 RabbitExchangeController 上新增一個接口用來測試發送過時消息,這裏經過 MessageProperties 設置的 expiration 屬性就至關因而給單條消息設置了一個 TTL
@GetMapping(value="/send/ttl")
public String sendTtl(String routingKey,@RequestParam(value = "msg",defaultValue = "no ttl message") String msg){
    MessageProperties messageProperties = new MessageProperties();
    messageProperties.setExpiration("5000");//5秒後被刪除,即TTL屬性(針對單條消息)
    Message message = new Message(msg.getBytes(), messageProperties);
    rabbitTemplate.convertAndSend("LONGLY_WOLF_DIRECT_EXCHANGE",routingKey,message);
    return "succ";
}
  • 四、此時若是咱們把消費者的監聽去掉以後再發送消息,在管理後臺就能夠看到 5 秒以後消息會被刪除,10 秒以後隊列會被刪除。

PS:若是同時給隊列和單條消息都設置了 TTL,則會以時間短的爲主。

其餘屬性

隊列中還有其餘一些屬性能夠設置,在這裏咱們就不一一舉例了:

  • x-message-ttl:隊列中消息的存活時間(毫秒),達到TTL的消息可能會被刪除。
  • x-expires:隊列在多長時間(毫秒)沒有被訪問之後會被刪除。
  • x-max-length:隊列中的最大消息數。
  • x-max-length-bytes:隊列的最大容量(bytes)。
  • overflow:隊列溢出以後的策略。主要能夠配置以下參數:reject-publish - 直接丟棄最近發佈的消息,如若啓用了 publisher confirm(發佈者確認),發佈者將經過發送 basic.nack 消息通知拒絕,若是當前隊列綁定有多個消費者,則消息在收到 basic.nack 拒絕通知後,仍然會被髮布到其餘隊列;drop-head - 丟棄隊列頭部消息(集羣模式下只支持這種策略) reject-publish-dlx - 最近發佈的消息會進入死信隊列。
  • x-dead-letter-exchange:隊列的死信交換機。
  • x-dead-letter-routing-key:死信交換機的路由鍵。
  • x-single-active-consumer:true/false。表示是否最多隻容許一個消費者消費,若是有多個消費者同時綁定,則只會激活第一個,除非第一個消費者被取消或者死亡,纔會自動轉到下一個消費者。
  • x-max-priority:隊列中消息的最大優先級, 消息的優先級不能超過它。
  • x-queue-mode:3.6.0 版本引入的,主要是爲了實現惰性加載。隊列將收到的消息儘量快的進行持久化操做到磁盤上,而後只有在用戶請求的時候纔會加載到 RAM 內存。這個參數支持兩個值:defaultlazy。當不進行設置的時候,就是默認爲 default,不作任何改變;當設置爲 lazy 就會進行懶加載。
  • x-queue-master-locator:爲了保證消息的 FIFO,因此在高可用集羣模式下須要選擇一個節點做爲主節點。這個參數主要有三種模式:min-masters- 託管最小數量的綁定主機的節點;client-local- 選擇聲明的隊列已經鏈接到客戶端的節點;random- 隨機選擇一個節點。

神奇的死信隊列(Dead Letter)

上面的參數介紹中,提到了死信隊列,這又是什麼新鮮的東西呢?其實從名字上來看很好理解,就是指的已死的消息,或者說無家可歸的消息。一個消息進入死信隊列,主要有如下三種條件:

  • 一、消息被消費者拒絕而且未設置重回隊列。

  • 二、消息過時(即設置了 TTL)。

  • 三、隊列達到最大長度,超過了 Max lengthMax length bytes,則隊列頭部的消息會被髮送到死信隊列。

死信隊列實戰

  • 一、在上面定義的 RabbitConfig 類中,定義一個死信交換機,並將以前的 ttl 隊列新增一個屬性 x-dead-letter-exchange,最後再將死信隊列和死信交換機進行綁定:
//直連死信交換機(也能夠用topic或者fanout類型交換機)
@Bean("deatLetterExchange")
public DirectExchange deatLetterExchange(){
    return new DirectExchange("LONGLY_WOLF_DEAD_LETTER_DIRECT_EXCHANGE");
}
@Bean("ttlQueue")
public Queue ttlQueue(){
    Map<String, Object> map = new HashMap<String, Object>();
    map.put("x-message-ttl", 5000);//隊列中全部消息5秒後過時
    map.put("x-dead-letter-exchange", "LONGLY_WOLF_DEAD_LETTER_DIRECT_EXCHANGE");//已死消息會進入死信交換機
    return new Queue("LONGLY_WOLF_TTL_QUEUE",false,false,false,map);
}
//死信隊列
@Bean("deadLetterQueue")
public Queue deadLetterQueue(){
    return new Queue("LONGLY_WOLF_DEAD_LETTER_QUEUE");
}
  • 二、在 ExchangeConsumer 消費者類上將監聽 TTL 隊列的監聽取消,註釋掉監聽:
/**
     * 監聽ttl消息隊列
     */
    @RabbitHandler
//    @RabbitListener(queues = "LONGLY_WOLF_TTL_QUEUE")
    public void ttlConsumer(Message message){
        System.out.println("ttl隊列收到消息:" + new String(message.getBody()));
        System.out.println("ttl隊列收到消息:" + JSONObject.toJSONString(message.getMessageProperties()));
    }
  • 三、此時 TTL 隊列無消費者,而且設置了消息的 TTL5 秒,因此 5 秒以後就會進入死信隊列。
  • 五、訪問接口:http://localhost:8080/exchange/send/ttl?routingKey=test&msg=測試死信隊列,發送消息以後,等待 5 秒就查看消息,進入死信隊列:

消息真的發送成功了嗎

瞭解了消息的基本發送功能以後,就能夠高枕無憂了嗎?消息發出去以後,消費者真的收到消息了嗎?消息發送以後如何知道消息發送成功了?假如發送消息路由錯了致使沒法路由到隊列怎麼辦?你們是否是都有這些疑問呢?彆着急,接下來就讓咱們來一一來分析一下。

一條消息從生產者開始發送消息到消費者消費完消息主要能夠分爲如下 4 個階段:

  • 一、生產者將消息發送到 Broker (即:RabbitMQ 的交換機)。
  • 二、交換機將消息路由到隊列。
  • 三、隊列收到消息後存儲消息。
  • 四、消費者從隊列獲取消息進行消費。

接下來咱們就從這 4 個步驟上來逐步分析 RabbitMQ 如何保證消息發送的可靠性。

消息真的到達交換機了嗎

當咱們發送一條消息以後,如何知道對方收到消息了?這就和咱們寫信同樣,寫一封信出去,如何知道對方收到咱們寄出去的信?最簡單的方式就是對方也給咱們回一封信,咱們收到對方的回信以後就能夠知道本身的信已經成功寄達。

RabbitMQ 中服務端也提供了 2 種方式來告訴客戶端(生產者)是否收到消息:Transaction(事務)模式和 Confirm(確認)模式。

Transaction(事務) 模式

Java API 編程中開啓事務只須要增長如下代碼便可:

try {
     channel.txSelect();//開啓事務
     channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
     channel.txCommit();//提交事務
 }catch (Exception e){
     channel.txRollback();//消息回滾
 }

Spring Boot 中須要對 RabbitTemplate 進行事務設置:

@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
    RabbitTemplate rabbitTemplate = new RabbitTemplate();
    rabbitTemplate.setConnectionFactory(connectionFactory);
    rabbitTemplate.setChannelTransacted(true);//開啓事務
    return rabbitTemplate;
}

爲了瞭解 RabbitMQ 當中事務機制的原理,咱們在 Wireshark 中輸入 ip.addr==192.168.1.1 對本地 ip 進行抓包,發送一條消息以後,抓到以下數據包:

經過數據包,能夠得出開啓事務以後,除了本來的發送消息以外,多出了開啓事務和事務提交的通訊:

開啓事務以後,有一個致命的缺點就是發送消息流程會被阻塞。也就是說必須一條消息發送成功以後,纔會容許發送另外一條消息。正由於事務模式有這個缺點,因此通常狀況下並不建議在生產環境開啓事務,那麼有沒有更好的方式來實現消息的送達確認呢?那麼就讓咱們再看看Confirm(確認)模式。

Confirm(確認)模式

消息確認模式又能夠分爲三種(事務模式和確認模式沒法同時開啓):

  • 單條確認模式:發送一條消息,確認一條消息。此種確認模式的效率也不高。
  • 批量確認模式:發送一批消息,而後同時確認。批量發送有一個缺點就是同一批消息一旦有一條消息發送失敗,就會收到失敗的通知,須要將這一批消息所有重發。
  • 異步確認模式:一邊發送一邊確認,消息可能被單條確認也可能會被批量確認。

Java API 實現確認模式

  • 單條消息確認模式
channel.confirmSelect();//開啓確認模式
channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
if (channel.waitForConfirms()){//wait.ForConfirms(long time)方法能夠指定等待時間
    System.out.println("消息確認發送成功");
}
  • 批量確認模式
channel.confirmSelect();//開啓確認模式
//批量發送
for (int i=0;i<10;i++){
    channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
}
try{
    channel.waitForConfirmsOrDie();
}catch (IOException e){//只要有1條消息未被確認,就會拋出異常
    System.out.println("有消息發送失敗了");
}
  • 異步確認模式
channel.addConfirmListener(new ConfirmListener() {
    /**
      * 已確認消息,即發送成功後回調
      * @param deliveryTag -惟一標識id(即發送消息時獲取到的nextPublishSeqNo)
      * @param multiple - 是否批量確認,當multiple=true,表示<=deliveryTag的消息被批量確認,multiple=false,表示只確認了單條
      */
    @Override
    public void handleAck(long deliveryTag, boolean multiple) throws IOException {//成功回調
        System.out.println("收到確認消息了");
        //TODO 能夠作一些想作的事
    }

    /**
       * 發送失敗消息後回調
       * @param deliveryTag -惟一標識id(即發送消息時獲取到的nextPublishSeqNo)
       * @param multiple - 是否批量確認,當multiple=true,表示<=deliveryTag的消息被批量確認,multiple=false,表示只確認了單條
       */
    @Override
    public void handleNack(long deliveryTag, boolean multiple) throws IOException {//失敗回調
        if (multiple) {//批量確認,<deliveryTag的消息都發送失敗
            //TODO 消息重發?
        } else {//非批量,=deliveryTag的消息發送失敗
            //TODO 消息重發?
        }
    }
});

channel.confirmSelect();//開啓確認模式
for (int i=0;i<10;i++){//批量發送
    long nextSeqNo = channel.getNextPublishSeqNo();//獲取發送消息的惟一標識(從1開始遞增)
    //TODO 能夠考慮把消息id存起來
    channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
}

SpringBoot 實現確認模式

經過配置文件 spring.rabbitmq.publisher-confirm-type 參數進行配置確認(舊版本是 spring.rabbitmq.publisher-confirms 參數)。

  • 一、新增配置文件屬性配置
spring:
  rabbitmq:
    publisher-confirm-type: correlated # none-表示禁用回調(默認) simple- 參考RabbitExchangeController#sendWithSimpleConfirm()方法
  • 二、RabbitConfig 配置文件中修改以下:
@Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
        RabbitTemplate rabbitTemplate = new RabbitTemplate();
        rabbitTemplate.setConnectionFactory(connectionFactory);
//        rabbitTemplate.setChannelTransacted(true);//開啓事務
        //消息是否成功發送到Exchange
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                if (!ack){//消息發送失敗
                    System.out.println("消息發送失敗,緣由爲:" + cause);
                    return;
                }
                //消息發送成功
                System.out.println("消息發送成功");
            }
        });
        return rabbitTemplate;
    }

這樣當咱們發送消息成功以後,就會收到回調。

  • 三、當上面的參數配置修改成 simple,則須要在發送消息的時候使用 invoke 調用 waitForConfirms 或者 waitForConfirmsOrDie 方法來確認是否發送成功:
@GetMapping(value="/send/confirm")
 public String sendWithSimpleConfirm(String routingKey,@RequestParam(value = "msg",defaultValue = "no direct message") String msg){
       //使用waitForConfirms方法確認
        boolean sendFlag = rabbitTemplate.invoke(operations -> {
            rabbitTemplate.convertAndSend(
                    "LONGLY_WOLF_DIRECT_EXCHANGE",
                    "routingKey",
                    msg
            );
            return rabbitTemplate.waitForConfirms(5000);
        });
        //也可使用waitForConfirmsOrDie方法確認
        boolean sendFlag2 = rabbitTemplate.invoke(operations -> {
            rabbitTemplate.convertAndSend(
                    "LONGLY_WOLF_DIRECT_EXCHANGE",
                    "routingKey",
                    msg
            );
            try {
                rabbitTemplate.waitForConfirmsOrDie(5000);
            }catch (Exception e){
                return false;
            }
            return true;
        });
        System.out.println(sendFlag);
        System.out.println(sendFlag2);
        return "succ";
    }

消息沒法從交換機路由到正確的隊列怎麼辦

上面經過事務或者確認機制確保了消息成功發送到交換機,那麼接下來交換機會負責將消息路由到隊列,這時候假如隊列不存在或者路由錯誤就會致使消息路由失敗,這又該如何保證呢?

一樣的,RabbitMQ 中也提供了 2 種方式來確保消息能夠正確路由到隊列:開啓監聽模式或者經過新增備份交換機模式來備份數據。

監聽回調

上面介紹的是消息是否發送到交換機的回調,而從交換機路由到隊列,一樣能夠開啓確認模式。

Java API 方式開啓監聽模式

下面就是開啓監聽主要代碼,爲了節省篇幅,省略了其他不相干代碼(完成代碼已上傳至 GitHub

channel.addReturnListener(new ReturnListener() {
     @Override
     public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
         System.out.println("收到未路由到隊列的回調消息:" + new String(body));
     }
 });
//注意這裏的第三個參數,mandatory須要設置爲true(發送一個錯誤的路由,便可收到回調)
channel.basicPublish(EXCHANGE_NAME,"ERROR_ROUTING_KEY",true,null,msg.getBytes());

Spring Boot 開啓監聽模式

RabitConfig 類中添加以下配置:

@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
    RabbitTemplate rabbitTemplate = new RabbitTemplate();
    rabbitTemplate.setConnectionFactory(connectionFactory);

    rabbitTemplate.setMandatory(true);//開啓監聽回調
    //消息是否成功被路由到隊列,沒有路由到隊列時會收到回調(原setReturnCallback在2.0版本已過時)
    rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
        @Override
        public void returnedMessage(ReturnedMessage returnedMessage) {
            System.out.println("收到未路由到隊列的回調消息:" + new String(returnedMessage.getMessage().getBody()));
        }
    });
    return rabbitTemplate;
}

備份交換機

除了開啓監聽的方式,還能夠經過定義備份交換機的方式來實現,當原交換機沒法正確路由到隊列時,則會進入備份交換機,再由備份交換機路由到正確隊列(要注意區分備份交換機和死信交換機的區別)。

Java API 實現備份交換機

下面就是一個實現備份交換機的例子,由於這裏備份交換機定義的是 Topic 類型,全部路由必須知足定義好的路由,實際使用中通常會設置會 Fanout,由於沒法預測錯誤的路由究竟是多少:

//聲明交換機且指定備份交換機
Map<String,Object> argMap = new HashMap<String,Object>();
argMap.put("alternate-exchange","TEST_ALTERNATE_EXCHANGE");
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT,false,false,argMap);
//隊列和交換機進行綁定
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,ROUTEING_KEY);

//聲明備份交換機和備份隊列,並綁定(爲了防止收不到消息,備份交換機通常建議設置爲Fanout類型)
channel.queueDeclare("BAK_QUEUE", false, false, false, null);
channel.exchangeDeclare("TEST_ALTERNATE_EXCHANGE", BuiltinExchangeType.TOPIC);
channel.queueBind("BAK_QUEUE","TEST_ALTERNATE_EXCHANGE","ERROR.#");

String msg = "I'm a bak exchange msg";
channel.basicPublish(EXCHANGE_NAME,"ERROR.ROUTING_KEY",null,msg.getBytes());

Spring Boot 實現備份交換機

Spring Boot 實現備份交換機原理和 Java API 實現相同:

  • 一、首先在 RabbiConfig 中新增兩個交換機,一個是原始交換機,一個是備份交換機,同時新增一個備份隊列和備份交換機進行綁定,這裏的備份交換機是一個 Fanout 類型,注意由於這裏主要是演示備份交換機,因此這裏的原始交換機沒有和任何隊列綁定,也就沒法路由到隊列,從而使得消息進入備份交換機:
//用於測試備份交換機的原直連交換機
@Bean("bakDirectEchange")
public DirectExchange bakDirectEchange(){
    Map argMap = new HashMap<>();
    argMap.put("alternate-exchange", "LONGLY_WOLF_BAK_FANOUT_EXCHANGE");
    return new DirectExchange("LONGLY_WOLF_BAK_ORIGIN_DIRECT_EXCHANGE",false,false,argMap);
}

//備份廣播交換機
@Bean("bakFanoutExchange")
public FanoutExchange bakFanoutExchange(){
    return new FanoutExchange("LONGLY_WOLF_BAK_FANOUT_EXCHANGE");
}
//備份隊列
@Bean("bakQueue")
public Queue bakQueue(){
    return new Queue("LONELY_WOLF_BAK_QUEUE");
}
//備份交換機和備份隊列進行綁定
@Bean
public Binding BindExchange(@Qualifier("bakQueue") Queue queue, @Qualifier("bakFanoutExchange") FanoutExchange fanoutExchange){
    return BindingBuilder.bind(queue).to(fanoutExchange);
}

二、在消費者類 ExchangeConsumer 中監聽備份隊列:

/**
  * 監聽備份消息隊列
  */
@RabbitHandler
@RabbitListener(queues = "LONELY_WOLF_BAK_QUEUE")
public void bakQueueConsumer(Message message){
    System.out.println("備份隊列收到消息:" + new String(message.getBody()));
}
  • 三、最後在生產者類 RabbitExchangeController 中新增一個消息發送的方法進行消息發送:
@GetMapping(value="/send/bak")
public String sendBak(String routingKey,@RequestParam(value = "msg",defaultValue = "no bak message") String msg){
    rabbitTemplate.convertAndSend("LONGLY_WOLF_BAK_ORIGIN_DIRECT_EXCHANGE",routingKey,msg);
    return "succ";
}

調用以後能夠看到,備份隊列會收到消息,從而說明了消息在沒法路由到隊列時會進入到備份隊列。

隊列存儲消息後發生異常怎麼辦

在保證了前面兩個階段的可靠性以後,消息終於安全抵達了隊列,那麼這時候就絕對安全了嗎?

當咱們的消費者的消費速度跟不上生產者的生產速度時,就會致使消息堆積在隊列中,而默認消息是沒有持久化的,存在於內存之中,因此假如服務器宕機等故障發生,就會致使隊列中的數據丟失。

這裏的解決方案也很簡單,就是將消息進行持久化,在 RabbitMQ 當中,持久化也能夠分爲 3 種:交換機持久化,隊列持久化和消息持久化。

雖說持久化能必定程度上保證消息的可靠性,然而當出現了服務器的磁盤損壞,依然可能出現消息丟失,因此爲了更加完美,RabbitMQ 集羣多是必須的,固然,本文不會涉及到集羣的知識,集羣的知識以及搭建會放到下次再來分析。

交換機持久化

聲明交換機時,durable 參數設置爲 true

隊列持久化

聲明隊列時,durable 參數設置爲 true

消息持久化

發送消息時能夠將消息設置爲持久化。

Java API 消息持久化

Java API 中,能夠經過以下方式設置消息持久化:

//deliveryMode=2表示消息持久化
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder().deliveryMode(2).build();
channel.basicPublish("exchangeName","routingKey",properties,msg.getBytes());

Spring Boot 消息持久化

Spring Boot 中能夠經過以下方式將消息設置爲持久化:

MessageProperties messageProperties = new MessageProperties();
messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);//消息持久化
Message message = new Message(msg.getBytes(), messageProperties);
rabbitTemplate.convertAndSend("exchangeName","routingKey",message);

消費者消費消息失敗了怎麼辦

踏遍千山萬水,通過 3 層地獄模式,消息終於被消費者拿到手了,然而悲劇的事情又發生了,消費者消費消息的時候可能由於消費者自己的問題或者其餘意外致使了消費者消費消息失敗了,這時候消息仍是沒能被正確處理,這時候難道眼睜睜看着最後關頭了一籌莫展了嗎?

非也,做爲一款如此優秀的消息隊列,怎麼可能沒考慮到這種場景呢。還記不記得上面咱們提到的確認模式,實際上,上面的兩種確認模式都屬於服務端的確認,在 RabbitMQ 中爲消費者也提供了確認模式,這就是消費者的確認。

消費者確認(ack)

隊列當中會把消息刪除的前提就是這條消息被消費者消費掉了,可是服務器如何知道消息被消費了呢?這就是須要經過消費者確認以後纔會刪除,而咱們前面在介紹消息發送的時候貌似並無看到消費者確認流程,這是由於消費者默認在收到消息後會給服務器一個應答,服務端收到消費者的應答以後,就會刪除消息。

Java API 實現消費者應答

Java API 中應答方式有兩種,自動應答和手動應答,當自動應答時,則只要消費者收到消息就會給服務端確認,不在意消息是否消費成功。

  • 一、新建一個消費者 AckConsumer 類(省略了包名和導入),這裏爲了實現方便,經過生產者的頭部標記來決定採用何種應答策略:
public class AckConsumer {
    private static String QUEUE_NAME = "ACK_QUEUE";
    public static void main(String[] args) throws Exception{
        //1.聲明鏈接
        ConnectionFactory factory = new ConnectionFactory();
        factory.setUri("amqp://username:password@ip:port");

        //2.創建鏈接
        Connection conn = factory.newConnection();
        //3.建立消息通道
        Channel channel = conn.createChannel();
        //4.聲明隊列(默認交換機AMQP default,Direct)
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        System.out.println(" 等待接收消息...");

        // 建立消費者
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                                       byte[] body) throws IOException {
                System.out.println("收到消息: " + new String(body, "UTF-8"));
                Map<String,Object> map = properties.getHeaders();//獲取頭部消息
                String ackType = map.get("ackType").toString();
                if (ackType.equals("ack")){//手動應答
                    channel.basicAck(envelope.getDeliveryTag(),true);
                }else if(ackType.equals("reject-single")){//拒絕單條消息
                    //拒絕消息。requeue參數表示消息是否從新入隊
                    channel.basicReject(envelope.getDeliveryTag(),false);
                    //                    channel.basicNack(envelope.getDeliveryTag(),false,false);
                }else if (ackType.equals("reject-multiple")){//拒絕多條消息
                    //拒絕消息。multiple參數表示是否批量拒絕,爲true則表示<deliveryTag的消息都被拒絕
                    channel.basicNack(envelope.getDeliveryTag(),true,false);
                }
            }
        };

        //開始獲取消息,第二個參數 autoAck表示是否開啓自動應答
        channel.basicConsume(QUEUE_NAME, false, consumer);
    }
}
  • 二、新建一個生產者 AckProducer 類(省略了包名和導入):
public class AckProducter {
    private static String QUEUE_NAME = "ACK_QUEUE";//隊列
    private static String EXCHANGE_NAME = "ACK_EXCHANGE";//交換機
    private static String ROUTEING_KEY = "test";
    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setUri("amqp://admin:123456@47.107.155.197:5672");
        // 創建鏈接
        Connection conn = factory.newConnection();
        // 建立消息通道
        Channel channel = conn.createChannel();
        Map<String, Object> headers = new HashMap<String, Object>(1);
        headers.put("ackType", "ack");//請應答
//        headers.put("ackType", "reject-single");//請單條拒絕
//        headers.put("ackType", "reject-multiple");//請多條拒絕

        AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
                .contentEncoding("UTF-8")  // 編碼
                .headers(headers) // 自定義屬性
                .messageId(String.valueOf(UUID.randomUUID()))
                .build();

        String msg = "I'm a ack message";
        //聲明隊列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        //聲明交換機
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT,false);
        //隊列和交換機進行綁定
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,ROUTEING_KEY);
        // 發送消息
        channel.basicPublish(EXCHANGE_NAME, ROUTEING_KEY, properties, msg.getBytes());

        channel.close();
        conn.close();
    }
}

Spring Boot 實現消費者應答

Spring Boot 中消費者給服務端的確認方式分爲 3 種:

  • NONE:自動應答(ack)。

  • MANUAL:手動應答(ack)。若是設置爲手動應答,而消費者又遲遲不給服務器應答,那麼消息就會一直存在隊列,可能會形成消息堆積和重複消費現象。

  • AUTO:當沒有拋出異常時會自動應答(ack)。除此外,當發生異常時,分爲如下三種狀況:

    • 一、當拋出 AmqpRejectAndDontRequeueException 異常時,消息會被拒絕,也不會從新入隊。
    • 二、當拋出 ImmediateAcknowledgeAmqpException 異常時,消費者會自動發送應答給服務端。
    • 三、當拋出其餘異常時,消息會被拒絕,且會從新入隊。當出現這種狀況且消費者只有一個時,很是容易形成死循環,因此應該極力避免這種狀況的發生。
  • 一、Spring Boot 中能夠經過參數控制應答類型:

spring:
  rabbitmq:
    listener:
      type: simple # direct類型是2.0以後纔有的
      simple:
        acknowledge-mode: manual
  • 二、在消費者類 ExchangeConsumer 中新建一個方法來監聽隊列,其中第一個註釋掉的方法是本來存在的,第二個方法是新增的,主要新增了幾個參數,注意 Channelcom.rabbitmq.client.Channel 包下的:
/**
 * 監聽綁定了direct交換機的的消息隊列
 */
//    @RabbitHandler
//    @RabbitListener(queues = "LONGLY_WOLF_ORDER_QUEUE")
//    public void directConsumer(String msg){
//        System.out.println("direct交換機收到消息:" + msg);
//    }

/**
 * 監聽綁定了direct交換機的的消息隊列,並進行手動應答
 */
@RabbitHandler
@RabbitListener(queues = "LONGLY_WOLF_ORDER_QUEUE")
public void manualDirectConsumer(String msg, Channel channel,Message message) throws IOException {
    System.out.println("direct交換機收到消息:" + msg + "。此消息須要手動應答");
    channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);//手動應答
}
  • 三、或者也能夠經過 SimpleMessageListenerContainer 類實現監聽,新建一個 RabbitAckConfig 類(省略了包名和導入):
@Configuration
public class RabbitAckConfig {
    @Bean
    public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory){
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.setQueueNames("LONGLY_WOLF_ORDER_QUEUE");//設置監聽隊列名
        container.setAcknowledgeMode(AcknowledgeMode.MANUAL);//手動確認
        container.setMessageListener((ChannelAwareMessageListener) (message, channel) -> {//消息處理
            System.out.println("收到消息:" + new String(message.getBody()) + "。此消息須要手動應答");
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
        });
        return container;
    }
}

PS:須要注意的是,這兩種方式不要同時使用,不然沒法保證消息會被哪一個監聽到。

僅靠 RabbitMQ 自身可靠性能實現業務需求嗎

上面介紹的兩種確認模式,服務端確認和消費者確認。其中服務端確認是會回調給生產者的,因此生產者能夠知道消息是否已經到達服務器且是否正確路由到隊列,然而,對於消費者的確認,生產者是不知道的,這是由於消息隊列的做用之一就是爲了實現生產者和消費者的解耦,換言之,消費者知道消息成功發送到隊列,可是沒法知道消息是否被消費者消費

因此爲了知道消息是否被成功消費,主要有兩種思路:

  • 一、消費者在消費成功以後須要回調生產者提供的API來告知消息已經被消費
  • 二、服務端在收到消費者確認後給生產者一個回執通知

然而假如生產者遲遲沒有收到消費者是否消費成功的信息,那麼可能就須要補償,好比微信支付等都會有補償機制,間隔必定時間就將消息重發一次。

補償機制同時也會帶來一個問題,假如說消費者消費成功了,可是在告訴生產者的時候失敗了,那麼這時候消息若是再次補償就會形成重複消費,因此消費者須要支持冪等(即不管一條消息被消費多少次,都不會改變結果)。固然,同時還有其餘場景須要考慮,好比消息之間的依賴性等等問題都須要結合具體業務場景來具體處理。

總結

本文主要講述了 RabbitMQ 的消息發送方式,介紹了 3 種不一樣交換機的方式,同時最後也從發送消息的主要 4 個步驟分析了每個步驟如何保證消息的可靠性,並分別經過 Java APISpring Boot 提供了示例,中間還提到了死信隊列,死信隊列本質也是一個隊列,只不過存儲的消息比較特殊,相信經過本文,你們對 RabbitMQ 會有一個更深層次的瞭解。

相關文章
相關標籤/搜索