Spring Cloud Stream 在 Spring Cloud 體系內用於構建高度可擴展的基於事件驅動的微服務,其目的是爲了簡化消息在 Spring Cloud 應用程序中的開發。Spring Cloud Stream (後面以 SCS 代替 Spring Cloud Stream) 自己內容不少,並且它還有不少外部的依賴,想要熟悉 SCS,必需要先了解 Spring Messaging 和 Spring Integration 這兩個項目,接下來文章將從如下幾點跟你們進行介紹:html
Spring Messaging 是 Spring Framework 中的一個模塊,其做用就是統一消息的編程模型。git
Messaging
對應的模型就包括一個消息體 Payload 和消息頭 Header:package org.springframework.messaging; public interface Message<T> { T getPayload(); MessageHeaders getHeaders(); }
MessageChannel
用於接收消息,調用 send
方法能夠將消息發送至該消息通道中 :@FunctionalInterface public interface MessageChannel { long INDEFINITE_TIMEOUT = -1; default boolean send(Message<?> message) { return send(message, INDEFINITE_TIMEOUT); } boolean send(Message<?> message, long timeout); }
消息通道里的消息如何被消費呢?github
SubscribableChannel
實現,被 MessageHandler
消息處理器所訂閱:public interface SubscribableChannel extends MessageChannel { boolean subscribe(MessageHandler handler); boolean unsubscribe(MessageHandler handler); }
MessageHandler
真正地消費/處理消息:@FunctionalInterface public interface MessageHandler { void handleMessage(Message<?> message) throws MessagingException; }
Spring Messaging 內部在消息模型的基礎上衍生出了其它的一些功能,如:web
HandlerMethodArgumentResolver
配合 @Header
, @Payload
等註解使用;消息接收後的返回值處理器 HandlerMethodReturnValueHandler
配合 @SendTo
註解使用;MessageConverter
;AbstractMessageSendingTemplate
;ChannelInterceptor
;Spring Integration 提供了 Spring 編程模型的擴展用來支持企業集成模式(Enterprise Integration Patterns),是對 Spring Messaging 的擴展。它提出了很多新的概念,包括消息的路由 MessageRoute
、消息的分發 MessageDispatcher
、消息的過濾 Filter
、消息的轉換 Transformer
、消息的聚合 Aggregator
、消息的分割 Splitter
等等。同時還提供了 MessageChannel
和MessageHandler
的實現,分別包括 DirectChannel
、ExecutorChannel
、PublishSubscribeChannel
和MessageFilter
、ServiceActivatingHandler
、MethodInvokingSplitter
等內容。spring
首先爲你們介紹幾種消息的處理方式:sql
接下來,咱們以一個最簡單的例子來嘗試一下 Spring Integration:編程
SubscribableChannel messageChannel = new DirectChannel(); // 1 messageChannel.subscribe(msg -> { // 2 System.out.println("receive: " + msg.getPayload()); }); messageChannel.send(MessageBuilder.withPayload("msg from alibaba").build()); // 3
messageChannel
;MessageHandler
去消費這個消息通道里的消息;MessageHandler
所消費,最後控制檯打印出: receive: msg from alibaba
;DirectChannel
內部有個 UnicastingDispatcher
類型的消息分發器,會分發到對應的消息通道 MessageChannel
中,從名字也能夠看出來,UnicastingDispatcher
是個單播的分發器,只能選擇一個消息通道。那麼如何選擇呢? 內部提供了 LoadBalancingStrategy
負載均衡策略,默認只有輪詢的實現,能夠進行擴展。架構
咱們對上段代碼作一點修改,使用多個 MessageHandler
去處理消息:併發
SubscribableChannel messageChannel = new DirectChannel(); messageChannel.subscribe(msg -> { System.out.println("receive1: " + msg.getPayload()); }); messageChannel.subscribe(msg -> { System.out.println("receive2: " + msg.getPayload()); }); messageChannel.send(MessageBuilder.withPayload("msg from alibaba").build()); messageChannel.send(MessageBuilder.withPayload("msg from alibaba").build());
因爲 DirectChannel
內部的消息分發器是 UnicastingDispatcher
單播的方式,而且採用輪詢的負載均衡策略,因此這裏兩次的消費分別對應這兩個 MessageHandler
。控制檯打印出:負載均衡
receive1: msg from alibaba receive2: msg from alibaba
既然存在單播的消息分發器 UnicastingDispatcher
,必然也會存在廣播的消息分發器,那就是 BroadcastingDispatcher
,它被 PublishSubscribeChannel
這個消息通道所使用。廣播消息分發器會把消息分發給全部的 MessageHandler
:
SubscribableChannel messageChannel = new PublishSubscribeChannel(); messageChannel.subscribe(msg -> { System.out.println("receive1: " + msg.getPayload()); }); messageChannel.subscribe(msg -> { System.out.println("receive2: " + msg.getPayload()); }); messageChannel.send(MessageBuilder.withPayload("msg from alibaba").build()); messageChannel.send(MessageBuilder.withPayload("msg from alibaba").build());
發送兩個消息,都被全部的 MessageHandler
所消費。控制檯打印:
receive1: msg from alibaba receive2: msg from alibaba receive1: msg from alibaba receive2: msg from alibaba
SCS與各模塊之間的關係是:
Binder
, Binding
, @EnableBinding
, @StreamListener
等概念;/bindings
, /channels
endpoint;BindingProperties
, BinderProperties
等外部化配置類;Binder
是提供與外部消息中間件集成的組件,爲構造 Binding
提供了 2 個方法,分別是 bindConsumer
和 bindProducer
,它們分別用於構造生產者和消費者。目前官方的實現有 Rabbit Binder 和 Kafka Binder, Spring Cloud Alibaba 內部已經實現了 RocketMQ Binder。
從圖中能夠看出,Binding
是鏈接應用程序跟消息中間件的橋樑,用於消息的消費和生產。咱們來看一個最簡單的使用 RocketMQ Binder 的例子,而後分析一下它的底層處理原理:
@SpringBootApplication @EnableBinding({ Source.class, Sink.class }) // 1 public class SendAndReceiveApplication { public static void main(String[] args) { SpringApplication.run(SendAndReceiveApplication.class, args); } @Bean // 2 public CustomRunner customRunner() { return new CustomRunner(); } public static class CustomRunner implements CommandLineRunner { @Autowired private Source source; @Override public void run(String... args) throws Exception { int count = 5; for (int index = 1; index <= count; index++) { source.output().send(MessageBuilder.withPayload("msg-" + index).build()); // 3 } } } }
@Service public class StreamListenerReceiveService { @StreamListener(Sink.INPUT) // 4 public void receiveByStreamListener1(String receiveMsg) { System.out.println("receiveByStreamListener: " + receiveMsg); } }
這段代碼很簡單,沒有涉及到 RocketMQ 相關的代碼,消息的發送和接收都是基於 SCS 體系完成的。若是想切換成 RabbitMQ 或 kafka,只需修改配置文件便可,代碼無需修改。
咱們分析這段代碼的原理:
@EnableBinding
對應的兩個接口屬性 Source
和 Sink
是 SCS 內部提供的。SCS 內部會基於 Source
和 Sink
構造 BindableProxyFactory
,且對應的 output 和 input 方法返回的 MessageChannel 是 DirectChannel
。output 和 input 方法修飾的註解對應的 value 是配置文件中 binding 的 name。public interface Source { String OUTPUT = "output"; @Output(Source.OUTPUT) MessageChannel output(); } public interface Sink { String INPUT = "input"; @Input(Sink.INPUT) SubscribableChannel input(); }
配置文件裏 bindings 的 name 爲 output 和 input,對應 Source
和 Sink
接口的方法上的註解裏的 value:
spring.cloud.stream.bindings.output.destination=test-topic spring.cloud.stream.bindings.output.content-type=text/plain spring.cloud.stream.rocketmq.bindings.output.producer.group=demo-group spring.cloud.stream.bindings.input.destination=test-topic spring.cloud.stream.bindings.input.content-type=text/plain spring.cloud.stream.bindings.input.group=test-group1
CommandLineRunner
,程序啓動的時候會執行 CustomRunner
的 run
方法。Source
接口裏的 output 方法獲取 DirectChannel
,併發送消息到這個消息通道中。這裏跟以前 Spring Integration 章節裏的代碼一致。DirectChannel
消息通道以後會被 AbstractMessageChannelBinder#SendingHandler
這個 MessageHandler
處理,而後它會委託給 AbstractMessageChannelBinder#createProducerMessageHandler
建立的 MessageHandler 處理(該方法由不一樣的消息中間件實現);AbstractMessageChannelBinder#createProducerMessageHandler
方法返回的 MessageHandler 內部會把 Spring Message 轉換成對應中間件的 Message 模型併發送到對應中間件的 broker;@StreamListener
進行消息的訂閱。請注意,註解裏的 Sink.input
對應的值是 "input",會根據配置文件裏 binding 對應的 name 爲 input 的值進行配置:AbstractMessageChannelBinder#createConsumerEndpoint
方法會使用 Consumer 訂閱消息,訂閱到消息後內部會把中間件對應的 Message 模型轉換成 Spring Message;@StreamListener
對應的 StreamListenerMessageHandler
訂閱了 name 爲 input 的消息通道,進行了消息的消費;這個過程文字描述有點囉嗦,用一張圖總結一下(黃色部分涉及到各消息中間件的 Binder 實現以及 MQ 基本的訂閱發佈功能):
SCS 章節的最後,咱們來看一段 SCS 關於消息的處理方式的一段代碼:
@StreamListener(value = Sink.INPUT, condition = "headers['index']=='1'") public void receiveByHeader(Message msg) { System.out.println("receive by headers['index']=='1': " + msg); } @StreamListener(value = Sink.INPUT, condition = "headers['index']=='9999'") public void receivePerson(@Payload Person person) { System.out.println("receive Person: " + person); } @StreamListener(value = Sink.INPUT) public void receiveAllMsg(String msg) { System.out.println("receive allMsg by StreamListener. content: " + msg); } @StreamListener(value = Sink.INPUT) public void receiveHeaderAndMsg(@Header("index") String index, Message msg) { System.out.println("receive by HeaderAndMsg by StreamListener. content: " + msg); }
有沒有發現這段代碼跟 Spring MVC Controller 中接收請求的代碼很像? 實際上他們的架構都是相似的,Spring MVC 對於 Controller 中參數和返回值的處理類分別是 org.springframework.web.method.support.HandlerMethodArgumentResolver
、 org.springframework.web.method.support.HandlerMethodReturnValueHandler
。
Spring Messaging 中對於參數和返回值的處理類以前也提到過,分別是 org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolver
、org.springframework.messaging.handler.invocation.HandlerMethodReturnValueHandler
。
它們的類名如出一轍,甚至內部的方法名也同樣。
上圖是 SCS 體系相關類說明的總結,關於 SCS 以及 RocketMQ Binder 更多相關的示例,能夠參考 RocketMQ Binder Demos,包含了消息的聚合、分割、過濾;消息異常處理;消息標籤、sql過濾;同步、異步消費等等。
下一篇文章,咱們將分析消息總線(Spring Cloud Bus) 在 Spring Cloud 體系中的做用,並逐步展開,分析 Spring Cloud Alibaba 中的 RocketMQ Binder 是如何實現 Spring Cloud Stream 標準的。
原文連接 本文爲雲棲社區原創內容,未經容許不得轉載。