Spring Cloud Stream 核心概念

Spring Cloud Stream簡介

Spring cloud stream是一個構建與Spring Boot和Spring Integration之上的框架,方便開發人員快速構建基於Message-Driven的系統。html

Spring Integration & Enterprise Integration Patterns簡介

Enterprise Integration Patterns 是由Gregor Hohpe和Bobby Woolf在 Enterprise Integration Patterns 一書中總結的企業應用開發實踐中使用到的各系統間數據交換的方式。java

Spring Integration是Spring框架對Enterprise Integration Patterns的實現和適配。Spring Integration在基於Spring的應用程序中實現輕量級消息傳遞,並支持經過聲明適配器與外部系統集成。 與Spring對遠程處理,消息傳遞和調度的支持相比,這些適配器提供了更高級別的抽象。 Spring Integration的主要目標是提供一個簡單的模型來構建企業集成解決方案,同時保持關注點的分離,這對於生成可維護的可測試代碼相當重要。spring

常見的企業集成數據傳遞模式有如下幾種:數據庫

  • 文件傳輸:系統A採用FTP輪詢等方式獲取系統B生成的文件等。
  • 共享數據庫:系統A和系統B共用一個數據庫表,共用實體類。
  • RPC調用:系統A和B暴露互相之間能調用的服務,例如SOAP、REST。
  • 消息傳遞:系統A和系統B經過消息中間價交換數據。

Spring Cloud Stream 優勢

  1. 和MQ中間件解耦:相較一樣是針對MQ中間價集成的Spring Message項目,提供了更高層的面向不一樣MQ中間件代理(RabbitMQ、Kafka等)的Binder抽象,爲開發人員提供了統一的編程模型。例如RabbitMQ原生並不支持partition特性,若是想要從Kafaka遷移到RabbitMQ,就須要修改一堆代碼,可是若是是Spring Cloud Stream則有可能只須要修改幾個配置便可。
  2. 錯誤重試:集成Spring Retry提供了錯誤自動重試功能。
  3. Error Handler:提供application和system兩層的異常處理機制。

Spring Cloud Stream核心概念

spring-cloud-stream-application

Spring Cloud Stream官網的核心架構圖編程

Binder 層負責和MQ中間件的通訊,應用程序 Application Core 經過 inputs 接收 Binder 包裝後的 Message,至關因而消費者Consumer;經過 outputs 投遞 Message給 Binder,而後由 Binder 轉換後投遞給MQ中間件,至關因而生產者Producer。api

Channel

Channel描述的是消息從應用程序和Binder之間的流通的通道,也就是Application Model中的inputoutput架構

Binder

Binder是Spring Cloud Stream中一個很是重要的概念,它是應用程序和消息中間件的中間層,完美屏蔽了不一樣消息中間件的實現差別,能夠簡單的類比爲Adapterapp

Spring Cloud Stream官方提供了spring-cloud-stream-binder-kafkaspring-cloud-stream-binder-rabbit兩款主流消息中間件的Binder實現。而且還提供了專門用於測試的TestSupportBinder,開發者能夠直接使用它來對通道的接收內容進行斷言測試。框架

固然,Spring Cloud Stream也容許開發者經過它的SPI來實現其餘MQ的Binder。目前已有多款MQ產品提供了第三方Binder實現,參考官方文檔Binder Implementions。如要實現本身的Binder能夠參考官方文檔Binder SPI微服務

Bindings

Binding是用於描述MQ中間件到應用程序的橋樑模型,便是對於Binder加上inputsoutputs各個channel的綁定關係的描述。例如:RabbitMQ-Binder + channel-input1

Spring Cloud Stream經過spring.cloud.stream.bindings.<channelName>來肯定綁定關係。

Spring Cloud Stream已經包含了如下幾個Bindings接口:

  • Source-定義了應用程序做爲生產者將消息投遞到一個名爲outputchannel中去。
public interface Source {

        /**
         * Name of the output channel.
         */
        String OUTPUT = "output";
    
        /**
         * @return output channel
         */
        @Output(Source.OUTPUT)
        MessageChannel output();
    }
  • Sink-定義了應用程序做爲消費者消費名爲inputchannel中的消息。
public interface Sink {
    
        /**
         * Input channel name.
         */
        String INPUT = "input";
    
        /**
         * @return input channel.
         */
        @Input(Sink.INPUT)
        SubscribableChannel input();
    
    }
  • Processor-定義了應用程序同時做爲生產者和消費者,生產消息到名爲output的通道,消費來自名爲input通道的消息。
public interface Processor extends Source, Sink {

    }

固然,這幾個預約義的接口必然沒法知足複雜的業務邏輯,所以Spring Cloud Stream也支持開發人員自定義Bindings接口。

Pub-sub

spring cloud stream支持的是共享topicspublish-subscribe模型,並無採用point-to-pointqueues模型,由於pub-sub模型在微服務中更具備普適性。並且pub-sub模型也能經過只有一個消費者來變相支持p2p模型。

kafka是最典型的pub-sub主流MQ中間件,spring cloud stream在術語和特性支持上基本和kafka相似。

Consumer group

在普通的pub-sub關係中,多個consumer在訂閱了同一個topic時,這些consumer之間是競爭關係,即topic中的一條消息只會被其中一個consumer消費。但若是這些consumer不屬於同一個服務怎麼辦,例以下單topic的下游會有庫存服務、帳戶服務等多個服務的消費者同時存在,這些不一樣服務的消費者都須要獲取到下單topic中的消息,不然就沒法觸發相應的操做,難道須要給不一樣服務排個隊依次傳遞消息,那就變成了同步操做了。

在kafka中經過Consumer Group消費者分組來處理上述問題。一個topic中的每一條消息都會採起多副本的方式分發給全部訂閱的Consumer Group,每一個Consumer Group中的Consumer之間則競爭消費。即庫存服務和帳戶服務的消費組屬於不一樣的Consumer Group,兩個服務都會獲得下單topic的消息,可是同一個服務只會有一個Consumer實例會實際消費。

Spring Clous Stream也支持了kafka的這一特性,每一個Consumer能夠經過spring.cloud.stream.bindings.<channelName>.group屬性設置本身所屬的Consumer Group

默認狀況下,若是咱們沒有爲Consumer指定消費組的話,Spring Cloud Stream會爲其分配一個獨立的匿名消費組。因此若是某topic下的全部consumers都未指定消費組時,當有消息發佈後,全部的consumers都會對其進行消費,由於它們各自屬於獨立的組。所以,咱們建議在使用Spring Cloud Stream時最好都指定Consumer Group,以防止對消息的重複消費,除非該行爲是必要的(例如刷新全部consumer的配置等)。

Polled Consumer

spring cloud stream 2.0以後開始支持定時拉取的消費模式,開發人員能夠指定拉取頻率以及最大拉取消息數量來控制消費速率。

Partition

經過Consumer Group咱們已經能保障每條消息只會被組內的某個實例消費一次,可是咱們沒法控制消息會被哪個實例消費。即多條消息到達後,它們多是分別由不一樣的consumer實例消費。

可是對於一些業務場景,就須要針對某些具備相同特徵的消息每次均可以被同一個消費者實例消費,例如某些監控計數服務,須要針對相同uid的行爲在內存中計數。所以,MQ中間件引入了消息分區的概念,消息根據特徵寫入到不一樣的partition,不一樣的消費者實例指定消費不一樣分區的消息,因而保證相同特徵的消息會被同一個消費者實例消費。

Spring Cloud Stream針對patition提供了一個通用的抽象,用來在消息中間件的上層實現分區處理,因此它對於消息中間件自身是否實現了消息分區並不關心,這使得Spring Cloud Stream爲不具有分區功能的消息中間件也增長了分區功能擴展(例如RabbitMQ)。

相關文章
相關標籤/搜索