RabbitMQ的交換器Exchange之fanout交換器(廣播)

一、Fanout交換器(廣播),以廣播的模式進行消息的傳遞。廣播模式必定沒有路由鍵的存在,將消息從路由器發送到全部綁定的隊列中去(即消息會發送到全部和指定路由器綁定的隊列中去)。java

項目的結構以下所示:web

二、因爲使用的是SpringBoot項目結合Maven項目構建的,pom.xml的配置文件,以下所示,生產者和消費者的配置文件一致,這裏只貼一份了。spring

 1 <?xml version="1.0" encoding="UTF-8"?>
 2 <project xmlns="http://maven.apache.org/POM/4.0.0"
 3     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 4     xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
 5     https://maven.apache.org/xsd/maven-4.0.0.xsd">
 6     <modelVersion>4.0.0</modelVersion>
 7     <parent>
 8         <groupId>org.springframework.boot</groupId>
 9         <artifactId>spring-boot-starter-parent</artifactId>
10         <version>2.1.1.RELEASE</version>
11         <relativePath /> <!-- lookup parent from repository -->
12     </parent>
13     <groupId>com.bie</groupId>
14     <artifactId>rabbitmq-fanout-provider</artifactId>
15     <version>0.0.1-SNAPSHOT</version>
16     <name>rabbitmq-fanout-provider</name>
17     <description>Demo project for Spring Boot</description>
18 
19     <properties>
20         <java.version>1.8</java.version>
21     </properties>
22 
23     <dependencies>
24         <dependency>
25             <groupId>org.springframework.boot</groupId>
26             <artifactId>spring-boot-starter</artifactId>
27         </dependency>
28         <dependency>
29             <groupId>org.springframework.boot</groupId>
30             <artifactId>spring-boot-starter-web</artifactId>
31         </dependency>
32         <dependency>
33             <groupId>org.springframework.boot</groupId>
34             <artifactId>spring-boot-starter-test</artifactId>
35             <scope>test</scope>
36         </dependency>
37         <dependency>
38             <groupId>org.springframework.boot</groupId>
39             <artifactId>spring-boot-starter-amqp</artifactId>
40         </dependency>
41     </dependencies>
42 
43     <build>
44         <plugins>
45             <plugin>
46                 <groupId>org.springframework.boot</groupId>
47                 <artifactId>spring-boot-maven-plugin</artifactId>
48             </plugin>
49         </plugins>
50     </build>
51 
52 </project>

三、配置好pom.xml配置文件,就能夠進行開發了,這裏先約束一下配置文件,體現一下SpringBoot的魔力,約定大於配置。apache

 1 # 給當前項目起名稱.
 2 spring.application.name=rabbitmq-fanout-provider
 3 
 4 # 配置端口號
 5 server.port=8081
 6 
 7 # 配置rabbitmq的參數.
 8 # rabbitmq服務器的ip地址.
 9 spring.rabbitmq.host=192.168.110.133
10 # rabbitmq的端口號5672,區別於瀏覽器訪問界面的15672端口號.
11 spring.rabbitmq.port=5672
12 # rabbitmq的帳號.
13 spring.rabbitmq.username=guest
14 # rabbitmq的密碼.
15 spring.rabbitmq.password=guest
16 
17 # 設置交換器的名稱,方便修改.
18 # 生產者和消費者的交換器的名稱是一致的,這樣生產者生產的消息發送到交換器,消費者能夠從這個交換器中消費.
19 rabbitmq.config.exchange=order.exchange.fanout

生產者模擬一個服務,訂單服務。使用廣播模式,將相同的消息發送到sms短信服務、push服務中去,注意點,生產者和消費者使用相同的路由器。消費者的路由器和隊列進行綁定,就能夠將訂單服務產生的消息發送到sms短信服務、push服務中去瀏覽器

 1 package com.example.bie.provider;
 2 
 3 import org.springframework.amqp.core.AmqpTemplate;
 4 import org.springframework.beans.factory.annotation.Autowired;
 5 import org.springframework.beans.factory.annotation.Value;
 6 import org.springframework.stereotype.Component;
 7 
 8 /**
 9  * 
10  * @author biehl
11  * 
12  *         生產者,生產消息一樣須要知道向那個交換器Exchange發送消息的.
13  * 
14  *         這裏使用的交換器類型使用的是topic主題模式,根據規則匹配。
15  *
16  */
17 @Component
18 public class RabbitMqUserLogProduce {
19 
20     @Autowired
21     private AmqpTemplate rabbitmqAmqpTemplate;
22 
23     // 交換器的名稱Exchange
24     @Value(value = "${rabbitmq.config.exchange}")
25     private String exchange;
26 
27     /**
28      * 發送消息的方法
29      * 
30      * @param msg
31      */
32     public void producer(String msg) {
33         // 向消息隊列發送消息
34         // 參數1,交換器的名稱
35         // 參數2,路由鍵,廣播模式沒有路由鍵,給定空串便可.
36         // 參數3,消息
37         this.rabbitmqAmqpTemplate.convertAndSend(this.exchange, "", "廣播模式: " + msg);
38     }
39 
40 }

這裏使用web工程,瀏覽器訪問調用,方便測試。你也可使用單元測試的方法。服務器

 1 package com.example.bie.controller;
 2 
 3 import org.springframework.beans.factory.annotation.Autowired;
 4 import org.springframework.stereotype.Controller;
 5 import org.springframework.web.bind.annotation.RequestMapping;
 6 import org.springframework.web.bind.annotation.ResponseBody;
 7 
 8 import com.example.bie.provider.RabbitMqUserLogProduce;
 9 
10 /**
11  * 
12  * @author biehl
13  *
14  */
15 @Controller
16 public class RabbitmqController {
17 
18     @Autowired
19     private RabbitMqUserLogProduce rabbitMqUserLogProduce;
20 
21     @RequestMapping(value = "/smsPush")
22     @ResponseBody
23     public String rabbitmqSendUserLogInfoMessage() {
24         String msg = "生產者===>生者的消息message: ";
25         for (int i = 0; i < 50000; i++) {
26             rabbitMqUserLogProduce.producer(msg + i);
27         }
28         return "生產===> 消息message  ===> success!!!";
29     }
30 
31 }

生產者的啓動類以下所示:app

 1 package com.example;
 2 
 3 import org.springframework.boot.SpringApplication;
 4 import org.springframework.boot.autoconfigure.SpringBootApplication;
 5 
 6 @SpringBootApplication
 7 public class RabbitmqProducerApplication {
 8 
 9     public static void main(String[] args) {
10         SpringApplication.run(RabbitmqProducerApplication.class, args);
11     }
12 
13 }

四、生產者開發完畢就能夠進行消費者的開發,也是先約束一下配置文件application.properties。maven

 1 # 給當前項目起名稱.
 2 spring.application.name=rabbitmq-fanout-consumer
 3 
 4 # 配置端口號
 5 server.port=8080
 6 
 7 # 配置rabbitmq的參數.
 8 # rabbitmq服務器的ip地址.
 9 spring.rabbitmq.host=192.168.110.133
10 # rabbitmq的端口號5672,區別於瀏覽器訪問界面的15672端口號.
11 spring.rabbitmq.port=5672
12 # rabbitmq的帳號.
13 spring.rabbitmq.username=guest
14 # rabbitmq的密碼.
15 spring.rabbitmq.password=guest
16 
17 # 設置交換器的名稱,方便修改.
18 # 路由鍵是將交換器和隊列進行綁定的,隊列經過路由鍵綁定到交換器.
19 rabbitmq.config.exchange=order.exchange.fanout
20 
21 # sms短信服務的隊列名稱.
22 rabbitmq.config.queue.sms=order.sms.queue
23 
24 # push服務的隊列名稱.
25 rabbitmq.config.queue.push=order.push.queue

約束好配置文件就能夠進行消費者的開發了,這裏是將訂單服務產生的消息,以廣播模式發送給sms短信服務、push服務中去sms短信服務、push服務接收到的消息一致。廣播模式必定不用配置routing-key路由鍵。ide

 1 package com.example.bie.consumer;
 2 
 3 import org.springframework.amqp.core.ExchangeTypes;
 4 import org.springframework.amqp.rabbit.annotation.Exchange;
 5 import org.springframework.amqp.rabbit.annotation.Queue;
 6 import org.springframework.amqp.rabbit.annotation.QueueBinding;
 7 import org.springframework.amqp.rabbit.annotation.RabbitHandler;
 8 import org.springframework.amqp.rabbit.annotation.RabbitListener;
 9 import org.springframework.stereotype.Component;
10 
11 /**
12  * 
13  * @author biehl
14  * 
15  *         消息接收者
16  * 
17  *         一、@RabbitListener bindings:綁定隊列
18  * 
19  *         二、@QueueBinding
20  *         value:綁定隊列的名稱、exchange:配置交換器、key:路由鍵routing-key綁定隊列和交換器
21  * 
22  *         三、@Queue value:配置隊列名稱、autoDelete:是不是一個可刪除的臨時隊列
23  * 
24  *         四、@Exchange value:爲交換器起個名稱、type:指定具體的交換器類型
25  * 
26  * 
27  */
28 @Component
29 @RabbitListener(bindings = @QueueBinding(
30 
31         value = @Queue(value = "${rabbitmq.config.queue.sms}", autoDelete = "true"),
32         // 廣播模式Fanout不須要路由鍵
33         exchange = @Exchange(value = "${rabbitmq.config.exchange}", type = ExchangeTypes.FANOUT)
34 
35 ))
36 public class SmsConsumer {
37 
38     /**
39      * 接收消息的方法,採用消息隊列監聽機制.
40      * 
41      * @RabbitHandler意思是將註解@RabbitListener配置到類上面
42      * 
43      * @RabbitHandler是指定這個方法能夠進行消息的接收而且消費.
44      * 
45      * @param msg
46      */
47     @RabbitHandler
48     public void consumer(String msg) {
49         // 打印消息
50         System.out.println("sms短信服務消費者===>消費: " + msg);
51     }
52 
53 }
 1 package com.example.bie.consumer;
 2 
 3 import org.springframework.amqp.core.ExchangeTypes;
 4 import org.springframework.amqp.rabbit.annotation.Exchange;
 5 import org.springframework.amqp.rabbit.annotation.Queue;
 6 import org.springframework.amqp.rabbit.annotation.QueueBinding;
 7 import org.springframework.amqp.rabbit.annotation.RabbitHandler;
 8 import org.springframework.amqp.rabbit.annotation.RabbitListener;
 9 import org.springframework.stereotype.Component;
10 
11 /**
12  * 
13  * @author biehl
14  * 
15  *         消息接收者
16  * 
17  *         一、@RabbitListener bindings:綁定隊列
18  * 
19  *         二、@QueueBinding
20  *         value:綁定隊列的名稱、exchange:配置交換器、key:路由鍵routing-key綁定隊列和交換器
21  * 
22  *         三、@Queue value:配置隊列名稱、autoDelete:是不是一個可刪除的臨時隊列
23  * 
24  *         四、@Exchange value:爲交換器起個名稱、type:指定具體的交換器類型
25  * 
26  * 
27  */
28 @Component
29 @RabbitListener(bindings = @QueueBinding(
30 
31         value = @Queue(value = "${rabbitmq.config.queue.push}", autoDelete = "true"),
32         // 廣播模式Fanout不須要路由鍵
33         exchange = @Exchange(value = "${rabbitmq.config.exchange}", type = ExchangeTypes.FANOUT)
34 
35 ))
36 public class PushConsumer {
37 
38     /**
39      * 接收消息的方法,採用消息隊列監聽機制.
40      * 
41      * @RabbitHandler意思是將註解@RabbitListener配置到類上面
42      * 
43      * @RabbitHandler是指定這個方法能夠進行消息的接收而且消費.
44      * 
45      * @param msg
46      */
47     @RabbitHandler
48     public void consumer(String msg) {
49         // 打印消息
50         System.out.println("push短信服務消費者===>消費: " + msg);
51     }
52 
53 }

消費者的啓動類,以下所示:spring-boot

 1 package com.example;
 2 
 3 import org.springframework.boot.SpringApplication;
 4 import org.springframework.boot.autoconfigure.SpringBootApplication;
 5 
 6 @SpringBootApplication
 7 public class RabbitmqConsumerApplication {
 8 
 9     public static void main(String[] args) {
10         SpringApplication.run(RabbitmqConsumerApplication.class, args);
11     }
12 
13 }

五、運行效果以下所示:

 

做者:別先生

博客園:https://www.cnblogs.com/biehongli/

若是您想及時獲得我的撰寫文章以及著做的消息推送,能夠掃描上方二維碼,關注我的公衆號哦。

相關文章
相關標籤/搜索