「 從0到1學習微服務SpringCloud 」08 構建消息驅動微服務的框架 Spring Cloud Stream

系列文章(更新ing):

「 從0到1學習微服務SpringCloud 」01 一塊兒來學呀! 「 從0到1學習微服務SpringCloud 」02 Eureka服務註冊與發現
「 從0到1學習微服務SpringCloud 」03 Eureka的自我保護機制
「 從0到1學習微服務SpringCloud 」04服務消費者Ribbon+RestTemplate
「 從0到1學習微服務SpringCloud 」05消費者Fegin
「 從0到1學習微服務SpringCloud 」06 統一配置中心Spring Cloud Config
「 從0到1學習微服務SpringCloud 」07 RabbitMq的基本使用app

簡介

官方定義 Spring Cloud Stream 是一個構建消息驅動微服務的框架。框架

簡單來講,它就是用來與消息中間件進行交互的,咱們不須要直接對消息中間件進行操做,而是經過Spring Cloud Stream,從而簡化了對中間件的操做,並進行了解耦(想要更換消息中間件時,無需更改代碼)。 maven

image

應用程序經過 inputs 或者 outputs 來與 Spring Cloud Stream 中binder 交互,而  Spring Cloud Stream 的 binder 負責與中間件交互。微服務

因此,咱們只須要搞清楚如何與  Spring Cloud Stream 交互就能夠方便使用消息驅動的方式。最大的好處莫過於對中間件的再次封裝,能夠作到代碼層面對消息中間件的無感知,甚至於動態的切換中間件。學習

目前Stream只提供了RabiitMq和Kafka的binder,若要使用其餘的消息中間件,須要本身自定義binder。ui

基本使用

消費者

1.新建一個項目, micro-service1用於接收消息,做爲eureka client,增長mq,stream-mq的maven,修改相關配置等再也不累述,與以前同樣spa

2.定義一個接口,將input綁定名爲"input"的消息通道3d

public interface Receiver {
    //消息通道名稱
    String INPUT = "input";

    //綁定可訂閱的通道
    @Input(INPUT)
    SubscribableChannel input();
}

3.定義Stream接收類代理

@Component
//@EnableBinding註解能夠接收一個或多個接口類做爲對象
// 聲明綁定的消息通到,實現與消息代理的鏈接
@EnableBinding(Receiver.class)
@Log4j2
public class StreamReceiver {

    //監聽binding的input
    @StreamListener(Receiver.INPUT)
    //message爲接收到信息消息
    public void input(Message<String> message){
        log.info("StreamReceiver: {}", message.getPayload());
    }
}

啓動,默認是會建立一個臨時隊列,臨時隊列綁定的exchange爲 「input」
全部發送 exchange 爲「input」 的MQ消息都會被投遞到這個臨時隊列,並經過上述方法接收。日誌

image

以上代碼就完成了最基本的消費者部分。

生產者

1.新建一個項目, micro-service2用於發送消息,具體步驟步驟累述

2.定義一個接口,,將output綁定名爲"input"的消息通道

public interface Sender {
    //消息通道名稱
    String OUTPUT = "input";

    @Output(OUTPUT)
    MessageChannel output();
}

3.定義Stream發送類Controller

@RestController
@EnableBinding(Sender.class)
@Log4j2
public class SendController {
    @Autowired
    @Qualifier(Sender.OUTPUT)
    MessageChannel output;

    @GetMapping("send")
    public void send(){
        String message = "Hello! I am Stream Message!";
        log.info("發送Stream消息: {}",message);
        output.send(MessageBuilder.withPayload(message).build());
    }
}

以上代碼就完成了最基本的消費者部分。

啓動後,調用/send接口,可看到收發消息成功的日誌

image

消息分組

當消費者集羣部署時,它們當中應當只有一個能接受到消息。但按照如今的配置,每一個消費者都能收到消息,咱們來看看。

1.啓動兩個micro-service1,設置不一樣接口

2.調用/send接口,兩個應用均能收到消息 image

image

顯然這是不合理,這裏就須要用到消息分組

3.在micro-service1應用中添加Stream分組配置

cloud:
    stream:
      bindings:
        #爲input消息通道添加分組
        input:
          group: testGroup

4.啓動兩個micro-service1,調用/send接口。如今,發送一條信息,只能在其中一個應用中接收到消息,兩個應用輪訓接收。

Spring Cloud Stream的簡單使用講解就到這裏了,下期再見啦~

若是以爲不錯,分享給你的朋友!

image

image

 THANDKS

  • End -

一個立志成大腿而天天努力奮鬥的年輕人

伴學習伴成長,成長之路你並不孤單!

掃描二維碼,關注公衆號

相關文章
相關標籤/搜索