Spring Cloud Stream 是一個構建消息驅動微服務的框架,該框架在Spring Boot的基礎上整合了Spring Integrationg來鏈接消息代理中間件(RabbitMQ, Kafka等),提供了個性化的自動化配置實現,並引入了發佈-訂閱、消費組、分區這三個核心概念。
應用程序經過input通道或者output通道來與Spring Cloud Stream中binder(綁定器)交互,經過配置來binding. 而Spring Cloud Stream的binder負責與中間件交互。java
開發工具:IntelliJ IDEA 2019.2.3web
1、服務器端spring
一、建立項目apache
IDEA中建立一個新的SpringBoot項目,名稱爲「spring-server」,SpringBoot版本選擇2.1.10,在選擇Dependencies(依賴)的界面勾選Spring Cloud Discovery -> Eureka Server。
pom.xml完整內容以下:瀏覽器
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.1.10.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.example</groupId> <artifactId>spring-server</artifactId> <version>0.0.1-SNAPSHOT</version> <name>spring-server</name> <description>Demo project for Spring Boot</description> <properties> <java.version>1.8</java.version> <spring-cloud.version>Greenwich.SR4</spring-cloud.version> </properties> <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-eureka-server</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies> <dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-dependencies</artifactId> <version>${spring-cloud.version}</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
二、修改配置application.yml服務器
修改端口號爲8761;取消將本身信息註冊到Eureka服務器,不從Eureka服務器抓取註冊信息。app
server: port: 8761 eureka: client: register-with-eureka: false fetch-registry: false
三、修改啓動類代碼框架
增長註解@EnableEurekaServermaven
package com.example.springserver; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cloud.netflix.eureka.server.EnableEurekaServer; @SpringBootApplication @EnableEurekaServer public class SpringServerApplication { public static void main(String[] args) { SpringApplication.run(SpringServerApplication.class, args); } }
2、消息生產者ide
一、建立項目
IDEA中建立一個新的SpringBoot項目,名稱爲「spring-producer」,SpringBoot版本選擇2.1.10,在選擇Dependencies(依賴)的界面勾選Web -> Spring Web,Spring Cloud Discovery -> Eureka Discovery Client。
打開pom.xml,添加依賴spring-cloud-starter-stream-rabbit,會自動引入spring-cloud-stream和spring-cloud-stream-binder。
pom.xml完整內容以下:
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.1.10.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.example</groupId> <artifactId>spring-producer</artifactId> <version>0.0.1-SNAPSHOT</version> <name>spring-producer</name> <description>Demo project for Spring Boot</description> <properties> <java.version>1.8</java.version> <spring-cloud.version>Greenwich.SR4</spring-cloud.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream-test-support</artifactId> <scope>test</scope> </dependency> </dependencies> <dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-dependencies</artifactId> <version>${spring-cloud.version}</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
二、修改配置application.yml
pom.xml使用RabbitMQ,默認狀況下,鏈接本地的5672端口。下面這段rabbitmq也可省略。
server: port: 8081 spring: application: name: spring-producer eureka: instance: hostname: localhost client: serviceUrl: defaultZone: http://localhost:8761/eureka/ rabbitmq: host: localhost post: 5672 username: guest password: guest
三、編寫發送服務
方法sendOrder使用@Output("myInput")註解表示建立myInput的消息通道。調用該方法後,會向myInput通道投遞消息。
若是不使用參數myInput,則使用方法名做爲通道名稱。
package com.example.springproducer; import org.springframework.cloud.stream.annotation.Output; import org.springframework.messaging.SubscribableChannel; public interface SendService { @Output("myInput") SubscribableChannel sendOrder(); }
四、修改啓動類代碼
加入註解@EnableBinding以開啓Spring容器的綁定功能,以SendService.class爲參數,Spring容器啓動時,會自動綁定SendService接口中定義的通道。
package com.example.springproducer; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cloud.netflix.eureka.EnableEurekaClient; import org.springframework.cloud.stream.annotation.EnableBinding; @SpringBootApplication @EnableEurekaClient @EnableBinding(SendService.class) public class SpringProducerApplication { public static void main(String[] args) { SpringApplication.run(SpringProducerApplication.class, args); } }
五、添加一個控制器類
調用SendService的發送方法,往服務器發送消息。
package com.example.springproducer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.messaging.Message; import org.springframework.messaging.support.MessageBuilder; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.bind.annotation.RestController; @RestController public class ProducerController { @Autowired SendService sendService; @RequestMapping(value="/send",method= RequestMethod.GET) public String sendRequest(){ //建立消息 Message msg = MessageBuilder.withPayload("hello world".getBytes()).build(); //發送消息 sendService.sendOrder().send(msg); return "SUCCESS"; } }
3、消息消費者
一、建立項目
IDEA中建立一個新的SpringBoot項目,名稱爲「spring-consumer」,SpringBoot版本選擇2.1.10,在選擇Dependencies(依賴)的界面勾選Web -> Spring Web,Spring Cloud Discovery -> Eureka Discovery Client。
打開pom.xml,添加依賴
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.1.10.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.example</groupId> <artifactId>spring-consumer</artifactId> <version>0.0.1-SNAPSHOT</version> <name>spring-consumer</name> <description>Demo project for Spring Boot</description> <properties> <java.version>1.8</java.version> <spring-cloud.version>Greenwich.SR4</spring-cloud.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies> <dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-dependencies</artifactId> <version>${spring-cloud.version}</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
二、修改配置application.yml
server: port: 8080 spring: application: name: spring-consumer eureka: instance: hostname: localhost client: serviceUrl: defaultZone: http://localhost:8761/eureka/ rabbitmq: host: localhost post: 5672 username: guest password: guest
三、縮寫接受消息的通道接口
package com.example.springconsumer; import org.springframework.cloud.stream.annotation.Input; import org.springframework.messaging.SubscribableChannel; public interface ReceiveService { @Input("myInput") SubscribableChannel myInput(); }
四、修改啓動類代碼
一樣綁定消息通道
package com.example.springconsumer; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.annotation.StreamListener; @SpringBootApplication @EnableBinding(ReceiveService.class) public class SpringConsumerApplication { public static void main(String[] args) { SpringApplication.run(SpringConsumerApplication.class, args); } //訂閱myInput通道的消息 @StreamListener("myInput") public void receive(byte[] msg){ System.out.println("接收到的消息:" + new String(msg)); } }
五、測試
(1)檢查服務裏面的RabbitMQ是否有啓動(默認啓動);
(2)啓動spring-server(8761端口);
(3)啓動spring-producer(8081端口);
(4)啓動spring-consumer(8080端口);
(5)瀏覽器訪問http://localhost:8081/send,spring-consumer項目的控制檯輸出:
接收到的消息:hello world
說明消費者已經能夠從消息代理中獲取到消息。
4、更換綁定器
上面使用了RabbitMQ做爲消息代理,若是使用Kafka,能夠更換Maven依賴實現。在生產者和消費者的pom.xml中,將spring-cloud-starter-stream-rabbit修改成spring-cloud-starter-stream-kafka。