本文內容翻譯自官方文檔,spring-cloud-stream docs,對 Spring Cloud Stream的應用入門介紹。spring
官方定義 Spring Cloud Stream 是一個構建消息驅動微服務的框架。編程
Spring Cloud Stream構建在SpringBoot之上,提供了Kafka,RabbitMQ等消息中間件的個性化配置,引入了發佈訂閱、消費組和分區的語義概念,有效的簡化了上層研發人員對MQ使用的複雜度,讓開發人員更多的精力投入到核心業務的處理。架構
在實際開發過程當中,服務與服務之間通訊常常會使用到消息中間件,而以往使用了哪一個中間件好比RabbitMQ,那麼該中間件和系統的耦合性就會很是高,若是咱們要替換爲Kafka那麼變更會比較大,使用Spring Cloud Stream來整合咱們的消息中間件,能夠下降系統和中間件的耦合性。框架
Stream解決了開發人員無感知的使用消息中間件的問題,由於Stream對消息中間件的進一步封裝,能夠作到代碼層面對中間件的無感知。微服務
Spring Cloud Stream進行了配置隔離,只須要調整配置,開發中能夠動態的切換中間件(如rabbitmq切換爲kafka),使得微服務開發的高度解耦,服務能夠關注更多本身的業務流程。性能
關注公衆號:架構進化論,得到第一手的技術資訊和原創文章翻譯
Spring Cloud Stream 爲各大消息中間件產品提供了個性化的自動化配置實現,引用了發佈-訂閱、消費組、分區的三個核心概念。設計
Spring Cloud Stream提供了不少抽象和基礎組件來簡化消息驅動型微服務應用。包含如下內容:代理
Spring Cloud Stream由一箇中立的中間件內核組成。Spring Cloud Stream會注入輸入和輸出的channels,應用程序經過這些channels與外界通訊,而channels則是經過一個明確的中間件Binder與外部brokers鏈接。code
Spring Cloud Stream提供對Kafka,Rabbit MQ,Redis,和Gemfire的Binder實現。Spring Cloud Stream還包括了一個TestSupportBinder,TestSupportBinder預留一個未更改的channel以便於直接地、可靠地和channels通訊。
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream-binder-kafka</artifactId> </dependency>
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency>
Spring Cloud Stream支持在一個應用程序的多個實例之間數據分區,在分區的狀況下,物理通訊介質(例如,topic代理)被視爲多分區結構。一個或多個生產者應用程序實例將數據發送給多個消費應用實例,並保證共同的特性的數據由相同的消費者實例處理。
Spring Cloud Stream提供了一個通用的抽象,用於統一方式進行分區處理,所以分區能夠用於自帶分區的代理(如kafka)或者不帶分區的代理(如rabbiemq)
分區在有狀態處理中是一個很重要的概念,其重要性體如今性能和一致性上,要確保全部相關數據被一併處理,例如,在時間窗平均計算的例子中,給定傳感器測量結果應該都由同一應用實例進行計算。
Spring Cloud Stream提供了一些預約義的註解,用於綁定輸入和輸出channels,以及如何監聽channels。
將@EnableBinding註解添加到應用的配置類,就能夠把一個spring應用轉換成Spring Cloud Stream應用,@EnableBinding註解自己就包含@Configuration註解,會觸發Spring Cloud Stream 基本配置。
@Import(...) @Configuration @EnableIntegration public @interface EnableBinding { ... Class<?>[] value() default {}; }
一個Spring Cloud Stream應用能夠有任意數目的input和output通道,後者經過@Input和@Output註解在接口中定義。
定義在方法中,被修飾的方法註冊爲消息中間件上數據流的事件監聽器,註解中屬性值對應了監聽的消息通道名。
Spring Cloud Stream提供了三個開箱即用的預約義接口。
Source用於有單個輸出(outbound)通道的應用。
public interface Source { String OUTPUT = "output"; @Output(Source.OUTPUT) MessageChannel output(); }
Sink用於有單個輸入(inbound)通道的應用。
public interface Sink { String INPUT = "input"; @Input(Sink.INPUT) SubscribableChannel input(); }
Processor用於單個應用同時包含輸入和輸出通道的狀況。
public interface Processor extends Source, Sink { }
下面是一個很是簡單的 SpringBootApplication應用,經過依賴Spring Cloud Stream,從Input通道監聽消息而後返回應答到Output通道,只要添加配置文件就能夠應用。
@SpringBootApplication @EnableBinding(Processor.class) public class MyLoggerServiceApplication { public static void main(String[] args) { SpringApplication.run(MyLoggerServiceApplication.class, args); } @StreamListener(Processor.INPUT) @SendTo(Processor.OUTPUT) public LogMessage enrichLogMessage(LogMessage log) { return new LogMessage(String.format("[1]: %s", log.getMessage())); } }
下面解釋下這個示例中相關注解的應用:
@EnableBinding聲明瞭這個應用程序綁定了2個通道:INPUT和OUTPUT。這2個通道是在接口Processor中定義的(Spring Cloud Stream默認設置)。全部通道都是配置在一個具體的消息中間件或綁定器中。
@StreamListener(Processor.INPUT)代表這裏在input中提取消息,而且處理。
@SendTo(Processor.OUTPUT)代表在output中返回消息。
這篇文章根據Spring Cloud Stream的官方文檔,對Stream作了一個總體的介紹,包括設計目標,應用場景,業務模型以及對外開放的註解,後面我會經過一個實例,演示 Spring Cloud Stream 的應用。