Spring cloud stream是一個構建與Spring Boot和Spring Integration之上的框架,方便開發人員快速構建基於Message-Driven的系統。html
Enterprise Integration Patterns 是由Gregor Hohpe和Bobby Woolf在 Enterprise Integration Patterns 一書中總結的企業應用開發實踐中使用到的各系統間數據交換的方式。java
Spring Integration是Spring框架對Enterprise Integration Patterns的實現和適配。Spring Integration在基於Spring的應用程序中實現輕量級消息傳遞,並支持經過聲明適配器與外部系統集成。 與Spring對遠程處理,消息傳遞和調度的支持相比,這些適配器提供了更高級別的抽象。 Spring Integration的主要目標是提供一個簡單的模型來構建企業集成解決方案,同時保持關注點的分離,這對於生成可維護的可測試代碼相當重要。spring
常見的企業集成數據傳遞模式有如下幾種:數據庫
Spring Cloud Stream官網的核心架構圖編程
Binder 層負責和MQ中間件的通訊,應用程序 Application Core 經過 inputs 接收 Binder 包裝後的 Message,至關因而消費者Consumer;經過 outputs 投遞 Message給 Binder,而後由 Binder 轉換後投遞給MQ中間件,至關因而生產者Producer。api
Channel
描述的是消息從應用程序和Binder
之間的流通的通道,也就是Application Model
中的input
和output
。架構
Binder
是Spring Cloud Stream中一個很是重要的概念,它是應用程序和消息中間件的中間層,完美屏蔽了不一樣消息中間件的實現差別,能夠簡單的類比爲Adapter
。app
Spring Cloud Stream官方提供了spring-cloud-stream-binder-kafka
和spring-cloud-stream-binder-rabbit
兩款主流消息中間件的Binder
實現。而且還提供了專門用於測試的TestSupportBinder
,開發者能夠直接使用它來對通道的接收內容進行斷言測試。框架
固然,Spring Cloud Stream也容許開發者經過它的SPI來實現其餘MQ的Binder
。目前已有多款MQ產品提供了第三方Binder
實現,參考官方文檔Binder Implementions。如要實現本身的Binder
能夠參考官方文檔Binder SPI。微服務
Binding
是用於描述MQ中間件到應用程序的橋樑模型,便是對於Binder
加上inputs
和outputs
各個channel
的綁定關係的描述。例如:RabbitMQ-Binder
+ channel-input1
。
Spring Cloud Stream經過spring.cloud.stream.bindings.<channelName>
來肯定綁定關係。
Spring Cloud Stream已經包含了如下幾個Bindings
接口:
Source
-定義了應用程序做爲生產者將消息投遞到一個名爲output
的channel
中去。public interface Source { /** * Name of the output channel. */ String OUTPUT = "output"; /** * @return output channel */ @Output(Source.OUTPUT) MessageChannel output(); }
Sink
-定義了應用程序做爲消費者消費名爲input
的channel
中的消息。public interface Sink { /** * Input channel name. */ String INPUT = "input"; /** * @return input channel. */ @Input(Sink.INPUT) SubscribableChannel input(); }
Processor
-定義了應用程序同時做爲生產者和消費者,生產消息到名爲output
的通道,消費來自名爲input
通道的消息。public interface Processor extends Source, Sink { }
固然,這幾個預約義的接口必然沒法知足複雜的業務邏輯,所以Spring Cloud Stream也支持開發人員自定義Bindings
接口。
spring cloud stream支持的是共享topics
的publish-subscribe
模型,並無採用point-to-point
的queues
模型,由於pub-sub模型在微服務中更具備普適性。並且pub-sub模型也能經過只有一個消費者來變相支持p2p模型。
kafka是最典型的pub-sub主流MQ中間件,spring cloud stream在術語和特性支持上基本和kafka相似。
在普通的pub-sub關係中,多個consumer
在訂閱了同一個topic
時,這些consumer
之間是競爭關係,即topic
中的一條消息只會被其中一個consumer
消費。但若是這些consumer
不屬於同一個服務怎麼辦,例以下單topic
的下游會有庫存服務、帳戶服務等多個服務的消費者同時存在,這些不一樣服務的消費者都須要獲取到下單topic
中的消息,不然就沒法觸發相應的操做,難道須要給不一樣服務排個隊依次傳遞消息,那就變成了同步操做了。
在kafka中經過Consumer Group
消費者分組來處理上述問題。一個topic
中的每一條消息都會採起多副本的方式分發給全部訂閱的Consumer Group
,每一個Consumer Group
中的Consumer
之間則競爭消費。即庫存服務和帳戶服務的消費組屬於不一樣的Consumer Group
,兩個服務都會獲得下單topic
的消息,可是同一個服務只會有一個Consumer
實例會實際消費。
Spring Clous Stream也支持了kafka的這一特性,每一個Consumer
能夠經過spring.cloud.stream.bindings.<channelName>.group
屬性設置本身所屬的Consumer Group
。
默認狀況下,若是咱們沒有爲Consumer
指定消費組的話,Spring Cloud Stream會爲其分配一個獨立的匿名消費組。因此若是某topic
下的全部consumers
都未指定消費組時,當有消息發佈後,全部的consumers
都會對其進行消費,由於它們各自屬於獨立的組。所以,咱們建議在使用Spring Cloud Stream時最好都指定Consumer Group
,以防止對消息的重複消費,除非該行爲是必要的(例如刷新全部consumer
的配置等)。
spring cloud stream 2.0以後開始支持定時拉取的消費模式,開發人員能夠指定拉取頻率以及最大拉取消息數量來控制消費速率。
經過Consumer Group
咱們已經能保障每條消息只會被組內的某個實例消費一次,可是咱們沒法控制消息會被哪個實例消費。即多條消息到達後,它們多是分別由不一樣的consumer
實例消費。
可是對於一些業務場景,就須要針對某些具備相同特徵的消息每次均可以被同一個消費者實例消費,例如某些監控計數服務,須要針對相同uid的行爲在內存中計數。所以,MQ中間件引入了消息分區的概念,消息根據特徵寫入到不一樣的partition,不一樣的消費者實例指定消費不一樣分區的消息,因而保證相同特徵的消息會被同一個消費者實例消費。
Spring Cloud Stream針對patition提供了一個通用的抽象,用來在消息中間件的上層實現分區處理,因此它對於消息中間件自身是否實現了消息分區並不關心,這使得Spring Cloud Stream爲不具有分區功能的消息中間件也增長了分區功能擴展(例如RabbitMQ)。