SpringCloud之Spring Cloud Stream:消息驅動

Spring Cloud Stream 是一個構建消息驅動微服務的框架,該框架在Spring Boot的基礎上整合了Spring Integrationg來鏈接消息代理中間件(RabbitMQ, Kafka等),提供了個性化的自動化配置實現,並引入了發佈-訂閱、消費組、分區這三個核心概念。
應用程序經過input通道或者output通道來與Spring Cloud Stream中binder(綁定器)交互,經過配置來binding. 而Spring Cloud Stream的binder負責與中間件交互。java

開發工具:IntelliJ IDEA 2019.2.3web

1、服務器端spring

一、建立項目apache

IDEA中建立一個新的SpringBoot項目,名稱爲「spring-server」,SpringBoot版本選擇2.1.10,在選擇Dependencies(依賴)的界面勾選Spring Cloud Discovery -> Eureka Server。
pom.xml完整內容以下:瀏覽器

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.10.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.example</groupId>
    <artifactId>spring-server</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>spring-server</name>
    <description>Demo project for Spring Boot</description>

    <properties>
        <java.version>1.8</java.version>
        <spring-cloud.version>Greenwich.SR4</spring-cloud.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-eureka-server</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>${spring-cloud.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>
View Code

二、修改配置application.yml服務器

修改端口號爲8761;取消將本身信息註冊到Eureka服務器,不從Eureka服務器抓取註冊信息。app

server:
  port: 8761
eureka:
  client:
    register-with-eureka: false
    fetch-registry: false

三、修改啓動類代碼框架

增長註解@EnableEurekaServermaven

package com.example.springserver;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.netflix.eureka.server.EnableEurekaServer;

@SpringBootApplication
@EnableEurekaServer
public class SpringServerApplication {

    public static void main(String[] args) {
        SpringApplication.run(SpringServerApplication.class, args);
    }

}
View Code

2、消息生產者ide

一、建立項目
IDEA中建立一個新的SpringBoot項目,名稱爲「spring-producer」,SpringBoot版本選擇2.1.10,在選擇Dependencies(依賴)的界面勾選Web -> Spring Web,Spring Cloud Discovery -> Eureka Discovery Client。
打開pom.xml,添加依賴spring-cloud-starter-stream-rabbit,會自動引入spring-cloud-stream和spring-cloud-stream-binder。
pom.xml完整內容以下:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.10.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.example</groupId>
    <artifactId>spring-producer</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>spring-producer</name>
    <description>Demo project for Spring Boot</description>

    <properties>
        <java.version>1.8</java.version>
        <spring-cloud.version>Greenwich.SR4</spring-cloud.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-test-support</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>${spring-cloud.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>
View Code

二、修改配置application.yml

pom.xml使用RabbitMQ,默認狀況下,鏈接本地的5672端口。下面這段rabbitmq也可省略。

server:
  port: 8081
spring:
  application:
    name: spring-producer
eureka:
  instance:
    hostname: localhost
  client:
    serviceUrl:
      defaultZone: http://localhost:8761/eureka/
rabbitmq:
  host: localhost
  post: 5672
  username: guest
  password: guest

三、編寫發送服務

方法sendOrder使用@Output("myInput")註解表示建立myInput的消息通道。調用該方法後,會向myInput通道投遞消息。
若是不使用參數myInput,則使用方法名做爲通道名稱。

package com.example.springproducer;

import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.SubscribableChannel;

public interface SendService {
    @Output("myInput")
    SubscribableChannel sendOrder();
}

四、修改啓動類代碼

加入註解@EnableBinding以開啓Spring容器的綁定功能,以SendService.class爲參數,Spring容器啓動時,會自動綁定SendService接口中定義的通道。

package com.example.springproducer;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.netflix.eureka.EnableEurekaClient;
import org.springframework.cloud.stream.annotation.EnableBinding;

@SpringBootApplication
@EnableEurekaClient
@EnableBinding(SendService.class)
public class SpringProducerApplication {

    public static void main(String[] args) {
        SpringApplication.run(SpringProducerApplication.class, args);
    }

}

五、添加一個控制器類

調用SendService的發送方法,往服務器發送消息。

package com.example.springproducer;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class ProducerController {

    @Autowired
    SendService sendService;

    @RequestMapping(value="/send",method= RequestMethod.GET)
    public String sendRequest(){
        //建立消息
        Message msg = MessageBuilder.withPayload("hello world".getBytes()).build();
        //發送消息
        sendService.sendOrder().send(msg);
        return "SUCCESS";
    }
}

3、消息消費者

一、建立項目

IDEA中建立一個新的SpringBoot項目,名稱爲「spring-consumer」,SpringBoot版本選擇2.1.10,在選擇Dependencies(依賴)的界面勾選Web -> Spring Web,Spring Cloud Discovery -> Eureka Discovery Client。
打開pom.xml,添加依賴

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.10.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.example</groupId>
    <artifactId>spring-consumer</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>spring-consumer</name>
    <description>Demo project for Spring Boot</description>

    <properties>
        <java.version>1.8</java.version>
        <spring-cloud.version>Greenwich.SR4</spring-cloud.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>${spring-cloud.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>
View Code

二、修改配置application.yml

server:
  port: 8080
spring:
  application:
    name: spring-consumer
eureka:
  instance:
    hostname: localhost
  client:
    serviceUrl:
      defaultZone: http://localhost:8761/eureka/
rabbitmq:
  host: localhost
  post: 5672
  username: guest
  password: guest

三、縮寫接受消息的通道接口

package com.example.springconsumer;

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;

public interface ReceiveService {
    @Input("myInput")
    SubscribableChannel myInput();
}

四、修改啓動類代碼

一樣綁定消息通道

package com.example.springconsumer;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;

@SpringBootApplication
@EnableBinding(ReceiveService.class)
public class SpringConsumerApplication {

    public static void main(String[] args) {
        SpringApplication.run(SpringConsumerApplication.class, args);
    }

    //訂閱myInput通道的消息
    @StreamListener("myInput")
    public void receive(byte[] msg){
        System.out.println("接收到的消息:" + new String(msg));
    }
}

五、測試

(1)檢查服務裏面的RabbitMQ是否有啓動(默認啓動);

(2)啓動spring-server(8761端口);

(3)啓動spring-producer(8081端口);

(4)啓動spring-consumer(8080端口);

(5)瀏覽器訪問http://localhost:8081/send,spring-consumer項目的控制檯輸出:

接收到的消息:hello world

說明消費者已經能夠從消息代理中獲取到消息。

4、更換綁定器

上面使用了RabbitMQ做爲消息代理,若是使用Kafka,能夠更換Maven依賴實現。在生產者和消費者的pom.xml中,將spring-cloud-starter-stream-rabbit修改成spring-cloud-starter-stream-kafka。

相關文章
相關標籤/搜索