Spring AMQP 項目是應用了spring的核心概念到AMQP協議消息解決方案中。咱們提供了一個「template」做爲更高級別的抽象去發送和接收消息。我也提供了消息驅動類的支持。使用依賴注入和聲明式編程能夠更好的管理AMQP源代碼。此項目中你能夠看到和SpringFramework中JMS一些類似的地方。web
1.消息隊列是應用程序和應用程序之間的一種通訊方法。spring
2.RabbitMQ : erlang語言開發、 基於AMQP協議。編程
3.同類產品:ActiveMQ、 ZeroMQ、 RabbitMQ、 RocketMQ、 Kafka。服務器
4.物理模型app
5.Broker 消息隊列服務進程、 Exchange消息隊列交換機,Queue 消息隊列、 Producer 消息生產者、 Consumer 消息消費者。maven
6.六種模式: 簡單模式、 工做模式、 發佈與訂閱模式、 路由模式、通配符模式、 遠程調用模式(基本不會用到)。spring-boot
7.關鍵詞:{Broker: 服務器實體、 Exchange :消息交換機、 Queue: 消息隊列載體、Binding: 綁定 、Routing Key: 路由關鍵字、 VHost: 虛擬主機、Producer: 消息生產者 、 Consumer: 消息消費者、Channel: 消息通道 }測試
8.關鍵概念:由Exchange、Queue、RoutingKey三個才能決定一個從Exchange到Queue的惟一的線路。ui
這次是基於SpringBoot開發的RabbitMQ應用程序,利用SpringBoot的自動配置和起步依賴會讓你更快更方便的構建項目。
讓咱們實戰開始。spa
<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.3.4.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-configuration-processor</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> <exclusions> <exclusion> <groupId>org.junit.vintage</groupId> <artifactId>junit-vintage-engine</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit-test</artifactId> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> 123456789101112131415161718192021222324252627282930313233343536373839404142434445
spring: rabbitmq: host: 127.0.0.1 port: 5672 username: guest password: guest server: port: 8082
@Configuration public class RabbitSimpleConfig { @Bean public Queue simpleQueue(){ return new Queue("simpleQueue"); } }
@SpringBootTest(classes = RabbitmqProducerApplication.class) public class ProducerTest { @Autowired RabbitTemplate rabbitTemplate; @Test public void simpleProduct(){ for (int num = 0; num < 20; num++) { rabbitTemplate.convertAndSend("simpleQueue", "簡單模式"+num); } } }
@Component public class MessageListener { @RabbitListener(queues = "simpleQueue") public void simpleListener(String message){ System.out.println("簡單模式監聽器:"+message); } }
@Bean public Queue workQueue(){ return new Queue("workQueue"); }
@Test public void workProduct(){ for (int num = 0; num < 20; num++) { rabbitTemplate.convertAndSend("workQueue", "工做模式"+num); } }
@RabbitListener(queues = "workQueue") public void workListener1(String message) { System.out.println("工做模式監聽器1:" + message); } @RabbitListener(queues = "workQueue") public void workListener2(String message) { System.out.println("工做模式監聽器2:" + message); }
//配置交換器 @Bean public FanoutExchange fanoutExchange() { return new FanoutExchange("fanoutExchange"); } //配置隊列 @Bean public Queue fanoutQueue1() { return new Queue("fanoutQueue1", true, false, false, null); } @Bean public Queue fanoutQueue2() { return new Queue("fanoutQueue2", true, false, false, null); } //配置綁定 @Bean public Binding fanoutBinding1(FanoutExchange fanoutExchange, Queue fanoutQueue1) { return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange); } @Bean public Binding fanoutBinding2(FanoutExchange fanoutExchange, Queue fanoutQueue2) { return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange); }
@Test public void FanoutProduct(){ for (int num = 0; num < 10; num++) { rabbitTemplate.convertAndSend("fanoutExchange","","發佈訂閱模式"+num); } }
@RabbitListener(queues = "fanoutQueue1") public void fanoutListener1(String message) { System.out.println("發佈訂閱監聽器1:" + message); } @RabbitListener(queues = "fanoutQueue2") public void fanoutListener2(String message) { System.out.println("發佈訂閱監聽器2:" + message); }
//配置交換機 @Bean public DirectExchange directExchange() { return new DirectExchange("directExchange"); } //配置隊列 @Bean public Queue directQueue1() { return new Queue("directQueue1", true, false, false, null); } @Bean public Queue directQueue2() { return new Queue("directQueue2", true, false, false, null); } //配置綁定 @Bean public Binding directBinding1(Queue directQueue1, DirectExchange directExchange) { return BindingBuilder.bind(directQueue1).to(directExchange).with("one"); } @Bean public Binding directBinding2(Queue directQueue2, DirectExchange directExchange) { return BindingBuilder.bind(directQueue2).to(directExchange).with("two"); }
@Test public void directProduct1() { for (int num = 0; num < 5; num++) { rabbitTemplate.convertAndSend("directExchange","one", "發送到路由隊列1消息"+num); } } @Test public void directProduct2() { for (int num = 0; num < 5; num++) { rabbitTemplate.convertAndSend("directExchange","two", "發送到路由隊列2消息"+num); } }
@RabbitListener(queues = "directQueue1") public void fanoutListener1(String message) { System.out.println("路由模式監聽器1:" + message); } @RabbitListener(queues = "directQueue2") public void fanoutListener2(String message) { System.out.println("路由模式監聽器2:" + message); }
//配置隊列 @Bean public Queue topicQueue1() { return new Queue("topicQueue1"); } @Bean public Queue topicQueue2() { return new Queue("topicQueue2"); } //配置交換器 @Bean public TopicExchange topicExchange() { return new TopicExchange("topicExchange"); } //配置綁定 @Bean public Binding topicBinding1(Queue topicQueue1, TopicExchange topicExchange) { return BindingBuilder.bind(topicQueue1).to(topicExchange).with("topic.*"); } @Bean public Binding topicBinding2(Queue topicQueue2, TopicExchange topicExchange) { return BindingBuilder.bind(topicQueue2).to(topicExchange).with("topic.#"); }
/* * 通配符模式測試 * */ @Test public void topicProduct() { rabbitTemplate.convertAndSend("topicExchange","topic.one", "routkey爲topic.one的消息"); rabbitTemplate.convertAndSend("topicExchange","topic.one.two", "routkey爲topic.one.two的消息"); }
@RabbitListener(queues = "topicQueue1") public void fanoutListener1(String message) { System.out.println("通配符監聽器1:" + message); } @RabbitListener(queues = "topicQueue2") public void fanoutListener2(String message) { System.out.println("通配符監聽器2:" + message); }
以上就是SpringBoot+RabbitMQ五大模式的簡單使用實例,到目前爲止RabbitMQ也是Sping AMQP的惟一實現。下一節將會講解RabbitMQ可視化管理界面,可視化管理界面幫助咱們能夠直觀地看到RabbitMQ服務器的運行狀況。若是你以爲文章對你有幫助,歡迎關注,謝謝。