Java進階專題(二十) 消息中間件架構體系(2)-- RabbitMQ研究

前言

接上文,這個繼續介紹RabbitMQ,並理解其底層原理。java

介紹

RabbitMQ是由erlang語言開發,基於AMQP(Advanced Message Queue 高級消息隊列協議)協議實現的消息隊列。node

爲何使用RabbitMQ呢?
一、使得簡單,功能強大。
二、基於AMQP協議。
三、社區活躍,文檔完善。
四、高併發性能好,這主要得益於Erlang語言。
五、Spring Boot默認已集成RabbitMQweb

AMQP協議

AMQP基本介紹

AMQP,即Advanced Message Queuing Protocol,一個提供統一消息服務的應用層標準高級消息隊列協議,是應用層協議的一個開放標準,爲面向消息的中間件設計。基於此協議的客戶端與消息中間件可傳遞消息,並不受客戶端/中間件同產品、不一樣的開發語言等條件的限制。面試

AMQP的實現有:RabbitMQ、OpenAMQ、Apache Qpid、Redhat Enterprise MRG、AMQP Infrastructure、
ØMQ、Zyre等。redis

目前Rabbitmq最新的版本默認支持的是AMQP 0-9-1,該協議總共包含了3部分:spring

Module Layer: 位於協議的最高層,主要定義了一些供客戶端調用的命令,客戶端能夠利用這些命令實現
自定義的業務邏輯。
例如,客戶端能夠是使用Queue.Declare命令聲明一個隊列或者使用Basic.Consume訂閱消費一個隊列中的
消息。
Session Layer: 位於中間層,主要負責將客戶端的命令發送給服務端,在將服務端的應答返回給客戶端,
主要爲客戶端與服務器之間的通訊提供可靠性的同步機制和錯誤處理。
Transport Layer: 位於最底層,主要傳輸二進制數據流,提供幀的處理、信道的複用、錯誤檢查和數據表
示等。docker

AMQP生產者流轉過程

當客戶端與Broker創建鏈接的時候,客戶端會向Broker發送一個Protocol Header 0-9-1的報文頭,以此通知Broker本次交互才採用的是AMQP 0-9-1協議。shell

緊接着Broker返回Connection.Start來創建鏈接,在鏈接的過程當中涉及Connection.Start/.Start-OK、Connection. Tune/. Tune-OK、Connection.Open/.Open-OK這6個命令的交互。數據庫

鏈接創建之後須要建立通道,會使用到Channel.Open , Channel.Open-OK命令,在進行交換機聲明的時候須要使用到Exchange.Declare以及Exchange.Declare-OK的命令。以此類推,在聲明隊列以及完成隊列和交換機的綁定的時候都會使用到指定的命令來完成。編程

在進行消息發送的時候會使用到Basic.Publish命令完成,這個命令還包含了Conetent-Header和Content-Body。Content Header裏面包含的是消息體的屬性,Content-Body包含了消息體自己。

AMQP消費者流轉過程

消費者消費消息的時候,所涉及到的命令和生成者大部分都是相同的。在原有的基礎之上,多個幾個命令:Basic.Qos/.Qos-OK以及Basic.Consume和Basic.Consume-OK。

其中Basic.Qos/.Qos-OK這兩個命令主要用來確認消費者最大能保持的未確認的消息數時使用。Basic.Consume和Basic.Consume-OK這兩個命令主要用來進行消息消費確認。

RabbitMQ的特性

RabbitMQ使用Erlang語言編寫,使用Mnesia數據庫存儲消息。
(1)可靠性(Reliability) RabbitMQ 使用一些機制來保證可靠性,如持久化、傳輸確認、發佈確認。
(2)靈活的路由(Flexible Routing) 在消息進入隊列以前,經過Exchange 來路由消息的。對於典型的路由功
能,RabbitMQ 已經提供了一些內置的Exchange 來實現。針對更復雜的路由功能,能夠將多個Exchange 綁定在
一塊兒,也經過插件機制實現本身的Exchange 。
(3)消息集羣(Clustering) 多個RabbitMQ 服務器能夠組成一個集羣,造成一個邏輯Broker 。
(4)高可用(Highly Available Queues) 隊列能夠在集羣中的機器上進行鏡像,使得在部分節點出問題的狀況下
隊列仍然可用。
(5)多種協議(Multi-protocol) RabbitMQ 支持多種消息隊列協議,好比AMQP、STOMP、MQTT 等等。
(6)多語言客戶端(Many Clients) RabbitMQ 幾乎支持全部經常使用語言,好比Java、.NET、Ruby、PHP、C#、
JavaScript 等等。
(7)管理界面(Management UI) RabbitMQ 提供了一個易用的用戶界面,使得用戶能夠監控和管理消息、集羣
中的節點。
(8)插件機制(Plugin System)
RabbitMQ提供了許多插件,以實現從多方面擴展,固然也能夠編寫本身的插件。

工做模型

名詞解釋
Broker :即RabbitMQ的實體服務器。提供一種傳輸服務,維護一條從生產者到消費者的傳輸線路,保證消息數據能按照指定的方式傳輸。
Exchange :消息交換機。指定消息按照什麼規則路由到哪一個隊列Queue。
Queue :消息隊列。消息的載體,每條消息都會被投送到一個或多個隊列中。
Binding :綁定。做用就是將Exchange和Queue按照某種路由規則綁定起來。
Routing Key:路由關鍵字。Exchange根據Routing Key進行消息投遞。定義綁定時指定的關鍵字稱爲Binding Key。
Vhost:虛擬主機。一個Broker能夠有多個虛擬主機,用做不一樣用戶的權限分離。一個虛擬主機持有一組Exchange、Queue和Binding。
Producer:消息生產者。主要將消息投遞到對應的Exchange上面。通常是獨立的程序。
Consumer:消息消費者。消息的接收者,通常是獨立的程序。
Connection:Producer 和Consumer 與Broker之間的TCP長鏈接。
Channel:消息通道,也稱信道。在客戶端的每一個鏈接裏能夠創建多個Channel,每一個Channel表明一個會話任務。在RabbitMQ Java Client API中,channel上定義了大量的編程接口。

交換機類型

Direct Exchange 直連交換機

定義:直連類型的交換機與一個隊列綁定時,須要指定一個明確的binding key。
路由規則:發送消息到直連類型的交換機時,只有routing key跟binding key徹底匹配時,綁定的隊列才能收到消息。

Topic Exchange 主題交換機
定義:主題類型的交換機與一個隊列綁定時,能夠指定按模式匹配的routing key。
通配符有兩個,*表明匹配一個單詞。#表明匹配零個或者多個單詞。單詞與單詞之間用 . 隔開。
路由規則:發送消息到主題類型的交換機時,routing key符合binding key的模式時,綁定的隊列才能收到消息。

// 只有隊列1能收到消息
channel.basicPublish("MY_TOPIC_EXCHANGE", "sh.abc", null, msg.getBytes());  
// 隊列2和隊列3能收到消息
channel.basicPublish("MY_TOPIC_EXCHANGE", "bj.book", null, msg.getBytes());  
// 只有隊列4能收到消息
channel.basicPublish("MY_TOPIC_EXCHANGE", "abc.def.food", null, msg.getBytes());

Fanout Exchange 廣播交換機

定義:廣播類型的交換機與一個隊列綁定時,不須要指定binding key。
路由規則:當消息發送到廣播類型的交換機時,不須要指定routing key,全部與之綁定的隊列都能收到消息。

RabbitMq安裝

下載鏡像

docker pull rabbitmq

建立並啓動容器

docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 -v `pwd`/data:/var/lib/rabbitmq --hostname myRabbit -e RABBITMQ_DEFAULT_VHOST=my_vhost  -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin rabbitmq
  • -d 後臺運行容器;
  • --name 指定容器名;
  • -p 指定服務運行的端口(5672:應用訪問端口;15672:控制檯Web端口號);
  • -v 映射目錄或文件;
  • --hostname 主機名(RabbitMQ的一個重要注意事項是它根據所謂的 「節點名稱」 存儲數據,默認爲主機名);
  • -e 指定環境變量;(RABBITMQ_DEFAULT_VHOST:默認虛擬機名;RABBITMQ_DEFAULT_USER:默認的用戶名;RABBITMQ_DEFAULT_PASS:默認用戶名的密碼)

啓動rabbitmq後臺管理服務

docker exec -it rabbitmq rabbitmq-plugins enable rabbitmq_management

訪問後臺頁面:

http://127.0.0.1:15672  初始密碼: admin  admin

RabbitMQ快速入門

maven依賴

<dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
            <version>2.3.0.RELEASE</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
            <version>2.3.0.RELEASE</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
            <version>2.3.0.RELEASE</version>
        </dependency>
    </dependencies>

rabbitmq配置類

/**
 * @author 原
 * @date 2020/12/22
 * @since 1.0
 **/
@Configuration
public class RabbitConfig {

    public static final String EXCHANGE_TOPICS_INFORM = "exchange_topic_inform";
    public static final String QUEUE_SMS = "queue_sms";
    public static final String QUEUE_EMAIL = "queue_email";

       @Bean
    public Exchange getExchange() {
        //durable(true)持久化,消息隊列重啓後交換機仍然存在
        return ExchangeBuilder.topicExchange(EXCHANGE_TOPICS_INFORM).durable(true).build();
    }

    @Bean("queue_sms")
    public Queue getQueueSms(){
        return new Queue(QUEUE_SMS);
    }

    @Bean("queue_email")
    public Queue getQueueEmail(){
        return new Queue(QUEUE_EMAIL);
    }

    @Bean
    public Binding bindingSms(@Qualifier("queue_sms") Queue queue, Exchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with("demo.#.sms").noargs();
    }

    @Bean
    public Binding bindingEmail(@Qualifier("queue_email") Queue queue, Exchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with("demo.#.email").noargs();
    }
}

生產者

@Service
public class RabbitmqProviderService {

    @Autowired
    RabbitTemplate rabbitTemplate;

    public void sendMessageSms(String message) {
        rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_TOPICS_INFORM,"demo.one.sms",message);
    }

    public void sendMessageEmail(String message) {
        rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_TOPICS_INFORM,"demo.one.email",message);
    }
}

消費者

@Component
public class RabbitMqConsumer {

    @RabbitListener(queues = {RabbitConfig.QUEUE_EMAIL})
    public void listenerEmail(String message, Message msg , Channel channel) {
        System.out.println("EMAIL:"+message);
        System.out.println(msg);
        System.out.println(channel);
    }

    @RabbitListener(queues = {RabbitConfig.QUEUE_SMS})
    public void listenerSms(String message) {
        System.out.println("SMS:"+message);
    }
}

啓動類

/**
 * @author 原
 * @date 2020/12/22
 * @since 1.0
 **/
@SpringBootApplication
@EnableRabbit
public class RabbitMqApplicaiton {

    public static void main(String[] args) {
        ResourceLoader resourceLoader = new DefaultResourceLoader(RabbitMqApplicaiton.class.getClassLoader());
        try {
            String path = resourceLoader.getResource("classpath:").getURL().getPath();
            System.out.println(path);
        } catch (IOException e) {
            e.printStackTrace();
        }
        SpringApplication.run(RabbitMqApplicaiton.class, args);
    }
}

web

@RestController
public class DemoController {

    @Autowired
    RabbitmqProviderService rabbitmqProviderService;

    @RequestMapping("/sms")
    public void sendMsgSms(String msg) {
        rabbitmqProviderService.sendMessageSms(msg);
    }

    @RequestMapping("/eamil")
    public void sendMsgEmail(String msg) {
        rabbitmqProviderService.sendMessageEmail(msg);
    }
}

經過頁面發送消息:

http://localhost:44000/sms?msg=1111

http://localhost:44000/email?msg=1111

RabbitMQ進階用法

TTL

消息設置過時時間

MessageProperties messageProperties = new MessageProperties();
messageProperties.setExpiration("30000");
Message msg = new Message("消息內容".getBytes(),messageProperties);
//若是消息沒有及時消費,那麼通過30秒之後,消息變成死信,Rabbitmq會將這個消息直接丟棄。
rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_TOPICS_INFORM,"demo.one.sms",msg);

隊列設置過時時間

Queue queue = QueueBuilder.durable(QUEUE_SMS).ttl(30000).build();

死信隊列

當一個消息變成死信了之後,默認狀況下這個消息會被mq刪除。若是咱們給隊列指定了"死信交換機"(DLX:
Dead-Letter-Exchange),那麼此時這個消息就會轉發到死信交換機,進而被與死信交換機綁定的隊列(死信隊列)進行消費。從而實現了延遲消息發送的效果。有三種狀況消息會進入DLX(Dead Letter Exchange)死信交換機。
一、(NACK || Reject ) && requeue == false
二、消息過時
三、隊列達到最大長度,能夠經過x-max-length參數來指定隊列的長度,若是不指定,能夠認爲是無限長(先入隊的消息會被髮送到DLX)

一、聲明死信交換機、死信隊列、死信交換機和死信隊列的綁定

// 聲明死信交換機
@Bean(name = "dlx.exchange")
public Exchange dlxExchange() {
return
ExchangeBuilder.directExchange("dlx.exchange").durable(true).build() ;
}
// 聲明死信隊列
@Bean(name = "dlx.queue")
public Queue dlxQueue() {
   return QueueBuilder.durable("dlx.queue").build() ;
}
// 完成死信隊列和死信交換機的綁定
@Bean
public Binding dlxQueueBindToDlxExchange(@Qualifier(value =
"dlx.exchange") Exchange exchange, @Qualifier(value = "dlx.queue")
Queue queue) {
   return
BindingBuilder.bind(queue).to(exchange).with("delete").noargs() ;
}

二、將死信隊列做爲普通隊列的屬性設置過去

// 聲明隊列
@Bean(name = "direct.queue_02")
public Queue commonQueue02() {
   QueueBuilder queueBuilder =
QueueBuilder.durable("direct.queue_02");
   queueBuilder.deadLetterExchange("dlx.exchange") ;   // 將死信交換機做
爲普通隊列的屬性設置過去
   queueBuilder.deadLetterRoutingKey("delete") ;       // 設置消息的
routingKey
   // queueBuilder.ttl(30000) ;                       // 設置隊列消息的過時時間,爲30秒
   // queueBuilder.maxLength(2) ;                     // 設置隊列的最大長度
   return queueBuilder.build() ;
}

三、消費端進行一樣的設置,而且指定消費死信隊列

@Component
public class RabbitmqDlxQueueConsumer{
   // 建立日誌記錄器
   private static final Logger LOGGER =
LoggerFactory.getLogger(RabbitmqDlxQueueConsumer.class) ;
   @RabbitListener(queues = "dlx.queue")
   public void dlxQueueConsumer(String msg) {
       LOGGER.info("dlx queue msg is : {} ", msg);
  }
}

優先隊列

優先級高的消息能夠優先被消費,可是:只有消息堆積(消息的發送速度大於消費者的消費速度)的狀況下優先級
纔有意義。

Map<String, Object> argss = new HashMap<String, Object>();
argss.put("x-max-priority",10);  // 隊列最大優先級
channel.queueDeclare("ORIGIN_QUEUE", false, false, false, argss);

延遲隊列

RabbitMQ自己不支持延遲隊列。可使用TTL結合DLX的方式來實現消息的延遲投遞,即把DLX跟某個隊列綁定,
到了指定時間,消息過時後,就會從DLX路由到這個隊列,消費者能夠從這個隊列取走消息。
另外一種方式是使用rabbitmq-delayed-message-exchange插件。
固然,將須要發送的信息保存在數據庫,使用任務調度系統掃描而後發送也是能夠實現的。

服務端限流

在AutoACK爲false的狀況下,若是必定數目的消息(經過基於consumer或者channel設置Qos的值)未被確認
前,不進行消費新的消息。

channel.basicQos(2); // 若是超過2條消息沒有發送ACK,當前消費者再也不接受隊列消息
channel.basicConsume(QUEUE_NAME, false, consumer);

RabbitMQ如何保證可靠性

首先須要明確,效率與可靠性是沒法兼得的,若是要保證每個環節都成功,勢必會對消息的收發效率形成影響。若是是一些業務實時一致性要求不是特別高的場合,能夠犧牲一些可靠性來換取效率。

① 表明消息從生產者發送到Exchange;
② 表明消息從Exchange路由到Queue;
③ 表明消息在Queue中存儲;
④ 表明消費者訂閱Queue並消費消息。

一、確保消息發送到RabbitMQ服務器

可能由於網絡或者Broker的問題致使①失敗,而生產者是沒法知道消息是否正確發送到Broker的。

有兩種解決方案,第一種是Transaction(事務)模式,第二種Confirm(確認)模式。

在經過channel. txSelect方法開啓事務以後,咱們即可以發佈消息給RabbitMQ了,若是事務提交成功,則消息定到達了RabbitMQ中,若是在事務提交執行以前因爲RabbitMQ異常崩潰或者其餘緣由拋出異常,這個時候咱們即可以將其捕獲,進而經過執行channel. txRollback方法來實現事務回滾。使用事務機制的話會「吸乾」RabbitMQ性能,通常不建議使用。

生產者經過調用channel. confirmSelect方法(即Confirm. Select命令)將信道設置爲confirm模式。一旦消息投遞到全部匹配的隊列以後,RabbitMQ就會發送一個確認(Basic. Ack)給生產者(包含消息的惟一ID),這就使得生產者知曉消息已經正確到達了目的地了。

二、確保消息路由到正確的隊列

可能由於路由關鍵字錯誤,或者隊列不存在,或者隊列名稱錯誤致使②失敗。
使用mandatory參數和ReturnListener,能夠實現消息沒法路由的時候返回給生產者。
另外一種方式就是使用備份交換機(alternate-exchange),沒法路由的消息會發送到這個交換機上。

三、確保消息在隊列正確地存儲

可能由於系統宕機、重啓、關閉等等狀況致使存儲在隊列的消息丟失,即③出現問題。

一、作隊列、交換機、消息的持久化。

二、作集羣,鏡像隊列。

若是想更改這個默認的配置,咱們能夠在/etc/rabbitmq/目錄下建立一個rabbitmq.config文件,配置信息能夠按照指定的json規則進行指定。以下所示:

[{
   rabbit,
   
  [
       
      {
           queue_index_embed_msgs_below,
           4097
      }
  ]
}].

而後重啓rabbitmq服務(rabbitmqctl stop----> rabbitmq-server -detached)。
那麼咱們是否是把queue_index_embed_msgs_below參數的值調節的越大越好呢?確定不是的rabbit_queue_index中以順序(文件名從0開始累加)的段文件來進行存儲,後綴爲".idx",每一個段文件中包含固定的SEGMENT_ENTRY_COUNT條記錄,SEGMENT_ENTRY_COUNT默認值爲16384。每一個rabbit_queue_index從磁盤中讀取消息的時候至少在內存中維護一個段文件,因此設置queue_index_embed_msgs_below值的時候須要格外謹慎,一點點增大也可能會引發內存爆炸式增加。

相關知識:消息存儲機制

無論是持久化的消息仍是非持久化的消息均可以被寫入到磁盤。
一、持久化的消息在到達隊列時就被寫入到磁盤,而且若是能夠,持久化的消息也會在內存中保存一份備份,這樣能夠提升必定的性能,當內存吃緊的時候會從內存中清除。
二、非持久化的消息通常只保存在內存中,在內存吃緊的時候會被寫入到磁盤中,以節省內存空間。這兩種類型的消息的落盤處理都在RabbitmqMQ的"持久層"中完成。持久層的組成以下所示:

rabbit_queue_index:負責維護隊列中的落盤消息的信息,包括消息的存儲地點、是否已被交付給消費者、
是否已被消費者ack。每個隊列都有與之對應的一個rabbitmq_queue_index。
rabbit_msg_store: 負責消息的存儲,它被全部的隊列共享,在每一個節點中有且只有一個。
rabbit_msg_store能夠在進行細分:

msg_store_persisent :負責持久化消息的持久化,重啓不會丟失

msg_store_transient :負責非持久化消息的持久化,重啓會丟失

消息能夠存儲在rabbit_queue_index中也能夠存儲在rabbit_msg_store中。最佳的配置是較小的消息存儲在rabbit_queue_index中而較大的消息存儲在rabbit_msg_store中。這個消息的界定能夠經過queue_index_embed_msgs_below來配置,默認大小爲4096,單位爲B。注意這裏的消息大小是指消息體、屬性以及headers總體的大小。當一個消息小於設定的大小閾值時就能夠存儲在rabbit_queue_index中,這樣能夠獲得性能上的優化。這種存儲機制是在Rabbitmq3.5 版本之後引入的,該優化提升了系統性能10%左右。

相關知識: 隊列的結構

Rabbitmq中隊列的是由兩部分組成:rabbit_amqpqueue_process和backing_queue組成:

rabbit_amqpqueue_process: 負責協議相關的消息處理,即接收生產者發佈的消息、向消費者交付消息、處理消息的確認(包括生產端的confirm和消費端的ack)等。
backing_queue: 是消息存儲的具體形式和引擎,並向rabbit_amqpqueue_process提供相關的接口以供調用。

若是消息發送的隊列是空的且隊列有消費者,該消息不會通過該隊列而是直接發往消費者,若是沒法直接被消費,則須要將消息暫存入隊列,以便從新投遞。

消息在存入隊列後,主要有如下幾種狀態:
alpha:消息內容(包括消息體、屬性和headers)和消息索引都存在內存中(消耗內存最多,CPU消耗最少)
beta:消息內容保存在磁盤中,消息索引都存在內存中(只須要一次IO操做就能夠讀取到消息)
gamma:消息內容保存在磁盤中,消息索引在磁盤和內存中都存在(只須要一次IO操做就能夠讀取到消息)
delta:消息內容和消息索引都在磁盤中(消耗內存最小,可是會消耗更多的CPU和磁盤的IO操做)持久化的消息,消息內容和消息索引必須先保存在磁盤中,纔會處於上面狀態中的一種,gamma狀態只有持久化的消息纔有這種狀態。Rabbitmq在運行時會根據統計的消息傳送速度按期計算一個當前內存中可以保存的最大消息數量(target_ram_count), 若是alpha狀態的消息數量大於此值時,就會引發消息的狀態轉換,多餘的消息可能會轉換到beta狀態、gamma狀態或者delta狀態。區分這4種狀態的主要做用是知足不一樣的內存和CPU 的需求。

對於普通隊列而言,backing_queue內部的實現是經過5個子隊列來體現消息的狀態的
Q1:只包含alpha狀態的消息
Q2:包含beta和gamma的消息
Delta:包含delta的消息
Q3:包含beta和gamma的消息
Q4:只包含alpha狀態的消息

通常狀況下,消息按照Q1->Q2->Delta->Q3->Q4這樣的順序進行流動,但並非每一條消息都會經歷全部狀態,這取決於當前系統的負載狀況(好比非持久化的消息在內存負載不高時,就不會經歷delta)。如此設計的好處:能夠在隊列負載很高的狀況下,可以經過將一部分消息由磁盤保存來節省內存空間,而在負載下降的時候,這部分消息又漸漸回到內存被消費者獲取,使得整個隊列具備良好的彈性。

相關知識: 消費消息時的狀態轉換

消費者消費消息也會引發消息狀態的轉換,狀態轉換的過程以下所示:

  1. 消費者消費時先從Q4獲取消息,若是獲取成功則返回。

  2. 若是Q4爲空,則從Q3中獲取消息,首先判斷Q3是否爲空,若是爲空返回隊列爲空,即此時隊列中無消

  3. 若是Q3不爲空,取出Q3的消息,而後判斷Q3和Delta中的長度,若是都爲空,那麼Q二、Delta、Q三、
    Q4都爲空,直接將Q1中的消息轉移至Q4,下次直接從Q4中讀取消息

  4. 若是Q3爲空,Delta不爲空,則將Delta中的消息轉移至Q3中,下次直接從Q3中讀取。

  5. 在將消息從Delta轉移至Q3的過程當中,是按照索引分段讀取,首先讀取某一段,而後判斷讀取的消息個數和Delta消息的個數,若是相等,斷定Delta已無消息,直接將讀取Q2和讀取到消息一併放入Q3,若是不相等,僅將這次讀取的消息轉移到Q3。

一般在負載正常時,若是消息被消費的速度不小於接收新消息的速度,對於不須要保證可靠性的消息來講,極有可能只會處於alpha狀態。對於durable屬性設置爲true的消息,它必定會進入gamma狀態,而且在開啓publisher confirm機制時,只有到了gamma狀態時纔會確認該消息己被接收,若消息消費速度足夠快、內存也充足,這些消息也不會繼續走到下一個狀態。

在系統負載較高中,已經收到的消息若不能很快被消費掉,就是這些消息就是在隊列中"堆積", 那麼此時Rabbitmq就須要花更多的時間和資源處理"堆積"的消息,如此用來處理新流入的消息的能力就會下降,使得流入的消息又被"堆積"繼續增大處理每一個消息的平均開銷,繼而狀況變得愈來愈惡化,使得系統的處理能力大大下降。
減小消息堆積的常看法決方案:
一、增長prefetch_count的值,設置消費者存儲未確認的消息的最大值
二、消費者進行multiple ack,下降ack帶來的開銷

相關知識: 惰性隊列

默認狀況下,當生產者將消息發送到Rabbitmq的時候,隊列中的消息會盡量地存儲在內存中,這樣能夠更快地將消息發送給消費者。即便是持久化的消息,在被寫入磁盤的同時也會在內存中駐留一份備份。這樣的機制無形會佔用更多系統資源,畢竟內存應該留給更多有須要的地方。若是發送端過快或消費端宕機,致使消息大量積壓,此時消息仍是在內存和磁盤各存儲一份,在消息大爆發的時候,MQ服務器會撐不住,影響其餘隊列的消息收發,能不能有效的處理這種狀況呢。答案 惰性隊列。
RabbitMQ從3.6.0版本開始引入了惰性隊列(Lazy Queue)的概念。惰性隊列會將接收到的消息直接存入文件系統中,而無論是持久化的或者是非持久化的,這樣能夠減小了內存的消耗,可是會增長I/0的使用,若是消息是持久化的,那麼這樣的I/0操做不可避免,惰性隊列和持久化的消息可謂是"最佳拍檔"。注意若是惰性隊列中存儲的是非持久化的消息,內存的使用率會一直很穩定,可是重啓以後消息同樣會丟失。
把一個隊列設置成惰性隊列的方式:

// 聲明隊列
@Bean(name = "direct.queue_03")
public Queue commonQueue03() {
   QueueBuilder queueBuilder = QueueBuilder.durable("direct.queue_03");
   queueBuilder.lazy();            // 把隊列設置成惰性隊列
   return queueBuilder.build();
}

四、確保消息從隊列正確地投遞到消費者

若是消費者收到消息後將來得及處理即發生異常,或者處理過程當中發生異常,會致使④失敗。

爲了保證消息從隊列可靠地達到消費者,RabbitMQ提供了消息確認機制(message acknowledgement)。消費者在訂閱隊列時,能夠指定autoAck參數,當autoAck等於false時,RabbitMQ會等待消費者顯式地回覆確認信號後才從隊列中移去消息。

若是消息消費失敗,也能夠調用Basic. Reject或者Basic. Nack來拒絕當前消息而不是確認。若是r equeue參數設置爲true,能夠把這條消息從新存入隊列,以便發給下一個消費者(固然,只有一個消費者的時候,這種方式可能會出現無限循環重複消費的狀況,能夠投遞到新的隊列中,或者只打印異常日誌)。

五、補償機制

對於必定時間沒有獲得響應的消息,能夠設置一個定時重發的機制,但要控制次數,好比最多重發3次,不然會造
成消息堆積。

六、消息冪等性

服務端是沒有這種控制的,只能在消費端控制。
如何避免消息的重複消費?
消息重複可能會有兩個緣由:
一、生產者的問題,環節①重複發送消息,好比在開啓了Confirm模式但未收到確認。
二、環節④出了問題,因爲消費者未發送ACK或者其餘緣由,消息重複投遞。
對於重複發送的消息,能夠對每一條消息生成一個惟一的業務ID,經過日誌或者建表來作重複控制。

七、消息的順序性

消息的順序性指的是消費者消費的順序跟生產者產生消息的順序是一致的。
在RabbitMQ中,一個隊列有多個消費者時,因爲不一樣的消費者消費消息的速度是不同的,順序沒法保證。

RabbitMQ如何保證高可用

RabbittMQ集羣

集羣主要用於實現高可用與負載均衡。
RabbitMQ經過/var/lib/r abbitmq/. erlang. cookie來驗證身份,須要在全部節點上保持一致。
集羣有兩種節點類型,一種是磁盤節點,一種是內存節點。集羣中至少須要一個磁盤節點以實現元數據的持久化,
未指定類型的狀況下,默認爲磁盤節點。
集羣經過25672端口兩兩通訊,須要開放防火牆的端口。
須要注意的是,RabbitMQ集羣沒法搭建在廣域網上,除非使用feder ation或者shovel等插件。
集羣的配置步驟:
一、配置hosts
二、同步erlang. cookie
三、加入集羣

集羣搭建

docker pull rabbitmq:3.6.10-management
docker run -di --network=docker-network --ip=172.19.0.50 --hostname=rabbitmq-node01 --name=rabbitmq_01 -p 15673:15672 -p 5673:5672
--privileged=true -e RABBITMQ_ERLANG_COOKIE='rabbitcookie'
rabbitmq:3.6.10-management /bin/bash
docker run -di --network=docker-network --ip=172.19.0.51 --hostname=rabbitmq-node02 --name=rabbitmq_02 -p 15674:15672 -p 5674:5672
--privileged=true -e RABBITMQ_ERLANG_COOKIE='rabbitcookie'
rabbitmq:3.6.10-management /bin/bash
docker run -di --network=docker-network --ip=172.19.0.52 --hostname=rabbitmq-node03 --name=rabbitmq_03 -p 15675:15672 -p 5675:5672
--privileged=true -e RABBITMQ_ERLANG_COOKIE='rabbitcookie'
rabbitmq:3.6.10-management /bin/bash

參數說明:Erlang Cookie值必須相同,也就是RABBITMQ_ERLANG_COOKIE參數的值必須相同。由於
RabbitMQ是用Erlang實現的,Erlang Cookie至關於不一樣節點之間相互通信的祕鑰,Erlang節點經過交換Erlang Cookie得到認證。

docker exec -itrabbitmq_01 /bin/bash

配置hosts文件,讓各個節點都能互相識別對方的存在。在系統中編輯/etc/hosts文件,添加ip地址和節點名稱的映射信息(apt-get update , apt-get install vim):

172.19.0.50 rabbitmq-node01
172.19.0.51 rabbitmq-node02
172.19.0.52 rabbitmq-node03

啓動rabbitmq,而且查看狀態

root@014faa4cba72:/# rabbitmq-server -detached          # 啓動
rabbitmq服務,該命令能夠啓動erlang虛擬機和rabbitmq服務
root@014faa4cba72:/# rabbitmqctl status                     # 查看節點
信息
Status of noderabbit@014faa4cba72
[{pid,270},
{running_applications,
    [{rabbitmq_management,"RabbitMQ Management Console","3.6.10"},
    {rabbitmq_management_agent,"RabbitMQ Management Agent","3.6.10"},
    {rabbitmq_web_dispatch,"RabbitMQ Web Dispatcher","3.6.10"},
.............
root@014faa4cba72:/# rabbitmqctl cluster_status             # 查看集羣
節點狀態
Cluster status of noderabbit@014faa4cba72
[{nodes,[{disc,[rabbit@014faa4cba72]}]},
{running_nodes,[rabbit@014faa4cba72]},                      # 正在運行
的只有一個節點
{cluster_name,<<"rabbit@014faa4cba72">>},
{partitions,[]},
{alarms,[{rabbit@014faa4cba72,[]}]}]

注意:此時咱們能夠經過瀏覽器訪問rabbitmq的後端管理系統,可是rabbitmq默認提供的guest用戶不支持遠程訪問。所以咱們須要建立用戶,而且對其進行受權

root@014faa4cba72:/# rabbitmqctl add_user admin admin   # 添加用戶,用戶名
爲admin,密碼爲admin
root@014faa4cba72:/# rabbitmqctl list_users             # 查看rabbitmq的
用戶列表
Listing users
admin   []                                              # admin用戶已經添
加成功,可是沒有角色
guest   [administrator]
root@014faa4cba72:/# rabbitmqctl set_user_tags admin administrator   #
給admin用戶設置管理員權限
# rabbitmqctl delete_user admin   # 刪除admin用戶
# rabbitmqctl stop_app           # 中止rabbitmq服務
# rabbitmqctl stop               # 會將rabbitmq的服務和erlang虛擬機一同關閉

再次使用admin用戶就能夠登陸web管理系統了。在其餘的rabbitmq中也建立用戶,以便後期能夠訪問後端管理系統。

配置集羣

一、同步cookie
集羣中的Rabbitmq節點須要經過交換密鑰令牌以得到相互認證,若是節點的密鑰令牌不一致,那麼在配置節點時就會報錯。
獲取某一個節點上的/var/lib/rabbitmq/.erlang.cookie文件,而後將其複製到其餘的節點上。咱們以node01節點爲基準,進行此操做。

docker cprabbitmq_01:/var/lib/rabbitmq/.erlang.cookie .
docker cp.erlang.cookie rabbitmq_02:/var/lib/rabbitmq
docker cp.erlang.cookie rabbitmq_03:/var/lib/rabbitmq

二、創建集羣關係
目前3個節點都是獨立的運行,之間並無任何的關聯關係。接下來咱們就來創建3者之間的關聯關係,咱們以rabbitmq-node01爲基準,將其餘的兩個節點加入進來。
把rabbitmq-node02加入到節點1中

# 進入到rabbitmq-node02中
rabbitmqctl stop_app   # 關閉rabbitmq服務
rabbitmqctl reset      # 進行重置
rabbitmqctl join_cluster rabbit@rabbitmq-node01     # rabbitmq-node01爲
節點1的主機名稱
rabbitmqctl start_app   # 啓動rabbitmq節點

把rabbitmq-node03加入到節點1中

# 進入到rabbitmq-node03中
rabbitmqctl stop_app   # 關閉rabbitmq服務
rabbitmqctl reset      # 清空節點的狀態,並將其恢復都空白狀態,當設置的節點時集羣
中的一部分,該命令也會和集羣中的磁盤節點進行通信,告訴他們該節點正在離開集羣。否則集羣
會認爲該節點處理故障,並指望其最終可以恢復過來
rabbitmqctl join_cluster rabbit@rabbitmq-node01     # rabbitmq-node01爲
節點1的主機名稱
rabbitmqctl start_app   # 啓動rabbitmq節點

進入後臺管理系統查看集羣概述。

節點類型

節點類型介紹
在使用rabbitmqctl cluster_status命令來查看集羣狀態時會有[{nodes,[{disc['rabbit@rabbitmqnode01','rabbit@rabbitmq-node02','rabbit@rabbitmq-node03']}這一項信息,其中的disc標註了Rabbitmq節點類型。Rabbitmq中的每個節點,無論是單一節點系統或者是集羣中的一部分要麼是內存節點,要麼是磁盤節點。內存節點將全部的隊列,交換機,綁定關係、用戶、權限和vhost的元數據定義都存儲在內存中,而磁盤節點則將這些信息存儲到磁盤中。單節點的集羣中必然只有磁盤類型的節點,不然當重啓Rabbitmq以後,全部關於系統配置信息都會丟失。不過在集羣中,能夠選擇配置部分節點爲內存節點,這樣能夠得到更高的性能。

節點類型變動
若是咱們沒有指定節點類型,那麼默認就是磁盤節點。咱們在添加節點的時候,可使用以下的命令來指定節點的類型爲內存節點:

rabbitmqctl join_cluster rabbit@rabbitmq-node01 --ram

咱們也可使用以下的命令將某一個磁盤節點設置爲內存節點:

rabbitmqctl change_cluster_node_type {disc , ram}

以下所示

root@rabbitmq-node02:/# rabbitmqctl stop_app                          
        # 關閉rabbitmq服務
Stopping rabbit application on node 'rabbit@rabbitmq-node02'
root@rabbitmq-node02:/# rabbitmqctl change_cluster_node_type ram      
        # 將root@rabbitmq-node02節點類型切換爲內存節點
Turning 'rabbit@rabbitmq-node02'into a ram node
root@rabbitmq-node02:/# rabbitmqctl start_app                          
        # 啓動rabbitmq服務
Starting node 'rabbit@rabbitmq-node02'
root@rabbitmq-node02:/# rabbitmqctl cluster_status                    
        # 查看集羣狀態
Cluster status of node 'rabbit@rabbitmq-node02'
[{nodes,[{disc,['rabbit@rabbitmq-node03','rabbit@rabbitmq-node01']},
        {ram,['rabbit@rabbitmq-node02']}]},
{running_nodes,['rabbit@rabbitmq-node01','rabbit@rabbitmq-node03',
                'rabbit@rabbitmq-node02']},
{cluster_name,<<"rabbit@rabbitmq-node01">>},
{partitions,[]},
{alarms,[{'rabbit@rabbitmq-node01',[]},
        {'rabbit@rabbitmq-node03',[]},
        {'rabbit@rabbitmq-node02',[]}]}]
root@rabbitmq-node02:/#

節點選擇

Rabbitmq只要求在集羣中至少有一個磁盤節點,其餘全部的節點能夠是內存節點。當節點加入或者離開集羣時,它們必須將變動通知到至少一個磁盤節點。若是隻有一個磁盤節點,並且不湊巧它恰好崩潰了,那麼集羣能夠繼續接收和發送消息。

可是不能執行建立隊列,交換機,綁定關係、用戶已經更改權限、添加和刪除集羣節點操做了。也就是說、若是集羣中惟一的磁盤節點崩潰了,集羣仍然能夠保持運行,可是知道將該節點恢復到集羣前,你沒法更改任何東西,因此在建立集羣的時候應該保證至少有兩個或者多個磁盤節點。

當內存節點重啓後,它會鏈接到預先配置的磁盤節點,下載當前集羣元數據的副本。當在集羣中添加內存節點的時候,確保告知全部的磁盤節點(內存節點惟一存儲到磁盤中的元數據信息是磁盤節點的地址)。只要內存節點能夠找到集羣中至少一個磁盤節點,那麼它就能在重啓後從新加入集羣中。

集羣優化:HAproxy負載+Keepalived高可用

以前搭建的集羣存在的問題:不具備負載均衡能力

本次咱們所選擇的負載均衡層的軟件是HAProxy。爲了保證負載均衡層的高可用,咱們須要使用使用到keepalived軟件,使用vrrp協議產生虛擬ip實現動態的ip飄逸。

keepalived是以VRRP協議爲實現基礎的,VRRP全稱Virtual Router Redundancy Protocol,即虛擬路由冗餘協議。虛擬路由冗餘協議,能夠認爲是實現路由器高可用的協議,即將N臺提供相同功能的路由器組成一個路由器組,這個組裏面有一個master和多個backup,master上面有一個對外提供服務的vip(該路由器所在局域網內其餘機器的默認路由爲該vip),master會定義向backup發送vrrp協議數據包,當backup收不到vrrp包時就認爲master宕掉了,這時就須要根據VRRP的優先級來選舉一個backup當master。這樣的話就能夠保證路由器的高可用了。

優化實現

在兩個內存節點上安裝HAProxy

yum install haproxy

編輯配置文件

vim /etc/haproxy/haproxy.cfg

global
   log         127.0.0.1 local2
   chroot      /var/lib/haproxy
   pidfile     /var/run/haproxy.pid
   maxconn     4000
   user        haproxy
   group       haproxy
   daemon
   stats socket /var/lib/haproxy/stats
defaults
   log                     global
   option                  dontlognull
   option                  redispatch
   retries                 3
   timeout connect         10s
   timeout client          1m
   timeout server          1m
   maxconn                 3000
listen http_front
       mode http
       bind 0.0.0.0:1080           #監聽端口
       stats refresh 30s           #統計頁面自動刷新時間
       stats uri /haproxy?stats    #統計頁面url
       stats realm Haproxy Manager #統計頁面密碼框上提示文本
       stats auth admin:123456     #統計頁面用戶名和密碼設置
listen rabbitmq_admin
   bind 0.0.0.0:15673
   server node1 192.168.8.40:15672
   server node2 192.168.8.45:15672
listen rabbitmq_cluster 0.0.0.0:5673
   mode tcp
   balance roundrobin
   timeout client 3h
   timeout server 3h
   timeout connect 3h
   server   node1 192.168.8.40:5672 check inter 5s rise 2 fall 3
   server   node2 192.168.8.45:5672 check inter 5s rise 2 fall 3

啓動HAproxy

haproxy -f /etc/haproxy/haproxy.cfg

安裝keepalived

yum -y install keepalived

修改配置文件

vim /etc/keepalived/keepalived.conf

global_defs {
  notification_email {
    acassen@firewall.loc
    failover@firewall.loc
    sysadmin@firewall.loc
  }
  notification_email_from Alexandre.Cassen@firewall.loc
  smtp_server 192.168.200.1
  smtp_connect_timeout 30
  router_id LVS_DEVEL
  vrrp_skip_check_adv_addr
  # vrrp_strict    # 註釋掉,否則訪問不到VIP
  vrrp_garp_interval 0
  vrrp_gna_interval 0
}
global_defs {
  notification_email {
    acassen@firewall.loc
    failover@firewall.loc
    sysadmin@firewall.loc
  }
  notification_email_from Alexandre.Cassen@firewall.loc
  smtp_server 192.168.200.1
  smtp_connect_timeout 30
  router_id LVS_DEVEL
  vrrp_skip_check_adv_addr
  # vrrp_strict    # 註釋掉,否則訪問不到VIP
  vrrp_garp_interval 0
  vrrp_gna_interval 0
}
# 檢測任務
vrrp_script check_haproxy {
   # 檢測HAProxy監本
   script "/etc/keepalived/script/check_haproxy.sh"
   # 每隔兩秒檢測
   interval 2
   # 權重
   weight 2
}
# 虛擬組
vrrp_instance haproxy {
   state MASTER # 此處爲`主`,備機是 `BACKUP`
   interface ens33 # 物理網卡,根據狀況而定
   mcast_src_ip 192.168.8.40 # 當前主機ip
   virtual_router_id 51 # 虛擬路由id,同一個組內須要相同
   priority 100 # 主機的優先權要比備機高
   advert_int 1 # 心跳檢查頻率,單位:秒
   authentication { # 認證,組內的要相同
       auth_type PASS
       auth_pass 1111
   }
   # 調用腳本
   track_script {
       check_haproxy
   }
   # 虛擬ip,多個換行
   virtual_ipaddress {
       192.168.8.201
   }
}

啓動

keepalived -D

RabbitMQ鏡像隊列

一、爲何要存在鏡像隊列
爲了保證隊列和消息的高可用
二、什麼是鏡像隊列,鏡像隊列是如何進行選取主節點的?
引入鏡像隊列的機制,能夠將隊列鏡像到集羣中的其餘的Broker節點之上,若是集羣中的一個節點失效了,隊列能自動的切換到鏡像中的另外一個節點之上以保證服務的可用性。在一般的用法中,針對每個配置鏡像的隊列(一下稱之爲鏡像隊列)都包含一個主節點(master)和若干個從節點(slave),以下圖所示:

集羣方式下,隊列和消息是沒法在節點之間同步的,所以須要使用RabbitMQ的鏡像隊列機制進行同步。

深刻了解參考文章:https://blog.csdn.net/u013256816/article/details/71097186

RabbitMQ的應用

筆者就任於電商公司,就以電商秒殺場景爲背景,闡述下RabbitMQ的實踐。

場景:訂單未支付庫存回退

當用戶秒殺成功之後,就須要引導用戶去訂單頁面進行支付。若是用戶在規定的時間以內(30分鐘),沒有完成訂單的支付,此時咱們就須要進行庫存的回退操做。

架構圖

具體實現就是使用的死信隊列,能夠參考上面的代碼。

場景:RabbitMQ秒殺公平性保證

消息的可靠性傳輸能夠保證秒殺業務的公平性。關於秒殺業務的公平性,咱們還須要考慮一點:消息的順序性(先進入隊列的消息先進行處理)
RabbitMQ消息順序性說明
順序性:消息的順序性是指消費者消費到的消息和發送者發佈的消息的順序是一致的。
舉個例子,不考慮消息重複的狀況,若是生產者發佈的消息分別爲msgl、msg二、msg3,那麼消費者必然也是按照msgl、msg二、msg3的順序進行消費的。
目前不少資料顯示RabbitMQ的消息可以保障順序性,這是不正確的,或者說這個觀點有很大的侷限性。在不使用任何RabbitMQ的高級特性,也沒有消息丟失、網絡故障之類異常的狀況發生,而且只有一個消費者的狀況下,也只有一個生產者的狀況下能夠保證消息的順序性。若是有多個生產者同時發送消息,沒法肯定消息到達Broker 的先後順序,也就沒法驗證消息的順序性,由於每一次消息的發送都是在各自的線程中進行的。
RabbitMQ消息順序錯亂演示
生產者發送消息:
一、不使用生產者確認機制,單生產者單消費者能夠保證消息的順序性
二、使用了生產者確認機制,那麼就沒法保證消息到達Broker的先後順序,由於消息的發送是異步發送的,每個線程的執行時間不一樣
三、生產端使用事務機制,保證消息的順序性
消費端消費消息:
一、單消費者能夠保證消息的順序性
二、多消費者不能保證消息的順序,由於每個消息的消費都是在各自的線程中進行,每個線程的執行時間不一樣
RabbitMQ消息順序性保障
生產端啓用事務機制,單生產者單消費者。若是咱們不考慮消息到達MQ的順序,只是考慮對已經到達到MQ中的消息順序消費,那麼須要保證消費者是單消費者便可。

場景:RabbitMQ秒殺不超賣保證

要保證秒殺不超賣,咱們須要在不少的環節進行考慮。好比:在進行預扣庫存的時候,咱們須要考慮不能超賣,在進行數據庫真實庫存扣減的時候也須要考慮不能超賣。而對咱們的mq這個環節而言,要保證不超賣咱們只須要保證消息不被重複消費。

首先咱們能夠確認的是,觸發消息重複執行的條件會是很苛刻的! 也就說 在大多數場景下不會觸發該條件!!! 通常出在任務超時,或者沒有及時返回狀態,引發任務從新入隊列,因爲服務端沒有收到消費端的ack應答,所以該條消息還會從新進行投遞。

冪等性保障方案

重複消費不可怕,可怕的是你沒考慮到重複消費以後,怎麼保證冪等性。
所謂冪等性,就是對接口的屢次調用所產生的結果和調用一次是一致的。通俗點說,就一個數據,或者一個請求,給你重複來屢次,你得確保對應的數據是不會改變的,不能出錯。
舉個例子:
假設你有個系統,消費一條消息就往數據庫裏插入一條數據,要是你一個消息重複消費兩次,你不就插入了兩條,這數據不就錯了?可是你要是消費到第二次的時候,本身判斷一下是否已經消費過了,如果就直接扔了,這樣不就保留了一條數據,從而保證了數據的正確性。一條數據重複消費兩次,數據庫裏就只有一條數據,這就保證了系統的冪等性。
怎麼保證消息隊列消費的冪等性?這一點須要結合的實際的業務來進行處理:
一、好比咱們消費的數據須要寫數據庫,你先根據主鍵查一下,若是這數據都有了,你就別插入了,執行如下update操做
二、好比咱們消費的數據須要寫Redis,那沒問題了,反正每次都是set,自然冪等性。
三、好比你不是上面兩個場景,那作的稍微複雜一點,你須要讓生產者發送每條數據的時候,裏面加一個全局惟一的id,相似訂單id 之類的東西,而後你這裏消費到了以後,先根據這個id 去好比Redis 裏查一下,以前消費過嗎?若是沒有消費過,你就處理,而後這個id 寫Redis。若是消費過了,那你就別處理了,保證別重複處理相同的消息便可。
四、好比基於數據庫的惟一鍵來保證重複數據不會重複插入多條。由於有惟一鍵約束了,重複數據插入只會報錯,不會致使數據庫中出現髒數據。

面試題

一、消息隊列的做用與使用場景? 二、建立隊列和交換機的方法? 三、多個消費者監聽一個生產者時,消息如何分發? 四、沒法被路由的消息,去了哪裏? 五、消息在何時會變成Dead Letter(死信)? 六、RabbitMQ如何實現延遲隊列? 七、如何保證消息的可靠性投遞? 八、如何在服務端和消費端作限流? 九、如何保證消息的順序性? 十、RabbitMQ的節點類型?

相關文章
相關標籤/搜索