RabbitMQ 簡介

###前言java

  1. RabbitMQ是實現了高級消息隊列協議(AMQP)的開源消息代理軟件(亦稱面向消息的中間件)。
  2. 自己支持不少的協議:AMQP, XMPP, SMTP, STONP
  3. RabbitMQ服務器是用Erlang語言編寫的,實現了代理(Broker)架構,意味着消息在發送到客戶端以前能夠在中央節點上排隊。 此特性使得RabbitMQ易於使用和部署,適宜於不少場景如路由、負載均衡或消息持久化等

###主要特性服務器

  1. 消息集羣(Clustering): RabbitMQ支持分佈式部署,多個RabbitMQ服務器組成一個集羣造成一個邏輯Broker
  2. 可靠性(Reliability): RabbitMQ 使用一些機制來保證可靠性,如持久化、傳輸確認、發佈確認。
  3. 高可用(Highly Available Queues): 隊列能夠在集羣中的機器上進行鏡像,使得在部分節點出問題的狀況下隊列仍然可用。

###基本組成網絡

Producer: 消息發送者

Message :消息,消息是不具名的,它由消息頭和消息體組成。消息體是不透明的,而消息頭則由一系列的可選屬性組成

RabbitMQ:
    - Virtual Host:虛擬主機,表示一批交換器、消息隊列和相關對象。默認"/"
   
    - Exchange: 路由器,接收消息,根據RoutingKey分發消息,有四種類型:
        - headers:消息頭類型 路由器,內部應用
        - direct:精準匹配類型 路由器
        - topic:主題匹配類型 路由器,支持正則 模糊匹配
        - fanout:廣播類型 路由器,RoutingKey無效

    - RoutingKey: 路由規則

    - Queue: 隊列,用於存儲消息,保存消息直到發送給消費者。它是消息的容器,也是消息的終點(消息的目的地)

    - Binding:綁定,用於消息隊列和交換器之間的關聯。一個綁定就是基於路由鍵將交換器和消息隊列連 

    - Connection:網絡鏈接,好比一個TCP鏈接。 

    - Channel:信道:創建和銷燬 TCP 都是很是昂貴的開銷,因此引入了信道的概念,以複用一條 TCP 鏈接。 

Consumer: 消息消費者

###RabbitMQ 消息確認策略架構

Confrim: 
    簡單說就是直接傳送消息 client > mq, 接收到 mq > ack, mq 在異步的完成接下來的事情
    發送線程不會當即獲得MQ反饋結果,發送後經過callback確認成功失敗

Transaction: 
    client 請求開啓事務  > 發送message > client 提交事務,整個過程是同步的,mq必須完成消息持久化、消息同步等
    發送線程會當即獲得MQ反饋結果,同一線程中,多個發送阻塞進行

###消息消費的兩種模式負載均衡

1. 無需反饋: 消費者從消息隊列獲取消息後,服務端就認爲該消息已經成功消費

2. 消息消費完給服務器返回確認狀態,表示該消息已被消費
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);

###持久化異步

RabbitMQ中消息的持久化須要保證Exchange、Queue、Message都進行持久化操做。

Exchange的持久化
    @param exchange the name of the exchange
    @param type the exchange type
    @param durable true if we are declaring a durable exchange 
    exchangeDeclare(String exchange, String type, boolean durable)


Queue的持久化
    @param queue the name of the queue
    @param durable true if we are declaring a durable queue (the queue will survive a server restart)
    @param exclusive true if we are declaring an exclusive queue (restricted to this connection)
    @param autoDelete true if we are declaring an autodelete queue (server will delete it when no longer in use)
    @param arguments other properties (construction arguments) for the queue
    queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)

    autoDelete:代表該隊列是否自動刪除。
    自動刪除,若是該隊列沒有任何訂閱的消費者的話,該隊列會被自動刪除。這種隊列適用於臨時隊列。


Message的持久化
    @param exchange the exchange to publish the message to
    @param routingKey the routing key
    @param props other properties for the message - routing headers etc
    @param body the message body
    basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)

    exchange:表示該消息發送到哪一個Exchange
    routingKey:表示該消息的Routing Key
    props:表示該消息的屬性    
        BasicProperties.PERSISTENT_TEXT_PLAIN
        BasicProperties.PERSISTENT_BASIC
    body:消息實體

###存活時間 TTL分佈式

Queue TTL(Queue上沒有Consumer)

Map<String, Object> queueArgs = new HashMap<>();
// 設置1分鐘過時
queueArgs.put("x-expires", 60000);
channel.queueDeclare("queue", false, false, false, queueArgs);

Message TTL(消息在隊列中的存活時間,超過該時間消息將被刪除或者不能傳遞給消費者)

// 設置消息屬性-TTL爲30s
BasicProperties properties = new BasicProperties.Builder().expiration("30000").build();
channel.basicPublish("exchange", "kanyuxia", properties,"hello".getBytes(StandardCharsets.UTF_8));
相關文章
相關標籤/搜索