目錄java
RabbitMQ 是一個開源的消息代理和隊列服務器,用來經過普通協議在徹底不一樣的應用之間共享數據,RabbitMQ是使用 Erlang語言來編寫的,而且RabbitMQ是基於AMQP協議的web
RabbitMQ的優勢:spring
RabbitMQ官網springboot
RabbitMQ的總體架構:服務器
RabbitMQ的消息流轉:網絡
AMQP全稱: Advanced Message Queuing Protocol架構
AMQP翻譯: 高級消息隊列協議spring-boot
AMQP定義: 是具備現代特徵的二進制協議。是一個提供統一消息服務的應用層標準高級消息隊列協議,是應用層協議的一個開放標準,爲面向消息的中間件設計性能
AMQP核心概念:ui
後臺啓動: ./rabbitmq start &
關閉: ./rabbitmqctl stop
節點狀態: ./rabbitmqctl status
管控臺: http://ip:15672
RabbitMQ生產消費快速入門:
環境: springboot+jdk1.7+rabbitmq3.6.5 (Maven依賴配置)
<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>1.5.9.RELEASE</version> </parent> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>3.6.5</version> </dependency> </dependencies>
public class Procuder { public static void main(String[] args) throws Exception { //1.建立一個ConnectionFactory 並進行配置 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.244.11"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); connectionFactory.setHandshakeTimeout(20000); //2.經過鏈接工廠建立鏈接 Connection connection = connectionFactory.newConnection(); //3.經過Connection 建立一個 Channel Channel channel = connection.createChannel(); /** * basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) * exchange:指定交換機 不指定 則默認 (AMQP default交換機) 經過routingkey進行匹配 * props 消息屬性 * body 消息體 */ //4.經過Channel發送數據 for(int i = 0; i < 5; i++){ System.out.println("生產消息:" + i); String msg = "Hello RabbitMQ" + i; channel.basicPublish("", "test", null, msg.getBytes()); } //5.記得關閉相關的鏈接 channel.close(); connection.close(); } }
public class Consumer { public static void main(String[] args) throws Exception{ //1.建立一個ConnectionFactory 並進行配置 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.244.11"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); connectionFactory.setHandshakeTimeout(20000); //2.經過鏈接工廠建立鏈接 Connection connection = connectionFactory.newConnection(); //3.經過Connection 建立一個 Channel Channel channel = connection.createChannel(); //4. 聲明建立一個隊列 String queueName = "test"; /** * durable 是否持久化 * exclusive 獨佔的 至關於加了一把鎖 */ channel.queueDeclare(queueName,true,false,false,null); //5.建立消費者 QueueingConsumer queueingConsumer = new QueueingConsumer(channel); //6.設置channel /** * ACK: 當一條消息從生產端發到消費端,消費端接收到消息後會立刻回送一個ACK信息給broker,告訴它這條消息收到了 * autoack: * true 自動簽收 當消費者一收到消息就表示消費者收到了消息,消費者收到了消息就會當即從隊列中刪除。 * false 手動簽收 當消費者收到消息在合適的時候來顯示的進行確認,說我已經接收到了該消息了,RabbitMQ能夠從隊列中刪除該消息了 * */ channel.basicConsume(queueName, true, queueingConsumer); //7.獲取消息 while(true){ Delivery delivery = queueingConsumer.nextDelivery(); String msg = new String(delivery.getBody()); System.err.println("消費端:" + msg); //Envelope envelope = delivery.getEnvelope(); } } }
Exchange: 接收消息,並根據路由鍵轉發消息所綁定的隊列
交換機屬性:
全部發送到Direct Exchange的消息被轉發到RouteKey指定的Queue
注意:Direct模式可使用RabbitMQ自帶的Exchange: default Exchange,因此不須要將Exchange進行任何綁定(binding)操做,消息傳遞時,RoutingKey必須徹底匹配纔會被隊列接收,不然該消息會被拋棄
public class ProducerDirectExchange { public static void main(String[] args) throws Exception { //1.建立ConnectionFactory ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.244.11"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); //2.建立Connection Connection connection = connectionFactory.newConnection(); //3.建立Channel Channel channel = connection.createChannel(); //4.聲明 String exchangeName = "test_direct_exchange"; String routingKey = "test.direct"; //5.發送 String msg = "Hello World RabbitMQ4 Direct Exchange Message"; channel.basicPublish(exchangeName, routingKey, null, msg.getBytes()); } }
public class ConsumerDirectExchange { public static void main(String[] args) throws Exception{ ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.244.11"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); connectionFactory.setHandshakeTimeout(20000); connectionFactory.setAutomaticRecoveryEnabled(true); connectionFactory.setNetworkRecoveryInterval(3000); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); //聲明 String exchangeName = "test_direct_exchange"; String exchangeType = "direct"; String queueName = "test_direct_queue"; String routingKey = "test.direct"; //表示聲明瞭一個交換機 channel.exchangeDeclare(exchangeName, exchangeType,true,false,false,null); //表示聲明瞭一個隊列 channel.queueDeclare(queueName,false,false,false,null); //創建一個綁定關係 channel.queueBind(queueName, exchangeName, routingKey); //durable 是否持久化消息 QueueingConsumer consumer = new QueueingConsumer(channel); //參數:隊列名稱,是否自動ACK,Consumer channel.basicConsume(queueName, true, consumer); //循環獲取消息 while(true){ //獲取消息,若是沒有消息,這一步將會一直阻塞 Delivery delivery = consumer.nextDelivery(); String msg = new String(delivery.getBody()); System.out.println("收到消息:" + msg); } } }
全部發送到Topic Exchange的消息被轉發到全部關心RouteKey中指定Topic的Queue上
Exchange將RouteKey和某Topic進行模糊匹配,此時隊列須要綁定一個Topic
注意:可使用通配符進行匹配
符號 # 匹配一個或多個詞
符號 * 匹配很少很多一個詞
例如: "log.#" 可以匹配到 「log.info.oa」
"log.*" 只會匹配到 "log.err"
public class ProducerTopicExchange { public static void main(String[] args) throws Exception { //1.建立ConnectionFactory ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.244.11"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); connectionFactory.setHandshakeTimeout(20000); //2.建立Connection Connection connection = connectionFactory.newConnection(); //3.建立Channel Channel channel = connection.createChannel(); //4.聲明 String exchangeName = "test_topic_exchange"; String routingKey1 = "user.save"; String routingKey2 = "user.update"; String routingKey3 = "user.delete.abc"; //5.發送 String msg = "Hello World RabbitMQ4 Direct Exchange Message"; channel.basicPublish(exchangeName, routingKey1, null, msg.getBytes()); channel.basicPublish(exchangeName, routingKey2, null, msg.getBytes()); channel.basicPublish(exchangeName, routingKey3, null, msg.getBytes()); } }
public class ConsumerTopicExchange { public static void main(String[] args) throws Exception{ ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.244.11"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); connectionFactory.setHandshakeTimeout(20000); connectionFactory.setAutomaticRecoveryEnabled(true); connectionFactory.setNetworkRecoveryInterval(3000); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); //聲明 String exchangeName = "test_topic_exchange"; String exchangeType = "topic"; String queueName = "test_topic_queue"; String routingKey = "user.#"; //表示聲明瞭一個交換機 channel.exchangeDeclare(exchangeName, exchangeType,true,false,false,null); //表示聲明瞭一個隊列 channel.queueDeclare(queueName,false,false,false,null); //創建一個綁定關係 channel.queueBind(queueName, exchangeName, routingKey); //durable 是否持久化消息 QueueingConsumer consumer = new QueueingConsumer(channel); //參數:隊列名稱,是否自動ACK,Consumer channel.basicConsume(queueName, true, consumer); //循環獲取消息 while(true){ //獲取消息,若是沒有消息,這一步將會一直阻塞 Delivery delivery = consumer.nextDelivery(); String msg = new String(delivery.getBody()); System.out.println("收到消息:" + msg); } } }
不處理路由鍵,只須要簡單的將隊列綁定到交換機上
發送到交換機的消息都會被轉發到與該交換機綁定的全部隊列上
因此Fanout交換機轉發消息是最快的
public class ProducerFanoutExchange { public static void main(String[] args) throws Exception { //1.建立ConnectionFactory ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.244.11"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); connectionFactory.setHandshakeTimeout(20000); //2.建立Connection Connection connection = connectionFactory.newConnection(); //3.建立Channel Channel channel = connection.createChannel(); //4.聲明 String exchangeName = "test_fanout_exchange"; //5.發送 for(int i = 0; i < 10 ; i++){ String msg = "Hello World RabbitMQ4 Direct Exchange Message"; channel.basicPublish(exchangeName, "", null, msg.getBytes()); } channel.close(); connection.close(); } }
public class ConsumerFanoutExchange { public static void main(String[] args) throws Exception{ ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.244.11"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); connectionFactory.setHandshakeTimeout(20000); connectionFactory.setAutomaticRecoveryEnabled(true); connectionFactory.setNetworkRecoveryInterval(3000); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); //聲明 String exchangeName = "test_fanout_exchange"; String exchangeType = "fanout"; String queueName = "test_topic_queue"; //無需指定路由key String routingKey = ""; //表示聲明瞭一個交換機 channel.exchangeDeclare(exchangeName, exchangeType,true,false,false,null); //表示聲明瞭一個隊列 channel.queueDeclare(queueName,false,false,false,null); //創建一個綁定關係 channel.queueBind(queueName, exchangeName, routingKey); //durable 是否持久化消息 QueueingConsumer consumer = new QueueingConsumer(channel); //參數:隊列名稱,是否自動ACK,Consumer channel.basicConsume(queueName, true, consumer); //循環獲取消息 while(true){ //獲取消息,若是沒有消息,這一步將會一直阻塞 Delivery delivery = consumer.nextDelivery(); String msg = new String(delivery.getBody()); System.out.println("收到消息:" + msg); } } }
服務器與應用程序之間傳遞的數據,本質上就是一段數據,由Properties和Body組成
經常使用屬性:delivery mode、headers (自定義屬性)
其餘屬性:content_type、content_encoding、priority、expiration
消息的properties屬性用法示例:
public class Procuder { public static void main(String[] args) throws Exception { //1.建立一個ConnectionFactory 並進行配置 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.244.11"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); connectionFactory.setHandshakeTimeout(20000); //2.經過鏈接工廠建立鏈接 Connection connection = connectionFactory.newConnection(); //3.經過Connection 建立一個 Channel Channel channel = connection.createChannel(); Map<String,Object> headers = new HashMap<>(); headers.put("my1", "111"); headers.put("my2", "222"); //10秒不消費 消息過時移除消息隊列 AMQP.BasicProperties properties = new AMQP.BasicProperties().builder() .deliveryMode(2) .contentEncoding("utf-8") .expiration("10000") .headers(headers) .build(); //4.經過Channel發送數據 for(int i = 0; i < 5; i++){ System.out.println("生產消息:" + i); String msg = "Hello RabbitMQ" + i; channel.basicPublish("", "test", properties, msg.getBytes()); } //5.記得關閉相關的鏈接 channel.close(); connection.close(); } }
public class Consumer { public static void main(String[] args) throws Exception{ //1.建立一個ConnectionFactory 並進行配置 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.244.11"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); connectionFactory.setHandshakeTimeout(20000); //2.經過鏈接工廠建立鏈接 Connection connection = connectionFactory.newConnection(); //3.經過Connection 建立一個 Channel Channel channel = connection.createChannel(); //4. 聲明建立一個隊列 String queueName = "test"; channel.queueDeclare(queueName,true,false,false,null); //5.建立消費者 QueueingConsumer queueingConsumer = new QueueingConsumer(channel); //6.設置channel channel.basicConsume(queueName, true, queueingConsumer); //7.獲取消息 while(true){ Delivery delivery = queueingConsumer.nextDelivery(); String msg = new String(delivery.getBody()); System.err.println("消費端:" + msg); Map<String, Object> headers = delivery.getProperties().getHeaders(); System.err.println("headers value:" + headers.get("my1")); } } }