案例代碼:https://github.com/q279583842q/springcloud-e-bookhtml
在實際開發過程當中,服務與服務之間通訊常常會使用到消息中間件,而以往使用了哪一個中間件好比RabbitMQ,那麼該中間件和系統的耦合性就會很是高,若是咱們要替換爲Kafka那麼變更會比較大,這時咱們可使用SpringCloudStream來整合咱們的消息中間件,來下降系統和中間件的耦合性。java
官方定義 Spring Cloud Stream 是一個構建<font color='red'>消息驅動</font>微服務的框架。 應用程序經過 inputs 或者 outputs 來與 Spring Cloud Stream 中binder 交互,經過咱們配置來 binding ,而 Spring Cloud Stream 的 binder 負責與消息中間件交互。因此,咱們只須要搞清楚如何與 Spring Cloud Stream 交互就能夠方便使用消息驅動的方式。 經過使用Spring Integration來鏈接消息代理中間件以實現消息事件驅動。Spring Cloud Stream 爲一些供應商的消息中間件產品提供了個性化的自動化配置實現,引用了發佈-訂閱、消費組、分區的三個核心概念。目前僅支持RabbitMQ、Kafka。git
Stream解決了開發人員無感知的使用消息中間件的問題,由於Stream對消息中間件的進一步封裝,能夠作到代碼層面對中間件的無感知,甚至於動態的切換中間件(rabbitmq切換爲kafka),使得微服務開發的高度解耦,服務能夠關注更多本身的業務流程github
官網結構圖web
組成 | 說明 |
---|---|
Middleware | 中間件,目前只支持RabbitMQ和Kafka |
Binder | Binder是應用與消息中間件之間的封裝,目前實行了Kafka和RabbitMQ的Binder,經過Binder能夠很方便的鏈接中間件,能夠動態的改變消息類型(對應於Kafka的topic,RabbitMQ的exchange),這些均可以經過配置文件來實現 |
@Input | 註解標識輸入通道,經過該輸入通道接收到的消息進入應用程序 |
@Output | 註解標識輸出通道,發佈的消息將經過該通道離開應用程序 |
@StreamListener | 監聽隊列,用於消費者的隊列的<font color='red'>消息接收</font> |
@EnableBinding | 指信道channel和exchange綁定在一塊兒 |
咱們經過一個入門案例來演示下經過stream來整合RabbitMQ來實現消息的異步通訊的效果,因此首先要開啓RabbitMQ服務,RabbitMQ不清楚的請參考此文:https://dpb-bobokaoya-sm.blog.csdn.net/article/details/90409404spring
建立一個SpringCloud項目apache
pom文件中重點是要添加<font color='red'>spring-cloud-starter-stream-rabbit</font>這個依賴app
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>1.5.13.RELEASE</version> </parent> <groupId>com.bobo</groupId> <artifactId>stream-sender</artifactId> <version>0.0.1-SNAPSHOT</version> <dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-dependencies</artifactId> <version>Dalston.SR5</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-eureka</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
配置文件中除了必要的服務名稱,端口和Eureka的信息外咱們還要添加<font color='red'>RabbitMQ</font>的註冊信息框架
spring.application.name=stream-sender server.port=9060 #設置服務註冊中心地址,指向另外一個註冊中心 eureka.client.serviceUrl.defaultZone=http://dpb:123456@eureka1:8761/eureka/,http://dpb:123456@eureka2:8761/eureka/ #rebbitmq 連接信息 spring.rabbitmq.host=192.168.88.150 spring.rabbitmq.port=5672 spring.rabbitmq.username=dpb spring.rabbitmq.password=123 spring.rabbitmq.virtualHost=/
建立一個發送消息的接口。具體以下:方法名稱自定義,返回類型必須是<font color='red'>SubscribableChannel</font>,在Output註解中指定交換器名稱。異步
/** * 發送消息的接口 * @author dengp * */ public interface ISendeService { /** * 指定輸出的交換器名稱 * @return */ @Output("dpb-exchange") SubscribableChannel send(); }
在啓動類中經過@EnableBinding註解綁定咱們建立的接口類。
@SpringBootApplication @EnableEurekaClient // 綁定咱們剛剛建立的發送消息的接口類型 @EnableBinding(value={ISendeService.class}) public class StreamSenderStart { public static void main(String[] args) { SpringApplication.run(StreamSenderStart.class, args); } }
添加的依賴和發送消息的服務是一致的
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>1.5.13.RELEASE</version> </parent> <groupId>com.bobo</groupId> <artifactId>stream-receiver</artifactId> <version>0.0.1-SNAPSHOT</version> <dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-dependencies</artifactId> <version>Dalston.SR5</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-eureka</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
注意修改服務名稱和端口
spring.application.name=stream-receiver server.port=9061 #設置服務註冊中心地址,指向另外一個註冊中心 eureka.client.serviceUrl.defaultZone=http://dpb:123456@eureka1:8761/eureka/,http://dpb:123456@eureka2:8761/eureka/ #rebbitmq 連接信息 spring.rabbitmq.host=192.168.88.150 spring.rabbitmq.port=5672 spring.rabbitmq.username=dpb spring.rabbitmq.password=123 spring.rabbitmq.virtualHost=/
此接口和發送消息的接口類似,注意使用的是@Input註解。
/** * 接收消息的接口 * @author dengp * */ public interface IReceiverService { /** * 指定接收的交換器名稱 * @return */ @Input("dpb-exchange") SubscribableChannel receiver(); }
注意此類並非實現上面建立的接口。而是經過@EnableBinding來綁定咱們建立的接口,同時經過<font color='red'>@StreamListener</font>註解來監聽dpb-exchange對應的消息服務
/** * 具體接收消息的處理類 * @author dengp * */ @Service @EnableBinding(IReceiverService.class) public class ReceiverService { @StreamListener("dpb-exchange") public void onReceiver(byte[] msg){ System.out.println("消費者:"+new String(msg)); } }
一樣要添加@EnableBinding註解
@SpringBootApplication @EnableEurekaClient @EnableBinding(value={IReceiverService.class}) public class StreamReceiverStart { public static void main(String[] args) { SpringApplication.run(StreamReceiverStart.class, args); } }
經過單元測試來測試服務。
import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.messaging.Message; import org.springframework.messaging.support.MessageBuilder; import org.springframework.test.context.junit4.SpringRunner; import com.bobo.stream.StreamSenderStart; import com.bobo.stream.sender.ISendeService; @RunWith(SpringRunner.class) @SpringBootTest(classes=StreamSenderStart.class) public class StreamTest { @Autowired private ISendeService sendService; @Test public void testStream(){ String msg = "hello stream ..."; // 將須要發送的消息封裝爲Message對象 Message message = MessageBuilder .withPayload(msg.getBytes()) .build(); sendService.send().send(message ); } }
啓動消息消費者後,執行測試代碼。結果以下:
消息接收者獲取到了發送者發送的消息,同時咱們在RabbitMQ的web界面也能夠看到相關的信息
咱們同stream實現了消息中間件的使用,咱們發現只有在兩處地址和RabbitMQ有耦合,第一處是pom文件中的依賴,第二處是application.properties中的RabbitMQ的配置信息,而在具體的業務處理中並無出現任何RabbitMQ相關的代碼,這時若是咱們要替換爲Kafka的話咱們只須要將這兩處換掉便可,即<font color='red'>實現了中間件和服務的高度解耦</font>。
原文出處:https://www.cnblogs.com/dengpengbo/p/11103943.html