Spring Cloud 系列之 Stream 消息驅動(一)

  在實際開發過程當中,服務與服務之間通訊常常會使用到消息中間件,消息中間件解決了應用解耦、異步處理、流量削鋒等問題,實現高性能,高可用,可伸縮和最終一致性架構。java

  不一樣中間件內部實現方式是不同的,這些中間件的差別性致使咱們實際項目開發給咱們形成了必定的困擾,好比項目中間件爲 Kafka,若是咱們要替換爲 RabbitMQ,這無疑就是一個災難性的工做,一大堆東西都要重作,由於它跟咱們系統的耦合性很是高。這時咱們可使用 Spring Cloud Stream 來整合咱們的消息中間件,下降系統和中間件的耦合性。spring

  

消息中間件的幾大應用場景

  

應用解耦

  

  假設公司有幾個不一樣的系統,各系統在某些業務有聯動關係,好比 A 系統完成了某些操做,須要觸發 B 系統及 C 系統,可是各個系統之間產生了耦合。針對這種場景,用消息中間件就能夠完成解耦,當 A 系統完成操做時將數據放進消息隊列,B 和 C 系統去訂閱消息就能夠了,這樣各系統只要約定好消息的格式就能夠了。shell

  

  傳統模式:apache

  中間件模式:編程

  

異步處理

  

  好比用戶在電商網站下單,下單完成後會給用戶推送短信或郵件,發短信和郵件的過程就能夠異步完成。由於下單付款纔是核心業務,發郵件和短信並不屬於核心功能,且可能耗時較長,因此針對這種業務場景能夠選擇先放到消息隊列中,由其餘服務來異步處理。api

  

  傳統模式:服務器

  中間件模式:架構

  

流量削峯

  

  好比秒殺活動,一會兒進來好多請求,有的服務可能承受不住瞬時高併發而崩潰,針對這種場景,在中間加一層消息隊列,把請求先入隊列,而後再把隊列中的請求平滑的推送給服務,或者讓服務去隊列拉取。併發

  

  傳統模式:app

  中間件模式:

  

日誌處理

  

  對於小型項目來講,咱們一般對日誌的處理沒有那麼多的要求,可是當用戶量,數據量達到必定的峯值以後,問題就會隨之而來。好比:

  • 用戶日誌怎麼存放
  • 用戶日誌存放後怎麼利用
  • 怎麼在存儲大量日誌而不對系統形成影響

  等不少其餘的問題,這樣咱們就須要藉助消息隊列進行業務的上解耦,數據上更好的傳輸。

  Kafka 最開始就是專門爲了處理日誌產生的。

  

總結

  

  消息隊列,是分佈式系統中重要的組件,其通用的使用場景能夠簡單地描述爲:當不須要當即得到結果,可是併發量又須要進行控制的時候,差很少就是須要使用消息隊列的時候。在項目中,將一些無需即時返回且耗時的操做提取出來,進行了異步處理,而這種異步處理的方式大大的節省了服務器的請求響應時間,從而提升了系統的吞吐量。

  當遇到上面幾種狀況的時候,就要考慮用消息隊列了。若是你碰巧使用的是 RabbitMQ 或者 Kafka ,並且一樣也在使用 Spring Cloud,那你能夠考慮下用 Spring Cloud Stream。

  

什麼是 Spring Cloud Stream

  

  Spring Cloud Stream 是用於構建消息驅動微服務應用程序的框架。該框架提供了一個靈活的編程模型,該模型創建在已經熟悉 Spring 習慣用法的基礎上,它提供了來自多家供應商的中間件的合理配置,包括 publish-subscribe,消息分組和消息分區處理的支持。

  Spring Cloud Stream 解決了開發人員無感知的使用消息中間件的問題,由於 Stream 對消息中間件的進一步封裝,能夠作到代碼層面對中間件的無感知,甚至於動態的切換中間件,使得微服務開發的高度解耦,服務能夠關注更多本身的業務流程。

  

核心概念

  

  

組成 說明
Middleware 中間件,支持 RabbitMQ 和 Kafka。
Binder 目標綁定器,目標指的是 Kafka 仍是 RabbitMQ。綁定器就是封裝了目標中間件的包。若是操做的是 Kafka 就使用 spring-cloud-stream-binder-kafka,若是操做的是 RabbitMQ 就使用 spring-cloud-stream-binder-rabbit
@Input 註解標識輸入通道,接收(消息消費者)的消息將經過該通道進入應用程序。
@Output 註解標識輸出通道,發佈(消息生產者)的消息將經過該通道離開應用程序。
@StreamListener 監聽隊列,消費者的隊列的消息接收。
@EnableBinding 註解標識綁定,將信道 channel 和交換機 exchange 綁定在一塊兒。

  

工做原理

  

  經過定義綁定器做爲中間層,實現了應用程序與消息中間件細節之間的隔離。經過嚮應用程序暴露統一的 Channel 通道,使得應用程序不須要再考慮各類不一樣的消息中間件的實現。當須要升級消息中間件,或者是更換其餘消息中間件產品時,咱們須要作的就是更換對應的 Binder 綁定器而不須要修改任何應用邏輯。

  

  

  該模型圖中有以下幾個核心概念:

  • Source:當須要發送消息時,咱們就須要經過 Source.java,它會把咱們所要發送的消息進行序列化(默認轉換成 JSON 格式字符串),而後將這些數據發送到 Channel 中;
  • Sink:當咱們須要監聽消息時就須要經過 Sink.java,它負責從消息通道中獲取消息,並將消息反序列化成消息對象,而後交給具體的消息監聽處理;
  • Channel:一般咱們向消息中間件發送消息或者監聽消息時須要指定主題(Topic)和消息隊列名稱,一旦咱們須要變動主題的時候就須要修改消息發送或消息監聽的代碼。經過 Channel 對象,咱們的業務代碼只須要對應 Channel 就能夠了,具體這個 Channel 對應的是哪一個主題,能夠在配置文件中來指定,這樣當主題變動的時候咱們就不用對代碼作任何修改,從而實現了與具體消息中間件的解耦;
  • Binder:經過不一樣的 Binder 能夠實現與不一樣的消息中間件整合,Binder 提供統一的消息收發接口,從而使得咱們能夠根據實際須要部署不一樣的消息中間件,或者根據實際生產中所部署的消息中間件來調整咱們的配置。

  

環境準備

  

  stream-demo 聚合工程。SpringBoot 2.2.4.RELEASESpring Cloud Hoxton.SR1

  • RabbitMQ:消息隊列
  • eureka-server:註冊中心
  • eureka-server02:註冊中心

  

  

入門案例

  

  點擊連接觀看:Stream 入門案例視頻(獲取更多請關注公衆號「哈嘍沃德先生」)

  

消息生產者

  

建立項目

  

  在 stream-demo 項目下建立 stream-producer 子項目。

  

添加依賴

  

  要使用 RabbitMQ 綁定器,能夠經過使用如下 Maven 座標將其添加到 Spring Cloud Stream 應用程序中:

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>

  或者使用 Spring Cloud Stream RabbitMQ Starter:

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>

  

  完整依賴以下:

<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.example</groupId>
    <artifactId>stream-producer</artifactId>
    <version>1.0-SNAPSHOT</version>

    <!-- 繼承父依賴 -->
    <parent>
        <groupId>com.example</groupId>
        <artifactId>stream-demo</artifactId>
        <version>1.0-SNAPSHOT</version>
    </parent>

    <!-- 項目依賴 -->
    <dependencies>
        <!-- netflix eureka client 依賴 -->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
        </dependency>
        <!-- spring cloud stream binder rabbit 綁定器依賴 -->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
        </dependency>

        <!-- spring boot test 依賴 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.junit.vintage</groupId>
                    <artifactId>junit-vintage-engine</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
    </dependencies>

</project>

  

配置文件

  

  配置 RabbitMQ 消息隊列和 Stream 消息發送與接收的通道。

server:
  port: 8001 # 端口

spring:
  application:
    name: stream-producer # 應用名稱
  rabbitmq:
    host: 192.168.10.101  # 服務器 IP
    port: 5672            # 服務器端口
    username: guest       # 用戶名
    password: guest       # 密碼
    virtual-host: /       # 虛擬主機地址
  cloud:
    stream:
      bindings:
        # 消息發送通道
        # 與 org.springframework.cloud.stream.messaging.Source 中的 @Output("output") 註解的 value 相同
        output:
          destination: stream.message # 綁定的交換機名稱

# 配置 Eureka Server 註冊中心
eureka:
  instance:
    prefer-ip-address: true       # 是否使用 ip 地址註冊
    instance-id: ${spring.cloud.client.ip-address}:${server.port} # ip:port
  client:
    service-url:                  # 設置服務註冊中心地址
      defaultZone: http://localhost:8761/eureka/,http://localhost:8762/eureka/

  

發送消息

  

  MessageProducer.java

package com.example.producer;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;

/**
 * 消息生產者
 */
@Component
@EnableBinding(Source.class)
public class MessageProducer {

    @Autowired
    private Source source;

    /**
     * 發送消息
     *
     * @param message
     */
    public void send(String message) {
        source.output().send(MessageBuilder.withPayload(message).build());
    }

}

  

啓動類

  

  StreamProducerApplication.java

package com.example;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class StreamProducerApplication {

    public static void main(String[] args) {
        SpringApplication.run(StreamProducerApplication.class);
    }

}

  

消息消費者

  

建立項目

  

  在 stream-demo 項目下建立 stream-consumer 子項目。

  

添加依賴

  

  要使用 RabbitMQ 綁定器,能夠經過使用如下 Maven 座標將其添加到 Spring Cloud Stream 應用程序中:

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>

  或者使用 Spring Cloud Stream RabbitMQ Starter:

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>

  

  完整依賴以下:

<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.example</groupId>
    <artifactId>stream-consumer</artifactId>
    <version>1.0-SNAPSHOT</version>

    <!-- 繼承父依賴 -->
    <parent>
        <groupId>com.example</groupId>
        <artifactId>stream-demo</artifactId>
        <version>1.0-SNAPSHOT</version>
    </parent>

    <!-- 項目依賴 -->
    <dependencies>
        <!-- netflix eureka client 依賴 -->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
        </dependency>
        <!-- spring cloud stream binder rabbit 綁定器依賴 -->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
        </dependency>
        
        <!-- spring boot test 依賴 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.junit.vintage</groupId>
                    <artifactId>junit-vintage-engine</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
    </dependencies>

</project>

  

配置文件

  

  配置 RabbitMQ 消息隊列和 Stream 消息發送與接收的通道。

server:
  port: 8002 # 端口

spring:
  application:
    name: stream-consumer # 應用名稱
  rabbitmq:
    host: 192.168.10.101  # 服務器 IP
    port: 5672            # 服務器端口
    username: guest       # 用戶名
    password: guest       # 密碼
    virtual-host: /       # 虛擬主機地址
  cloud:
    stream:
      bindings:
        # 消息接收通道
        # 與 org.springframework.cloud.stream.messaging.Sink 中的 @Input("input") 註解的 value 相同
        input:
          destination: stream.message # 綁定的交換機名稱

# 配置 Eureka Server 註冊中心
eureka:
  instance:
    prefer-ip-address: true       # 是否使用 ip 地址註冊
    instance-id: ${spring.cloud.client.ip-address}:${server.port} # ip:port
  client:
    service-url:                  # 設置服務註冊中心地址
      defaultZone: http://localhost:8761/eureka/,http://localhost:8762/eureka/

  

接收消息

  

  MessageConsumer.java

package com.example.consumer;

import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.stereotype.Component;

/**
 * 消息消費者
 */
@Component
@EnableBinding(Sink.class)
public class MessageConsumer {

    /**
     * 接收消息
     *
     * @param message
     */
    @StreamListener(Sink.INPUT)
    public void receive(String message) {
        System.out.println("message = " + message);
    }

}

  

啓動類

  

  StreamConsumerApplication.java

package com.example;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class StreamConsumerApplication {

    public static void main(String[] args) {
        SpringApplication.run(StreamConsumerApplication.class);
    }

}

  

測試

  

單元測試

  

  MessageProducerTest.java

package com.example;

import com.example.producer.MessageProducer;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest(classes = {StreamProducerApplication.class})
public class MessageProducerTest {

    @Autowired
    private MessageProducer messageProducer;

    @Test
    public void testSend() {
        messageProducer.send("hello spring cloud stream");
    }

}

  

訪問

  

  啓動消息消費者,運行單元測試,消息消費者控制檯打印結果以下:

message = hello spring cloud stream

  RabbitMQ 界面以下:

  

自定義消息通道

  

建立消息通道

  

  參考源碼 Source.javaSink.java 建立自定義消息通道。

  自定義消息發送通道 MySource.java

package com.example.channel;

import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;

/**
 * 自定義消息發送通道
 */
public interface MySource {

    String MY_OUTPUT = "my_output";

    @Output(MY_OUTPUT)
    MessageChannel myOutput();

}

  自定義消息接收通道 MySink.java

package com.example.channel;

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;

/**
 * 自定義消息接收通道
 */
public interface MySink {

    String MY_INPUT = "my_input";

    @Input(MY_INPUT)
    SubscribableChannel myInput();

}

  

配置文件

  

  消息生產者。

server:
  port: 8001 # 端口

spring:
  application:
    name: stream-producer # 應用名稱
  rabbitmq:
    host: 192.168.10.101  # 服務器 IP
    port: 5672            # 服務器端口
    username: guest       # 用戶名
    password: guest       # 密碼
    virtual-host: /       # 虛擬主機地址
  cloud:
    stream:
      bindings:
        # 消息發送通道
        # 與 org.springframework.cloud.stream.messaging.Source 中的 @Output("output") 註解的 value 相同
        output:
          destination: stream.message # 綁定的交換機名稱
        my_output:
          destination: my.message # 綁定的交換機名稱

  

  消息消費者。

server:
  port: 8002 # 端口

spring:
  application:
    name: stream-consumer # 應用名稱
  rabbitmq:
    host: 192.168.10.101  # 服務器 IP
    port: 5672            # 服務器端口
    username: guest       # 用戶名
    password: guest       # 密碼
    virtual-host: /       # 虛擬主機地址
  cloud:
    stream:
      bindings:
        # 消息接收通道
        # 與 org.springframework.cloud.stream.messaging.Sink 中的 @Input("input") 註解的 value 相同
        input:
          destination: stream.message # 綁定的交換機名稱
        my_input:
          destination: my.message # 綁定的交換機名稱

  

代碼重構

  

  消息生產者 MyMessageProducer.java

package com.example.producer;

import com.example.channel.MySource;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;

/**
 * 消息生產者
 */
@Component
@EnableBinding(MySource.class)
public class MyMessageProducer {

    @Autowired
    private MySource mySource;

    /**
     * 發送消息
     *
     * @param message
     */
    public void send(String message) {
        mySource.myOutput().send(MessageBuilder.withPayload(message).build());
    }

}

  

  消息消費者 MyMessageConsumer.java

package com.example.consumer;

import com.example.channel.MySink;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.stereotype.Component;

/**
 * 消息消費者
 */
@Component
@EnableBinding(MySink.class)
public class MyMessageConsumer {

    /**
     * 接收消息
     *
     * @param message
     */
    @StreamListener(MySink.MY_INPUT)
    public void receive(String message) {
        System.out.println("message = " + message);
    }

}

  

測試

  

單元測試

  

  MessageProducerTest.java

package com.example;

import com.example.producer.MyMessageProducer;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest(classes = {StreamProducerApplication.class})
public class MessageProducerTest {

    @Autowired
    private MyMessageProducer myMessageProducer;

    @Test
    public void testMySend() {
        myMessageProducer.send("hello spring cloud stream");
    }

}

  

訪問

  

  啓動消息消費者,運行單元測試,消息消費者控制檯打印結果以下:

message = hello spring cloud stream

  RabbitMQ 界面以下:

  

配置優化

  

  Spring Cloud 微服務開發之因此簡單,除了官方作了許多完全的封裝以外還有一個優勢就是約定大於配置。開發人員僅需規定應用中不符約定的部分,在沒有規定配置的地方採用默認配置,以力求最簡配置爲核心思想。

簡單理解就是:Spring 遵循了推薦默認配置的思想,當存在特殊需求時候,自定義配置便可不然無需配置。

  

  在 Spring Cloud Stream 中,@Output("output")@Input("input") 註解的 value 默認即爲綁定的交換機名稱。因此自定義消息通道的案例咱們就能夠重構爲如下方式。

  

建立消息通道

  

  參考源碼 Source.javaSink.java 建立自定義消息通道。

  自定義消息發送通道 MySource02.java

package com.example.channel;

import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;

/**
 * 自定義消息發送通道
 */
public interface MySource02 {

    String MY_OUTPUT = "default.message";

    @Output(MY_OUTPUT)
    MessageChannel myOutput();

}

  自定義消息接收通道 MySink02.java

package com.example.channel;

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;

/**
 * 自定義消息接收通道
 */
public interface MySink02 {

    String MY_INPUT = "default.message";

    @Input(MY_INPUT)
    SubscribableChannel myInput();

}

  

配置文件

  

  消息生產者。

server:
  port: 8001 # 端口

spring:
  application:
    name: stream-producer # 應用名稱
  rabbitmq:
    host: 192.168.10.101  # 服務器 IP
    port: 5672            # 服務器端口
    username: guest       # 用戶名
    password: guest       # 密碼
    virtual-host: /       # 虛擬主機地址

  

  消息消費者。

server:
  port: 8002 # 端口

spring:
  application:
    name: stream-consumer # 應用名稱
  rabbitmq:
    host: 192.168.10.101  # 服務器 IP
    port: 5672            # 服務器端口
    username: guest       # 用戶名
    password: guest       # 密碼
    virtual-host: /       # 虛擬主機地址

  

代碼重構

  

  消息生產者 MyMessageProducer02.java

package com.example.producer;

import com.example.channel.MySource02;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;

/**
 * 消息生產者
 */
@Component
@EnableBinding(MySource02.class)
public class MyMessageProducer02 {

    @Autowired
    private MySource02 mySource02;

    /**
     * 發送消息
     *
     * @param message
     */
    public void send(String message) {
        mySource02.myOutput().send(MessageBuilder.withPayload(message).build());
    }

}

  

  消息消費者 MyMessageConsumer02.java

package com.example.consumer;

import com.example.channel.MySink02;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.stereotype.Component;

/**
 * 消息消費者
 */
@Component
@EnableBinding(MySink02.class)
public class MyMessageConsumer02 {

    /**
     * 接收消息
     *
     * @param message
     */
    @StreamListener(MySink02.MY_INPUT)
    public void receive(String message) {
        System.out.println("message = " + message);
    }

}

  

測試

  

單元測試

  

  MessageProducerTest.java

package com.example;

import com.example.producer.MyMessageProducer02;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest(classes = {StreamProducerApplication.class})
public class MessageProducerTest {

    @Autowired
    private MyMessageProducer02 myMessageProducer02;

    @Test
    public void testMySend02() {
        myMessageProducer02.send("約定大於配置");
    }

}

  

訪問

  

  啓動消息消費者,運行單元測試,消息消費者控制檯打印結果以下:

message = 約定大於配置

  RabbitMQ 界面以下:

  

短信郵件發送案例

  

  一個消息驅動微服務應用能夠既是消息生產者又是消息消費者。接下來模擬一個短信郵件發送的消息處理過程:

  • 原始消息發送至 source.message 交換機;
  • 消息驅動微服務應用經過 source.message 交換機接收原始消息,通過處理分別發送至 sms.messageemail.message 交換機;
  • 消息驅動微服務應用經過 sms.messageemail.message 交換機接收處理後的消息併發送短信和郵件。

  

建立消息通道

  

  發送原始消息,接收處理後的消息併發送短信和郵件的消息驅動微服務應用。

package com.example.channel;

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;

/**
 * 自定義消息通道
 */
public interface MyProcessor {

    String SOURCE_MESSAGE = "source.message";
    String SMS_MESSAGE = "sms.message";
    String EMAIL_MESSAGE = "email.message";

    @Output(SOURCE_MESSAGE)
    MessageChannel sourceOutput();

    @Input(SMS_MESSAGE)
    SubscribableChannel smsInput();

    @Input(EMAIL_MESSAGE)
    SubscribableChannel emailInput();

}

  

  接收原始消息,通過處理分別發送短信和郵箱的消息驅動微服務應用。

package com.example.channel;

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;

/**
 * 自定義消息通道
 */
public interface MyProcessor {

    String SOURCE_MESSAGE = "source.message";
    String SMS_MESSAGE = "sms.message";
    String EMAIL_MESSAGE = "email.message";

    @Input(SOURCE_MESSAGE)
    MessageChannel sourceOutput();

    @Output(SMS_MESSAGE)
    SubscribableChannel smsOutput();

    @Output(EMAIL_MESSAGE)
    SubscribableChannel emailOutput();

}

  

配置文件

  

  約定大於配置,配置文件只修改端口和應用名稱便可,其餘配置一致。

spring:
  application:
    name: stream-producer # 應用名稱
  rabbitmq:
    host: 192.168.10.101  # 服務器 IP
    port: 5672            # 服務器端口
    username: guest       # 用戶名
    password: guest       # 密碼
    virtual-host: /       # 虛擬主機地址

  

spring:
  application:
    name: stream-consumer # 應用名稱
  rabbitmq:
    host: 192.168.10.101  # 服務器 IP
    port: 5672            # 服務器端口
    username: guest       # 用戶名
    password: guest       # 密碼
    virtual-host: /       # 虛擬主機地址

  

消息驅動微服務 A

  

發送消息

  

  發送原始消息 10086|10086@email.comsource.message 交換機。

package com.example.producer;

import com.example.channel.MyProcessor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;

/**
 * 消息生產者
 */
@Component
@EnableBinding(MyProcessor.class)
public class SourceMessageProducer {

    private Logger logger = LoggerFactory.getLogger(SourceMessageProducer.class);

    @Autowired
    private MyProcessor myProcessor;

    /**
     * 發送原始消息
     *
     * @param sourceMessage
     */
    public void send(String sourceMessage) {
        logger.info("原始消息發送成功,原始消息爲:{}", sourceMessage);
        myProcessor.sourceOutput().send(MessageBuilder.withPayload(sourceMessage).build());
    }

}

  

接收消息

  

  接收處理後的消息併發送短信和郵件。

package com.example.consumer;

import com.example.channel.MyProcessor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.stereotype.Component;

/**
 * 消息消費者
 */
@Component
@EnableBinding(MyProcessor.class)
public class SmsAndEmailMessageConsumer {

    private Logger logger = LoggerFactory.getLogger(SmsAndEmailMessageConsumer.class);

    /**
     * 接收消息 電話號碼
     *
     * @param phoneNum
     */
    @StreamListener(MyProcessor.SMS_MESSAGE)
    public void receiveSms(String phoneNum) {
        logger.info("電話號碼爲:{},調用短信發送服務,發送短信...", phoneNum);
    }

    /**
     * 接收消息 郵箱地址
     *
     * @param emailAddress
     */
    @StreamListener(MyProcessor.EMAIL_MESSAGE)
    public void receiveEmail(String emailAddress) {
        logger.info("郵箱地址爲:{},調用郵件發送服務,發送郵件...", emailAddress);
    }

}

  

消息驅動微服務 B

  

接收消息

  

  接收原始消息 10086|10086@email.com 處理後併發送至 sms.messageemail.message 交換機。

package com.example.consumer;

import com.example.channel.MyProcessor;
import com.example.producer.SmsAndEmailMessageProducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.stereotype.Component;

/**
 * 消息消費者
 */
@Component
@EnableBinding(MyProcessor.class)
public class SourceMessageConsumer {

    private Logger logger = LoggerFactory.getLogger(SourceMessageConsumer.class);

    @Autowired
    private SmsAndEmailMessageProducer smsAndEmailMessageProducer;

    /**
     * 接收原始消息,處理後併發送
     *
     * @param sourceMessage
     */
    @StreamListener(MyProcessor.SOURCE_MESSAGE)
    public void receive(String sourceMessage) {
        logger.info("原始消息接收成功,原始消息爲:{}", sourceMessage);
        // 發送消息 電話號碼
        smsAndEmailMessageProducer.sendSms(sourceMessage.split("\\|")[0]);
        // 發送消息 郵箱地址
        smsAndEmailMessageProducer.sendEmail(sourceMessage.split("\\|")[1]);
    }

}

  

發送消息

  

  發送電話號碼 10086 和郵箱地址 10086@email.comsms.messageemail.message 交換機。

package com.example.producer;

import com.example.channel.MyProcessor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;

/**
 * 消息生產者
 */
@Component
@EnableBinding(MyProcessor.class)
public class SmsAndEmailMessageProducer {

    private Logger logger = LoggerFactory.getLogger(SmsAndEmailMessageProducer.class);

    @Autowired
    private MyProcessor myProcessor;

    /**
     * 發送消息 電話號碼
     *
     * @param smsMessage
     */
    public void sendSms(String smsMessage) {
        logger.info("電話號碼消息發送成功,消息爲:{}", smsMessage);
        myProcessor.smsOutput().send(MessageBuilder.withPayload(smsMessage).build());
    }

    /**
     * 發送消息 郵箱地址
     *
     * @param emailMessage
     */
    public void sendEmail(String emailMessage) {
        logger.info("郵箱地址消息發送成功,消息爲:{}", emailMessage);
        myProcessor.emailOutput().send(MessageBuilder.withPayload(emailMessage).build());
    }

}

  

測試

  

單元測試

  

  MessageProducerTest.java

package com.example;

import com.example.producer.SourceMessageProducer;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest(classes = {StreamProducerApplication.class})
public class MessageProducerTest {

    @Autowired
    private SourceMessageProducer sourceMessageProducer;

    @Test
    public void testSendSource() {
        sourceMessageProducer.send("10086|10086@email.com");
    }

}

  

訪問

  

  消息驅動微服務 A 控制檯打印結果以下:

電話號碼爲:10086,調用短信發送服務,發送短信...
郵箱地址爲:10086@email.com,調用郵件發送服務,發送郵件...

  

  消息驅動微服務 B 控制檯打印結果以下:

原始消息接收成功,原始消息爲:10086|10086@email.com
電話號碼消息發送成功,消息爲:10086
郵箱地址消息發送成功,消息爲:10086@email.com

  

  RabbitMQ 界面以下:

下一篇咱們講解 Stream 如何實現消息分組和消息分區,記得關注噢~

  本文采用 知識共享「署名-非商業性使用-禁止演繹 4.0 國際」許可協議

  你們能夠經過 分類 查看更多關於 Spring Cloud 的文章。

  

  🤗 您的點贊轉發是對我最大的支持。

  📢 掃碼關注 哈嘍沃德先生「文檔 + 視頻」每篇文章都配有專門視頻講解,學習更輕鬆噢 ~

相關文章
相關標籤/搜索