下圖是官方文檔中對於Spring Cloud Stream應用模型的結構圖。從中咱們能夠看到,Spring Cloud Stream構建的應用程序與消息中間件之間是經過綁定器Binder
相關聯的,綁定器對於應用程序而言起到了隔離做用,它使得不一樣消息中間件的實現細節對應用程序來講是透明的。因此對於每個Spring Cloud Stream的應用程序來講,它不須要知曉消息中間件的通訊細節,它只須要知道Binder
對應用程序提供的概念去實現便可,而這個概念就是在快速入門中咱們提到的消息通道:Channel
。以下圖案例,在應用程序和Binder之間定義了兩條輸入通道和三條輸出通道來傳遞消息,而綁定器則是做爲這些通道和消息中間件之間的橋樑進行通訊。html
Binder
綁定器是Spring Cloud Stream中一個很是重要的概念。在沒有綁定器這個概念的狀況下,咱們的Spring Boot應用要直接與消息中間件進行信息交互的時候,因爲各消息中間件構建的初衷不一樣,它們的實現細節上會有較大的差別性,這使得咱們實現的消息交互邏輯就會很是笨重,由於對具體的中間件實現細節有過重的依賴,當中間件有較大的變更升級、或是更換中間件的時候,咱們就須要付出很是大的代價來實施。spring
經過定義綁定器做爲中間層,完美地實現了應用程序與消息中間件細節之間的隔離。經過嚮應用程序暴露統一的Channel
通道,使得應用程序不須要再考慮各類不一樣的消息中間件實現。當咱們須要升級消息中間件,或是更換其餘消息中間件產品時,咱們要作的就是更換它們對應的Binder
綁定器而不須要修改任何Spring Boot的應用邏輯。這一點在上一章實現消息總線時,從RabbitMQ切換到Kafka的過程當中,已經可以讓咱們體驗到這一好處。架構
目前版本的Spring Cloud Stream爲主流的消息中間件產品RabbitMQ和Kafka提供了默認的Binder
實現,在快速入門的例子中,咱們就使用了RabbitMQ的Binder
。另外,Spring Cloud Stream還實現了一個專門用於測試的TestSupportBinder
,開發者能夠直接使用它來對通道的接收內容進行可靠的測試斷言。若是要使用除了RabbitMQ和Kafka之外的消息中間件的話,咱們也能夠經過使用它所提供的擴展API來實現其餘中間件的Binder
。app
仔細的讀者可能已經發現,咱們在快速入門示例中,並無使用application.properties
或是application.yml
來作任何屬性設置。那是由於它也秉承了Spring Boot的設計理念,提供了對RabbitMQ默認的自動化配置。固然,咱們也能夠經過Spring Boot應用支持的任何方式來修改這些配置,好比:經過應用程序參數、環境變量、application.properties
或是application.yml
配置文件等。好比,下面就是經過配置文件來對RabbitMQ的鏈接信息以及input通道的主題進行配置的示例:負載均衡
spring.cloud.stream.bindings.input.destination=raw-sensor-data |
在Spring Cloud Stream中的消息通訊方式遵循了發佈-訂閱模式,當一條消息被投遞到消息中間件以後,它會經過共享的Topic
主題進行廣播,消息消費者在訂閱的主題中收到它並觸發自身的業務邏輯處理。這裏所提到的Topic
主題是Spring Cloud Stream中的一個抽象概念,用來表明發佈共享消息給消費者的地方。在不一樣的消息中間件中,Topic
可能對應着不一樣的概念,好比:在RabbitMQ中的它對應了Exchange、而在Kakfa中則對應了Kafka中的Topic。微服務
在快速入門的示例中,咱們經過RabbitMQ的Channel
進行發佈消息給咱們編寫的應用程序消費,而實際上Spring Cloud Stream應用啓動的時候,在RabbitMQ的Exchange中也建立了一個名爲input
的Exchange交換器,因爲Binder
的隔離做用,應用程序並沒有法感知它的存在,應用程序只知道本身指向Binder
的輸入或是輸出通道。爲了直觀的感覺發布-訂閱模式中,消息是如何被分發到多個訂閱者的,咱們可使用快速入門的例子,經過命令行的方式啓動兩個不一樣端口的進程。此時,咱們在RabbitMQ控制頁面的Channels標籤頁中看到以下圖所示的兩個消息通道,它們分別綁定了啓動的兩個應用程序。測試
而在Exchanges標籤頁中,咱們還能找到名爲input
的交換器,點擊進入能夠看到以下圖所示的詳情頁面,其中在Bindings中的內容就是兩個應用程序綁定通道中的消息隊列,咱們能夠經過Exchange頁面的Publish Message來發布消息,此時能夠發現兩個啓動的應用程序都輸出了消息內容。spa
下圖總結了咱們上面所作嘗試的基礎結構,咱們啓動的兩個應用程序分別是「訂閱者-1」和「訂閱者-2」,他們都創建了一條輸入通道綁定到同一個Topic
(RabbitMQ的Exchange)上。當該Topic
中有消息發佈進來後,鏈接到該Topic
上的全部訂閱者能夠收到該消息並根據自身的需求進行消費操做。命令行
相對於點對點隊列實現的消息通訊來講,Spring Cloud Stream採用的發佈-訂閱模式能夠有效的下降消息生產者與消費者之間的耦合,當咱們須要對同一類消息增長一種處理方式時,只須要增長一個應用程序並將輸入通道綁定到既有的Topic
中就能夠實現功能的擴展,而不須要改變原來已經實現的任何內容。設計
雖然Spring Cloud Stream經過發佈-訂閱模式將消息生產者與消費者作了很好的解耦,基於相同主題的消費者能夠輕鬆的進行擴展,可是這些擴展都是針對不一樣的應用實例而言的,在現實的微服務架構中,咱們每個微服務應用爲了實現高可用和負載均衡,實際上都會部署多個實例。不少狀況下,消息生產者發送消息給某個具體微服務時,只但願被消費一次,按照上面咱們啓動兩個應用的例子,雖然它們同屬一個應用,可是這個消息出現了被重複消費兩次的狀況。爲了解決這個問題,在Spring Cloud Stream中提供了消費組的概念。
若是在同一個主題上的應用須要啓動多個實例的時候,咱們能夠經過spring.cloud.stream.bindings.input.group
屬性爲應用指定一個組名,這樣這個應用的多個實例在接收到消息的時候,只會有一個成員真正的收到消息並進行處理。以下圖所示,咱們爲Service-A和Service-B分別啓動了兩個實例,而且根據服務名進行了分組,這樣當消息進入主題以後,Group-A和Group-B都會收到消息的副本,可是在兩個組中都只會有一個實例對其進行消費。
默認狀況下,當咱們沒有爲應用指定消費組的時候,Spring Cloud Stream會爲其分配一個獨立的匿名消費組。因此,若是同一主題下全部的應用都沒有指定消費組的時候,當有消息被髮布以後,全部的應用都會對其進行消費,由於它們各自都屬於一個獨立的組中。大部分狀況下,咱們在建立Spring Cloud Stream應用的時候,建議最好爲其指定一個消費組,以防止對消息的重複處理,除非該行爲須要這樣作(好比:刷新全部實例的配置等)。
經過引入消費組的概念,咱們已經可以在多實例的狀況下,保障每一個消息只被組內一個實例進行消費。經過上面對消費組參數設置後的實驗,咱們能夠觀察到,消費組並沒有法控制消息具體被哪一個實例消費。也就是說,對於同一條消息,它屢次到達以後多是由不一樣的實例進行消費的。可是對於一些業務場景,就須要對於一些具備相同特徵的消息每次均可以被同一個消費實例處理,好比:一些用於監控服務,爲了統計某段時間內消息生產者發送的報告內容,監控服務須要在自身內容聚合這些數據,那麼消息生產者能夠爲消息增長一個固有的特徵ID來進行分區,使得擁有這些ID的消息每次都能被髮送到一個特定的實例上實現累計統計的效果,不然這些數據就會分散到各個不一樣的節點致使監控結果不一致的狀況。而分區概念的引入就是爲了解決這樣的問題:當生產者將消息數據發送給多個消費者實例時,保證擁有共同特徵的消息數據始終是由同一個消費者實例接收和處理。
Spring Cloud Stream爲分區提供了通用的抽象實現,用來在消息中間件的上層實現分區處理,因此它對於消息中間件自身是否實現了消息分區並不關心,這使得Spring Cloud Stream爲不具有分區功能的消息中間件也增長了分區功能擴展。源碼來源