《springcloud 五》springcloud stream

什麼是消息驅動?

SpringCloud Stream消息驅動能夠簡化開發人員對消息中間件的使用複雜度,讓系統開發人員更多盡力專一與核心業務邏輯的開發。SpringCloud Stream基於SpringBoot實現,自動配置化的功能能夠幫助咱們快速上手學習,相似與咱們以前學習的orm框架,能夠平滑的切換多種不一樣的數據庫。node

目前SpringCloud Stream 目前只支持 rabbitMQ和kafkaweb

 

消息驅動原理

綁定器

經過定義綁定器做爲中間層,實現了應用程序與消息中間件細節之間的隔離。經過嚮應用程序暴露統一的Channel經過,是的應用程序不須要再考慮各類不一樣的消息中間件的實現。當須要升級消息中間件,或者是更換其餘消息中間件產品時,咱們須要作的就是更換對應的Binder綁定器而不須要修改任何應用邏輯 。spring

在該模型圖上有以下幾個核心概念:數據庫

  • Source: 當須要發送消息時,咱們就須要經過Source,Source將會把咱們所要發送的消息(POJO對象)進行序列化(默認轉換成JSON格式字符串),而後將這些數據發送到Channel中;
  • Sink: 當咱們須要監聽消息時就須要經過Sink來,Sink負責從消息通道中獲取消息,並將消息反序列化成消息對象(POJO對象),而後交給具體的消息監聽處理進行業務處理;
  • Channel: 消息通道是Stream的抽象之一。一般咱們向消息中間件發送消息或者監聽消息時須要指定主題(Topic)/消息隊列名稱,但這樣一旦咱們須要變動主題名稱的時候須要修改消息發送或者消息監聽的代碼,可是經過Channel抽象,咱們的業務代碼只須要對Channel就能夠了,具體這個Channel對應的是那個主題,就能夠在配置文件中來指定,這樣當主題變動的時候咱們就不用對代碼作任何修改,從而實現了與具體消息中間件的解耦;
  • Binder: Stream中另一個抽象層。經過不一樣的Binder能夠實現與不一樣消息中間件的整合,好比上面的示例咱們所使用的就是針對Kafka的Binder,經過Binder提供統一的消息收發接口,從而使得咱們能夠根據實際須要部署不一樣的消息中間件,或者根據實際生產中所部署的消息中間件來調整咱們的配置。

消息驅動環境搭建

生產者環境

Maven依賴信息

<parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.0.1.RELEASE</version>
    </parent>
    <dependencies>
        <!-- SpringBoot整合Web組件 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
            <version>2.0.1.RELEASE</version>
        </dependency>
    </dependencies>

application.yml信息

server: port: 9000 spring: application: name: spingcloud-stream-producer # rabbitmq: # host: 192.168.174.128 # port: 5672 # username: guest # password: guest

建立管道app

 

// 建立管道接口
public interface SendMessageInterface { // 建立一個輸出管道,用於發送消息
    @Output("my_msg") SubscribableChannel sendMsg(); }

 

發送消息

@RestController public class SendMsgController { @Autowired private SendMessageInterface sendMessageInterface; @RequestMapping("/sendMsg") public String sendMsg() { String msg = UUID.randomUUID().toString(); System.out.println("生產者發送內容msg:" + msg); Message build = MessageBuilder.withPayload(msg.getBytes()).build(); sendMessageInterface.sendMsg().send(build); return "success"; } }

啓動服務

@SpringBootApplication @EnableBinding(SendMessageInterface.class) // 開啓綁定
public class AppProducer { public static void main(String[] args) { SpringApplication.run(AppProducer.class, args); } }

消費者環境

Maven

<parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.0.1.RELEASE</version>
    </parent>
    <dependencies>
        <!-- SpringBoot整合Web組件 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
            <version>2.0.1.RELEASE</version>
        </dependency>
    </dependencies>

application.yml

server: port: 9000 spring: application: name: spingcloud-stream-consumer # rabbitmq: # host: 192.168.174.128 # port: 5672 # username: guest # password: guest

管道中綁定消息

public interface RedMsgInterface { // 從管道中獲取消息
    @Input("my_msg") SubscribableChannel redMsg(); }

消費者獲取消息

@Component public class Consumer { @StreamListener("my_msg") public void listener(String msg) { System.out.println("消費者獲取生產消息:" + msg); } }

啓動消費者

@SpringBootApplication @EnableBinding(RedMsgInterface.class) public class AppConsumer { public static void main(String[] args) { SpringApplication.run(AppConsumer.class, args); } }

消費組

在現實的業務場景中,每個微服務應用爲了實現高可用和負載均衡,都會集羣部署,按照上面咱們啓動了兩個應用的實例,消息被重複消費了兩次。爲解決這個問題,Spring Cloud Stream 中提供了消費組,經過配置 spring.cloud.stream.bindings.myInput.group 屬性爲應用指定一個組名,下面修改下配置文件,負載均衡

server: port: 8001 spring: application: name: spring-cloud-stream # rabbitmq: # host: 192.168.174.128 # port: 5672 # username: guest # password: guest cloud: stream: bindings: mymsg: ###指定 管道名稱 #指定該應用實例屬於 stream 消費組 group: stream

修改消費者

@Component public class Consumer { @Value("${server.port}") private String serverPort; @StreamListener("my_msg") public void listener(String msg) { System.out.println("消費者獲取生產消息:" + msg + ",端口號:" + serverPort); } }

 

更改環境爲kafka

Maven依賴

<dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-kafka</artifactId>
            <version>2.0.1.RELEASE</version>
        </dependency>

生產者配置

server: port: 9000 spring: cloud: stream: # 設置成使用kafka kafka: binder: # Kafka的服務端列表,默認localhost brokers: 192.168.212.174:9092,192.168.212.175:9092,192.168.212.176:9092 # Kafka服務端鏈接的ZooKeeper節點列表,默認localhost zkNodes: 192.168.212.174:2181,192.168.212.175:2181,192.168.212.176:2181 minPartitionCount: 1 autoCreateTopics: true autoAddPartitions: true

消費者配置

server: port: 8000 spring: application: name: springcloud_kafka_consumer cloud: instance-count: 1 instance-index: 0 stream: kafka: binder: brokers: 192.168.212.174:9092,192.168.212.175:9092,192.168.212.176:9092 zk-nodes: 192.168.212.174:2181,192.168.212.175:2181,192.168.212.176:2181 auto-add-partitions: true auto-create-topics: true min-partition-count: 1 bindings: input: destination: my_msg group: s1 consumer: autoCommitOffset: false concurrency: 1 partitioned: false
相關文章
相關標籤/搜索