RabbitMQ由淺入深刻門全總結(二)

寫在最前面

距離上一次發文章已經好久了,其實這段時間一直也沒有停筆,只不過在忙着找工做還有學校結課的事情,從新弄了一下博客,後面也會陸陸續續會把文章最近更新出來~java

  • 這篇文章有點長,就分了兩篇
  • PS:那個Github上Java知識問答的文章也沒有停筆,最近也會陸續更新

6. 進階補充

6.1 過時時間設置(TTL)

過時時間(TTL)就是對消息或者隊列設置一個時效,只有在時間範圍內才能夠被被消費者接收穫取,超過過時時間後消息將自動被刪除。spring

注:咱們主要講消息過時,在消息過時的第一種方式中,順便也就會提到隊列過時的設置方式shell

  1. 經過隊列屬性設置,隊列中全部消息都有相同的過時時間
  2. 對消息進行單獨設置,每條消息 TTL能夠不一樣

兩種方法同時被使用時,以二者過時時間 TTL 較小的那個數值爲準。消息在隊列的生存時間一旦超過設置的 TTL 值,就稱爲 Dead Message 被投遞到死信隊列,消費者將沒法再收到該消息(死信隊列是咱們下一點要講的)服務器

6.1.1 應用於所有消息的過時時間

  • 配置類
@Configuration
public class RabbitMqConfiguration {

    public static final String TOPIC_EXCHANGE = "topic_order_exchange";
    public static final String TOPIC_QUEUE_NAME_1 = "test_topic_queue_1";
    public static final String TOPIC_ROUTINGKEY_1 = "test.*";

    @Bean
    public TopicExchange topicExchange() {
        return new TopicExchange(TOPIC_EXCHANGE);
    }

    @Bean
    public Queue topicQueue1() {
        // 建立參數 Map 容器
        Map<String, Object> args = new HashMap<>();
        // 設置消息過時時間 注意此處是數值 5000 不是字符串
        args.put("x-message-ttl", 5000);
        // 設置隊列過時時間
        args.put("x-expires", 8000);
        // 在最後傳入額外參數 即這些過時信息
        return new Queue(TOPIC_QUEUE_NAME_1, true, false, false, args);
    }

    @Bean
    public Binding bindingTopic1() {
        return BindingBuilder.bind(topicQueue1())
                .to(topicExchange())
                .with(TOPIC_ROUTINGKEY_1);
    }
}
  1. 建立參數 Map 容器:類型是在 Queue 參數中所要求的,要按照要求來。
  2. 設置消息過時時間:這裏設置的消息過時時間,會應用到全部消息中。
  3. 設置隊列過時時間
  4. 傳入額外參數:將上述配置好的過時時間設置,經過 Queue 傳入便可。
  • 生產者
@SpringBootTest(classes = RabbitmqSpringbootApplication.class)
@RunWith(SpringRunner.class)
public class RabbitMqTest {
    /**
     * 注入 RabbitTemplate
     */
    @Autowired

    @Test
    public void testTopicSendMessage() {
        rabbitTemplate.convertAndSend(RabbitMqConfiguration.TOPIC_EXCHANGE, "test.order.insert", "This is a message !");
    }
}

不要配置消費者,而後就能夠在 Web 管理器中看到效果了網絡

6.1.2 應用於單獨消息的過時時間

  • 配置中保持最初的樣子就好了,就不須要配置過時時間了
  • 生產者中配置消息單獨的過時時間
@SpringBootTest(classes = RabbitmqSpringbootApplication.class)
@RunWith(SpringRunner.class)
public class RabbitMqTest {
    /**
     * 注入 RabbitTemplate
     */
    @Autowired

    @Test
    public void testTopicSendMessage2() {
        MessagePostProcessor messagePostProcessor = new MessagePostProcessor(){
            public Message postProcessMessage(Message message){
                // 注意此處是 字符串 「5000」
                message.getMessageProperties().setExpiration("5000");
                message.getMessageProperties().setContentEncoding("UTF-8");
                return message;
            }
        };
        rabbitTemplate.convertAndSend(RabbitMqConfiguration.TOPIC_EXCHANGE, "test.order",
                "This is a message 002 !",messagePostProcessor);
    }
}

6.2 死信隊列

死信官方原文爲 Dead letter ,它是RabbitMQ中的一種消息機制,當你在消費消息時,若是隊列以及隊列裏的消息出現如下狀況,說明當前消息就成爲了 「死信」,若是配置了死信隊列,這些數據就會傳送到其中,若是沒有配置就會直接丟棄。運維

  1. 消息被拒絕
  2. 消息過時
  3. 隊列達到最大長度

不過死信隊列並非什麼很特殊的存在,咱們只須要配置一個交換機,在消費的那個隊列中配置,出現死信就從新發送到剛纔配置的交換機中去,進而被路由到與交換機綁定的隊列中去,這個隊列也就是死信隊列,因此從建立上看,它和普通的隊列沒什麼區別。異步

6.2.1 應用場景

好比在一些比較重要的業務隊列中,未被正確消費的消息,每每咱們並不想丟棄,由於丟棄後若是想恢復這些數據,每每須要運維人員從日誌獲取到原消息,而後從新投遞消息,而配置了死信隊列,至關於給了未正確消費消息一個暫存的位置,往後須要恢復的時候,只須要編寫對應的代碼就能夠了。分佈式

6.2.2 實現方式

  • 定義一個處理死信的交換機和隊列
@Configuration
public class DeadRabbitMqConfiguration{

    @Bean
    public DirectExchange deadDirect(){
        return new DirectExchange("dead_direct_exchange");}

    @Bean
    public Queue deadQueue(){
        return new Queue("dead_direct_queue");}
    @Bean
    public Binding deadBinds(){
        return BindingBuilder.bind(deadQueue()).to(deadDirect()).with("dead");
    }
}
  • 在正常的消費隊列中指定死信隊列
@Configuration
public class RabbitMqConfiguration {

    public static final String TOPIC_EXCHANGE = "topic_order_exchange";
    public static final String TOPIC_QUEUE_NAME_1 = "test_topic_queue_1";
    public static final String TOPIC_ROUTINGKEY_1 = "test.*";

    @Bean
    public TopicExchange topicExchange() {
        return new TopicExchange(TOPIC_EXCHANGE);
    }

    @Bean
    public Queue topicQueue1() {
        // 設置過時時間
        Map<String, Object> args = new HashMap<>();
        args.put("x-message-ttl", 5000);
        // 設置死信隊列交換器
        args.put("x-dead-letter-exchange","dead_direct_exchange");
        // 設置交換路由的路由key fanout 模式不須要配置此條
        args.put("x-dead-letter-routing-key","dead");
        return new Queue(TOPIC_QUEUE_NAME_1, true, false, false, args);
    }

    @Bean
    public Binding bindingTopic1() {
        return BindingBuilder.bind(topicQueue1())
                .to(topicExchange())
                .with(TOPIC_ROUTINGKEY_1);
    }
}

6.3 內存及磁盤監控

6.3.1 內存告警及控制

爲了防止避免服務器因內存不夠而崩潰,因此 RabbitMQ 設定了一個閾值,當內存使用量超過閾值的時候,RabbitMQ 會暫時阻塞全部客戶端的鏈接,而且中止繼續接受新消息。ide

有兩種方式能夠修改這個閾值post

  1. 經過命令(二選一便可)

    • 命令的方式會在 Broker 重啓後失效
# 經過百分比設置的命令 <fraction> 處表明百分比小數例如 0.6
rabbitmqctl set_vm_memory_high_watermark <fraction>
# 經過絕對值設置的命令 <value> 處表明設置的一個固定值例如 700MB
rabbitmqctl set_vm_memory_high_watermark absolute <value>
  1. 經過修改配置文件 rabbitmq.conf

    • 配置文件每次啓動都會加載,屬於永久有效
# 百分比設置 默認值爲 0.4 推薦 0.4-0.7 之間
vm_memory_high_watermark.relative = 0.5
# 固定值設置
vm_memory_high_watermark.absolute = 2GB

6.3.2 內存換頁

在客戶端鏈接和生產者被阻塞以前,它會嘗試將隊列中的消息換頁到磁盤中,這種思想在操做系統中其實很是常見,以最大程度的知足消息的正常處理。

當內存換頁發生後,不管持久化仍是非持久化的消息,都會被轉移到磁盤,而因爲持久化的消息原本就在磁盤中有一個持久化的副本,因此會優先移除持久化的消息。

默認狀況下,當內存達到閾值的 50 % 的時候,就會進行換頁處理。

能夠經過設置 vm_memory_high_watermark_paging_ratio 修改

# 值小於 1, 若是大於 1 就沒有意義了
vm_memory_high_watermark_paging_ratio = 0.6

6.3.3 磁盤預警

若是無止境的換頁,也頗有可能會致使耗盡磁盤空間致使服務器崩潰,因此 RabbitMQ 又提供了一個磁盤預警的閾值,當低於這個值的時候就會進行報警,默認是 50MB,能夠經過命令的方式修改

# 固定值
rabbitmqctl set_disk_free_limit <disk_limit>
# 百分數
rabbitmqctl set_disk_free_limit memory_limit <fraction>

6.4 消息的可靠傳遞

生產者向 RabbitMQ 中發送消息的時候,可能會由於網絡等種種緣由致使發送失敗,因此 RabbitMQ 提供了一系列保證消息可靠傳遞的機制,能夠大體分爲生產者和消費者兩部分的處理

6.4.1 生產者中的機制

生產者做爲消息的發送者,須要保證本身的消息發送成功,RabbitMQ 提供了兩種方式來保證這一點。-

  1. confirm 確認模式
  2. return 退回模式

6.4.1.1 confirm 確認模式

生產者發送消息後,會異步等待接收一個 ack 應答,收到返回的 ack 確認消息後,根據 ack是 true 仍是 false,調用 confirmCallback 接口進行處理

  • 配置類
spring:
  rabbitmq:
    # 發送確認
    publisher-confirm-type: correlated
  • 實現 ConfirmCallback 接口的 confirm 方法
@Component
public class ConfirmCallbackService implements RabbitTemplate.ConfirmCallback {

    /**
     * @param correlationData 相關配置信息
     * @param ack             exchange交換機 是否成功收到了消息。true 成功,false表明失敗
     * @param cause           失敗緣由
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if (ack) {
            //接收成功
            System.out.println("消息成功發送到交換機");
        } else {
            //接收失敗
            System.out.println("消息發送到交換機失敗,失敗緣由: " + cause);
            // TODO 能夠處理失敗的消息,例如再次發送等等
        }
    }
}
  • 聲明隊列和交換機
@Configuration
public class RabbitMqConfig {
    @Bean()
    public Queue confirmTestQueue() {
        return new Queue("confirm_test_queue", true, false, false);
    }

    @Bean()
    public FanoutExchange confirmTestExchange() {
        return new FanoutExchange("confirm_test_exchange");
    }

    @Bean
    public Binding confirmTestFanoutExchangeAndQueue() {
        return BindingBuilder.bind(confirmTestQueue()).to(confirmTestExchange());
    }
}
  • 生產者
@SpringBootTest(classes = RabbitmqSpringbootApplication.class)
@RunWith(SpringRunner.class)
public class RabbitMqTest {
    /**
     * 注入 RabbitTemplate
     */
    @Autowired
    
     /**
     * 注入 ConfirmCallbackService
     */
    @Autowired
    private ConfirmCallbackService confirmCallbackService;

    @Test
    public void testConfirm() {
        // 設置確認回調類
        rabbitTemplate.setConfirmCallback(confirmCallbackService);
        // 發送消息
        rabbitTemplate.convertAndSend("confirm_test_exchange", "", "ConfirmCallback !");
    }
}

6.4.1.2 return 退回模式

當 Exchange 發送到 Queue 失敗時,會調用一個 returnsCallback,咱們能夠經過實現這個接口,而後來處理這種失敗的狀況。

  • 在配置文件中開啓發送回調
spring:
  rabbitmq:
    # 發送回調
    publisher-returns: true
  • 實現 ReturnsCallback 的 returnedMessage 方法
//  public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) 已經屬於過期方法了
@Component
public class ReturnCallbackService implements RabbitTemplate.ReturnsCallback {
    @Override
    public void returnedMessage(ReturnedMessage returned) {
        System.out.println(returned);
    }
}
  • 聲明隊列和交換機(Direct 模式)
@Configuration
public class RabbitMqConfig {

    @Bean()
    public Queue returnsTestQueue() {
        return new Queue("return_test_queue", true, false, false);
    }

    @Bean()
    public DirectExchange returnsTestExchange() {
        return new DirectExchange("returns_test_exchange");
    }

    @Bean
    public Binding returnsTestDirectExchangeAndQueue() {
        return BindingBuilder.bind(returnsTestQueue()).to(returnsTestExchange()).with("info");
    }
}
  • 生產者
@SpringBootTest(classes = RabbitmqSpringbootApplication.class)
@RunWith(SpringRunner.class)
public class RabbitMqTest {
    /**
     * 注入 RabbitTemplate
     */
    @Autowired
    
    /**
     * 注入 ConfirmCallbackService
     */
    @Autowired
    private ConfirmCallbackService confirmCallbackService;
    
    /**
     * 注入 ReturnCallbackService
     */
    @Autowired
    private ReturnCallbackService returnCallbackService;

    @Test
    public void testReturn() {
        // 確保消息發送失敗後能夠從新返回到隊列中
        rabbitTemplate.setMandatory(true);
        // 消息投遞到隊列失敗回調處理
        rabbitTemplate.setReturnsCallback(returnCallbackService);
        // 消息投遞確認模式
        rabbitTemplate.setConfirmCallback(confirmCallbackService);
        // 發送消息
        rabbitTemplate.convertAndSend("returns_test_exchange", "info", "ReturnsCallback !");
    }
}
  • 修改不一樣的路由key,便可測試出結果。

6.4.2 消費者中的機制

6.4.2.1 ack 確認機制

ack 表示收到消息的確認,默認是自動確認,可是它有三種類型

acknowledge-mode 選項介紹

  • auto:自動確認,爲默認選項
  • manual:手動確認(按能力分配就須要設置爲手動確認)
  • none:不確認,發送後自動丟棄

其中自動確認是指,當消息一旦被消費者接收到,則自動確認收到,並把這個消息從隊列中刪除。

可是在實際業務處理中,正確的接收到的消息可能會由於業務上的問題,致使消息沒有正確的被處理,可是若是設置了 手動確認方式,則須要在業務處理成功後,調用channel.basicAck(),手動簽收,若是出現異常,則調用 channel.basicNack()方法,讓其自動從新發送消息。

  • 配置文件
spring:
  rabbitmq:
    listener:
      simple:
          # 手動確認
        acknowledge-mode: manual
  • 消費者
@Component
@RabbitListener(queues = "confirm_test_queue")
public class TestConsumer {

    @RabbitHandler
    public void processHandler(String msg, Channel channel, Message message) throws IOException {
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try {
            System.out.println("消息內容: " + new String(message.getBody()));

            System.out.println("業務出錯的位置:");
            int i = 66 / 0;
            
            // 手動簽收 deliveryTag標識表明隊列能夠刪除了
            channel.basicAck(deliveryTag, true);
        } catch (Exception e) {
            // 拒絕簽收
            channel.basicNack(deliveryTag, true, true);
        }
    }
}

6.5 集羣 & 6.6 分佈式事務(待更新)

因爲這兩個點篇幅也不短,實在不肯草草簡單寫上了事,放到後面單獨的文章編寫,發佈哇。

關於集羣的搭建暫時可參考:https://blog.csdn.net/belongh...

相關文章
相關標籤/搜索