SpringCloud-Stream 消息驅動

1、概述

是什麼?

Spring Cloud Stream 是一個構建消息微服務驅動的框架。能夠屏蔽底層消息中間件的差別,下降版本切換成本,統一消息的編程模型,目前僅支持 RabbitMQ 和 Kafka。java

設計思想

標準 MQ 的設計思想

生產者 / 消費者之間靠消息媒介傳遞信息內容,Messagespring

消息必須走特定的通道,MessageChannel編程

消息通道里的消息如何被消費呢,誰負責收發處理?消息通道MessageChannel的子接口SubscribableChannel,由消息處理器MessageHandler所訂閱json

Spring Cloud Stream 的設計思想

若是咱們的項目中用到了 RabbitMQ 和 Kafka 兩種消息中間件,因爲它們的架構不一樣,對實際開發形成了必定困擾;或者用到了一種消息中間件,隨着後面的業務需求須要向另外一種消息隊列遷移,這無疑是災難性的,會形成一大堆的改動,由於它們與系統耦合了,這時候 Spring Cloud Stream 就能夠爲咱們提供一種解耦的方式。api

Spring Cloud Stream 提供的解決方案是:經過定義綁定器 Binder 做爲中間層,實現了應用程序與消息中間件細節之間的隔離。嚮應用程序暴露統一的 Channel 通道,使得應用程序不須要再考慮各類消息中間件的實現。架構

inputs 對應消費者,outputs 對應生產者app

Stream中的消息通訊方式遵循了發佈-訂閱模式,用 Topic 主題進行廣播(在RabbitMQ就是Exchange,在Kafka中就是Topic)框架

工做流程

Binder:綁定器,很方便的鏈接中間件,屏蔽差別ide

Channel:通道,是隊列 Queue 的一種抽象,在消息通信系統中就是實現存儲與轉發的媒介,經過 Channel 對隊列進行配置微服務

Source 和 Sink:簡單理解就是參照物是 Spring Cloud Stream 自己,從 Stream 發佈消息就是輸出,接收消息就是輸入

編碼 API 和經常使用註解

2、基本使用

生產者

配置:

spring:
  application:
    name: cloud-stream-provider
  cloud:
    stream:
      binders: # 配置綁定的rabbitmq的服務信息
        defaultRabbit: # 表示定義的名稱,用於與binding整合
          type: rabbit # 消息組件的類型
          environment:
            spring:
              rabbitmq:
                host: localhost
                port: 5672
                username: guest
                password: guest
      bindings: # 服務的整合處理
        output: # 這個名字是一個通道的名字
          destination: studyExchange # 表示要使用的Exchange名稱定義
          content-type: application/json # 設置消息類型,若是文本就是「text/plain」
          binder: defaultRabbit # 設置要綁定的消息服務的具體設置

發送消息:

@EnableBinding(Source.class)
@Slf4j
public class MessageProviderImpl implements IMessageProvider {

    @Autowired
    private MessageChannel output;

    @Override
    public void send() {
        String serial = IdUtil.simpleUUID();
        output.send(MessageBuilder.withPayload(serial).build());
        log.info("流水號:" + serial);
    }
}

消費者

配置與生產者一致,只須要把 output 改成 input

接收消息:

@Controller
@EnableBinding(Sink.class)
@Slf4j
public class MessageReceiveController {
    @Value("${server.port}")
    private String serverPort;

    @StreamListener(Sink.INPUT)
    public void receiveMessage(Message<String> message){
      log.info("receive -> " + serverPort + " -> " +message.getPayload());
    }
}

3、解決消息重複消費的問題

場景

舉個栗子,咱們對訂單系統作了集羣部署,消費者從 RabbitMQ 中獲取訂單信息,若是同一個訂單被不一樣的服務都獲取到了,就會形成數據錯誤,爲了不這種狀況,咱們可使用 Stream 中的消息分組來解決。

原理

在 Stream 中,處於同一個組的多個消費者是競爭關係,就能夠保證消息只被一個服務消費一次,而不一樣組是能夠重複消費的。如今默認分組就是不一樣的,組流水號不同。

解決

將不想產生重複消費的服務分爲同一個組便可

配置方式

spring:
  cloud:
    stream:
      bindings:
        input:
          group: groupA

4、持久化

若是咱們的消費者由於種種緣由宕機了,生產者此時發送了消息,沒有配置 group 屬性的消費者從新上線後沒法接收到以前的消息,而配置了 group 的消費者仍會接收到消息,這就是持久化的效果

相關文章
相關標籤/搜索