###前言java
- RabbitMQ是實現了高級消息隊列協議(AMQP)的開源消息代理軟件(亦稱面向消息的中間件)。
- 自己支持不少的協議:AMQP, XMPP, SMTP, STONP
- RabbitMQ服務器是用Erlang語言編寫的,實現了代理(Broker)架構,意味着消息在發送到客戶端以前能夠在中央節點上排隊。 此特性使得RabbitMQ易於使用和部署,適宜於不少場景如路由、負載均衡或消息持久化等
###主要特性服務器
- 消息集羣(Clustering): RabbitMQ支持分佈式部署,多個RabbitMQ服務器組成一個集羣造成一個邏輯Broker
- 可靠性(Reliability): RabbitMQ 使用一些機制來保證可靠性,如持久化、傳輸確認、發佈確認。
- 高可用(Highly Available Queues): 隊列能夠在集羣中的機器上進行鏡像,使得在部分節點出問題的狀況下隊列仍然可用。
###基本組成網絡
Producer: 消息發送者 Message :消息,消息是不具名的,它由消息頭和消息體組成。消息體是不透明的,而消息頭則由一系列的可選屬性組成 RabbitMQ: - Virtual Host:虛擬主機,表示一批交換器、消息隊列和相關對象。默認"/" - Exchange: 路由器,接收消息,根據RoutingKey分發消息,有四種類型: - headers:消息頭類型 路由器,內部應用 - direct:精準匹配類型 路由器 - topic:主題匹配類型 路由器,支持正則 模糊匹配 - fanout:廣播類型 路由器,RoutingKey無效 - RoutingKey: 路由規則 - Queue: 隊列,用於存儲消息,保存消息直到發送給消費者。它是消息的容器,也是消息的終點(消息的目的地) - Binding:綁定,用於消息隊列和交換器之間的關聯。一個綁定就是基於路由鍵將交換器和消息隊列連 - Connection:網絡鏈接,好比一個TCP鏈接。 - Channel:信道:創建和銷燬 TCP 都是很是昂貴的開銷,因此引入了信道的概念,以複用一條 TCP 鏈接。 Consumer: 消息消費者
###RabbitMQ 消息確認策略架構
Confrim: 簡單說就是直接傳送消息 client > mq, 接收到 mq > ack, mq 在異步的完成接下來的事情 發送線程不會當即獲得MQ反饋結果,發送後經過callback確認成功失敗 Transaction: client 請求開啓事務 > 發送message > client 提交事務,整個過程是同步的,mq必須完成消息持久化、消息同步等 發送線程會當即獲得MQ反饋結果,同一線程中,多個發送阻塞進行
###消息消費的兩種模式負載均衡
1. 無需反饋: 消費者從消息隊列獲取消息後,服務端就認爲該消息已經成功消費 2. 消息消費完給服務器返回確認狀態,表示該消息已被消費 channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
###持久化異步
RabbitMQ中消息的持久化須要保證Exchange、Queue、Message都進行持久化操做。 Exchange的持久化 @param exchange the name of the exchange @param type the exchange type @param durable true if we are declaring a durable exchange exchangeDeclare(String exchange, String type, boolean durable) Queue的持久化 @param queue the name of the queue @param durable true if we are declaring a durable queue (the queue will survive a server restart) @param exclusive true if we are declaring an exclusive queue (restricted to this connection) @param autoDelete true if we are declaring an autodelete queue (server will delete it when no longer in use) @param arguments other properties (construction arguments) for the queue queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) autoDelete:代表該隊列是否自動刪除。 自動刪除,若是該隊列沒有任何訂閱的消費者的話,該隊列會被自動刪除。這種隊列適用於臨時隊列。 Message的持久化 @param exchange the exchange to publish the message to @param routingKey the routing key @param props other properties for the message - routing headers etc @param body the message body basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) exchange:表示該消息發送到哪一個Exchange routingKey:表示該消息的Routing Key props:表示該消息的屬性 BasicProperties.PERSISTENT_TEXT_PLAIN BasicProperties.PERSISTENT_BASIC body:消息實體
###存活時間 TTL分佈式
Queue TTL(Queue上沒有Consumer) Map<String, Object> queueArgs = new HashMap<>(); // 設置1分鐘過時 queueArgs.put("x-expires", 60000); channel.queueDeclare("queue", false, false, false, queueArgs); Message TTL(消息在隊列中的存活時間,超過該時間消息將被刪除或者不能傳遞給消費者) // 設置消息屬性-TTL爲30s BasicProperties properties = new BasicProperties.Builder().expiration("30000").build(); channel.basicPublish("exchange", "kanyuxia", properties,"hello".getBytes(StandardCharsets.UTF_8));