一、Topic交換器(主題,規則匹配),Topic交換器也稱爲主題交換器,特色是根據規則進行匹配,能夠根據模糊進行匹配(即根據路由key進行模糊匹配),決定將那個信息放入到指定的隊列裏面去。java
項目的結構以下所示:web
二、因爲使用的是SpringBoot項目結合Maven項目構建的,pom.xml的配置文件,以下所示,生產者和消費者的配置文件一致,這裏只貼一份了。spring
1 <?xml version="1.0" encoding="UTF-8"?> 2 <project xmlns="http://maven.apache.org/POM/4.0.0" 3 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 4 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 5 https://maven.apache.org/xsd/maven-4.0.0.xsd"> 6 <modelVersion>4.0.0</modelVersion> 7 <parent> 8 <groupId>org.springframework.boot</groupId> 9 <artifactId>spring-boot-starter-parent</artifactId> 10 <version>2.1.1.RELEASE</version> 11 <relativePath /> <!-- lookup parent from repository --> 12 </parent> 13 <groupId>com.bie</groupId> 14 <artifactId>rabbitmq-topic-provider</artifactId> 15 <version>0.0.1-SNAPSHOT</version> 16 <name>rabbitmq-topic-provider</name> 17 <description>Demo project for Spring Boot</description> 18 19 <properties> 20 <java.version>1.8</java.version> 21 </properties> 22 23 <dependencies> 24 <dependency> 25 <groupId>org.springframework.boot</groupId> 26 <artifactId>spring-boot-starter</artifactId> 27 </dependency> 28 <dependency> 29 <groupId>org.springframework.boot</groupId> 30 <artifactId>spring-boot-starter-web</artifactId> 31 </dependency> 32 <dependency> 33 <groupId>org.springframework.boot</groupId> 34 <artifactId>spring-boot-starter-test</artifactId> 35 <scope>test</scope> 36 </dependency> 37 <dependency> 38 <groupId>org.springframework.boot</groupId> 39 <artifactId>spring-boot-starter-amqp</artifactId> 40 </dependency> 41 </dependencies> 42 43 <build> 44 <plugins> 45 <plugin> 46 <groupId>org.springframework.boot</groupId> 47 <artifactId>spring-boot-maven-plugin</artifactId> 48 </plugin> 49 </plugins> 50 </build> 51 52 </project>
三、配置好pom.xml配置文件,就能夠進行開發了,這裏先約束一下配置文件,體現一下SpringBoot的魔力,約定大於配置。apache
1 # 給當前項目起名稱. 2 spring.application.name=rabbitmq-topic-provider 3 4 # 配置端口號 5 server.port=8081 6 7 # 配置rabbitmq的參數. 8 # rabbitmq服務器的ip地址. 9 spring.rabbitmq.host=192.168.110.133 10 # rabbitmq的端口號5672,區別於瀏覽器訪問界面的15672端口號. 11 spring.rabbitmq.port=5672 12 # rabbitmq的帳號. 13 spring.rabbitmq.username=guest 14 # rabbitmq的密碼. 15 spring.rabbitmq.password=guest 16 17 # 設置交換器的名稱,方便修改. 18 # 生產者和消費者的交換器的名稱是一致的,這樣生產者生產的消息發送到交換器,消費者能夠從這個交換器中消費. 19 rabbitmq.config.exchange=log.exchange.topic
模擬三個服務,用戶服務、商品服務,訂單服務,產生的各類日誌信息,包含info、debug、trace、warn、error日誌信息。不一樣的日誌級別信息指定好路由鍵,將發送的消息綁定到交換器上面,發送消息。瀏覽器
1 package com.example.bie.provider; 2 3 import org.springframework.amqp.core.AmqpTemplate; 4 import org.springframework.beans.factory.annotation.Autowired; 5 import org.springframework.beans.factory.annotation.Value; 6 import org.springframework.stereotype.Component; 7 8 /** 9 * 10 * @author biehl 11 * 12 * 生產者,生產消息一樣須要知道向那個交換器Exchange發送消息的. 13 * 14 * 這裏使用的交換器類型使用的是topic主題模式,根據規則匹配。 15 * 16 */ 17 @Component 18 public class RabbitMqUserLogProduce { 19 20 @Autowired 21 private AmqpTemplate rabbitmqAmqpTemplate; 22 23 // 交換器的名稱Exchange 24 @Value(value = "${rabbitmq.config.exchange}") 25 private String exchange; 26 27 // 路由鍵routingkey 28 private String routingKeyInfo = "user.log.info"; 29 private String routingKeyDebug = "user.log.debug"; 30 private String routingKeyTrace = "user.log.trace"; 31 private String routingKeyWarn = "user.log.warn"; 32 private String routingKeyError = "user.log.error"; 33 34 /** 35 * 發送消息的方法 36 * 37 * @param msg 38 */ 39 public void producer(String msg) { 40 // 向消息隊列發送消息 41 // 參數1,交換器的名稱 42 // 參數2,路由鍵 43 // 參數3,消息 44 this.rabbitmqAmqpTemplate.convertAndSend(this.exchange, this.routingKeyInfo, "user.log.info......" + msg); 45 this.rabbitmqAmqpTemplate.convertAndSend(this.exchange, this.routingKeyDebug, "user.log.debug......" + msg); 46 this.rabbitmqAmqpTemplate.convertAndSend(this.exchange, this.routingKeyTrace, "user.log.trace......" + msg); 47 this.rabbitmqAmqpTemplate.convertAndSend(this.exchange, this.routingKeyWarn, "user.log.warn......" + msg); 48 this.rabbitmqAmqpTemplate.convertAndSend(this.exchange, this.routingKeyError, "user.log.error......" + msg); 49 } 50 51 }
1 package com.example.bie.provider; 2 3 import org.springframework.amqp.core.AmqpTemplate; 4 import org.springframework.beans.factory.annotation.Autowired; 5 import org.springframework.beans.factory.annotation.Value; 6 import org.springframework.stereotype.Component; 7 8 /** 9 * 10 * @author biehl 11 * 12 * 生產者,生產消息一樣須要知道向那個交換器Exchange發送消息的. 13 * 14 * 這裏使用的交換器類型使用的是topic主題模式,根據規則匹配。 15 * 16 */ 17 @Component 18 public class RabbitMqProductLogProduce { 19 20 @Autowired 21 private AmqpTemplate rabbitmqAmqpTemplate; 22 23 // 交換器的名稱Exchange 24 @Value(value = "${rabbitmq.config.exchange}") 25 private String exchange; 26 27 // 路由鍵routingkey 28 private String routingKeyInfo = "product.log.info"; 29 private String routingKeyDebug = "product.log.debug"; 30 private String routingKeyTrace = "product.log.trace"; 31 private String routingKeyWarn = "product.log.warn"; 32 private String routingKeyError = "product.log.error"; 33 34 /** 35 * 發送消息的方法 36 * 37 * @param msg 38 */ 39 public void producer(String msg) { 40 // 向消息隊列發送消息 41 // 參數1,交換器的名稱 42 // 參數2,路由鍵 43 // 參數3,消息 44 this.rabbitmqAmqpTemplate.convertAndSend(this.exchange, this.routingKeyInfo, "product.log.info......" + msg); 45 this.rabbitmqAmqpTemplate.convertAndSend(this.exchange, this.routingKeyDebug, "product.log.debug......" + msg); 46 this.rabbitmqAmqpTemplate.convertAndSend(this.exchange, this.routingKeyTrace, "product.log.trace......" + msg); 47 this.rabbitmqAmqpTemplate.convertAndSend(this.exchange, this.routingKeyWarn, "product.log.warn......" + msg); 48 this.rabbitmqAmqpTemplate.convertAndSend(this.exchange, this.routingKeyError, "product.log.error......" + msg); 49 } 50 51 }
1 package com.example.bie.provider; 2 3 import org.springframework.amqp.core.AmqpTemplate; 4 import org.springframework.beans.factory.annotation.Autowired; 5 import org.springframework.beans.factory.annotation.Value; 6 import org.springframework.stereotype.Component; 7 8 /** 9 * 10 * @author biehl 11 * 12 * 生產者,生產消息一樣須要知道向那個交換器Exchange發送消息的. 13 * 14 * 這裏使用的交換器類型使用的是topic主題模式,根據規則匹配。 15 * 16 */ 17 @Component 18 public class RabbitMqOrderLogProduce { 19 20 @Autowired 21 private AmqpTemplate rabbitmqAmqpTemplate; 22 23 // 交換器的名稱Exchange 24 @Value(value = "${rabbitmq.config.exchange}") 25 private String exchange; 26 27 // 路由鍵routingkey 28 private String routingKeyInfo = "order.log.info"; 29 private String routingKeyDebug = "order.log.debug"; 30 private String routingKeyTrace = "order.log.trace"; 31 private String routingKeyWarn = "order.log.warn"; 32 private String routingKeyError = "order.log.error"; 33 34 /** 35 * 發送消息的方法 36 * 37 * @param msg 38 */ 39 public void producer(String msg) { 40 // 向消息隊列發送消息 41 // 參數1,交換器的名稱 42 // 參數2,路由鍵 43 // 參數3,消息 44 this.rabbitmqAmqpTemplate.convertAndSend(this.exchange, this.routingKeyInfo, "order.log.info......" + msg); 45 this.rabbitmqAmqpTemplate.convertAndSend(this.exchange, this.routingKeyDebug, "order.log.debug......" + msg); 46 this.rabbitmqAmqpTemplate.convertAndSend(this.exchange, this.routingKeyTrace, "order.log.trace......" + msg); 47 this.rabbitmqAmqpTemplate.convertAndSend(this.exchange, this.routingKeyWarn, "order.log.warn......" + msg); 48 this.rabbitmqAmqpTemplate.convertAndSend(this.exchange, this.routingKeyError, "order.log.error......" + msg); 49 } 50 51 }
這裏使用web工程,瀏覽器訪問調用,方便測試。你也可使用單元測試的方法。服務器
1 package com.example.bie.controller; 2 3 import org.springframework.beans.factory.annotation.Autowired; 4 import org.springframework.stereotype.Controller; 5 import org.springframework.web.bind.annotation.RequestMapping; 6 import org.springframework.web.bind.annotation.ResponseBody; 7 8 import com.example.bie.provider.RabbitMqOrderLogProduce; 9 import com.example.bie.provider.RabbitMqProductLogProduce; 10 import com.example.bie.provider.RabbitMqUserLogProduce; 11 12 /** 13 * 14 * @author biehl 15 * 16 */ 17 @Controller 18 public class RabbitmqController { 19 20 @Autowired 21 private RabbitMqUserLogProduce rabbitMqUserLogProduce; 22 23 @Autowired 24 private RabbitMqProductLogProduce rabbitMqProductLogProduce; 25 26 @Autowired 27 private RabbitMqOrderLogProduce rabbitMqOrderLogProduce; 28 29 @RequestMapping(value = "/userLogInfo") 30 @ResponseBody 31 public String rabbitmqSendUserLogInfoMessage() { 32 String msg = "生產者===>生者的UserLogInfo消息message: "; 33 for (int i = 0; i < 50000; i++) { 34 rabbitMqUserLogProduce.producer(msg + i); 35 } 36 return "生產===> UserLogInfo消息message ===> success!!!"; 37 } 38 39 @RequestMapping(value = "/productLogInfo") 40 @ResponseBody 41 public String rabbitmqSendProductLogErrorMessage() { 42 String msg = "生產者===>生者的ProductLogInfo消息message: "; 43 for (int i = 0; i < 50000; i++) { 44 rabbitMqProductLogProduce.producer(msg + i); 45 } 46 return "生產===> ProductLogInfo消息message ===> success!!!"; 47 } 48 49 @RequestMapping(value = "/orderLogInfo") 50 @ResponseBody 51 public String rabbitmqSendOrderLogInfoMessage() { 52 String msg = "生產者===>生者的OrderLogInfo消息message: "; 53 for (int i = 0; i < 50000; i++) { 54 rabbitMqOrderLogProduce.producer(msg + i); 55 } 56 return "生產===> OrderLogInfo消息message ===> success!!!"; 57 } 58 59 }
生產者的啓動類以下所示:app
1 package com.example; 2 3 import org.springframework.boot.SpringApplication; 4 import org.springframework.boot.autoconfigure.SpringBootApplication; 5 6 @SpringBootApplication 7 public class RabbitmqProducerApplication { 8 9 public static void main(String[] args) { 10 SpringApplication.run(RabbitmqProducerApplication.class, args); 11 } 12 13 }
四、生產者開發完畢就能夠進行消費者的開發,也是先約束一下配置文件application.properties。maven
1 # 給當前項目起名稱. 2 spring.application.name=rabbitmq-topic-consumer 3 4 # 配置端口號 5 server.port=8080 6 7 # 配置rabbitmq的參數. 8 # rabbitmq服務器的ip地址. 9 spring.rabbitmq.host=192.168.110.133 10 # rabbitmq的端口號5672,區別於瀏覽器訪問界面的15672端口號. 11 spring.rabbitmq.port=5672 12 # rabbitmq的帳號. 13 spring.rabbitmq.username=guest 14 # rabbitmq的密碼. 15 spring.rabbitmq.password=guest 16 17 # 設置交換器的名稱,方便修改. 18 # 路由鍵是將交換器和隊列進行綁定的,隊列經過路由鍵綁定到交換器. 19 rabbitmq.config.exchange=log.exchange.topic 20 21 # info級別的隊列名稱. 22 rabbitmq.config.queue.info=log.info.queue 23 24 # error級別的隊列名稱. 25 rabbitmq.config.queue.error=log.error.queue 26 27 # 全日誌log級別的隊列名稱. 28 rabbitmq.config.queue.log=log.all.queue
約束好配置文件就能夠進行消費者的開發了,這裏是將用戶服務、商品服務、訂單服務產生的info、debug、trace、warn、error不一樣級別的日誌信息,使用rabbitmq的主題topic模式進行規則配置,即,消費者能夠專注消費info級別的消息,error級別的消息,或者所有級別的日誌消息。ide
1 package com.example.bie.consumer; 2 3 import org.springframework.amqp.core.ExchangeTypes; 4 import org.springframework.amqp.rabbit.annotation.Exchange; 5 import org.springframework.amqp.rabbit.annotation.Queue; 6 import org.springframework.amqp.rabbit.annotation.QueueBinding; 7 import org.springframework.amqp.rabbit.annotation.RabbitHandler; 8 import org.springframework.amqp.rabbit.annotation.RabbitListener; 9 import org.springframework.stereotype.Component; 10 11 /** 12 * 13 * @author biehl 14 * 15 * 消息接收者 16 * 17 * 一、@RabbitListener bindings:綁定隊列 18 * 19 * 二、@QueueBinding 20 * value:綁定隊列的名稱、exchange:配置交換器、key:路由鍵routing-key綁定隊列和交換器 21 * 22 * 三、@Queue value:配置隊列名稱、autoDelete:是不是一個可刪除的臨時隊列 23 * 24 * 四、@Exchange value:爲交換器起個名稱、type:指定具體的交換器類型 25 * 26 * 27 */ 28 @Component 29 @RabbitListener(bindings = @QueueBinding( 30 31 value = @Queue(value = "${rabbitmq.config.queue.info}", autoDelete = "true"), 32 33 exchange = @Exchange(value = "${rabbitmq.config.exchange}", type = ExchangeTypes.TOPIC), 34 35 key = "*.log.info")) 36 public class LogInfoConsumer { 37 38 /** 39 * 接收消息的方法,採用消息隊列監聽機制. 40 * 41 * @RabbitHandler意思是將註解@RabbitListener配置到類上面 42 * 43 * @RabbitHandler是指定這個方法能夠進行消息的接收而且消費. 44 * 45 * @param msg 46 */ 47 @RabbitHandler 48 public void consumer(String msg) { 49 // 打印消息 50 System.out.println("All消費者===>消費: " + msg); 51 } 52 53 }
1 package com.example.bie.consumer; 2 3 import org.springframework.amqp.core.ExchangeTypes; 4 import org.springframework.amqp.rabbit.annotation.Exchange; 5 import org.springframework.amqp.rabbit.annotation.Queue; 6 import org.springframework.amqp.rabbit.annotation.QueueBinding; 7 import org.springframework.amqp.rabbit.annotation.RabbitHandler; 8 import org.springframework.amqp.rabbit.annotation.RabbitListener; 9 import org.springframework.stereotype.Component; 10 11 /** 12 * 13 * @author biehl 14 * 15 * 消息接收者 16 * 17 * 一、@RabbitListener bindings:綁定隊列 18 * 19 * 二、@QueueBinding 20 * value:綁定隊列的名稱、exchange:配置交換器、key:路由鍵routing-key綁定隊列和交換器 21 * 22 * 三、@Queue value:配置隊列名稱、autoDelete:是不是一個可刪除的臨時隊列 23 * 24 * 四、@Exchange value:爲交換器起個名稱、type:指定具體的交換器類型 25 * 26 * 27 */ 28 @Component 29 @RabbitListener(bindings = @QueueBinding( 30 31 value = @Queue(value = "${rabbitmq.config.queue.error}", autoDelete = "true"), 32 33 exchange = @Exchange(value = "${rabbitmq.config.exchange}", type = ExchangeTypes.TOPIC), 34 35 key = "*.log.error")) 36 public class LogErrorConsumer { 37 38 /** 39 * 接收消息的方法,採用消息隊列監聽機制. 40 * 41 * @RabbitHandler意思是將註解@RabbitListener配置到類上面 42 * 43 * @RabbitHandler是指定這個方法能夠進行消息的接收而且消費. 44 * 45 * @param msg 46 */ 47 @RabbitHandler 48 public void consumer(String msg) { 49 // 打印消息 50 System.out.println("Error消費者===>消費: " + msg); 51 } 52 53 }
1 package com.example.bie.consumer; 2 3 import org.springframework.amqp.core.ExchangeTypes; 4 import org.springframework.amqp.rabbit.annotation.Exchange; 5 import org.springframework.amqp.rabbit.annotation.Queue; 6 import org.springframework.amqp.rabbit.annotation.QueueBinding; 7 import org.springframework.amqp.rabbit.annotation.RabbitHandler; 8 import org.springframework.amqp.rabbit.annotation.RabbitListener; 9 import org.springframework.stereotype.Component; 10 11 /** 12 * 13 * @author biehl 14 * 15 * 消息接收者 16 * 17 * 一、@RabbitListener bindings:綁定隊列 18 * 19 * 二、@QueueBinding 20 * value:綁定隊列的名稱、exchange:配置交換器、key:路由鍵routing-key綁定隊列和交換器 21 * 22 * 三、@Queue value:配置隊列名稱、autoDelete:是不是一個可刪除的臨時隊列 23 * 24 * 四、@Exchange value:爲交換器起個名稱、type:指定具體的交換器類型 25 * 26 * 27 */ 28 @Component 29 @RabbitListener(bindings = @QueueBinding( 30 31 value = @Queue(value = "${rabbitmq.config.queue.log}", autoDelete = "true"), 32 33 exchange = @Exchange(value = "${rabbitmq.config.exchange}", type = ExchangeTypes.TOPIC), 34 35 key = "*.log.*")) 36 public class LogAllConsumer { 37 38 /** 39 * 接收消息的方法,採用消息隊列監聽機制. 40 * 41 * @RabbitHandler意思是將註解@RabbitListener配置到類上面 42 * 43 * @RabbitHandler是指定這個方法能夠進行消息的接收而且消費. 44 * 45 * @param msg 46 */ 47 @RabbitHandler 48 public void consumer(String msg) { 49 // 打印消息 50 System.out.println("Info消費者===>消費: " + msg); 51 } 52 53 }
消費者的啓動類,以下所示:spring-boot
1 package com.example; 2 3 import org.springframework.boot.SpringApplication; 4 import org.springframework.boot.autoconfigure.SpringBootApplication; 5 6 @SpringBootApplication 7 public class RabbitmqConsumerApplication { 8 9 public static void main(String[] args) { 10 SpringApplication.run(RabbitmqConsumerApplication.class, args); 11 } 12 13 }
五、運行效果以下所示:
做者:別先生
博客園:https://www.cnblogs.com/biehongli/
若是您想及時獲得我的撰寫文章以及著做的消息推送,能夠掃描上方二維碼,關注我的公衆號哦。