SpringCloud Stream消息驅動

SpringCloud Stream消息驅動

圖片在這裏插入圖片描述html

簡介

什麼是SpringCloudStream
  • 官方定義 SpringCloud Stream 是一個構建消息驅動微服務對框架。
  • 應用程序經過  inputs 或者  outputs 來 與Springcloud Stream 中 binder 對象交互 經過咱們配置來 binding(綁定),而  SpringCloud Stream 的 binder 對象負責與消息中間件交互,因此咱們只須要搞清楚如何與 Spring Coud Stream 交互就能夠方便使用消息驅動的方式。
  • 經過使用SpringIntegration 來鏈接消息代理中間件實現消息事件驅動。
  • Spring Cloud Stream 爲一些供應商的消息中間件產品提供來個性化的自動化配置實現,引用來發布-訂閱、消費組、分區的三個核心概念。
  • ==目前值支持 RabbitMQ、Kafka==圖片
  • 官網  https://spring.io/projects/spring-cloud-stream#overview

Spring Cloud Stream 是用於構建與消息傳遞系統的高度可伸縮的事件驅動微服務架構,該框架提供來一個靈活的編程模型,它簡歷在已經創建和熟悉的Spring 熟語和最佳實踐上,包括支持持久化的發佈訂閱、消費組以及消息分區這三個核心概念web

圖片API:https://cloud.spring.io/spring-cloud-static/spring-cloud-stream/3.0.1.RELEASE/reference/html/spring

中文指導手冊:https://m.wang1314.com/doc/webapp/topic/20971999.html編程

設計思想

標準MQjson

圖片圖片圖片

爲何用Cloud Stream
  • 比方說咱們用到了RabbitMQ 和 Kafka ,因爲這兩個消息中間件的架構上的不一樣,像RabbitMQ 有 exchange,Kafka有 Topic 和 Partitions 分區,
  • 圖片這些中間件的差別性致使咱們實際項目開發給咱們形成必定困擾,咱們若是用了兩個消息隊列的其中一種,後面業務需求,咱們想往另外一個消息隊列進行遷移,這時候無疑就是一個災難性的,一大堆東西都須要推到從新作,由於它跟咱們系統耦合度很高,這時候SpringCloud Stream 給咱們提供了一種解耦合的方式
stream憑什麼能夠統一底層差別

Biinder服務器

  • INPUT 對應於消費者架構

  • OUTPUT 對應於生產者app

  • 在沒有綁定器這個概念的狀況下,咱們springBoot 應用要直接與消息中間件進行交互的時候,因爲消息中間件構造不一樣,他們的實現細節上會有較大的差別性,經過定義綁定器做爲中間層,完美地實現了應用與消息中間件細節之間的隔離。經過嚮應用程序暴露統一的Channel 通道,使得應用程序不須要再考慮各類不一樣的消息中間件實現框架

  • 經過綁定器Binder做爲中間層,實現了應用程序與消息中間件的隔離dom


圖片圖片

  • 經過定義綁定器Binder 做爲中間層,實現了應用程序與消息中間件之間細節的隔離。
  • Stream 中的消息通訊方式遵循了發佈訂閱  Topic主題進行廣播

Spring Cloud Stream標準流程套路圖片圖片

編碼API和經常使用註解

圖片在這裏插入圖片描述

案例:
消息驅動生產者
  1. 新建模塊 cloud-stream-rabbitmq-provider8801
  2. pom

<dependencies>    <!--stream rabbit -->    <dependency>        <groupId>org.springframework.cloud</groupId>        <artifactId>spring-cloud-starter-stream-rabbit</artifactId>    </dependency>    <!--eureka client-->    <dependency>        <groupId>org.springframework.cloud</groupId>        <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>    </dependency>    <dependency>        <groupId>org.springframework.boot</groupId>        <artifactId>spring-boot-starter-web</artifactId>    </dependency>    <!--監控-->    <dependency>        <groupId>org.springframework.boot</groupId>        <artifactId>spring-boot-starter-actuator</artifactId>    </dependency>    <!--熱部署-->    <dependency>        <groupId>org.springframework.boot</groupId>        <artifactId>spring-boot-devtools</artifactId>        <scope>runtime</scope>        <optional>true</optional>    </dependency>    <dependency>        <groupId>org.projectlombok</groupId>        <artifactId>lombok</artifactId>        <optional>true</optional>    </dependency>    <dependency>        <groupId>org.springframework.boot</groupId>        <artifactId>spring-boot-starter-test</artifactId>        <scope>test</scope>    </dependency></dependencies>
  1. yml

server:  port: 8801
spring:  application:    name: cloud-stream-provider  cloud:    stream:      binders: #在此處配置要綁定的rabbitmq的服務信息        defaultRabbit: #表示定義的名稱,用於binding整合          type: rabbit #消息組件類型          environment: #設置rabbitmq的相關環境配置            spring:              rabbitmq:                host: ip端口號  #RabbitMQ在本機的用localhost,在服務器的用服務器的ip地址                port: 5672                username: guest                password: guest      bindings: #服務的整合處理        output: #這個名字是一個通道的名稱          destination: studyExchange #表示要使用的Exchange名稱定義          content-type: application/json #設置消息類型,本次爲json          binder: defaultRabbit #設置要綁定的消息服務的具體設置(爆紅不影響使用,位置沒錯)
eureka:  client:    service-url:      defaultZone: http://localhost:7001/eureka  instance:    lease-renewal-interval-in-seconds: 2 #設置心跳的時間間隔(默認是30S)    lease-expiration-duration-in-seconds: 5 #若是超過5S間隔就註銷節點 默認是90s    instance-id: send-8801.com #在信息列表時顯示主機名稱    prefer-ip-address: true #訪問的路徑變爲IP地址

啓動類

@SpringBootApplicationpublic class StreamMQMain8801 {
   public static void main(String[] args) {        SpringApplication.run(StreamMQMain8801.class, args);    }
}
  1. 新建業務類

新建service包     --- IMessageProvider接口 與實現類

public interface IMessageProvider {    public String send();}
@EnableBinding(Source.class)    //定義消息的推送管道(Source是spring的)public class IMessageProviderImpl implements IMessageProvider {
   @Resource    private MessageChannel output;  //消息發送管道
   @Override    public String send() {        String serial = UUID.randomUUID().toString();        output.send(MessageBuilder.withPayload(serial).build());     //MessageBuilder是spring的integration.support.MessageBuilder        System.out.println("*******serial: " + serial);        return null;    }}
@RestControllerpublic class SendMessageController {
   @Resource    private IMessageProvider iMessageProvider;
   @GetMapping("/sendMessage")    public String sendMessage(){        return iMessageProvider.send();    }
}
消費者
  1. 新建模塊cloud-stream-rabbitmq-consumer8802
  2. pom

<dependencies>    <!--stream rabbit -->    <dependency>        <groupId>org.springframework.cloud</groupId>        <artifactId>spring-cloud-starter-stream-rabbit</artifactId>    </dependency>    <!--eureka client-->    <dependency>        <groupId>org.springframework.cloud</groupId>        <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>    </dependency>    <dependency>        <groupId>org.springframework.boot</groupId>        <artifactId>spring-boot-starter-web</artifactId>    </dependency>    <!--監控-->    <dependency>        <groupId>org.springframework.boot</groupId>        <artifactId>spring-boot-starter-actuator</artifactId>    </dependency>    <!--熱部署-->    <dependency>        <groupId>org.springframework.boot</groupId>        <artifactId>spring-boot-devtools</artifactId>        <scope>runtime</scope>        <optional>true</optional>    </dependency>    <dependency>        <groupId>org.projectlombok</groupId>        <artifactId>lombok</artifactId>        <optional>true</optional>    </dependency>    <dependency>        <groupId>org.springframework.boot</groupId>        <artifactId>spring-boot-starter-test</artifactId>        <scope>test</scope>    </dependency></dependencies>
  1. yml

server:  port: 8802
spring:  application:    name: cloud-stream-provider  cloud:    stream:      binders: #在此處配置要綁定的rabbitmq的服務信息        defaultRabbit: #表示定義的名稱,用於binding整合          type: rabbit #消息組件類型          environment: #設置rabbitmq的相關環境配置            spring:              rabbitmq:                host: ip端口  #RabbitMQ在本機的用localhost,在服務器的用服務器的ip地址                port: 5672                username: 用戶名                password: 密碼      bindings: #服務的整合處理        input: #這個名字是一個通道的名稱          destination: studyExchange #表示要使用的Exchange名稱定義          content-type: application/json #設置消息類型,本次爲json,本文要設置爲「text/plain」          binder: defaultRabbit #設置要綁定的消息服務的具體設置(爆紅不影響使用,位置沒錯)
eureka:  client:    service-url:      defaultZone: http://localhost:7001/eureka  instance:    lease-renewal-interval-in-seconds: 2 #設置心跳的時間間隔(默認是30S)    lease-expiration-duration-in-seconds: 5 #若是超過5S間隔就註銷節點 默認是90s    instance-id: receive-8802.com #在信息列表時顯示主機名稱    prefer-ip-address: true #訪問的路徑變爲IP地址
  1. 主啓動類

@SpringBootApplicationpublic class StreamMQMain8802 {
   public static void main(String[] args) {        SpringApplication.run(StreamMQMain8802.class, args);    }
}
  1. 新建 Controller

@EnableBinding(Sink.class)@Controllerpublic class ReceiveMessageListenerController {
   @Value("${server.port}")    private String serverPort;
   @StreamListener(Sink.INPUT) //監聽    public void input(Message<String> message){        System.out.println("消費者1號------>收到的消息:" + message.getPayload() + "\t port:" + serverPort);    }
}

啓動Eureka   消息驅動生產者,消費者

圖片在這裏插入圖片描述

  • http://localhost:8801/sendMessage(8801發送消息)圖片
  • 8802接收到消息:圖片

圖片在這裏插入圖片描述

  • 我的博客: http://blog.yanxiaolong.cn/.
相關文章
相關標籤/搜索