在Spring Cloud Stream中的消息通訊方式遵循了發佈-訂閱模式,當一條消息被投遞到消息中間件以後,它會經過共享的Topic主題進行廣播,消息消費者在訂閱的主題中收到它並觸發自身的業務邏輯處理。這裏所提到的Topic主題是Spring Cloud Stream中的一個抽象概念,用來表明發佈共享消息給消費者的地方。在不一樣的消息中間件中,Topic可能對應着不一樣的概念,好比:在RabbitMQ中的它對應了Exchange、而在Kakfa中則對應了Kafka中的Topic。spring
經過RabbitMQ的 Channel 發佈消息給咱們編寫的應用程序消費,而實際上Spring Cloud Stream應用啓動的時候,在RabbitMQ的Exchange中也建立了一個名爲input的Exchange交換器,因爲Binder的隔離做用,應用程序並沒有法感知它的存在,應用程序只知道本身指向Binder的輸入或是輸出通道。爲了直觀的感覺發布-訂閱模式中,消息是如何被分發到多個訂閱者的,咱們可使用快速入門的例子,經過命令行的方式啓動兩個不一樣端口的進程。此時,咱們在RabbitMQ控制頁面的Channels標籤頁中看到以下圖所示的兩個消息通道,它們分別綁定了啓動的兩個應用程序。架構
咱們啓動的兩個應用程序分別是「訂閱者-1」和「訂閱者-2」,他們都創建了一條輸入通道綁定到同一個Topic(RabbitMQ的Exchange)上。當該Topic中有消息發佈進來後,鏈接到該Topic上的全部訂閱者能夠收到該消息並根據自身的需求進行消費操做。負載均衡
相對於點對點隊列實現的消息通訊來講,Spring Cloud Stream採用的發佈-訂閱模式能夠有效的下降消息生產者與消費者之間的耦合,當咱們須要對同一類消息增長一種處理方式時,只須要增長一個應用程序並將輸入通道綁定到既有的Topic中就能夠實現功能的擴展,而不須要改變原來已經實現的任何內容。微服務
雖然Spring Cloud Stream經過發佈-訂閱模式將消息生產者與消費者作了很好的解耦,基於相同主題的消費者能夠輕鬆的進行擴展,可是這些擴展都是針對不一樣的應用實例而言的,在現實的微服務架構中,咱們每個微服務應用爲了實現高可用和負載均衡,實際上都會部署多個實例。不少狀況下,消息生產者發送消息給某個具體微服務時,只但願被消費一次,按照上面咱們啓動兩個應用的例子,雖然它們同屬一個應用,可是這個消息出現了被重複消費兩次的狀況。爲了解決這個問題,在Spring Cloud Stream中提供了消費組的概念。spa
若是在同一個主題上的應用須要啓動多個實例的時候,咱們能夠經過spring.cloud.stream.bindings.input.group屬性爲應用指定一個組名,這樣這個應用的多個實例在接收到消息的時候,只會有一個成員真正的收到消息並進行處理。命令行
默認狀況下,當咱們沒有爲應用指定消費組的時候,Spring Cloud Stream會爲其分配一個獨立的匿名消費組。因此,若是同一主題下全部的應用都沒有指定消費組的時候,當有消息被髮布以後,全部的應用都會對其進行消費,由於它們各自都屬於一個獨立的組中。大部分狀況下,咱們在建立Spring Cloud Stream應用的時候,建議最好爲其指定一個消費組,以防止對消息的重複處理,除非該行爲須要這樣作(好比:刷新全部實例的配置等)。code
經過引入消費組的概念,咱們已經可以在多實例的狀況下,保障每一個消息只被組內一個實例進行消費。經過上面對消費組參數設置後的實驗,咱們能夠觀察到,消費組並沒有法控制消息具體被哪一個實例消費。也就是說,對於同一條消息,它屢次到達以後多是由不一樣的實例進行消費的。可是對於一些業務場景,就須要對於一些具備相同特徵的消息每次均可以被同一個消費實例處理,好比:一些用於監控服務,爲了統計某段時間內消息生產者發送的報告內容,監控服務須要在自身內容聚合這些數據,那麼消息生產者能夠爲消息增長一個固有的特徵ID來進行分區,使得擁有這些ID的消息每次都能被髮送到一個特定的實例上實現累計統計的效果,不然這些數據就會分散到各個不一樣的節點致使監控結果不一致的狀況。而分區概念的引入就是爲了解決這樣的問題:當生產者將消息數據發送給多個消費者實例時,保證擁有共同特徵的消息數據始終是由同一個消費者實例接收和處理。中間件
Spring Cloud Stream爲分區提供了通用的抽象實現,用來在消息中間件的上層實現分區處理,因此它對於消息中間件自身是否實現了消息分區並不關心,這使得Spring Cloud Stream爲不具有分區功能的消息中間件也增長了分區功能擴展。blog
以上出處:http://blog.didispace.com/spring-cloud-starter-dalston-7-2/rabbitmq
spring.cloud.stream.bindings.input.destination=raw-sensor-data
spring.cloud.stream.bindings.