RabbitMQ實戰應用技巧

1. RabbitMQ實戰應用技巧

1.1. 前言

因爲項目緣由,以後會和RabbitMQ比較多的打交道,因此讓咱們來好好整理下RabbitMQ的應用實戰技巧,儘可能避免往後的採坑spring

1.2. 概述

RabbitMQ有幾個重要的概念:虛擬主機,交換機,隊列和綁定segmentfault

  • 虛擬主機:一個虛擬主機持有一組交換機、隊列和綁定,咱們能夠從虛擬主機層面的顆粒度進行權限控制
  • 交換機:Exchange用於轉發消息,它並不存儲消息,若是沒有Queue隊列綁定到Exchange,它會直接丟棄掉生產者發來的數據。

交換機還有個關聯的重要概念:路由鍵,消息轉發到哪一個隊列根據路由鍵決定springboot

  • 綁定:就是綁定交換機和隊列,它是多對多的關係,也就是說多個交換機能夠綁同一個隊列,也能夠一個交換機綁多個隊列

1.3. 交換機

交換機有四種類型的模式Direct, topic, Headers and Fanout學習

1.3.1. Direct Exchage

Direct模式使用的是RabbitMQ的默認交換機,也是最簡單的模式,適合比較簡單的場景ui

以下圖所示,使用Direct模式,咱們須要建立不一樣的隊列,而默認交換機則經過Routing key路由鍵的值來決定轉發到哪一個隊列,能夠看到,路由鍵綁定隊列是能夠指定多個的spa

1.3.2. Topic Exchange

Topic模式主要是根據通配符匹配,也就相似於模糊匹配,當這種匹配模式和路由鍵匹配後交換機就能轉發消息到指定隊列code

  • 路由鍵爲一串字符串,由句號(.)隔開,好比a.b.c
  • *)表明指定位置一個單詞,(#)表明零個或者多個單詞,好比a.*.b.#,表示a和b中間隨意填個單詞,b後面能夠跟n個單詞,好比a.x.b.c.d.e

Topic模式和Direct模式的區別在於交換機須要本身指定,路由鍵支持模糊匹配,例如:blog

rabbitTemplate.convertAndSend("topicExchange","a.x.b.d", " hello world!");

1.3.3. Headers Exchage

Headers也是根據規則匹配,但它不是根據路由鍵了,headers有個自定義匹配規則,它將匹配鍵值設在了消息的headers屬性上,當這些鍵值對有一對或者所有匹配時,消息纔會被投遞到對應隊列,這種模式效率相對較低,通常不推薦使用rabbitmq

1.3.4. Fanout Exchange

Fanout即爲大名鼎鼎的廣播模式了,它不須要管路由鍵,會把消息發給綁定它的所有隊列,就算配置了路由鍵也會被忽略隊列

1.4. 複雜狀況

  1. 首先咱們Direct模式,一個生產者一個消費者的狀況,也就對應了一個發送者和一個隊列A接收,這是沒有疑問的,發送什麼接收什麼
  2. 當Direct模式,一個生產者發消息,開啓多個消費者也就是多個相同queue,此時消息由多個消費者均勻分攤,不會重複消費(前提ack正常)
  3. 當Topic模式,一個交換機綁定兩個隊列,路由鍵有重疊關係,以下代碼,此時指定路由鍵topic.message發送消息,隊列queueMessagequeueMessages都能接收到相同消息,也就是說,topic模式能夠實現相似於廣播模式的形式,甚至更加靈活,它可否轉發到消息由路由鍵決定。
  4. 相比於Fanout模式,咱們若是要對消費者隊列分組發送,咱們須要指定不一樣的路由鍵;而Fanout模式則須要指定不一樣的交換機和隊列綁定,實際使用結合實際狀況
@Configuration
public class TopicRabbitConfig {

    final static String message = "topic.message";
    final static String messages = "topic.messages";

    @Bean
    public Queue queueMessage() {
        return new Queue(TopicRabbitConfig.message);
    }

    @Bean
    public Queue queueMessages() {
        return new Queue(TopicRabbitConfig.messages);
    }

    @Bean
    TopicExchange exchange() {
        return new TopicExchange("exchange");
    }

    @Bean
    Binding bindingExchangeMessage(Queue queueMessage, TopicExchange exchange) {
        return BindingBuilder.bind(queueMessage).to(exchange).with("topic.message");
    }

    @Bean
    Binding bindingExchangeMessages(Queue queueMessages, TopicExchange exchange) {
        return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#");
    }
}

1.5. springboot配置

咱們的經常使用配置以下

spring.rabbitmq.addresses=localhost:5672
spring.rabbitmq.username=user
spring.rabbitmq.password=123456
spring.rabbitmq.virtual-host=/
spring.rabbitmq.connection-timeout=1000
##設置監聽限制:最大10,默認5
spring.rabbitmq.listener.simple.concurrency=5
spring.rabbitmq.listener.simple.max-concurrency=10
spring.rabbitmq.publisher-confirms=true
spring.rabbitmq.publisher-returns=true
spring.rabbitmq.template.mandatory=true
spring.rabbitmq.listener.simple.acknowledge-mode=manual

其中最後四條配置須要着重解釋:

  • spring.rabbitmq.publisher-confirms爲true,表示生產者消息發出後,MQ的broker接收到了消息,發送回執表示確認接收,不設置則可能致使消息丟失
  • spring.rabbitmq.publisher-returns爲true,表示當消息不能到達MQ的Broker端,,則使用監聽器對不可達的消息作後續處理,這種通常是路由鍵沒配好,或MQ宕機纔可能發生
  • spring.rabbitmq.template.mandatory當上面兩個爲true時,這個必定要配true,不然上面兩個不起做用
  • spring.rabbitmq.listener.simple.acknowledge-mode這個爲manual表示手工確認,實際生產應該設爲手工,才能保證你的業務是處理完成的,注意業務的冪等性,可重複調用,手工確認代碼以下例子
@Component
public class RabbitReceiver {

    
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = "queue-1", 
            durable="true"),
            exchange = @Exchange(value = "exchange-1", 
            durable="true", 
            type= "topic", 
            ignoreDeclarationExceptions = "true"),
            key = "springboot.*"
            )
    )
    @RabbitHandler
    public void onMessage(Message message, Channel channel) throws Exception {
        System.err.println("--------------------------------------");
        System.err.println("消費端Payload: " + message.getPayload());
        Long deliveryTag = (Long)message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
        //手工ACK,獲取deliveryTag
        channel.basicAck(deliveryTag, false);
    }
}

1.6. 隊列屬性

  1. queue : 隊列的名字
  2. durable : 爲true表示隊列中數據持久化到磁盤,能夠防止mq宕機重啓數據丟失
  3. exclusive : 爲true表示排他性,只容許一個當前鏈接訪問該隊列,當前已鏈接就不容許新的鏈接進入不然報錯,當鏈接斷開當前隊列會銷燬
  4. autoDelete : 爲true表示自動刪除,當沒有Connection鏈接到隊列的時候,會自動刪除
  5. arguments : 這個參數用來添加一些額外參數的,以下圖片

    • 好比添加x-message-ttl爲5000,則表示消息超過5秒沒被處理就會超時過時;
    • x-expires設置120000表示隊列在2分鐘內沒被消費則被刪除;
    • x-max-length,x-max-length-bytes表示傳送數據的最大長度和字節數
    • x-dead-letter-exchangex-dead-letter-routing-key表示死信交換機和死信路由,放在須要過時或處理失敗的隊列屬性中,這些數據會轉發到死信隊列存儲起來,建立普通的交換機和隊列綁定,把交換機名填到x-dead-letter-exchange的值,填寫路由鍵要符合死信隊列的路由鍵
    • x-max-priority,表示設置優先級,範圍爲0~255,只有當消息堆積的時候,這個優先級纔有意義,數字越大優先級越高
    • x-queue-mode當爲lazy,表示惰性隊列,3.6.0以後才被引入的概念,相比默認的模式,惰性隊列模式會將生產者產生的消息直接存到磁盤中,這固然會增長IO開銷,但適合應對大量消息堆積的狀況;由於當大量消息堆積時,內存也不夠存放,會將消息轉存到磁盤,這個過程也是比較耗時且過程當中不能接收新的消息。若是須要將普通隊列轉換成惰性隊列須要將原來的隊列刪除,從新建立個惰性隊列綁定。
![](https://image-static.segmentfault.com/406/935/4069353622-5db7b891b3bb4_articlex)

1.7. 交換機屬性

  1. exchange : 交換機名稱
  2. type : 交換機類型
  3. durable : 持久化,同隊列
  4. autoDelete : 是否自動刪除,同隊列
  5. internal : 若爲true,表示這個exchange不能夠被client用來推送消息,僅用來進行exchange和exchange之間的綁定。
  6. arguments : 額外參數,目前只有個alternate-exchange,表示當生產者發送消息到這個交換機,路由不到該交換機的隊列,則會嘗試這個參數指定的交換機進行路由,若路由鍵匹配,則路由到alternate-exchange指定的隊列,至關於轉發了,恰好和上一個參數internal配合,若不想本交換機起到路由隊列的做用,能夠設置internal爲true,把消息都轉發到alternate-exchange指定的交換機,由該交換機來路由指定隊列,

    • 以下圖:exchange0設置了alternate-exchange交換機爲exchange1,生產者發送數據到exchange0路由鍵爲test1,在exchange0路由不到,則轉發到exchange1判斷路由符合,發送到隊列queue1
![](https://image-static.segmentfault.com/411/027/4110271351-5db7b8925ca60_articlex)

1.8. 磁盤和內存

在RabbitMQ的管理界面,當咱們集羣部署時能夠看到Nodes節點中Info字段可能爲disc也可能ram,表示了磁盤存儲或內存儲存。事實上,在集羣部署的時候,咱們至少要一個磁盤儲存,它表明了將交換機,隊列,綁定,用戶等元數據持久化保存到磁盤,一遍重啓RabbitMQ也能恢復到原先的狀態,當只有一個節點時,一定是磁盤存儲;而內存儲存也有它的優點,就是效率更高速度更快

1.9. 報錯案例

  • 當報下列錯誤,表示你必定存在排他性隊列,也就是設置了exclusive屬性的隊列,因爲同一個鏈接建立的不一樣通道能夠訪問同一個隊列,此時因爲這個排他屬性會獲得資源被鎖定錯誤,也就是下列的錯誤。
  • 由此咱們能夠知道,若你把隊列設置成了exclusive屬性的,那麼就別建立新的鏈接去訪問同一個隊列
ESOURCE_LOCKED - cannot obtain exclusive access to locked queue xxxxxx

老梁講Java

歡迎關注公衆號,一塊兒學習進步
相關文章
相關標籤/搜索