[TOC]html
RabbitMQ 相較於其餘消息隊列,有一系列防止消息丟失的措施,擁有強悍的高可用性能,它的吞吐量可能沒有其餘消息隊列大,可是其消息的保障性出類拔萃,被普遍用於金融類業務。與其餘消息隊列的比較以及強大的防止消息丟失的能力咱們將在後續文章再作介紹。java
AMQP: Advanced Message Queuing Protocol 高級消息隊列協議服務器
AMQP定義:是具備現代特徵的二進制協議。是一個提供統一消息服務的應用層標準高級消息隊列協議,是應用層協議的一個開放標準,爲面向消息的中間件設計。網絡
Erlang語言最初在於交換機領域的架構模式,這樣使得RabbitMQ在Broker之間進行數據交互的性能是很是優秀的 Erlang的優勢: Erlang有着和原生Socket同樣的延遲。架構
RabbitMQ是一個開源的消息代理和隊列服務器,用來經過普通協議在徹底不一樣的應用之間共享數據, RabbitMQ是使用Erlang語言來編寫的,而且RabbitMQ是基於AMQP協議的。ide
生產者發送消息到指定的 Exchange,Exchange 依據自身的類型(direct、topic等),根據 routing key 將消息發送給 0 - n 個 隊列,隊列再將消息轉發給了消費者。性能
Server: 又稱Broker, 接受客戶端的鏈接,實現AMQP實體服務,這裏指RabbitMQ 服務器
ui
Connection: 鏈接,應用程序與Broker的網絡鏈接。spa
**Channel: **網絡信道,幾乎全部的操做都在 Channel 中進行,Channel是進行消息讀寫的通道。客戶端可創建多個Channel:,每一個Channel表明一個會話任務。設計
**Virtual host: **虛似地址,用於迸行邏輯隔離,是最上層的消息路由。一個 Virtual Host 裏面能夠有若干個 Exchange和 Queue ,同一個 VirtualHost 裏面不能有相同名稱的 Exchange 或 Queue。權限控制的最小粒度是Virtual Host。
Binding: Exchange 和 Queue 之間的虛擬鏈接,binding 中能夠包含 routing key。
Routing key: 一 個路由規則,虛擬機可用它來肯定如何路由一個特定消息,即交換機綁定到 Queue 的鍵。
Queue: 也稱爲Message Queue,消息隊列,保存消息並將它們轉發給消費者。
消息,服務器和應用程序之間傳送的數據,由 Properties 和 Body 組成。Properties 能夠對消息進行修飾,好比消息的優先級、延遲等高級特性;,Body 則就 是消息體內容。
properties 中咱們能夠設置消息過時時間以及是否持久化等,也能夠傳入自定義的map屬性,這些在消費端也均可以獲取到。
生產者
import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.util.HashMap; import java.util.Map; public class MessageProducer { public static void main(String[] args) throws Exception { //1. 建立一個 ConnectionFactory 並進行設置 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setVirtualHost("/"); factory.setUsername("guest"); factory.setPassword("guest"); //2. 經過鏈接工廠來建立鏈接 Connection connection = factory.newConnection(); //3. 經過 Connection 來建立 Channel Channel channel = connection.createChannel(); //4. 聲明 使用默認交換機 以隊列名做爲 routing key String queueName = "msg_queue"; /** * deliverMode 設置爲 2 的時候表明持久化消息 * expiration 意思是設置消息的有效期,超過10秒沒有被消費者接收後會被自動刪除 * headers 自定義的一些屬性 * */ //5. 發送 Map<String, Object> headers = new HashMap<String, Object>(); headers.put("myhead1", "111"); headers.put("myhead2", "222"); AMQP.BasicProperties properties = new AMQP.BasicProperties().builder() .deliveryMode(2) .contentEncoding("UTF-8") .expiration("100000") .headers(headers) .build(); String msg = "test message"; channel.basicPublish("", queueName, properties, msg.getBytes()); System.out.println("Send message : " + msg); //6. 關閉鏈接 channel.close(); connection.close(); } }
消費者
import com.rabbitmq.client.*; import java.io.IOException; import java.util.Map; public class MessageConsumer { public static void main(String[] args) throws Exception{ //1. 建立一個 ConnectionFactory 並進行設置 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setVirtualHost("/"); factory.setUsername("guest"); factory.setPassword("guest"); factory.setAutomaticRecoveryEnabled(true); factory.setNetworkRecoveryInterval(3000); //2. 經過鏈接工廠來建立鏈接 Connection connection = factory.newConnection(); //3. 經過 Connection 來建立 Channel Channel channel = connection.createChannel(); //4. 聲明 String queueName = "msg_queue"; channel.queueDeclare(queueName, false, false, false, null); //5. 建立消費者並接收消息 Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); Map<String, Object> headers = properties.getHeaders(); System.out.println("head: " + headers.get("myhead1")); System.out.println(" [x] Received '" + message + "'"); System.out.println("expiration : "+ properties.getExpiration()); } }; //6. 設置 Channel 消費者綁定隊列 channel.basicConsume(queueName, true, consumer); } }
Send message : test message head: 111 [x] Received 'test message' 100000
Exchange 就是交換機,接收消息,根據路由鍵轉發消息到綁定的隊列。有不少的 Message 進入到 Exchange 中,Exchange 根據 Routing key 將 Message 分發到不一樣的 Queue 中。
RabbitMQ 中的 Exchange 有多種類型,類型不一樣,Message 的分發機制不一樣,以下:
fanout:廣播模式。這種類型的 Exchange 會將 Message 分發到綁定到該 Exchange 的全部 Queue。
direct:這種類型的 Exchange 會根據 Routing key(精確匹配,將Message分發到指定的Queue。
Topic:這種類型的 Exchange 會根據 Routing key(模糊匹配,將Message分發到指定的Queue。
headers: 主題交換機有點類似,可是不一樣於主題交換機的路由是基於路由鍵,頭交換機的路由值基於消息的header數據。 主題交換機路由鍵只有是字符串,而頭交換機能夠是整型和哈希值 .
/** * Declare an exchange, via an interface that allows the complete set of * arguments. * @see com.rabbitmq.client.AMQP.Exchange.Declare * @see com.rabbitmq.client.AMQP.Exchange.DeclareOk * @param exchange the name of the exchange * @param type the exchange type * @param durable true if we are declaring a durable exchange (the exchange will survive a server restart) * @param autoDelete true if the server should delete the exchange when it is no longer in use * @param internal true if the exchange is internal, i.e. can't be directly * published to by a client. * @param arguments other properties (construction arguments) for the exchange * @return a declaration-confirm method to indicate the exchange was successfully declared * @throws java.io.IOException if an error is encountered */ Exchange.DeclareOk exchangeDeclare(String exchange, String type,boolean durable, boolean autoDelete,boolean internal, Map<String, Object> arguments) throws IOException;
原文出處:https://www.cnblogs.com/haixiang/p/10853467.html