乾貨|Spring Cloud Stream 體系及原理介紹

Spring Cloud Stream 在 Spring Cloud 體系內用於構建高度可擴展的基於事件驅動的微服務,其目的是爲了簡化消息在 Spring Cloud 應用程序中的開發。Spring Cloud Stream (後面以 SCS 代替 Spring Cloud Stream) 自己內容不少,並且它還有不少外部的依賴,想要熟悉 SCS,必需要先了解 Spring Messaging 和 Spring Integration 這兩個項目,接下來文章將從如下幾點跟你們進行介紹:html

  • 什麼是 Spring Messaging;
  • 什麼是 Spring Integration;
  • 什麼是 SCS及其功能;

Spring Messaging

Spring Messaging 是 Spring Framework 中的一個模塊,其做用就是統一消息的編程模型。git

  • 好比消息 Messaging 對應的模型就包括一個消息體 Payload 和消息頭 Header:

package org.springframework.messaging;
public interface Message<T> {
    T getPayload();
    MessageHeaders getHeaders();
}
  • 消息通道 MessageChannel 用於接收消息,調用 send 方法能夠將消息發送至該消息通道中 :

@FunctionalInterface
public interface MessageChannel {
    long INDEFINITE_TIMEOUT = -1;
    default boolean send(Message<?> message) {
        return send(message, INDEFINITE_TIMEOUT);
    }
    boolean send(Message<?> message, long timeout);
}

消息通道里的消息如何被消費呢?github

  • 由消息通道的子接口可訂閱的消息通道 SubscribableChannel 實現,被 MessageHandler 消息處理器所訂閱:
public interface SubscribableChannel extends MessageChannel {
    boolean subscribe(MessageHandler handler);
    boolean unsubscribe(MessageHandler handler);
}
  • MessageHandler 真正地消費/處理消息:
@FunctionalInterface
public interface MessageHandler {
    void handleMessage(Message<?> message) throws MessagingException;
}

Spring Messaging 內部在消息模型的基礎上衍生出了其它的一些功能,如:web

  1. 消息接收參數及返回值處理:消息接收參數處理器 HandlerMethodArgumentResolver 配合 @Header@Payload 等註解使用;消息接收後的返回值處理器 HandlerMethodReturnValueHandler 配合 @SendTo 註解使用;
  2. 消息體內容轉換器 MessageConverter
  3. 統一抽象的消息發送模板 AbstractMessageSendingTemplate
  4. 消息通道攔截器 ChannelInterceptor

Spring Integration

Spring Integration 提供了 Spring 編程模型的擴展用來支持企業集成模式(Enterprise Integration Patterns),是對 Spring Messaging 的擴展。它提出了很多新的概念,包括消息的路由 MessageRoute、消息的分發 MessageDispatcher、消息的過濾 Filter、消息的轉換 Transformer、消息的聚合 Aggregator、消息的分割 Splitter 等等。同時還提供了 MessageChannel 和MessageHandler 的實現,分別包括 DirectChannelExecutorChannelPublishSubscribeChannel 和MessageFilterServiceActivatingHandlerMethodInvokingSplitter 等內容。spring

首先爲你們介紹幾種消息的處理方式:sql

  • 消息的分割:

  • 消息的聚合:

  • 消息的過濾:

  • 消息的分發:

接下來,咱們以一個最簡單的例子來嘗試一下 Spring Integration:編程

SubscribableChannel messageChannel = new DirectChannel(); // 1

messageChannel.subscribe(msg -> { // 2
  System.out.println("receive: " + msg.getPayload());
});

messageChannel.send(MessageBuilder.withPayload("msg from alibaba").build()); // 3
  1. 構造一個可訂閱的消息通道 messageChannel
  2. 使用 MessageHandler 去消費這個消息通道里的消息;
  3. 發送一條消息到這個消息通道,消息最終被消息通道里的 MessageHandler 所消費,最後控制檯打印出: receive: msg from alibaba

DirectChannel 內部有個 UnicastingDispatcher 類型的消息分發器,會分發到對應的消息通道 MessageChannel 中,從名字也能夠看出來,UnicastingDispatcher 是個單播的分發器,只能選擇一個消息通道。那麼如何選擇呢? 內部提供了 LoadBalancingStrategy 負載均衡策略,默認只有輪詢的實現,能夠進行擴展。架構

咱們對上段代碼作一點修改,使用多個 MessageHandler 去處理消息:併發

SubscribableChannel messageChannel = new DirectChannel();

messageChannel.subscribe(msg -> {
  System.out.println("receive1: " + msg.getPayload());
});

messageChannel.subscribe(msg -> {
  System.out.println("receive2: " + msg.getPayload());
});

messageChannel.send(MessageBuilder.withPayload("msg from alibaba").build());
messageChannel.send(MessageBuilder.withPayload("msg from alibaba").build());

因爲 DirectChannel 內部的消息分發器是 UnicastingDispatcher 單播的方式,而且採用輪詢的負載均衡策略,因此這裏兩次的消費分別對應這兩個 MessageHandler。控制檯打印出:負載均衡

receive1: msg from alibaba
receive2: msg from alibaba

既然存在單播的消息分發器 UnicastingDispatcher,必然也會存在廣播的消息分發器,那就是 BroadcastingDispatcher,它被 PublishSubscribeChannel 這個消息通道所使用。廣播消息分發器會把消息分發給全部的 MessageHandler

SubscribableChannel messageChannel = new PublishSubscribeChannel();

messageChannel.subscribe(msg -> {
  System.out.println("receive1: " + msg.getPayload());
});

messageChannel.subscribe(msg -> {
  System.out.println("receive2: " + msg.getPayload());
});

messageChannel.send(MessageBuilder.withPayload("msg from alibaba").build());
messageChannel.send(MessageBuilder.withPayload("msg from alibaba").build());

發送兩個消息,都被全部的 MessageHandler 所消費。控制檯打印:

receive1: msg from alibaba
receive2: msg from alibaba
receive1: msg from alibaba
receive2: msg from alibaba

Spring Cloud Stream

SCS與各模塊之間的關係是:

  • SCS 在 Spring Integration 的基礎上進行了封裝,提出了 BinderBinding@EnableBinding@StreamListener 等概念;
  • SCS 與 Spring Boot Actuator 整合,提供了 /bindings/channels endpoint;
  • SCS 與 Spring Boot Externalized Configuration 整合,提供了 BindingPropertiesBinderProperties 等外部化配置類;
  • SCS 加強了消息發送失敗的和消費失敗狀況下的處理邏輯等功能。
  • SCS 是 Spring Integration 的增強,同時與 Spring Boot 體系進行了融合,也是 Spring Cloud Bus 的基礎。它屏蔽了底層消息中間件的實現細節,但願以統一的一套 API 來進行消息的發送/消費,底層消息中間件的實現細節由各消息中間件的 Binder 完成。

Binder 是提供與外部消息中間件集成的組件,爲構造 Binding提供了 2 個方法,分別是 bindConsumer 和 bindProducer ,它們分別用於構造生產者和消費者。目前官方的實現有 Rabbit Binder 和 Kafka Binder, Spring Cloud Alibaba 內部已經實現了 RocketMQ Binder

從圖中能夠看出,Binding 是鏈接應用程序跟消息中間件的橋樑,用於消息的消費和生產。咱們來看一個最簡單的使用 RocketMQ Binder 的例子,而後分析一下它的底層處理原理:

  • 啓動類及消息的發送:
@SpringBootApplication
@EnableBinding({ Source.class, Sink.class }) // 1
public class SendAndReceiveApplication {
  
    public static void main(String[] args) {
        SpringApplication.run(SendAndReceiveApplication.class, args);
    }
  
    @Bean // 2
    public CustomRunner customRunner() {
        return new CustomRunner();
    }

    public static class CustomRunner implements CommandLineRunner {

        @Autowired
        private Source source;

        @Override
        public void run(String... args) throws Exception {
            int count = 5;
            for (int index = 1; index <= count; index++) {
                source.output().send(MessageBuilder.withPayload("msg-" + index).build()); // 3
            }
        }
    }
}
  • 消息的接收:
@Service
public class StreamListenerReceiveService {

    @StreamListener(Sink.INPUT) // 4
    public void receiveByStreamListener1(String receiveMsg) {
        System.out.println("receiveByStreamListener: " + receiveMsg);
    }

}

這段代碼很簡單,沒有涉及到 RocketMQ 相關的代碼,消息的發送和接收都是基於 SCS 體系完成的。若是想切換成 RabbitMQ 或 kafka,只需修改配置文件便可,代碼無需修改。

咱們分析這段代碼的原理:

  1. @EnableBinding 對應的兩個接口屬性 Source 和 Sink 是 SCS 內部提供的。SCS 內部會基於 Source 和 Sink 構造 BindableProxyFactory,且對應的 output 和 input 方法返回的 MessageChannel 是 DirectChannel。output 和 input 方法修飾的註解對應的 value 是配置文件中 binding 的 name。
public interface Source {
    String OUTPUT = "output";
    @Output(Source.OUTPUT)
    MessageChannel output();
}
public interface Sink {
    String INPUT = "input";
    @Input(Sink.INPUT)
    SubscribableChannel input();
}

配置文件裏 bindings 的 name 爲 output 和 input,對應 Source 和 Sink 接口的方法上的註解裏的 value:

spring.cloud.stream.bindings.output.destination=test-topic
spring.cloud.stream.bindings.output.content-type=text/plain
spring.cloud.stream.rocketmq.bindings.output.producer.group=demo-group

spring.cloud.stream.bindings.input.destination=test-topic
spring.cloud.stream.bindings.input.content-type=text/plain
spring.cloud.stream.bindings.input.group=test-group1
  1. 構造 CommandLineRunner,程序啓動的時候會執行 CustomRunner 的 run 方法。
  2. 調用 Source 接口裏的 output 方法獲取 DirectChannel,併發送消息到這個消息通道中。這裏跟以前 Spring Integration 章節裏的代碼一致。
  • Source 裏的 output 發送消息到 DirectChannel 消息通道以後會被 AbstractMessageChannelBinder#SendingHandler 這個 MessageHandler 處理,而後它會委託給 AbstractMessageChannelBinder#createProducerMessageHandler 建立的 MessageHandler 處理(該方法由不一樣的消息中間件實現);
  • 不一樣的消息中間件對應的 AbstractMessageChannelBinder#createProducerMessageHandler 方法返回的 MessageHandler 內部會把 Spring Message 轉換成對應中間件的 Message 模型併發送到對應中間件的 broker;
  1. 使用 @StreamListener 進行消息的訂閱。請注意,註解裏的 Sink.input 對應的值是 "input",會根據配置文件裏 binding 對應的 name 爲 input 的值進行配置:
  • 不一樣的消息中間件對應的 AbstractMessageChannelBinder#createConsumerEndpoint 方法會使用 Consumer 訂閱消息,訂閱到消息後內部會把中間件對應的 Message 模型轉換成 Spring Message;
  • 消息轉換以後會把 Spring Message 發送至 name 爲 input 的消息通道中;
  • @StreamListener 對應的 StreamListenerMessageHandler 訂閱了 name 爲 input 的消息通道,進行了消息的消費;

這個過程文字描述有點囉嗦,用一張圖總結一下(黃色部分涉及到各消息中間件的 Binder 實現以及 MQ 基本的訂閱發佈功能):

SCS 章節的最後,咱們來看一段 SCS 關於消息的處理方式的一段代碼:

@StreamListener(value = Sink.INPUT, condition = "headers['index']=='1'")
public void receiveByHeader(Message msg) {
  System.out.println("receive by headers['index']=='1': " + msg);
}

@StreamListener(value = Sink.INPUT, condition = "headers['index']=='9999'")
public void receivePerson(@Payload Person person) {
  System.out.println("receive Person: " + person);
}

@StreamListener(value = Sink.INPUT)
public void receiveAllMsg(String msg) {
  System.out.println("receive allMsg by StreamListener. content: " + msg);
}

@StreamListener(value = Sink.INPUT)
public void receiveHeaderAndMsg(@Header("index") String index, Message msg) {
  System.out.println("receive by HeaderAndMsg by StreamListener. content: " + msg);
}

有沒有發現這段代碼跟 Spring MVC Controller 中接收請求的代碼很像? 實際上他們的架構都是相似的,Spring MVC 對於 Controller 中參數和返回值的處理類分別是 org.springframework.web.method.support.HandlerMethodArgumentResolver、 org.springframework.web.method.support.HandlerMethodReturnValueHandler

Spring Messaging 中對於參數和返回值的處理類以前也提到過,分別是 org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverorg.springframework.messaging.handler.invocation.HandlerMethodReturnValueHandler

它們的類名如出一轍,甚至內部的方法名也同樣。

總結

上圖是 SCS 體系相關類說明的總結,關於 SCS 以及 RocketMQ Binder 更多相關的示例,能夠參考 RocketMQ Binder Demos,包含了消息的聚合、分割、過濾;消息異常處理;消息標籤、sql過濾;同步、異步消費等等。

下一篇文章,咱們將分析消息總線(Spring Cloud Bus) 在 Spring Cloud 體系中的做用,並逐步展開,分析 Spring Cloud Alibaba 中的 RocketMQ Binder 是如何實現 Spring Cloud Stream 標準的。


原文連接 本文爲雲棲社區原創內容,未經容許不得轉載。

相關文章
相關標籤/搜索