上篇文章咱們簡單的介紹了stream的使用,發現使用仍是蠻方便的,可是在上個案例中,若是有多個消息接收者,那麼消息生產者發送的消息會被多個消費者都接收到,這種狀況在某些實際場景下是有很大問題的,好比在以下場景中,訂單系統咱們作集羣部署,都會從RabbitMQ中獲取訂單信息,那若是一個訂單同時被兩個服務獲取到,那麼就會形成數據錯誤,咱們得避免這種狀況。這時咱們就可使用Stream中的消息分組來解決了!java
消息分組的做用咱們已經介紹了。注意在Stream中處於同一個group中的多個消費者是競爭關係。就可以保證消息只會被其中一個應用消費一次。不一樣的組是能夠消費的,同一個組內會發生競爭關係,只有其中一個能夠消費。經過案例咱們來演示看看,這裏咱們會建立3個服務,分別以下
|服務|介紹 |
|--|:--|
| stream-group-sender |消息發送者服務 |
| stream-group-receiverA|消息接收者服務 |
|stream-group-receiverB | 消息接收者服務 |git
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>1.5.13.RELEASE</version> </parent> <groupId>com.bobo</groupId> <artifactId>stream-group-sender</artifactId> <version>0.0.1-SNAPSHOT</version> <dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-dependencies</artifactId> <version>Dalston.SR5</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-eureka</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
配置中的「outputProduct」能夠自定義,可是咱們等會在消息接口中要使用到。github
spring.application.name=stream-sender server.port=9060 #設置服務註冊中心地址,指向另外一個註冊中心 eureka.client.serviceUrl.defaultZone=http://dpb:123456@eureka1:8761/eureka/,http://dpb:123456@eureka2:8761/eureka/ #rebbitmq 連接信息 spring.rabbitmq.host=192.168.88.150 spring.rabbitmq.port=5672 spring.rabbitmq.username=dpb spring.rabbitmq.password=123 spring.rabbitmq.virtualHost=/ # 對應 MQ 是 exchange outputProduct自定義的信息 spring.cloud.stream.bindings.outputProduct.destination=exchangeProduct
/** * 發送消息的接口 * @author dengp * */ public interface ISendeService { String OUTPUT="outputProduct"; /** * 指定輸出的交換器名稱 * @return */ @Output(OUTPUT) SubscribableChannel send(); }
@SpringBootApplication @EnableEurekaClient // 綁定咱們剛剛建立的發送消息的接口類型 @EnableBinding(value={ISendeService.class}) public class StreamSenderStart { public static void main(String[] args) { SpringApplication.run(StreamSenderStart.class, args); } }
在本案例中咱們發送的消息是自定義的對象web
package com.bobo.stream.pojo; import java.io.Serializable; public class Product implements Serializable{ private Integer id; private String name; public Integer getId() { return id; } public void setId(Integer id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public Product(Integer id, String name) { super(); this.id = id; this.name = name; } public Product() { super(); } @Override public String toString() { return "Product [id=" + id + ", name=" + name + "]"; } }
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>1.5.13.RELEASE</version> </parent> <groupId>com.bobo</groupId> <artifactId>stream-group-receiverA</artifactId> <version>0.0.1-SNAPSHOT</version> <dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-dependencies</artifactId> <version>Dalston.SR5</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-eureka</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
配置文件中配置分組「groupProduct」spring
spring.application.name=stream-group-receiverA server.port=9070 #設置服務註冊中心地址,指向另外一個註冊中心 eureka.client.serviceUrl.defaultZone=http://dpb:123456@eureka1:8761/eureka/,http://dpb:123456@eureka2:8761/eureka/ #rebbitmq 連接信息 spring.rabbitmq.host=192.168.88.150 spring.rabbitmq.port=5672 spring.rabbitmq.username=dpb spring.rabbitmq.password=123 spring.rabbitmq.virtualHost=/ # 對應 MQ 是 exchange 和消息發送者的 交換器是同一個 spring.cloud.stream.bindings.inputProduct.destination=exchangeProduct # 具體分組 對應 MQ 是 隊列名稱 而且持久化隊列 inputProduct 自定義 spring.cloud.stream.bindings.inputProduct.group=groupProduct
/** * 接收消息的接口 * @author dengp * */ public interface IReceiverService { String INPUT = "inputProduct"; /** * 指定接收的交換器名稱 * @return */ @Input(INPUT) SubscribableChannel receiver(); }
/** * 具體接收消息的處理類 * @author dengp * */ @Service @EnableBinding(IReceiverService.class) public class ReceiverService { @StreamListener(IReceiverService.INPUT) public void onReceiver(Product p){ System.out.println("消費者A:"+p); } }
注意一樣須要添加Product類apache
package com.bobo.stream.pojo; import java.io.Serializable; public class Product implements Serializable{ private Integer id; private String name; public Integer getId() { return id; } public void setId(Integer id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public Product(Integer id, String name) { super(); this.id = id; this.name = name; } public Product() { super(); } @Override public String toString() { return "Product [id=" + id + ", name=" + name + "]"; } }
@SpringBootApplication @EnableEurekaClient @EnableBinding(value={IReceiverService.class}) public class StreamReceiverStart { public static void main(String[] args) { SpringApplication.run(StreamReceiverStart.class, args); } }
此服務和stream-group-receiverA同樣,複製一份只需修改application.properties中的服務名稱,端口。咱們先將group設置不同,咱們測試來看看app
spring.application.name=stream-group-receiverB server.port=9071 #設置服務註冊中心地址,指向另外一個註冊中心 eureka.client.serviceUrl.defaultZone=http://dpb:123456@eureka1:8761/eureka/,http://dpb:123456@eureka2:8761/eureka/ #rebbitmq 連接信息 spring.rabbitmq.host=192.168.88.150 spring.rabbitmq.port=5672 spring.rabbitmq.username=dpb spring.rabbitmq.password=123 spring.rabbitmq.virtualHost=/ # 對應 MQ 是 exchange 和消息發送者的 交換器是同一個 spring.cloud.stream.bindings.inputProduct.destination=exchangeProduct # 具體分組 對應 MQ 是 隊列名稱 而且持久化隊列 inputProduct 自定義 spring.cloud.stream.bindings.inputProduct.group=groupProduct1
@RunWith(SpringRunner.class) @SpringBootTest(classes=StreamSenderStart.class) public class StreamTest { @Autowired private ISendeService sendService; @Test public void testStream(){ Product p = new Product(666, "stream test ..."); // 將須要發送的消息封裝爲Message對象 Message message = MessageBuilder .withPayload(p) .build(); sendService.send().send(message ); } }
在stream-group-receiverA和stream-group-receiverB服務的group不一致的狀況下maven
改成同組的狀況下ide
啓動服務,發送數據spring-boot
經過結果能夠看到只有其中一個受到消息。避免了消息重複消費。
案例代碼github:https://github.com/q279583842q/springcloud-e-book