Spring cloud stream【入門介紹】

案例代碼:https://github.com/q279583842q/springcloud-e-bookhtml

  在實際開發過程當中,服務與服務之間通訊常常會使用到消息中間件,而以往使用了哪一個中間件好比RabbitMQ,那麼該中間件和系統的耦合性就會很是高,若是咱們要替換爲Kafka那麼變更會比較大,這時咱們可使用SpringCloudStream來整合咱們的消息中間件,來下降系統和中間件的耦合性。java

1、什麼是SpringCloudStream

  官方定義 Spring Cloud Stream 是一個構建<font color='red'>消息驅動</font>微服務的框架。   應用程序經過 inputs 或者 outputs 來與 Spring Cloud Stream 中binder 交互,經過咱們配置來 binding ,而 Spring Cloud Stream 的 binder 負責與消息中間件交互。因此,咱們只須要搞清楚如何與 Spring Cloud Stream 交互就能夠方便使用消息驅動的方式。   經過使用Spring Integration來鏈接消息代理中間件以實現消息事件驅動。Spring Cloud Stream 爲一些供應商的消息中間件產品提供了個性化的自動化配置實現,引用了發佈-訂閱、消費組、分區的三個核心概念。目前僅支持RabbitMQKafkagit

2、Stream 解決了什麼問題?

  Stream解決了開發人員無感知的使用消息中間件的問題,由於Stream對消息中間件的進一步封裝,能夠作到代碼層面對中間件的無感知,甚至於動態的切換中間件(rabbitmq切換爲kafka),使得微服務開發的高度解耦,服務能夠關注更多本身的業務流程github

官網結構圖web

在這裏插入圖片描述

組成 說明
Middleware 中間件,目前只支持RabbitMQ和Kafka
Binder Binder是應用與消息中間件之間的封裝,目前實行了Kafka和RabbitMQ的Binder,經過Binder能夠很方便的鏈接中間件,能夠動態的改變消息類型(對應於Kafka的topic,RabbitMQ的exchange),這些均可以經過配置文件來實現
@Input 註解標識輸入通道,經過該輸入通道接收到的消息進入應用程序
@Output 註解標識輸出通道,發佈的消息將經過該通道離開應用程序
@StreamListener 監聽隊列,用於消費者的隊列的<font color='red'>消息接收</font>
@EnableBinding 指信道channel和exchange綁定在一塊兒

3、消息驅動入門案例

  咱們經過一個入門案例來演示下經過stream來整合RabbitMQ來實現消息的異步通訊的效果,因此首先要開啓RabbitMQ服務,RabbitMQ不清楚的請參考此文:https://dpb-bobokaoya-sm.blog.csdn.net/article/details/90409404spring

1.建立消息發送者服務

1.1 建立項目

  建立一個SpringCloud項目apache

在這裏插入圖片描述

1.2 pom文件

  pom文件中重點是要添加<font color='red'>spring-cloud-starter-stream-rabbit</font>這個依賴app

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>1.5.13.RELEASE</version>
	</parent>
	<groupId>com.bobo</groupId>
	<artifactId>stream-sender</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<dependencyManagement>
		<dependencies>
			<dependency>
				<groupId>org.springframework.cloud</groupId>
				<artifactId>spring-cloud-dependencies</artifactId>
				<version>Dalston.SR5</version>
				<type>pom</type>
				<scope>import</scope>
			</dependency>
		</dependencies>
	</dependencyManagement>
	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.cloud</groupId>
			<artifactId>spring-cloud-starter-eureka</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.cloud</groupId>
			<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
		</dependency>
	</dependencies>
	<build>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
			</plugin>
		</plugins>
	</build>
</project>

1.3 配置文件

  配置文件中除了必要的服務名稱端口Eureka的信息外咱們還要添加<font color='red'>RabbitMQ</font>的註冊信息框架

spring.application.name=stream-sender
server.port=9060
#設置服務註冊中心地址,指向另外一個註冊中心
eureka.client.serviceUrl.defaultZone=http://dpb:123456@eureka1:8761/eureka/,http://dpb:123456@eureka2:8761/eureka/

#rebbitmq 連接信息
spring.rabbitmq.host=192.168.88.150
spring.rabbitmq.port=5672
spring.rabbitmq.username=dpb
spring.rabbitmq.password=123
spring.rabbitmq.virtualHost=/

1.4 建立消費發送者接口

  建立一個發送消息的接口。具體以下:方法名稱自定義,返回類型必須是<font color='red'>SubscribableChannel</font>,在Output註解中指定交換器名稱。異步

/**
 * 發送消息的接口
 * @author dengp
 *
 */
public interface ISendeService {

	/**
	 * 指定輸出的交換器名稱
	 * @return
	 */
	@Output("dpb-exchange")
	SubscribableChannel send();
}

1.5 啓動類

  在啓動類中經過@EnableBinding註解綁定咱們建立的接口類。

@SpringBootApplication
@EnableEurekaClient
// 綁定咱們剛剛建立的發送消息的接口類型
@EnableBinding(value={ISendeService.class})
public class StreamSenderStart {

	public static void main(String[] args) {
		SpringApplication.run(StreamSenderStart.class, args);
	}
}

2.建立消息消費者服務

2.1 建立項目

在這裏插入圖片描述

2.2 pom文件

  添加的依賴和發送消息的服務是一致的

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>1.5.13.RELEASE</version>
	</parent>
	<groupId>com.bobo</groupId>
	<artifactId>stream-receiver</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<dependencyManagement>
		<dependencies>
			<dependency>
				<groupId>org.springframework.cloud</groupId>
				<artifactId>spring-cloud-dependencies</artifactId>
				<version>Dalston.SR5</version>
				<type>pom</type>
				<scope>import</scope>
			</dependency>
		</dependencies>
	</dependencyManagement>
	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.cloud</groupId>
			<artifactId>spring-cloud-starter-eureka</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.cloud</groupId>
			<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
		</dependency>
	</dependencies>
	<build>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
			</plugin>
		</plugins>
	</build>
</project>

2.3 配置文件

  注意修改服務名稱和端口

spring.application.name=stream-receiver
server.port=9061
#設置服務註冊中心地址,指向另外一個註冊中心
eureka.client.serviceUrl.defaultZone=http://dpb:123456@eureka1:8761/eureka/,http://dpb:123456@eureka2:8761/eureka/

#rebbitmq 連接信息
spring.rabbitmq.host=192.168.88.150
spring.rabbitmq.port=5672
spring.rabbitmq.username=dpb
spring.rabbitmq.password=123
spring.rabbitmq.virtualHost=/

2.4 建立接收消息的接口

  此接口和發送消息的接口類似,注意使用的是@Input註解。

/**
 * 接收消息的接口
 * @author dengp
 *
 */
public interface IReceiverService {

	/**
	 * 指定接收的交換器名稱
	 * @return
	 */
	@Input("dpb-exchange")
	SubscribableChannel receiver();
}

2.5 建立處理消息的處理類

  注意此類並非實現上面建立的接口。而是經過@EnableBinding來綁定咱們建立的接口,同時經過<font color='red'>@StreamListener</font>註解來監聽dpb-exchange對應的消息服務

/**
 * 具體接收消息的處理類
 * @author dengp
 *
 */
@Service
@EnableBinding(IReceiverService.class)
public class ReceiverService {

	@StreamListener("dpb-exchange")
	public void onReceiver(byte[] msg){
		System.out.println("消費者:"+new String(msg));
	}
}

2.6 啓動類

  一樣要添加@EnableBinding註解

@SpringBootApplication
@EnableEurekaClient
@EnableBinding(value={IReceiverService.class})
public class StreamReceiverStart {

	public static void main(String[] args) {
		SpringApplication.run(StreamReceiverStart.class, args);
	}
}

3.編寫測試代碼

  經過單元測試來測試服務。

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.test.context.junit4.SpringRunner;

import com.bobo.stream.StreamSenderStart;
import com.bobo.stream.sender.ISendeService;

@RunWith(SpringRunner.class)
@SpringBootTest(classes=StreamSenderStart.class)
public class StreamTest {
	
	@Autowired
	private ISendeService sendService;

	@Test
	public void testStream(){
		String msg = "hello stream ...";
		// 將須要發送的消息封裝爲Message對象
		Message message = MessageBuilder
								.withPayload(msg.getBytes())
								.build();
		sendService.send().send(message );
	}
}

啓動消息消費者後,執行測試代碼。結果以下:

在這裏插入圖片描述

消息接收者獲取到了發送者發送的消息,同時咱們在RabbitMQ的web界面也能夠看到相關的信息

在這裏插入圖片描述

總結

  咱們同stream實現了消息中間件的使用,咱們發現只有在兩處地址和RabbitMQ有耦合,第一處是pom文件中的依賴,第二處是application.properties中的RabbitMQ的配置信息,而在具體的業務處理中並無出現任何RabbitMQ相關的代碼,這時若是咱們要替換爲Kafka的話咱們只須要將這兩處換掉便可,即<font color='red'>實現了中間件和服務的高度解耦</font>。

原文出處:https://www.cnblogs.com/dengpengbo/p/11103943.html

相關文章
相關標籤/搜索