首先介紹一下rabbitmq三種模式java
-
Direct–路由模式
任何發送到Direct Exchange的消息都會被轉發到RouteKey指定的Queue。
這種模式下不須要將Exchange進行任何綁定(binding)操做。
消息傳遞時須要一個「RouteKey」,能夠簡單的理解爲要發送到的隊列名字。
若是vhost中不存在RouteKey中指定的隊列名,則該消息會被拋棄。
web -
Fanout–發佈/訂閱模式
任何發送到Fanout Exchange的消息都會被轉發到與該Exchange綁定(Binding)的全部Queue上。
這種模式不須要RouteKey。
這種模式須要提早將Exchange與Queue進行綁定,一個Exchange能夠綁定多個Queue,一個Queue能夠同多個Exchange進行綁定。
若是接受到消息的Exchange沒有與任何Queue綁定,則消息會被拋棄。
spring -
Topic–匹配訂閱模式
任何發送到Topic Exchange的消息都會被轉發到全部關心RouteKey中指定話題的Queue上。
就是每一個隊列都有其關心的主題,全部的消息都帶有一個「標題」(RouteKey),Exchange會將消息轉發到全部關注主題能與RouteKey模糊匹配的隊列。
這種模式須要RouteKey,也許要提早綁定Exchange與Queue。
在進行綁定時,要提供一個該隊列關心的主題。
.「#」表示0個或若干個關鍵字,「*」表示一個關鍵字。
一樣,若是Exchange沒有發現可以與RouteKey匹配的Queue,則會拋棄此消息。
數據庫
springboot整合rabbitmq基於註解方式(最簡單方式)
pom文件配置apache
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.1.0.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.example</groupId> <artifactId>rabbitmq</artifactId> <version>0.0.1-SNAPSHOT</version> <name>demo</name> <description>Demo project for Spring Boot</description> <properties> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
接下來就是配置文件springboot
spring.application.name=springboot-rabbitmq server.port=8080 //默認地址就是127.0.0.1:5672,若是是服務器的rabbitmq就改下 spring.rabbitmq.host=127.0.0.1 spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest //消息確認模式,還有一種事務模式,這裏不講解,有興趣本身去查資料 spring.rabbitmq.publisher-confirms=true //這裏我把他看做是虛擬主機目錄,至關於數據庫的庫名 spring.rabbitmq.virtual-host=/
第一步配置生產者服務器
package com.example.annotion.demo.sender; import java.util.Date; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @Component public class Sender { //rabbitTemplate.convertAndSend(String exchange交換機名稱(可省略), String routingKey路由鍵, Object object傳遞的消息) @Autowired private AmqpTemplate rabbitTemplate; //direct方式交換機名字隨便填,可是不能填direct,會形成兩次消費 public void sendDirect() { String msg1 = "hello " + new Date(); System.out.println("helloSender : " + msg1); this.rabbitTemplate.convertAndSend("hello", msg1); // this.rabbitTemplate.convertAndSend("direct","hello", msg1); String msg2 = "user " + new Date(); System.out.println("userSender : " + msg2); this.rabbitTemplate.convertAndSend("user", msg2); // this.rabbitTemplate.convertAndSend("direct","user", msg2); } //topic方式 public void sendTopic() { String msg1 = "I am topic.mesaage msg======"; System.out.println("topic.mesaage sender : " + msg1); this.rabbitTemplate.convertAndSend("exchange", "topic.message", msg1); String msg2 = "I am topic.mesaages msg########"; System.out.println("topic.mesaages sender : " + msg2); this.rabbitTemplate.convertAndSend("exchange", "topic.messages", msg2); } //fanout方式routingKey隨便填 public void sendFanout() { String msg = "I am fanoutSender msg======"; System.out.println("fanoutSender : " + msg); this.rabbitTemplate.convertAndSend("fanoutExchange", "suibiantian",msg); } }
接下來就是消費者app
package com.example.annotion.demo.receiver; import org.springframework.amqp.core.ExchangeTypes; import org.springframework.amqp.rabbit.annotation.*; import org.springframework.stereotype.Component; /** * @RabbitListener()rabbit監聽 * @QueueBinding()隊列綁定 value綁定@Queue,exchange綁定@Exchange,key爲路由鍵 * @Queue隊列 value:名稱;autoDelete:是否自動刪除,當最後一個消費者斷開鏈接以後隊列是否自動被刪除;durable: 是否持久化, 隊列的聲明默認是存放到內存中的,若是rabbitmq重啓會丟失,若是想重啓以後還存在就要使隊列持久化,保存到Erlang自帶的Mnesia數據庫中,當rabbitmq重啓以後會讀取該數據庫 * @Exchange交換器,type有五種,其他參數同@Queue */ @Component public class Receiver { //===============如下是驗證direct Exchange的隊列========== // @RabbitListener(queues = "hello") //direct模式,exchange名字隨便填 @RabbitListener(bindings = @QueueBinding( value = @Queue(value = "hello",autoDelete = "false",declare = "true"),exchange = @Exchange(value = "suibianxie",type = ExchangeTypes.DIRECT),key = "user" )) @RabbitHandler public void processHello(String msg) { System.out.println("helloReceiver : " + msg); } // @RabbitListener(queues = "user") @RabbitListener(bindings = @QueueBinding( value = @Queue(value = "user",autoDelete = "false"),exchange = @Exchange(value = "suibianxie",type = ExchangeTypes.DIRECT),key = "hello" )) @RabbitHandler public void processUser(String msg) { System.out.println("userReceiver : " + msg); } //===============以上是驗證direct Exchange的隊列========== //===============如下是驗證topic Exchange的隊列========== // @RabbitListener(queues = "topic.message") @RabbitListener(bindings = @QueueBinding( value = @Queue(value = "topic.message",autoDelete = "false"),exchange = @Exchange(value = "exchange",type = ExchangeTypes.TOPIC),key = "topic.message" )) @RabbitHandler public void processTopicMessage(String msg) { System.out.println("topicMessageReceiver : " + msg); } // @RabbitListener(queues = "topic.messages") @RabbitListener(bindings = @QueueBinding( value = @Queue(value = "topic.messages",autoDelete = "false"),exchange = @Exchange(value = "exchange",type = ExchangeTypes.TOPIC),key = "topic.#" )) @RabbitHandler public void processTopicMessages(String msg) { System.out.println("topicMessagesReceiver : " + msg); } //===============以上是驗證topic Exchange的隊列========== //===============如下是驗證fanout Exchange的隊列========== // @RabbitListener(queues = "fanout.A") //fanout方式key不用填 @RabbitListener(bindings = @QueueBinding( value = @Queue(value = "fanout.A",autoDelete = "false"),exchange = @Exchange(value = "fanoutExchange",type = ExchangeTypes.FANOUT) )) @RabbitHandler public void processFanoutA(String msg) { System.out.println("fanoutAReceiver : " + msg); } // @RabbitListener(queues = "fanout.B") @RabbitListener(bindings = @QueueBinding( value = @Queue(value = "fanout.B",autoDelete = "false"),exchange = @Exchange(value = "fanoutExchange",type = ExchangeTypes.FANOUT) )) @RabbitHandler public void processFanoutB(String msg) { System.out.println("fanoutBReceiver : " + msg); } // @RabbitListener(queues = "fanout.C") @RabbitHandler @RabbitListener(bindings = @QueueBinding( value = @Queue(value = "fanout.C",autoDelete = "false"),exchange = @Exchange(value = "fanoutExchange",type = ExchangeTypes.FANOUT) )) public void processFanoutC(String msg) { System.out.println("fanoutCReceiver : " + msg); } //===============以上是驗證fanout Exchange的隊列========== }
再來一個controller方便測試maven
package com.example.annotion.demo.controller; import com.example.annotion.demo.sender.Sender; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; @RestController @RequestMapping("/rabbit") public class RabbitController { @Autowired private Sender sender; @GetMapping("/direct") public void direct() { sender.sendDirect(); } @GetMapping("/topic") public void topic() { sender.sendTopic(); } @GetMapping("/fanout") public void fanout() { sender.sendFanout(); } }
啓動項目,分別訪問127.0.0.1:8080/rabbit/direct,127.0.0.1:8080/rabbit/topic,127.0.0.1:8080/rabbit/fanout三個地址看看效果。
測試
springboot整合rabbitmq基於註解方式
這種註解方式其實原理和上面同樣,只是消費者的RabbitListener只要配置一個queue的名稱,其餘配置贊成提取到一個配置類中
消費者代碼
package com.example.annotion.demo.receiver; import org.springframework.amqp.core.ExchangeTypes; import org.springframework.amqp.rabbit.annotation.*; import org.springframework.stereotype.Component; @Component public class Receiver { //===============如下是驗證direct Exchange的隊列========== @RabbitListener(queues = "hello") @RabbitHandler public void processHello(String msg) { System.out.println("helloReceiver : " + msg); } @RabbitListener(queues = "user") @RabbitHandler public void processUser(String msg) { System.out.println("userReceiver : " + msg); } //===============以上是驗證direct Exchange的隊列========== //===============如下是驗證topic Exchange的隊列========== @RabbitListener(queues = "topic.message") @RabbitHandler public void processTopicMessage(String msg) { System.out.println("topicMessageReceiver : " + msg); } @RabbitListener(queues = "topic.messages") @RabbitHandler public void processTopicMessages(String msg) { System.out.println("topicMessagesReceiver : " + msg); } //===============以上是驗證topic Exchange的隊列========== //===============如下是驗證fanout Exchange的隊列========== @RabbitListener(queues = "fanout.A") @RabbitHandler public void processFanoutA(String msg) { System.out.println("fanoutAReceiver : " + msg); } @RabbitListener(queues = "fanout.B") @RabbitHandler public void processFanoutB(String msg) { System.out.println("fanoutBReceiver : " + msg); } @RabbitListener(queues = "fanout.C") @RabbitHandler public void processFanoutC(String msg) { System.out.println("fanoutCReceiver : " + msg); } //===============以上是驗證fanout Exchange的隊列========== }
配置類代碼
package com.example.annotion.demo.config; import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Primary; import org.springframework.stereotype.Component; @Component public class QueueCofig { //===============如下是驗證direct Exchange的隊列========== @Bean public Queue helloQueue() { return new Queue("hello"); } @Bean public Queue userQueue() { return new Queue("user"); } /** *注入name爲'direct'的DirectExchange,默認名字就是空字符串,能夠不注入 */ @Bean DirectExchange directExchange() { return new DirectExchange("direct"); } /** * 將隊列hello與exchange綁定,binding_key爲hello,就是徹底匹配 */ @Bean Binding bindingHelloExchange(Queue helloQueue, DirectExchange exchange) { return BindingBuilder.bind(helloQueue).to(exchange).with("hello"); } /** * 將隊列user與exchange綁定,binding_key爲hello,就是徹底匹配 */ @Bean Binding bindingUserExchange(Queue userQueue, DirectExchange exchange) { return BindingBuilder.bind(userQueue).to(exchange).with("user"); } //===============以上是驗證direct Exchange的隊列========== //===============如下是驗證topic Exchange的隊列========== @Bean public Queue queueMessage() { return new Queue("topic.message"); } @Bean public Queue queueMessages() { return new Queue("topic.messages"); } /** *注入name爲exchange的TopicExchange */ @Bean TopicExchange exchange() { return new TopicExchange("exchange"); } /** * 將隊列topic.message與exchange綁定,binding_key爲topic.message,就是徹底匹配 */ @Bean Binding bindingExchangeMessage(Queue queueMessage, TopicExchange exchange) { return BindingBuilder.bind(queueMessage).to(exchange).with("topic.message"); } /** * 將隊列topic.messages與exchange綁定,binding_key爲topic.#,模糊匹配 */ @Bean Binding bindingExchangeMessages(Queue queueMessages, TopicExchange exchange) { return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#"); } //===============以上是驗證topic Exchange的隊列========== //===============如下是驗證Fanout Exchange的隊列========== @Bean public Queue AMessage() { return new Queue("fanout.A"); } @Bean public Queue BMessage() { return new Queue("fanout.B"); } @Bean public Queue CMessage() { return new Queue("fanout.C"); } /** * 注入name爲fanoutExchange的FanoutExchange */ @Bean FanoutExchange fanoutExchange() { return new FanoutExchange("fanoutExchange"); } /** * 將隊列fanout.A與FanoutExchange綁定 */ @Bean Binding bindingExchangeA(Queue AMessage,FanoutExchange fanoutExchange) { return BindingBuilder.bind(AMessage).to(fanoutExchange); } /** * 將隊列fanout.B與FanoutExchange綁定 */ @Bean Binding bindingExchangeB(Queue BMessage, FanoutExchange fanoutExchange) { return BindingBuilder.bind(BMessage).to(fanoutExchange); } /** * 將隊列fanout.C與FanoutExchange綁定 */ @Bean Binding bindingExchangeC(Queue CMessage, FanoutExchange fanoutExchange) { return BindingBuilder.bind(CMessage).to(fanoutExchange); } //===============以上是驗證Fanout Exchange的隊列========== }
結果以下
springboot整合rabbitmq基於xml方式
這種方式比較麻煩,可是呢,有些老項目多是這麼用的,因此在這裏也作一個demo。
pom文件和上面同樣,配置文件把mq的配置去掉,第一步設置配置類,加載xml文件
package com.example.xml.demo.config; import org.springframework.amqp.rabbit.annotation.EnableRabbit; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.ImportResource; /** * 實例化xml文件中定義的bean **/ @Configuration @EnableRabbit @ImportResource({ "classpath:config/applicationContext-*.xml" }) public class XmlConfig { }
第二步就是配置文件,這裏我分了兩個配置文件,把不改變的鏈接信息之類的放在單獨的配置文件,這部分文件配置便可發送mq消息
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd"> <!-- 讀取配置文件, 多個properties文件能夠用英文逗號分隔 --> <context:property-placeholder ignore-resource-not-found="true" location="classpath*:/rabbitmq.properties" file-encoding="UTF-8"/> <!-- 公共部分 --> <!-- 建立鏈接類 鏈接安裝好的 rabbitmq --> <bean id="connectionFactory" class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory"> <constructor-arg value="localhost" /> <!-- username,訪問RabbitMQ服務器的帳戶,默認是guest --> <property name="username" value="${rmq.manager.user}" /> <!-- username,訪問RabbitMQ服務器的密碼,默認是guest --> <property name="password" value="${rmq.manager.password}" /> <!-- host,RabbitMQ服務器地址,默認值"localhost" --> <property name="host" value="${rmq.ip}" /> <!-- port,RabbitMQ服務端口,默認值爲5672 --> <property name="port" value="${rmq.port}" /> </bean> <bean id="amqpAdmin" class="org.springframework.amqp.rabbit.core.RabbitAdmin"> <constructor-arg ref="connectionFactory" /> </bean> <bean id="rabbitTemplate" class="org.springframework.amqp.rabbit.core.RabbitTemplate"> <constructor-arg ref="connectionFactory"></constructor-arg> </bean> <!-- 聲明消息轉換器爲SimpleMessageConverter --> <bean id="messageConverter" class="org.springframework.amqp.support.converter.SerializerMessageConverter"> </bean> </beans>
引入了rabbitmq.properties
rmq.ip=127.0.0.1 rmq.port=5672 rmq.manager.user=guest rmq.manager.password=guest
接下來就是消費者的一些監聽綁定的xml
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd"> <!-- name:隊列名稱;autoDelete:是否自動刪除,當最後一個消費者斷開鏈接以後隊列是否自動被刪除;durable: 是否持久化, 隊列的聲明默認是存放到內存中的,若是rabbitmq重啓會丟失,若是想重啓以後還存在就要使隊列持久化,保存到Erlang自帶的Mnesia數據庫中,當rabbitmq重啓以後會讀取該數據庫 --> <!-- 聲明Queue並設定Queue的名稱 --> <rabbit:queue name="user" durable="true" auto-delete="false"/> <rabbit:queue name="hello" durable="true" auto-delete="false"/> <!-- direct 模式可不配置direct-exchange --> <!-- <rabbit:direct-exchange name="direct" xmlns="http://www.springframework.org/schema/rabbit" durable="true">--> <!-- <bindings>--> <!-- <binding queue="user" key="user" />--> <!-- <binding queue="hello" key="hello" />--> <!-- </bindings>--> <!-- </rabbit:direct-exchange>--> <rabbit:queue name="topic.message" durable="true" auto-delete="false"/> <rabbit:queue name="topic.messages" durable="true" auto-delete="false"/> <!-- topic主題 --> <rabbit:topic-exchange name="exchange" xmlns="http://www.springframework.org/schema/rabbit" durable="true"> <!-- 交換器綁定queue ,pattern就是routeKey路由鍵 --> <bindings> <binding queue="topic.message" pattern="topic.message" /> <binding queue="topic.messages" pattern="topic.#" /> </bindings> </rabbit:topic-exchange> <rabbit:queue name="fanout.A" durable="true" auto-delete="false"/> <rabbit:queue name="fanout.B" durable="true" auto-delete="false"/> <rabbit:queue name="fanout.C" durable="true" auto-delete="false"/> <!--fanout主題 沒有routeKey路由鍵--> <rabbit:fanout-exchange id="spittle.fanout" name="spittle.fanout" durable="true"> <rabbit:bindings> <rabbit:binding queue="fanout.A" /> <rabbit:binding queue="fanout.B" /> <rabbit:binding queue="fanout.C" /> </rabbit:bindings> </rabbit:fanout-exchange> <bean id="receiver" class="com.example.xml.demo.receiver.Receiver" /> <!-- 把監聽器配置進rabbit監聽容器,ref引用bean,method監聽方法,queues監聽隊列名字(上面配置的queue的name) --> <rabbit:listener-container connection-factory="connectionFactory"> <rabbit:listener ref="receiver" method="processUser" queues="user" /> <rabbit:listener ref="receiver" method="processHello" queues="hello" /> <rabbit:listener ref="receiver" method="processTopicMessage" queues="topic.message" /> <rabbit:listener ref="receiver" method="processTopicMessages" queues="topic.messages" /> <rabbit:listener ref="receiver" method="processFanoutA" queues="fanout.A" /> <rabbit:listener ref="receiver" method="processFanoutB" queues="fanout.B" /> <rabbit:listener ref="receiver" method="processFanoutC" queues="fanout.C" /> </rabbit:listener-container> </beans>
接下來咱們的接受類com.example.xml.demo.receiver.Receiver
package com.example.xml.demo.receiver; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; public class Receiver { public void processHello(String msg) { System.out.println("helloReceiver : " + msg); } public void processUser(String msg) { System.out.println("userReceiver : " + msg); } public void processTopicMessage(String msg) { System.out.println("topicMessageReceiver : " + msg); } public void processTopicMessages(String msg) { System.out.println("topicMessagesReceiver : " + msg); } public void processFanoutA(String msg) { System.out.println("fanoutAReceiver : " + msg); } public void processFanoutB(String msg) { System.out.println("fanoutBReceiver : " + msg); } public void processFanoutC(String msg) { System.out.println("fanoutCReceiver : " + msg); } }
生產者類代碼
package com.example.xml.demo.sender; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.util.Date; @Component public class Sender { @Autowired private AmqpTemplate rabbitTemplate; public void sendQueue() { String msg1 = "hello " + new Date(); System.out.println("helloSender : " + msg1); this.rabbitTemplate.convertAndSend("hello", msg1); // this.rabbitTemplate.convertAndSend("direct","hello", msg1); String msg2 = "user " + new Date(); System.out.println("userSender : " + msg2); this.rabbitTemplate.convertAndSend("user", msg2); // this.rabbitTemplate.convertAndSend("direct","user", msg1); } public void sendTopic() { String msg1 = "I am topic.mesaage msg======"; System.out.println("topic.mesaage sender : " + msg1); this.rabbitTemplate.convertAndSend("exchange", "topic.message", msg1); String msg2 = "I am topic.mesaages msg########"; System.out.println("topic.mesaages sender : " + msg2); this.rabbitTemplate.convertAndSend("exchange", "topic.messages", msg2); } public void sendFanout() { String msg = "I am fanoutSender msg======"; System.out.println("fanoutSender : " + msg); this.rabbitTemplate.convertAndSend("fanoutExchange","keysuibiantian", msg); } }
接下來看結果