Spring cloud stream【消息分組】

  上篇文章咱們簡單的介紹了stream的使用,發現使用仍是蠻方便的,可是在上個案例中,若是有多個消息接收者,那麼消息生產者發送的消息會被多個消費者都接收到,這種狀況在某些實際場景下是有很大問題的,好比在以下場景中,訂單系統咱們作集羣部署,都會從RabbitMQ中獲取訂單信息,那若是一個訂單同時被兩個服務獲取到,那麼就會形成數據錯誤,咱們得避免這種狀況。這時咱們就可使用Stream中的消息分組來解決了!java

在這裏插入圖片描述

Stream消息分組

  消息分組的做用咱們已經介紹了。注意在Stream中處於同一個group中的多個消費者是競爭關係。就可以保證消息只會被其中一個應用消費一次。不一樣的組是能夠消費的,同一個組內會發生競爭關係,只有其中一個能夠消費。經過案例咱們來演示看看,這裏咱們會建立3個服務,分別以下
|服務|介紹 |
|--|:--|
| stream-group-sender |消息發送者服務 |
| stream-group-receiverA|消息接收者服務 |
|stream-group-receiverB | 消息接收者服務 |git

1.建立stream-group-sender 服務

1.1 建立項目

在這裏插入圖片描述

1.2 pom文件

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>1.5.13.RELEASE</version>
    </parent>
    <groupId>com.bobo</groupId>
    <artifactId>stream-group-sender</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>Dalston.SR5</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-eureka</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
</project>

1.3 配置文件

  配置中的「outputProduct」能夠自定義,可是咱們等會在消息接口中要使用到。github

spring.application.name=stream-sender
server.port=9060
#設置服務註冊中心地址,指向另外一個註冊中心
eureka.client.serviceUrl.defaultZone=http://dpb:123456@eureka1:8761/eureka/,http://dpb:123456@eureka2:8761/eureka/

#rebbitmq 連接信息
spring.rabbitmq.host=192.168.88.150
spring.rabbitmq.port=5672
spring.rabbitmq.username=dpb
spring.rabbitmq.password=123
spring.rabbitmq.virtualHost=/

# 對應 MQ 是 exchange  outputProduct自定義的信息
spring.cloud.stream.bindings.outputProduct.destination=exchangeProduct

1.4 發送接口

/**
 * 發送消息的接口
 * @author dengp
 *
 */
public interface ISendeService {
    
    String OUTPUT="outputProduct";
    

    /**
     * 指定輸出的交換器名稱
     * @return
     */
    @Output(OUTPUT)
    SubscribableChannel send();
}

1.5 啓動類

@SpringBootApplication
@EnableEurekaClient
// 綁定咱們剛剛建立的發送消息的接口類型
@EnableBinding(value={ISendeService.class})
public class StreamSenderStart {

    public static void main(String[] args) {
        SpringApplication.run(StreamSenderStart.class, args);
    }
}

1.6 建立pojo

  在本案例中咱們發送的消息是自定義的對象web

package com.bobo.stream.pojo;

import java.io.Serializable;

public class Product implements Serializable{

    private Integer id;
    
    private String name;

    public Integer getId() {
        return id;
    }

    public void setId(Integer id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public Product(Integer id, String name) {
        super();
        this.id = id;
        this.name = name;
    }

    public Product() {
        super();
    }

    @Override
    public String toString() {
        return "Product [id=" + id + ", name=" + name + "]";
    }
    
}

2.建立stream-group-receiverA服務

2.1 建立項目

在這裏插入圖片描述

2.2 pom文件

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>1.5.13.RELEASE</version>
    </parent>
    <groupId>com.bobo</groupId>
    <artifactId>stream-group-receiverA</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>Dalston.SR5</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-eureka</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
</project>

2.3 配置文件

  配置文件中配置分組「groupProduct」spring

spring.application.name=stream-group-receiverA
server.port=9070
#設置服務註冊中心地址,指向另外一個註冊中心
eureka.client.serviceUrl.defaultZone=http://dpb:123456@eureka1:8761/eureka/,http://dpb:123456@eureka2:8761/eureka/

#rebbitmq 連接信息
spring.rabbitmq.host=192.168.88.150
spring.rabbitmq.port=5672
spring.rabbitmq.username=dpb
spring.rabbitmq.password=123
spring.rabbitmq.virtualHost=/

# 對應 MQ 是 exchange  和消息發送者的 交換器是同一個
spring.cloud.stream.bindings.inputProduct.destination=exchangeProduct
# 具體分組 對應 MQ 是 隊列名稱 而且持久化隊列  inputProduct 自定義
spring.cloud.stream.bindings.inputProduct.group=groupProduct

2.4 接收消息的接口

/**
 * 接收消息的接口
 * @author dengp
 *
 */
public interface IReceiverService {

    String INPUT = "inputProduct";
    /**
     * 指定接收的交換器名稱
     * @return
     */
    @Input(INPUT)
    SubscribableChannel receiver();
}

2.5 消息的具體處理類

/**
 * 具體接收消息的處理類
 * @author dengp
 *
 */
@Service
@EnableBinding(IReceiverService.class)
public class ReceiverService {

    @StreamListener(IReceiverService.INPUT)
    public void onReceiver(Product p){
        System.out.println("消費者A:"+p);
    }
}

注意一樣須要添加Product類apache

package com.bobo.stream.pojo;

import java.io.Serializable;

public class Product implements Serializable{

    private Integer id;
    
    private String name;

    public Integer getId() {
        return id;
    }

    public void setId(Integer id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public Product(Integer id, String name) {
        super();
        this.id = id;
        this.name = name;
    }

    public Product() {
        super();
    }

    @Override
    public String toString() {
        return "Product [id=" + id + ", name=" + name + "]";
    }
    
}

2.6 啓動類

@SpringBootApplication
@EnableEurekaClient
@EnableBinding(value={IReceiverService.class})
public class StreamReceiverStart {

    public static void main(String[] args) {
        SpringApplication.run(StreamReceiverStart.class, args);
    }
}

3.建立stream-group-receiverB服務

  此服務和stream-group-receiverA同樣,複製一份只需修改application.properties中的服務名稱,端口。咱們先將group設置不同,咱們測試來看看app

spring.application.name=stream-group-receiverB
server.port=9071
#設置服務註冊中心地址,指向另外一個註冊中心
eureka.client.serviceUrl.defaultZone=http://dpb:123456@eureka1:8761/eureka/,http://dpb:123456@eureka2:8761/eureka/

#rebbitmq 連接信息
spring.rabbitmq.host=192.168.88.150
spring.rabbitmq.port=5672
spring.rabbitmq.username=dpb
spring.rabbitmq.password=123
spring.rabbitmq.virtualHost=/

# 對應 MQ 是 exchange  和消息發送者的 交換器是同一個
spring.cloud.stream.bindings.inputProduct.destination=exchangeProduct
# 具體分組 對應 MQ 是 隊列名稱 而且持久化隊列  inputProduct 自定義
spring.cloud.stream.bindings.inputProduct.group=groupProduct1

4.測試代碼

@RunWith(SpringRunner.class)
@SpringBootTest(classes=StreamSenderStart.class)
public class StreamTest {
    
    @Autowired
    private ISendeService sendService;

    @Test
    public void testStream(){
        Product p = new Product(666, "stream test ...");
        // 將須要發送的消息封裝爲Message對象
        Message message = MessageBuilder
                                .withPayload(p)
                                .build();
        sendService.send().send(message );
    }
}

在stream-group-receiverA和stream-group-receiverB服務的group不一致的狀況下maven

在這裏插入圖片描述

在這裏插入圖片描述

在這裏插入圖片描述

改成同組的狀況下ide

在這裏插入圖片描述

在這裏插入圖片描述

啓動服務,發送數據spring-boot

在這裏插入圖片描述

在這裏插入圖片描述

經過結果能夠看到只有其中一個受到消息。避免了消息重複消費

案例代碼github:https://github.com/q279583842q/springcloud-e-book

在這裏插入圖片描述

相關文章
相關標籤/搜索