1.安裝好rabbitmqjava
2.新建一個springBoot項目:rabbitmq_demoweb
3.添加pom依賴:正則表達式
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> </dependencies>
4.application.properties:spring
server.port=8080 spring.application.name=rabbitmq_demo spring.rabbitmq.host=localhost spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest spring.rabbitmq.publisher-confirms=true spring.rabbitmq.virtual-host=/
5.啓動類聲明一個Queue,用於測試:app
package com; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.FanoutExchange; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.TopicExchange; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.Bean; @SpringBootApplication public class RabbitmqDemoApplication { @Bean public Queue helloQueue() { return new Queue("helloQueue"); } public static void main(String[] args) { SpringApplication.run(RabbitmqDemoApplication.class, args); } }
多場景實現:spring-boot
1.單生產者和單消費者測試
生產者1:ui
package com.demo.sender; import com.demo.model.User; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.util.Date; /** * @Description: * 生產者1 */ @Component public class Sender1 { @Autowired private AmqpTemplate rabbitTemplate; public void send() { String sendMsg = "hello1 " + new Date(); System.out.println("Sender1:" + sendMsg); rabbitTemplate.convertAndSend("helloQueue", sendMsg); } }
消費者1:this
package com.demo.receiver; import com.demo.model.User; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; /** * @Description: * helloQueue消費者1 */ @Component @RabbitListener(queues = "helloQueue") public class HelloReceiver1 { @RabbitHandler public void process(String hello) { System.out.println("Receiver1:" + hello); } }
測試controller:spa
package com.demo.controller; import com.demo.model.User; import com.demo.sender.Sender1; import com.demo.sender.Sender2; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; /** * @Description: 測試類 */ @RestController public class RabbitController { @Autowired private Sender1 helloSender1; @RequestMapping("/hello") public String hello() { helloSender1.send(); return "ok"; } }
運行項目,訪問http:localhost:8080/hello :
Sender1:hello1 Thu May 11 17:23:31 CST 2017 Receiver1:hello1 Thu May 11 17:23:31 CST 2017
2.單生產者-多消費者
生產者1不變
增長消費者2:
package com.demo.receiver; import com.demo.model.User; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; /** * @Description: * helloQueue消費者2 */ @Component @RabbitListener(queues = "helloQueue") public class HelloReceiver2 { @RabbitHandler public void process(String mesg) { System.out.println("Receiver2:" + mesg); } }
測試controller:
package com.demo.controller; import com.demo.model.User; import com.demo.sender.Sender1; import com.demo.sender.Sender2; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; /** * @Description: 測試類 */ @RestController public class RabbitController { @Autowired private Sender1 helloSender1; @RequestMapping("/hello") public String hello() { helloSender1.send(); helloSender1.send(); return "ok"; } }
運行項目,訪問http:localhost:8080/hello :
Sender1:hello1 Thu May 11 17:23:31 CST 2017 Sender1:hello1 Thu May 11 17:23:31 CST 2017 Receiver1:hello1 Thu May 11 17:23:31 CST 2017 Receiver2:hello1 Thu May 11 17:23:31 CST 2017
消息會被多個消費者交替消費,每條消息只能被一個消費者所接收。
3.多生產者-多消費者
增長生產者2:
package com.demo.sender; import com.demo.model.User; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.util.Date; /** * @Description: 生產者2 */ @Component public class Sender2 { @Autowired private AmqpTemplate rabbitTemplate; public void send() { String sendMsg = "hello2 " + new Date(); System.out.println("Sender2:" + sendMsg); rabbitTemplate.convertAndSend("helloQueue", sendMsg); } }
消費者一、2不變
測試controller:
package com.demo.controller; import com.demo.model.User; import com.demo.sender.Sender1; import com.demo.sender.Sender2; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; /** * @Description: 測試類 */ @RestController public class RabbitController { @Autowired private Sender1 helloSender1; @Autowired private Sender2 helloSender2; @RequestMapping("/hello") public String hello() { helloSender1.send(); helloSender2.send(); return "ok"; } }
運行項目,訪問http:localhost:8080/hello :
Sender1:hello1 Thu May 11 17:23:31 CST 2017 Sender2:hello2 Thu May 11 17:23:31 CST 2017 Receiver1:hello2 Thu May 11 17:23:31 CST 2017 Receiver2:hello1 Thu May 11 17:23:31 CST 2017
多個生產者將消息放入helloQueue的隊列中,隊列中的消息會被多個消費者交替消費,每條消息只能被一個消費者所接收。
4.實體類傳輸
支持對象的發送和接收,實體類只須要支持序列化便可。
實體類
package com.demo.model; import java.io.Serializable; /** * @Description: */ public class User implements Serializable { private String userName; private String password; private String sex; private String level; public String getUserName() { return userName; } public void setUserName(String userName) { this.userName = userName; } public String getPassword() { return password; } public void setPassword(String password) { this.password = password; } public String getSex() { return sex; } public void setSex(String sex) { this.sex = sex; } public String getLevel() { return level; } public void setLevel(String level) { this.level = level; } }
生產者1:
package com.demo.sender; import com.demo.model.User; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.util.Date; /** * @Description: * 生產者1 */ @Component public class Sender1 { @Autowired private AmqpTemplate rabbitTemplate; public void send() { String sendMsg = "hello1 " + new Date(); System.out.println("Sender1:" + sendMsg); rabbitTemplate.convertAndSend("helloQueue", sendMsg); } public void sendUser(User user){ System.out.println("user Sender1:" + user.getUserName()+"/"+user.getPassword()); rabbitTemplate.convertAndSend("helloQueue", user); } }
生產者2:
package com.demo.sender; import com.demo.model.User; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.util.Date; /** * @Description: 生產者2 */ @Component public class Sender2 { @Autowired private AmqpTemplate rabbitTemplate; public void send() { String sendMsg = "hello2 " + new Date(); System.out.println("Sender2:" + sendMsg); rabbitTemplate.convertAndSend("helloQueue", sendMsg); } public void sendUser(User user) { System.out.println("user Sender2:" + user.getUserName() + "/" + user.getPassword()); rabbitTemplate.convertAndSend("helloQueue", user); } }
消費者1:
package com.demo.receiver; import com.demo.model.User; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; /** * @Description: * helloQueue消費者1 */ @Component @RabbitListener(queues = "helloQueue") public class HelloReceiver1 { @RabbitHandler public void process(String hello) { System.out.println("Receiver1:" + hello); } @RabbitHandler public void processUser(User user) { System.out.println("user receive1:" + user.getUserName()+"/"+user.getPassword()); } }
消費者2:
package com.demo.receiver; import com.demo.model.User; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; /** * @Description: * helloQueue消費者2 */ @Component @RabbitListener(queues = "helloQueue") public class HelloReceiver2 { @RabbitHandler public void process(String mesg) { System.out.println("Receiver2:" + mesg); } @RabbitHandler public void processUser(User user) { System.out.println("user receive2:" + user.getUserName()+"/"+user.getPassword()); } }
測試的controller:
package com.demo.controller; import com.demo.model.User; import com.demo.sender.Sender1; import com.demo.sender.Sender2; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; /** * @Description: 測試類 */ @RestController public class RabbitController { @Autowired private Sender1 helloSender1; @Autowired private Sender2 helloSender2; @RequestMapping("/hello") public String hello() { helloSender1.send(); helloSender2.send(); return "ok"; } @RequestMapping("/user") public String user() { User user=new User(); user.setUserName("a"); user.setPassword("1"); user.setSex("m"); user.setLevel("1"); helloSender1.sendUser(user); helloSender2.sendUser(user); return "ok"; } }
運行項目,訪問http:localhost:8080/user :
user Sender1:a/1 user Sender2:a/1 user receive1:a/1 user receive2:a/1
5.TopicExchange的使用
啓動類新增聲明兩個Queue,用於測試:
package com; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.FanoutExchange; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.TopicExchange; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.Bean; @SpringBootApplication public class RabbitmqDemoApplication { /***************************************隊列***********************************************/ @Bean public Queue helloQueue() { return new Queue("helloQueue"); } @Bean public Queue topicMessage() { return new Queue("topicMessage"); } @Bean public Queue topicMessages() { return new Queue("topicMessages"); } /***************************************exchange***********************************************/ @Bean TopicExchange topicExchange() { return new TopicExchange("topicExchange"); } /***************************************將隊列和exchange綁定***********************************************/ /** * 將隊列topicMessage與topicExchange綁定, * 只有欄目名爲topic.Message才能匹配, * 獲得當前的Queue * @param topicMessage * @param topicExchange * @return */ @Bean Binding bindingExchangeMessage(Queue topicMessage, TopicExchange topicExchange) { return BindingBuilder.bind(topicMessage).to(topicExchange).with("topic.Message"); } /** * 將隊列topicMessages與topicExchange綁定, * 以topic開頭的欄目名均會模糊匹配, * 獲得當前的Queue * @param topicMessages * @param topicExchange * @return */ @Bean Binding bindingExchangeMessages(Queue topicMessages, TopicExchange topicExchange) { return BindingBuilder.bind(topicMessages).to(topicExchange).with("topic.#"); } public static void main(String[] args) { SpringApplication.run(RabbitmqDemoApplication.class, args); } }
生產者1:
package com.demo.sender; import com.demo.model.User; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.util.Date; /** * @Description: * 生產者1 */ @Component public class Sender1 { @Autowired private AmqpTemplate rabbitTemplate; public void send() { String sendMsg = "hello1 " + new Date(); System.out.println("Sender1:" + sendMsg); rabbitTemplate.convertAndSend("helloQueue", sendMsg); } public void sendUser(User user){ System.out.println("user Sender1:" + user.getUserName()+"/"+user.getPassword()); rabbitTemplate.convertAndSend("helloQueue", user); } public void testTopPicMessage() { String msg = "sendTopPicMessage"; System.out.println("sendTopPicMessage1:" + msg); //第一個參數:指定了exchange //第二個參數:指定了接受消息的欄目名 //第三個參數:消息內容 //到指定exchange找出第二個參數符合的正則表達式,獲得對應的Queue,監聽相應Queue的消費者接受到消息 rabbitTemplate.convertAndSend("topicExchange", "topic.Message", msg);//topic.Message、topic.#兩個都符合 msg = "sendTopPicMessages"; System.out.println("sendTopPicMessages1:" + msg); rabbitTemplate.convertAndSend("topicExchange", "topic.Messages", msg);//只有topic.#符合 } }
生產者2:
package com.demo.sender; import com.demo.model.User; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.util.Date; /** * @Description: 生產者2 */ @Component public class Sender2 { @Autowired private AmqpTemplate rabbitTemplate; public void send() { String sendMsg = "hello2 " + new Date(); System.out.println("Sender2:" + sendMsg); rabbitTemplate.convertAndSend("helloQueue", sendMsg); } public void sendUser(User user) { System.out.println("user Sender2:" + user.getUserName() + "/" + user.getPassword()); rabbitTemplate.convertAndSend("helloQueue", user); } public void testTopPicMessage() { String msg = "sendTopPicMessage"; System.out.println("sendTopPicMessage2:" + msg); //第一個參數:指定了exchange //第二個參數:指定了接受消息的欄目名 //第三個參數:消息內容 //到指定exchange找出第二個參數符合的正則表達式,獲得對應的Queue,監聽相應Queue的消費者接受到消息 rabbitTemplate.convertAndSend("topicExchange", "topic.Message", msg);//topic.Message、topic.#兩個都符合 msg = "sendTopPicMessages"; System.out.println("sendTopPicMessages2:" + msg); rabbitTemplate.convertAndSend("topicExchange", "topic.Messages", msg);//只有topic.#符合 } }
topicMessage消費者:
package com.demo.receiver; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; /** * @Description: * topicMessage消費者 */ @Component @RabbitListener(queues = "topicMessage") public class TopMessageReceiver { @RabbitHandler public void process(String msg) { System.out.println("topMessageReceiver:" +msg); } }
topicMessages消費者:
package com.demo.receiver; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; /** * @Description: * topicMessages消費者 */ @Component @RabbitListener(queues = "topicMessages") public class TopMessagesReceiver { @RabbitHandler public void process(String msg) { System.out.println("topMessagesReceiver:" +msg); } }
測試controller:
package com.demo.controller; import com.demo.model.User; import com.demo.sender.Sender1; import com.demo.sender.Sender2; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; /** * @Description: 測試類 */ @RestController public class RabbitController { @Autowired private Sender1 helloSender1; @Autowired private Sender2 helloSender2; @RequestMapping("/hello") public String hello() { helloSender1.send(); helloSender2.send(); return "ok"; } @RequestMapping("/user") public String user() { User user=new User(); user.setUserName("a"); user.setPassword("1"); user.setSex("m"); user.setLevel("1"); helloSender1.sendUser(user); helloSender2.sendUser(user); return "ok"; } @RequestMapping("/topMessage") public String topMessage() { helloSender1.testTopPicMessage(); helloSender2.testTopPicMessage(); return "ok"; } }
運行項目,訪問http:localhost:8080/topMessage :
sendTopPicMessage1:sendTopPicMessage sendTopPicMessages1:sendTopPicMessages sendTopPicMessage2:sendTopPicMessage sendTopPicMessages2:sendTopPicMessages topMessageReceiver:sendTopPicMessage topMessagesReceiver:sendTopPicMessage topMessagesReceiver:sendTopPicMessages topMessageReceiver:sendTopPicMessage topMessagesReceiver:sendTopPicMessage topMessagesReceiver:sendTopPicMessages
經過exchange發送的每條消息,全部的消費者都能收到。
須要注意:
rabbitTemplate.convertAndSend("topicExchange", "topic.Message", msg);//topic.Message、topic.#兩個都符合,所以兩個消費者都收到消息
rabbitTemplate.convertAndSend("topicExchange", "topic.Messages", msg);//只有topic.#符合,只有topMessages符合接受消息的條件
6.FanoutExchange的使用
啓動類新增聲明三個Queue,用於測試:
package com; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.FanoutExchange; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.TopicExchange; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.Bean; @SpringBootApplication public class RabbitmqDemoApplication { /***************************************隊列***********************************************/ @Bean public Queue helloQueue() { return new Queue("helloQueue"); } @Bean public Queue topicMessage() { return new Queue("topicMessage"); } @Bean public Queue topicMessages() { return new Queue("topicMessages"); } @Bean public Queue fanoutA() { return new Queue("fanoutA"); } @Bean public Queue fanoutB() { return new Queue("fanoutB"); } @Bean public Queue fanoutC() { return new Queue("fanoutC"); } /***************************************exchange***********************************************/ @Bean TopicExchange topicExchange() { return new TopicExchange("topicExchange"); } @Bean FanoutExchange fanoutExchange() { return new FanoutExchange("fanoutExchange"); } /***************************************將隊列和exchange綁定***********************************************/ /** * 將隊列topicMessage與topicExchange綁定, * 只有欄目名爲topic.Message才能匹配, * 獲得當前的Queue * @param topicMessage * @param topicExchange * @return */ @Bean Binding bindingExchangeMessage(Queue topicMessage, TopicExchange topicExchange) { return BindingBuilder.bind(topicMessage).to(topicExchange).with("topic.Message"); } /** * 將隊列topicMessages與topicExchange綁定, * 以topic開頭的欄目名均會模糊匹配, * 獲得當前的Queue * @param topicMessages * @param topicExchange * @return */ @Bean Binding bindingExchangeMessages(Queue topicMessages, TopicExchange topicExchange) { return BindingBuilder.bind(topicMessages).to(topicExchange).with("topic.#"); } /** * 將隊列fanoutA與fanoutExchange綁定 * * @param fanoutA * @param fanoutExchange * @return */ @Bean Binding bindingExchangeA(Queue fanoutA, FanoutExchange fanoutExchange) { return BindingBuilder.bind(fanoutA).to(fanoutExchange); } /** * 將隊列fanoutA與fanoutExchange綁定 * * @param fanoutB * @param fanoutExchange * @return */ @Bean Binding bindingExchangeB(Queue fanoutB, FanoutExchange fanoutExchange) { return BindingBuilder.bind(fanoutB).to(fanoutExchange); } /** * 將隊列fanoutA與fanoutExchange綁定 * * @param fanoutC * @param fanoutExchange * @return */ @Bean Binding bindingExchangeC(Queue fanoutC, FanoutExchange fanoutExchange) { return BindingBuilder.bind(fanoutC).to(fanoutExchange); } public static void main(String[] args) { SpringApplication.run(RabbitmqDemoApplication.class, args); } }
生產者1:
package com.demo.sender; import com.demo.model.User; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.util.Date; /** * @Description: * 生產者1 */ @Component public class Sender1 { @Autowired private AmqpTemplate rabbitTemplate; public void send() { String sendMsg = "hello1 " + new Date(); System.out.println("Sender1:" + sendMsg); rabbitTemplate.convertAndSend("helloQueue", sendMsg); } public void sendUser(User user){ System.out.println("user Sender1:" + user.getUserName()+"/"+user.getPassword()); rabbitTemplate.convertAndSend("helloQueue", user); } public void testTopPicMessage() { String msg = "sendTopPicMessage"; System.out.println("sendTopPicMessage1:" + msg); //第一個參數:指定了exchange //第二個參數:指定了接受消息的欄目名 //第三個參數:消息內容 //到指定exchange找出第二個參數符合的正則表達式,獲得對應的Queue,監聽相應Queue的消費者接受到消息 rabbitTemplate.convertAndSend("topicExchange", "topic.Message", msg);//topic.Message、topic.#兩個都符合 msg = "sendTopPicMessages"; System.out.println("sendTopPicMessages1:" + msg); rabbitTemplate.convertAndSend("topicExchange", "topic.Messages", msg);//只有topic.#符合 } public void testFanoutMessage(){ String sendMsg = "sendFanoutMessage"; System.out.println("fanout Sender1:" + sendMsg); //第二個參數不會進行正則表達式的過濾 //可是必需要填,才能根據exchange找到相關Queue rabbitTemplate.convertAndSend("fanoutExchange","", sendMsg); } }
生產者2:
package com.demo.sender; import com.demo.model.User; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.util.Date; /** * @Description: 生產者2 */ @Component public class Sender2 { @Autowired private AmqpTemplate rabbitTemplate; public void send() { String sendMsg = "hello2 " + new Date(); System.out.println("Sender2:" + sendMsg); rabbitTemplate.convertAndSend("helloQueue", sendMsg); } public void sendUser(User user) { System.out.println("user Sender2:" + user.getUserName() + "/" + user.getPassword()); rabbitTemplate.convertAndSend("helloQueue", user); } public void testTopPicMessage() { String msg = "sendTopPicMessage"; System.out.println("sendTopPicMessage2:" + msg); //第一個參數:指定了exchange //第二個參數:指定了接受消息的欄目名 //第三個參數:消息內容 //到指定exchange找出第二個參數符合的正則表達式,獲得對應的Queue,監聽相應Queue的消費者接受到消息 rabbitTemplate.convertAndSend("topicExchange", "topic.Message", msg);//topic.Message、topic.#兩個都符合 msg = "sendTopPicMessages"; System.out.println("sendTopPicMessages2:" + msg); rabbitTemplate.convertAndSend("topicExchange", "topic.Messages", msg);//只有topic.#符合 } public void testFanoutMessage(){ String sendMsg = "sendFanoutMessage"; System.out.println("fanout Sender2:" + sendMsg); //第二個參數不會進行正則表達式的過濾 //可是必需要填,才能根據exchange找到相關Queue rabbitTemplate.convertAndSend("fanoutExchange","", sendMsg); } }
fanoutA消費者
package com.demo.receiver; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; /** * @Description: * fanoutA消費者 */ @Component @RabbitListener(queues = "fanoutA") public class FanoutReceiverA { @RabbitHandler public void process(String msg) { System.out.println("FanoutReceiverA:" + msg); } }
fanoutB消費者
package com.demo.receiver; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; /** * @Description: * fanoutB消費者 */ @Component @RabbitListener(queues = "fanoutB") public class FanoutReceiverB { @RabbitHandler public void process(String msg) { System.out.println("FanoutReceiverB:" + msg); } }
fanoutC消費者
package com.demo.receiver; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; /** * @Description: * fanoutC消費者 */ @Component @RabbitListener(queues = "fanoutC") public class FanoutReceiverC { @RabbitHandler public void process(String msg) { System.out.println("FanoutReceiverC:" + msg); } }
測試controller:
package com.demo.controller; import com.demo.model.User; import com.demo.sender.Sender1; import com.demo.sender.Sender2; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; /** * @Description: 測試類 */ @RestController public class RabbitController { @Autowired private Sender1 helloSender1; @Autowired private Sender2 helloSender2; @RequestMapping("/hello") public String hello() { helloSender1.send(); helloSender2.send(); return "ok"; } @RequestMapping("/user") public String user() { User user=new User(); user.setUserName("a"); user.setPassword("1"); user.setSex("m"); user.setLevel("1"); helloSender1.sendUser(user); helloSender2.sendUser(user); return "ok"; } @RequestMapping("/topMessage") public String topMessage() { helloSender1.testTopPicMessage(); helloSender2.testTopPicMessage(); return "ok"; } @RequestMapping("/fanoutMessage") public String fanoutMessage() { helloSender1.testFanoutMessage(); helloSender2.testFanoutMessage(); return "ok"; } }
運行項目,訪問http:localhost:8080/fanoutMessage :
fanout Sender1:sendFanoutMessage fanout Sender2:sendFanoutMessage FanoutReceiverA:sendFanoutMessage FanoutReceiverB:sendFanoutMessage FanoutReceiverC:sendFanoutMessage FanoutReceiverA:sendFanoutMessage FanoutReceiverB:sendFanoutMessage FanoutReceiverC:sendFanoutMessage
經過exchange發送的每條消息,全部的消費者都能收到。