【進階技術】一篇文章搞掂:Spring Cloud Stream

本文總結自官方文檔http://cloud.spring.io/spring-cloud-static/spring-cloud-stream/2.1.0.RC3/single/spring-cloud-stream.htmlhtml

1、Spring的數據集成簡史

2、一個最簡單的實例

3、2.0版本的新特性

4、Spring Cloud Stream介紹

Spring Cloud Steam內容簡介:java

  • 一個框架,用於構建消息驅動的微服務應用程序;
  • 構建在SpringBoot之上;
  • 使用Spring Integration提供與消息代理的鏈接;
  • 提供了幾個不一樣供應商的中間件的opinionated配置;
  • 引入了持久發佈-訂閱語義、使用者組和分區的概念。

接收消息:node

使用@EnableBinding註釋,能夠鏈接到消息代理服務器。react

使用@StreamListener添加到方法中,能夠接收用於流處理的事件。web

下面的示例顯示接收外部消息的接收器應用程序:spring

@SpringBootApplication
@EnableBinding(Sink.class)
public class VoteRecordingSinkApplication {

  public static void main(String[] args) {
    SpringApplication.run(VoteRecordingSinkApplication.class, args);
  }

  @StreamListener(Sink.INPUT)
  public void processVote(Vote vote) {
      votingService.recordVote(vote);
  }
}

通道接口:express

@EnableBinding註釋將一個或多個接口做爲參數(在本例中,參數是單個Sink接口)。編程

接口聲明輸入和輸出通道。Spring Cloud Stream提供了Source、Sink和Processor接口。json

您還能夠定義本身的接口。數組

下面的清單顯示了Sink接口的定義:

public interface Sink {
  String INPUT = "input";

  @Input(Sink.INPUT)
  SubscribableChannel input();
}

@Input註釋標識了一個輸入通道,用於接收消息,使消息能進入應用程序。

@Output註釋標識了一個輸出通道,經過它發佈的消息能夠離開應用程序。

@Input和@Output註釋能夠將通道名稱做爲參數。若是沒有提供名稱,則使用註釋方法的方法名稱。

Spring Cloud Stream爲您建立了接口的實現。您能夠經過自動裝配來使用它,以下面的示例所示(來自測試用例):

@RunWith(SpringJUnit4ClassRunner.class)
@SpringApplicationConfiguration(classes = VoteRecordingSinkApplication.class)
@WebAppConfiguration
@DirtiesContext
public class StreamApplicationTests {

  @Autowired
  private Sink sink;

  @Test
  public void contextLoads() {
    assertNotNull(this.sink.input());
  }
}

五、主要概念

Spring Cloud Stream經過一些術語和抽象來簡化了消息驅動程序的編寫。

5.一、程序模型

Spring Cloud Stream的核心與中間件實現無關。

Stream應用經過輸入輸出通道(channel)來與外界交互。

通道(channel)經過與外部中間件對應的綁定器(Binder)具體實現,來與外部的中間件產品進行通訊。

5.二、綁定器(Binder)抽象

Spring Cloud Stream提供了kafka、RabbitMQ對應的Binder實現、也包含一個TestSupportBinder用於測試,也能夠編寫本身的Binder

Spring Cloud Stream使用Spring Boot配置以及Binder抽象,使之能夠靈活地配置如何鏈接到一個消息中間件。例如,能夠在部署時設置鏈接到哪一種類型,或者哪一個消息中間件。

這些配置能夠經過外部設置或者任何Spring Boot支持的配置方式(如程序參數、環境變量、yml文件、properties文件)進行

Spring Cloud Stream會根據classpath下的依賴自動選擇binder,你也能夠包含多個binder,在運行時決定使用哪一個,甚至根據不一樣的通道使用不一樣的binder

5.三、發佈訂閱的持久化支持

應用程序之間的通訊遵循發佈-訂閱模型,其中數據經過共享主題廣播。在下圖中能夠看到這一點,它顯示了一組交互的Spring Cloud Stream應用程序的典型部署。

SCSt sensors

Spring Cloud Stream發佈訂閱模式

傳感器的數據報告到一個HTTP端點,而後被送到一個共同的destination(目的地):raw-sensor-data。

從這個destination(目的地)開始,有2個微服務訂閱了raw-sensor-data這個主題,一個負責計算窗口平均值,一個將原始數據導入HDFS(Hadoop Distributed File System)。

發佈-訂閱通訊模型下降了生產者和使用者的複雜性,並容許在不中斷現有流的狀況下將新的應用程序添加到拓撲中。例如,在計算平均值的應用程序的下游,您能夠添加計算顯示和監視的最高溫度值的應用程序。而後,您能夠添加另外一個應用程序來解釋用於故障檢測的相同平均值流。經過共享主題(而不是點對點隊列)進行全部通訊能夠減小微服務之間的耦合。

雖然發佈-訂閱消息傳遞的概念並不新鮮,Spring Cloud Stream採起了額外的步驟,使其成爲其應用程序模型的一個opinionated choice。經過使用本地中間件支持,Spring Cloud Stream還簡化了跨不一樣平臺的發佈-訂閱模型的使用。

5.四、消費者組

爲了提高程序的處理能力,咱們部署時會建立多個實例的應用程序;而此時,不一樣實例對於消息是互相競爭的關係,只須要有其中一個實例來對消息進行消費便可。

Spring Cloud Stream經過消費者組的概念對這種行爲進行建模。(Spring Cloud Stream消費者組與卡夫卡消費者組類似並受到其啓發。)

每一個消費者binding均可以使用spring.cloud.stream.bindings.<channelName>.group屬性指定組名。

對於以下圖所示的使用者,此屬性將設置爲spring.cloud.stream.bindings.<channelName>.group=hdfsWrite或spring.cloud.stream.bindings.<channelName>.group=average.SCSt groups

訂閱給定目標的全部組都會收到已發佈數據的副本,但每一個組中只有一個成員從該目的地接收到給定的消息。

默認狀況下,當未指定組時,Spring Cloud Stream會把應用程序放到一個匿名的、獨立的、只有一個成員的消費者組中,而後和其它消費者組放在一塊兒維護。

5.五、消費者類型

支持2種消費者類型:

  • Message-driven (sometimes referred to as Asynchronous)
  • Polled (sometimes referred to as Synchronous)

2.0前,只支持異步消費者;A message is delivered as soon as it is available and a thread is available to process it.

若是想控制消息處理的速率,可使用同步消費者;

持久化:

與Spring Cloud Stream程序模型一致,消費者組是持久化的。

Binder實現保證了組訂閱是持久的,即便消息是在消費者都中止的狀態下發送的,只要消費者組建立了一個訂閱者,這個組就開始接收數據。

!!匿名訂閱本質上是不持久的。對於某些綁定器實現(如RabbitMQ),有可能具備非持久的組訂閱。

一般,在將應用程序綁定到給定目標時,最好始終使用消費者組。在擴展Spring Cloud Stream應用程序時,必須爲每一個輸入綁定指定一個使用者組。這樣作能夠防止應用程序的實例接收重複的消息(除非須要這種行爲,這是不尋常的)。

5.六、分區支持

Spring Cloud Stream支持在給定應用程序的多個實例之間劃分數據。在分區場景中,物理通訊介質(例如the broker topic)被視爲被構形成多個分區。

一個或多個生產者應用實例向多個使用者應用實例發送數據,並確保由公共特徵標識的數據由同一個使用者實例處理。

Spring Cloud Stream爲統一實現分區處理提供了一個公共抽象。所以,不管代理自己是不是天然分區的(例如Kafka),均可以使用分區(例如RabbitMQ)。

SCSt partitioning

 Spring Cloud Stream分區

分區是有狀態處理中的一個關鍵概念,在這種狀況下,確保全部相關數據一塊兒處理是相當重要的(不管是出於性能仍是一致性的緣由)。

例如,在時間加窗的平均計算示例中,來自任何給定傳感器的全部測量都由同一個應用實例處理是很重要的。

若要設置分區處理方案,必須同時配置數據生成端和數據消耗端。

6、編程模型

核心概念:

  • Destination Binders:負責集成外部消息隊列系統的組件。
  • Destination Bindings:由綁定器建立的,鏈接外部消息隊列系統和提供信息生產者或消費者的應用程序的橋樑。
  • Message:生產者和消費者使用的規範數據結構,用於與Destination Binders(以及經過外部消息傳遞系統的其餘應用程序)通訊。

6.一、Destination Binders

Destination Binders是Spring Cloud Stream的擴展組件,爲實現與外部消息系統集成,提供必要的配置和實現。這種集成負責消息與生產者和使用者之間的鏈接、委託和路由、數據類型轉換、用戶代碼的調用等等。

Binders處理了不少重要的事情,可是有些地方須要人爲幫助,一般是以配置的方式來實現,在本文其他部分會詳細介紹這些選項。

6.二、Destination Bindings

如前所述,Destination Bindings在鏈接外部消息隊列系統和提供信息生產者或消費者的應用程序的橋樑。

將@EnableBinding註釋應用於應用程序的一個配置類,能夠定義一個binding。@EnableBinding註釋自己使用@Configuration進行元註釋,並觸發SpringCloudStream基礎結構的配置。

下面的示例顯示了一個徹底配置和功能良好的Spring Cloud Stream應用程序,該應用程序以字符串類型從輸入目標接收消息的有效負載(請參見第9章內容類型協商部分),將其記錄到控制檯,並在將其轉換爲大寫後發送到輸出目的地。

@SpringBootApplication
@EnableBinding(Processor.class)
public class MyApplication {

    public static void main(String[] args) {
        SpringApplication.run(MyApplication.class, args);
    }

    @StreamListener(Processor.INPUT)
    @SendTo(Processor.OUTPUT)
    public String handle(String value) {
        System.out.println("Received: " + value);
        return value.toUpperCase();
    }
}

如您所見,@EnableBinding註釋可使用一個或多個接口類做爲參數。這些參數稱爲綁定,它們包含表示可綁定組件的方法。這些組件一般是基於信道的綁定器(如Ribbit、Kafka和其餘)的消息通道(參見Spring消息傳遞)。然而,其餘類型的綁定能夠提供對相應技術的本機特性的支持。例如,Kafka流綁定器(之前稱爲KStream)容許直接綁定到Kafka流(有關更多細節,請參見Kafka流)。

SpringCloudStream已經爲典型的消息交換契約提供了綁定接口,其中包括:

  • Sink:提供消息消費的目的地,是消息的消費者
  • Source:提供消息發佈的目的地,是消息的生產者
  • Processor:即便消費者,也是生產者
public interface Sink {
  String INPUT = "input";

  @Input(Sink.INPUT)
  SubscribableChannel input();
}
public interface Source {
  String OUTPUT = "output";

  @Output(Source.OUTPUT)
  MessageChannel output();
}
public interface Processor extends Source, Sink {}

雖然前面的示例知足了大多數狀況,但您也能夠經過定義本身的綁定接口來定義本身的契約,並使用@Input和@Output註釋來標識實際的可綁定組件。

public interface Barista {

    @Input
    SubscribableChannel orders();

    @Output
    MessageChannel hotDrinks();

    @Output
    MessageChannel coldDrinks();
}

使用前面示例中顯示的接口做爲@EnableBinding參數,將觸發分別名爲Orders、HotDrinks和ColdDrinks的三個綁定通道的建立。您能夠根據須要爲@EnableBinding註釋提供任意數量的綁定接口,以下面的示例所示:

@EnableBinding(value = { Orders.class, Payment.class })

在SpringCloudStream中,可用的通道綁定組件包括:Spring Messaging的Message Channel(用於出站)及其擴展,Subscribable Channel(用於入站)。

Pollable Destination Binding

雖然前面描述的綁定支持基於事件的消息消耗,但有時須要更多的控制,例如消耗率。

從版本2.0開始,您如今能夠綁定pollable consumer:

下面的示例演示如何綁定pollable consumer:

public interface PolledBarista {

    @Input
    PollableMessageSource orders();
    . . .
}

在這種狀況下,一個PollableMessageSource實現綁定到訂單通道。See Section 6.3.5, 「Using Polled Consumers」 for more details.

自定義通道名稱

經過使用@Input和@Output註釋,能夠爲通道指定自定義的通道名稱,以下面的示例所示:

public interface Barista {
    @Input("inboundOrders")
    SubscribableChannel orders();
}

Normally, you need not access individual channels or bindings directly (other then configuring them via @EnableBinding annotation). However there may be times, such as testing or other corner cases, when you do.

Aside from generating channels for each binding and registering them as Spring beans, for each bound interface, Spring Cloud Stream generates a bean that implements the interface. That means you can have access to the interfaces representing the bindings or individual channels by auto-wiring either in your application, as shown in the following two examples:

在這個示例中,建立的綁定通道被命名爲inboundOrders。

一般,您不須要直接訪問單個通道或綁定(而後經過@EnableBinding註釋配置它們)。

然而,也有一些時候,例如測試或其餘特殊狀況,你可能須要這麼作。

除了爲每一個綁定生成通道並將它們註冊爲Spring Bean以外,Spring Cloud Stream爲每一個綁定接口生成一個實現該接口的bean。這意味着您能夠經過應用程序中的自動裝配表示綁定或單個通道的接口,如如下兩個示例所示:

自動裝配Binding接口

@Autowire
private Source source

public void sayHello(String name) {
    source.output().send(MessageBuilder.withPayload(name).build());
}

自動裝配一個通道

@Autowire
private MessageChannel output;

public void sayHello(String name) {
    output.send(MessageBuilder.withPayload(name).build());
}

您還可使用標準Spring的@Qualifier註釋,用於自定義信道名稱或須要指定通道的多通道場景中的狀況。

@Autowire
@Qualifier("myChannel")
private MessageChannel output;

6.三、生產和消費消息

2種使用方式:

  • Spring Integration註解
  • Spring Cloud註解

6.3.一、Spring Integration註解用法:

Spring Cloud Stream的基礎:企業集成模式定義的概念和模式

Spring Cloud Stream的內部實現:依賴於Spring Integration框架

因此Stream支持Spring Integration已經創建的基礎、語義和配置選項

例如:能夠經過@InboundChannelAdapter註解獲取到一個Source或MessageSource的輸出通道:

@EnableBinding(Source.class)
public class TimerSource {

  @Bean
  @InboundChannelAdapter(value = Source.OUTPUT, poller = @Poller(fixedDelay = "10", maxMessagesPerPoll = "1"))
  public MessageSource<String> timerMessageSource() {
    return () -> new GenericMessage<>("Hello Spring Cloud Stream");
  }
}

相似:能夠經過@Transformer或@ServiceActivator,提供一個對來自Processor binding的消息的處理實現:

 

@EnableBinding(Processor.class)
public class TransformProcessor {
  @Transformer(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)
  public Object transform(String message) {
    return message.toUpperCase();
  }
}

注意一點:

使用Spring Cloud Stream中的@StreamListener註釋綁定同一個綁定時,使用的是發佈訂閱模型,因此每個使用了@StreamListener註釋的方法,都會接收到一份消息;

而使用Spring Integration中的註解 (such as @Aggregator, @Transformer, or @ServiceActivator)時,使用的是競爭模型,只會有一個消費者獲得消息;並且,不會爲每個消費者建立單獨的使用者組。

6.3.二、使用Spring Cloud Stream的註解@StreamListener

做爲對Spring Integration框架的補充,SpringCloudStream提供了本身的@StreamListener註解,該註釋借鑑了其餘Spring消息註解(@Messagemap、@JmsListener、@RabbitListener等),並提供了方便性,如基於內容的路由等。

@EnableBinding(Sink.class)
public class VoteHandler {

  @Autowired
  VotingService votingService;

  @StreamListener(Sink.INPUT)
  public void handle(Vote vote) {
    votingService.record(vote);
  }
}

與其餘Spring消息傳遞方法同樣,方法參數可使用@Payload、@Header和@Header進行註釋。對於返回數據的方法,必須使用@SendTo註釋爲該方法返回的數據指定輸出綁定目的地,如如下示例所示:

@EnableBinding(Processor.class)
public class TransformProcessor {

  @Autowired
  VotingService votingService;

  @StreamListener(Processor.INPUT)
  @SendTo(Processor.OUTPUT)
  public VoteResult handle(Vote vote) {
    return votingService.record(vote);
  }
}

6.3.三、@StreamListener用於基於內容的路由

Spring Cloud Stream支持根據conditions向多個帶有@StreamListener註釋的處理程序方法分發消息。

爲了支持條件分派,方法必須知足如下條件:

    • 不能有返回值。
    • 它必須是an individual message handling method(不支持reactive API methods)。

條件由註釋的條件參數中的Spel表達式指定,並對每條消息進行計算。匹配條件的全部處理程序都是在同一個線程中調用的,不須要對調用的順序做出任何假設。

在具備調度條件的@StreamListener的下面示例中,全部帶有值bogey的header type的消息都被分派到RecveBogey方法,而帶有值Bacall的header type的全部消息都被分派到ReceiveBacall方法。

@EnableBinding(Sink.class)
@EnableAutoConfiguration
public static class TestPojoWithAnnotatedArguments {

    @StreamListener(target = Sink.INPUT, condition = "headers['type']=='bogey'")
    public void receiveBogey(@Payload BogeyPojo bogeyPojo) {
       // handle the message
    }

    @StreamListener(target = Sink.INPUT, condition = "headers['type']=='bacall'")
    public void receiveBacall(@Payload BacallPojo bacallPojo) {
       // handle the message
    }
}

 Content Type Negotiation in the Context of condition

條件語境下的內容類型協商

It is important to understand some of the mechanics behind content-based routing using the condition argument of @StreamListener, especially in the context of the type of the message as a whole. It may also help if you familiarize yourself with the Chapter 9, Content Type Negotiation before you proceed.

Consider the following scenario:

理解使用@StreamListener的條件參數l來進行基於內容的路由背後的一些機制是很重要的,特別是在整個消息類型的上下文中。

若是您熟悉了第9章,內容類型協商,這也會有幫助。

考慮如下狀況:

@EnableBinding(Sink.class)
@EnableAutoConfiguration
public static class CatsAndDogs {

    @StreamListener(target = Sink.INPUT, condition = "payload.class.simpleName=='Dog'")
    public void bark(Dog dog) {
       // handle the message
    }

    @StreamListener(target = Sink.INPUT, condition = "payload.class.simpleName=='Cat'")
    public void purr(Cat cat) {
       // handle the message
    }
}

上面的代碼是徹底有效的。它編譯和部署沒有任何問題,但它永遠不會產生您指望的結果。

這是由於您正在測試一些還沒有以您指望的狀態存在的東西。

這是由於消息的有效負載還沒有傳輸類型轉換成所需類型。換句話說,它尚未經歷第9章「內容類型協商」中描述的類型轉換過程。

所以,除非您使用計算原始數據的Spel表達式(例如,字節數組中第一個字節的值),不然使用基於消息頭的表達式(例如,condition = "headers['type']=='dog'")。

 At the moment, dispatching through @StreamListener conditions is supported only for channel-based binders (not for reactive programming) support.

6.3.4 Spring Cloud Function support

自SpringCloudStreamv2.1以來,定義stream handlers and sources的另外一個替代方法是使用對 Spring Cloud Function 的內置支持,其中能夠將它們表示爲beans of type java.util.function.[Supplier/Function/Consumer].

To specify which functional bean to bind to the external destination(s) exposed by the bindings, you must provide spring.cloud.stream.function.definitionproperty.

Here is the example of the Processor application exposing message handler as java.util.function.Function

@SpringBootApplication
@EnableBinding(Processor.class)
public class MyFunctionBootApp {

    public static void main(String[] args) {
        SpringApplication.run(MyFunctionBootApp.class, "--spring.cloud.stream.function.definition=toUpperCase");
    }

    @Bean
    public Function<String, String> toUpperCase() {
        return s -> s.toUpperCase();
    }
}

In the above you we simply define a bean of type java.util.function.Function called toUpperCase and identify it as a bean to be used as message handler whose 'input' and 'output' must be bound to the external destinations exposed by the Processor binding.

Below are the examples of simple functional applications to support Source, Processor and Sink.

Here is the example of a Source application defined as java.util.function.Supplier

在上面,咱們只定義一個java.util.Function.Function類型的bean,函數名稱爲toUpperCase,並將其標識爲用做消息處理器,其「輸入」和「輸出」必須綁定到Processor binding暴露的外部目的地。

下面是支持Source、Processor和Sink的簡單功能應用程序的示例。

下面是定義爲java.util.Function.Supplier的Source應用程序的示例

@SpringBootApplication
@EnableBinding(Source.class)
public static class SourceFromSupplier {
    public static void main(String[] args) {
        SpringApplication.run(SourceFromSupplier.class, "--spring.cloud.stream.function.definition=date");
    }
    @Bean
    public Supplier<Date> date() {
        return () -> new Date(12345L);
    }
}

 Here is the example of a Processor application defined as java.util.function.Function

@SpringBootApplication
@EnableBinding(Processor.class)
public static class ProcessorFromFunction {
    public static void main(String[] args) {
        SpringApplication.run(ProcessorFromFunction.class, "--spring.cloud.stream.function.definition=toUpperCase");
    }
    @Bean
    public Function<String, String> toUpperCase() {
        return s -> s.toUpperCase();
    }
}

Here is the example of a Sink application defined as java.util.function.Consumer

@EnableAutoConfiguration
@EnableBinding(Sink.class)
public static class SinkFromConsumer {
    public static void main(String[] args) {
        SpringApplication.run(SinkFromConsumer.class, "--spring.cloud.stream.function.definition=sink");
    }
    @Bean
    public Consumer<String> sink() {
        return System.out::println;
    }
}

 

Functional Composition

使用此編程模型,您還能夠從函數組合中受益,在這種狀況下,您能夠從一組簡單的函數中動態地組合複雜的處理程序。做爲一個示例,讓咱們將下面的函數bean添加到上面定義的應用程序中

@Bean
public Function<String, String> wrapInQuotes() {
    return s -> "\"" + s + "\"";
}

and modify the spring.cloud.stream.function.definition property to reflect your intention to compose a new function from both ‘toUpperCase’ and ‘wrapInQuotes’. To do that Spring Cloud Function allows you to use | (pipe) symbol. So to finish our example our property will now look like this:

—spring.cloud.stream.function.definition=toUpperCase|wrapInQuotes

6.3.5 Using Polled Consumers

6.4 Error Handling

6.5 Reactive Programming Support

7. Binders

SpringCloudStream提供了一個Binder抽象,用於鏈接外部中間件。

本節提供有關BinderSPI背後的主要概念、其主要組件和具體實現細節的信息。

7.1 Producers and Consumers

下圖顯示生產者與消費者之間的通常關係:

A producer is any component that sends messages to a channel. The channel can be bound to an external message broker with a Binder implementation for that broker. When invoking the bindProducer() method, the first parameter is the name of the destination within the broker, the second parameter is the local channel instance to which the producer sends messages, and the third parameter contains properties (such as a partition key expression) to be used within the adapter that is created for that channel.

生產者:是向channel發送消息的任何組件。

channel :使用一個對應消息代理的Binder實現綁定到一個外部消息代理。

當調用bindProducer()方法時,第一個參數是代理中destination 的名稱,第二個參數是生產者發送消息的local channel instance,第三個參數包含要在爲該通道建立的適配器中使用的屬性(例如分區鍵表達式)。

A consumer is any component that receives messages from a channel. As with a producer, the consumer’s channel can be bound to an external message broker. When invoking the bindConsumer() method, the first parameter is the destination name, and a second parameter provides the name of a logical group of consumers. Each group that is represented by consumer bindings for a given destination receives a copy of each message that a producer sends to that destination (that is, it follows normal publish-subscribe semantics). If there are multiple consumer instances bound with the same group name, then messages are load-balanced across those consumer instances so that each message sent by a producer is consumed by only a single consumer instance within each group (that is, it follows normal queueing semantics).

 使用者是從通道接收消息的任何組件。

與生產者同樣,使用者的通道能夠綁定到外部消息代理。

當調用bindConsumer()方法時,第一個參數是destination 名稱,第二個參數提供使用者組的邏輯名稱。由給定目標的使用者綁定表示的每一個組接收生產者發送到該目標的每一個消息的副本(也就是說,它遵循正常的發佈-訂閱語義)。若是有多個以同一個組名綁定的使用者實例,那麼這些使用者實例之間的消息是負載均衡的,這樣由生產者發送的每條消息只被每一個組中的單個使用者實例所使用(也就是說,它遵循正常的排隊語義)。

7.2 Binder SPI

BinderSPI由許多接口、開箱即用的實用工具類和提供鏈接到外部中間件的可插拔機制的發現策略組成。

SPI的關鍵點是Binder接口,它是一種將輸入和輸出鏈接到外部中間件的策略。

下面的清單顯示了Binder接口的定義:

 

public interface Binder<T, C extends ConsumerProperties, P extends ProducerProperties> {
    Binding<T> bindConsumer(String name, String group, T inboundBindTarget, C consumerProperties);

    Binding<T> bindProducer(String name, T outboundBindTarget, P producerProperties);
}

 

 

The interface is parameterized, offering a number of extension points:

  • Input and output bind targets. As of version 1.0, only MessageChannel is supported, but this is intended to be used as an extension point in the future.
  • Extended consumer and producer properties, allowing specific Binder implementations to add supplemental properties that can be supported in a type-safe manner.

A typical binder implementation consists of the following:

  • A class that implements the Binder interface;
  • A Spring @Configuration class that creates a bean of type Binder along with the middleware connection infrastructure.
  • META-INF/spring.binders file found on the classpath containing one or more binder definitions, as shown in the following example:

 接口是參數化的,提供了許多擴展點:

  • 輸入和輸出綁定目標。從1.0版開始,只支持MessageChannel,但這將在未來用做擴展點。
  • 擴展使用者和生產者屬性,容許特定的Binder實現,去添加類型安全的補充屬性。

典型的綁定器實現包括如下內容:

  • 實現Binder接口的類;
  • 建立Binder類型bean的Spring@Configuration類以及中間件鏈接基礎設施。
  • 在類路徑上找到一個meta-INF/Spring.binders文件,其中包含一個或多個綁定器定義,以下面的示例所示:
kafka:\
org.springframework.cloud.stream.binder.kafka.config.KafkaBinderConfiguration

 

 

7.3 Binder Detection

SpringCloudStream依賴於BinderSPI的實現來執行將channels 鏈接到消息代理的任務。每一個Binder實現一般鏈接到一種類型的消息傳遞系統。

7.3.1 Classpath Detection

默認狀況下,SpringCloudStream依賴SpringBoot的自動配置來配置綁定過程。若是在類路徑上找到單個Binder實現,SpringCloudStream將自動使用它。例如,旨在綁定到RabbitMQ的SpringCloudStream項目能夠添加如下依賴項:

<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>

有關其餘綁定器依賴項的特定Maven座標,請參見該綁定器實現的文檔。

7.4 Multiple Binders on the Classpath

當類路徑上存在多個綁定器時,應用程序必須指示要爲每一個通道綁定使用哪一個綁定器。每一個綁定器配置都包含一個meta-INF/Spring.binders文件,它是一個簡單的屬性文件,以下面的示例所示:

rabbit:\
org.springframework.cloud.stream.binder.rabbit.config.RabbitServiceAutoConfiguration

Similar files exist for the other provided binder implementations (such as Kafka), and custom binder implementations are expected to provide them as well. The key represents an identifying name for the binder implementation, whereas the value is a comma-separated list of configuration classes that each contain one and only one bean definition of type org.springframework.cloud.stream.binder.Binder.

Binder selection can either be performed globally, using the spring.cloud.stream.defaultBinder property (for example, spring.cloud.stream.defaultBinder=rabbit) or individually, by configuring the binder on each channel binding. For instance, a processor application (that has channels named input and output for read and write respectively) that reads from Kafka and writes to RabbitMQ can specify the following configuration:

spring.cloud.stream.bindings.input.binder=kafka
spring.cloud.stream.bindings.output.binder=rabbit

7.5 Connecting to Multiple Systems

7.6 Binding visualization and control
綁定可視化和控制,能夠經過端點來查看,或者暫停通道

7.7 Binder Configuration Properties


 當自定義綁定器配置時,下列屬性可用。這些經過org.springframework.cloud.stream.config.BinderProperties公開的屬性必須以spring.cloud.stream.binders.<configurationName>.做爲前綴

type

The binder type. It typically references one of the binders found on the classpath — in particular, a key in a META-INF/spring.binders file.

By default, it has the same value as the configuration name.

inheritEnvironment

Whether the configuration inherits the environment of the application itself.

Default: true.

environment

Root for a set of properties that can be used to customize the environment of the binder. When this property is set, the context in which the binder is being created is not a child of the application context. This setting allows for complete separation between the binder components and the application components.

Default: empty.

defaultCandidate

Whether the binder configuration is a candidate for being considered a default binder or can be used only when explicitly referenced. This setting allows adding binder configurations without interfering with the default processing.

Default: true.

 

8. Configuration Options

一些binders有額外的binding 屬性支持特定中間件的特性。

能夠經過SpringBoot支持的任何機制向SpringCloudStream應用程序提供配置選項。

這包括應用程序參數、環境變量和YAML或.properties文件。

8.1 Binding Service Properties

 

These properties are exposed via org.springframework.cloud.stream.config.BindingServiceProperties

spring.cloud.stream.instanceCount

The number of deployed instances of an application. Must be set for partitioning on the producer side. Must be set on the consumer side when using RabbitMQ and with Kafka if autoRebalanceEnabled=false.

Default: 1.

spring.cloud.stream.instanceIndex
The instance index of the application: A number from  0 to  instanceCount - 1. Used for partitioning with RabbitMQ and with Kafka if  autoRebalanceEnabled=false. Automatically set in Cloud Foundry to match the application’s instance index.
spring.cloud.stream.dynamicDestinations

A list of destinations that can be bound dynamically (for example, in a dynamic routing scenario). If set, only listed destinations can be bound.

Default: empty (letting any destination be bound).

spring.cloud.stream.defaultBinder

The default binder to use, if multiple binders are configured. See Multiple Binders on the Classpath.

Default: empty.

spring.cloud.stream.overrideCloudConnectors

This property is only applicable when the cloud profile is active and Spring Cloud Connectors are provided with the application. If the property is false (the default), the binder detects a suitable bound service (for example, a RabbitMQ service bound in Cloud Foundry for the RabbitMQ binder) and uses it for creating connections (usually through Spring Cloud Connectors). When set to true, this property instructs binders to completely ignore the bound services and rely on Spring Boot properties (for example, relying on the spring.rabbitmq.* properties provided in the environment for the RabbitMQ binder). The typical usage of this property is to be nested in a customized environment when connecting to multiple systems.

Default: false.

spring.cloud.stream.bindingRetryInterval

The interval (in seconds) between retrying binding creation when, for example, the binder does not support late binding and the broker (for example, Apache Kafka) is down. Set it to zero to treat such conditions as fatal, preventing the application from starting.

Default: 30

8.2 Binding Properties

Binding properties are supplied by using the format of spring.cloud.stream.bindings.<channelName>.<property>=<value>. The <channelName> represents the name of the channel being configured (for example, output for a Source).

To avoid repetition, Spring Cloud Stream supports setting values for all channels, in the format of spring.cloud.stream.default.<property>=<value>.

When it comes to avoiding repetitions for extended binding properties, this format should be used - spring.cloud.stream.<binder-type>.default.<producer|consumer>.<property>=<value>.

In what follows, we indicate where we have omitted the spring.cloud.stream.bindings.<channelName>. prefix and focus just on the property name, with the understanding that the prefix ise included at runtime.

8.2.1 Common Binding Properties

These properties are exposed via org.springframework.cloud.stream.config.BindingProperties

The following binding properties are available for both input and output bindings and must be prefixed with spring.cloud.stream.bindings.<channelName>. (for example, spring.cloud.stream.bindings.input.destination=ticktock).

Default values can be set by using the spring.cloud.stream.default prefix (for example`spring.cloud.stream.default.contentType=application/json`).

destination
The target destination of a channel on the bound middleware (for example, the RabbitMQ exchange or Kafka topic). If the channel is bound as a consumer, it could be bound to multiple destinations, and the destination names can be specified as comma-separated  String values. If not set, the channel name is used instead. The default value of this property cannot be overridden.
group

The consumer group of the channel. Applies only to inbound bindings. See Consumer Groups.

Default: null (indicating an anonymous consumer).

contentType

The content type of the channel. See Chapter 9, Content Type Negotiation」.

Default: application/json.

binder

The binder used by this binding. See Section 7.4, 「Multiple Binders on the Classpath」」 for details.

Default: null (the default binder is used, if it exists).

8.2.2 Consumer Properties

These properties are exposed via org.springframework.cloud.stream.binder.ConsumerProperties

The following binding properties are available for input bindings only and must be prefixed with spring.cloud.stream.bindings.<channelName>.consumer. (for example, spring.cloud.stream.bindings.input.consumer.concurrency=3).

Default values can be set by using the spring.cloud.stream.default.consumer prefix (for example, spring.cloud.stream.default.consumer.headerMode=none).

concurrency

The concurrency of the inbound consumer.

Default: 1.

partitioned

Whether the consumer receives data from a partitioned producer.

Default: false.

headerMode

When set to none, disables header parsing on input. Effective only for messaging middleware that does not support message headers natively and requires header embedding. This option is useful when consuming data from non-Spring Cloud Stream applications when native headers are not supported. When set to headers, it uses the middleware’s native header mechanism. When set to embeddedHeaders, it embeds headers into the message payload.

Default: depends on the binder implementation.

maxAttempts

If processing fails, the number of attempts to process the message (including the first). Set to 1 to disable retry.

Default: 3.

backOffInitialInterval

The backoff initial interval on retry.

Default: 1000.

backOffMaxInterval

The maximum backoff interval.

Default: 10000.

backOffMultiplier

The backoff multiplier.

Default: 2.0.

defaultRetryable

Whether exceptions thrown by the listener that are not listed in the retryableExceptions are retryable.

Default: true.

instanceIndex

When set to a value greater than equal to zero, it allows customizing the instance index of this consumer (if different from spring.cloud.stream.instanceIndex). When set to a negative value, it defaults to spring.cloud.stream.instanceIndex. See Section 11.2, 「Instance Index and Instance Count」」 for more information.

Default: -1.

instanceCount

When set to a value greater than equal to zero, it allows customizing the instance count of this consumer (if different from spring.cloud.stream.instanceCount). When set to a negative value, it defaults to spring.cloud.stream.instanceCount. See Section 11.2, 「Instance Index and Instance Count」」 for more information.

Default: -1.

retryableExceptions

A map of Throwable class names in the key and a boolean in the value. Specify those exceptions (and subclasses) that will or won’t be retried. Also see defaultRetriable. Example: spring.cloud.stream.bindings.input.consumer.retryable-exceptions.java.lang.IllegalStateException=false.

Default: empty.

useNativeDecoding

When set to true, the inbound message is deserialized directly by the client library, which must be configured correspondingly (for example, setting an appropriate Kafka producer value deserializer). When this configuration is being used, the inbound message unmarshalling is not based on the contentType of the binding. When native decoding is used, it is the responsibility of the producer to use an appropriate encoder (for example, the Kafka producer value serializer) to serialize the outbound message. Also, when native encoding and decoding is used, the headerMode=embeddedHeaders property is ignored and headers are not embedded in the message. See the producer property useNativeEncoding.

Default: false.

8.2.3 Producer Properties

These properties are exposed via org.springframework.cloud.stream.binder.ProducerProperties

The following binding properties are available for output bindings only and must be prefixed with spring.cloud.stream.bindings.<channelName>.producer. (for example, spring.cloud.stream.bindings.input.producer.partitionKeyExpression=payload.id).

Default values can be set by using the prefix spring.cloud.stream.default.producer (for example, spring.cloud.stream.default.producer.partitionKeyExpression=payload.id).

partitionKeyExpression

A SpEL expression that determines how to partition outbound data. If set, or if partitionKeyExtractorClass is set, outbound data on this channel is partitioned. partitionCount must be set to a value greater than 1 to be effective. Mutually exclusive with partitionKeyExtractorClass. See Section 5.6, 「Partitioning Support」」.

Default: null.

partitionKeyExtractorClass

PartitionKeyExtractorStrategy implementation. If set, or if partitionKeyExpression is set, outbound data on this channel is partitioned. partitionCount must be set to a value greater than 1 to be effective. Mutually exclusive with partitionKeyExpression. See Section 5.6, 「Partitioning Support」」.

Default: null.

partitionSelectorClass

PartitionSelectorStrategy implementation. Mutually exclusive with partitionSelectorExpression. If neither is set, the partition is selected as the hashCode(key) % partitionCount, where key is computed through either partitionKeyExpression or partitionKeyExtractorClass.

Default: null.

partitionSelectorExpression

A SpEL expression for customizing partition selection. Mutually exclusive with partitionSelectorClass. If neither is set, the partition is selected as the hashCode(key) % partitionCount, where key is computed through either partitionKeyExpression or partitionKeyExtractorClass.

Default: null.

partitionCount

The number of target partitions for the data, if partitioning is enabled. Must be set to a value greater than 1 if the producer is partitioned. On Kafka, it is interpreted as a hint. The larger of this and the partition count of the target topic is used instead.

Default: 1.

requiredGroups
A comma-separated list of groups to which the producer must ensure message delivery even if they start after it has been created (for example, by pre-creating durable queues in RabbitMQ).
headerMode

When set to none, it disables header embedding on output. It is effective only for messaging middleware that does not support message headers natively and requires header embedding. This option is useful when producing data for non-Spring Cloud Stream applications when native headers are not supported. When set to headers, it uses the middleware’s native header mechanism. When set to embeddedHeaders, it embeds headers into the message payload.

Default: Depends on the binder implementation.

useNativeEncoding

When set to true, the outbound message is serialized directly by the client library, which must be configured correspondingly (for example, setting an appropriate Kafka producer value serializer). When this configuration is being used, the outbound message marshalling is not based on the contentType of the binding. When native encoding is used, it is the responsibility of the consumer to use an appropriate decoder (for example, the Kafka consumer value de-serializer) to deserialize the inbound message. Also, when native encoding and decoding is used, the headerMode=embeddedHeaders property is ignored and headers are not embedded in the message. See the consumer property useNativeDecoding.

Default: false.

errorChannelEnabled

When set to true, if the binder supports asynchroous send results, send failures are sent to an error channel for the destination. See Section 6.4, 「Error Handling」 for more information.

Default: false.

 

8.3 Using Dynamically Bound Destinations

 

9. Content Type Negotiation

10. Schema Evolution Support

11. Inter-Application Communication

12. Testing

13. Health Indicator

 

14. Metrics Emitter

15. Samples

 

 

Spring Cloud Stream 中 RabbitMQ Binder實現

一、使用

<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>

 

或者

<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>

2. RabbitMQ Binder Overview

下圖展現了RabbitMQ綁定器(Binder)實現的操做方式

 

 

默認狀況下,RabbitMQ Binder實現將每一個destination映射到Topic交換器(Exchange)。

對於每一個consumer group,一個Queue 綁定到這個TopicExchange。

每一個consumer 實例都有相應的RabbitMQ consumer 實例鏈接到該consumer group對應的隊列(queue)。

爲了分區生產者和使用者,隊列以分區索引做爲後綴,並使用分區索引做爲路由鍵(routing key)。

對於匿名consumer (那些沒有組屬性的用戶),使用自動刪除隊列(具備隨機惟一名稱)。

 

經過使用可選的autoBindDlq選項,您能夠配置binder來建立和配置死信隊列(DLQs)(以及死信交換器DLX以及路由基礎設施)。

默認狀況下,死信隊列的名稱爲destination.dlq。

若是啓用了重試(maxAttempt>1),則在重試結束後,失敗的消息將傳遞給DLQ。

若是禁用重試(maxAttempt=1),則應將requeueRejected設置爲false(默認值),那麼失敗的消息會路由到DLQ,而不是從新排隊。

此外,republishToDlq會致使binder 將失敗的消息發佈到DLQ(而不是拒絕它)。

此特性容許將附加信息(例如在header中的x-exception-stacktrace的堆棧跟蹤信息)添加到報頭中的消息中。

有關截斷堆棧跟蹤的信息,請參閱FrameMaxHeadRoom屬性。

此選項不須要啓用重試。只需重試一次,就能夠從新發布失敗的消息。從版本1.2開始,您能夠配置從新發布消息的傳遞模式。請參見rePublishDeliveryMode屬性。 

 

若是流監聽器拋出一個ImmediateAcKnowamqpException,則會繞過DLQ並簡單地丟棄消息。從Version 2.1開始,無論rePublishToDlq的設置如何,都會這樣執行;之前,則只有在rePublishToDlq爲false時纔是如此。

 

重要:!!!!!!!!!!!

將requeueRejected設置爲true(二設置rePublishToDlq=false)將致使消息被從新排隊並不斷地從新傳遞,除非故障的緣由是短暫的,不然一般不是您想要的。

一般,您應該在綁定器中啓用重試,方法是將maxAttempt設置爲大於1,或者將rePublishToDlq設置爲true。

 

有關這些屬性的更多信息,請參見3.1節「RabbitMQ綁定器屬性」。

該框架沒有提供任何標準機制來使用死信消息(或將它們從新路由回主隊列)。一些選項將在第6章「死信隊列處理」中描述.

 

當在SpringCloudStream應用程序中使用多個RabbitMQ Binder時,必定要禁用「RabbitAutoConfiguration」,以免將RabbitAutoConfiguration中相同的配置應用到這幾個Binder中。你可使用@SpringBootApplication註釋來排除掉這個類。

 

從版本2.0開始,RabbitMessageChannelBinder將RabbitTemplate.userPublisherConnection屬性設置爲true,避免非事務producers對consumers形成死鎖,若是因爲代理上的內存警報而阻塞緩存鏈接,則可能發生死鎖。

目前,只有消息驅動的消費 才支持多工使用者(一個偵聽多個隊列的使用者)polled consumers只能從單個隊列中檢索消息。

 

3. Configuration Options

 

本節包含特定於RabbitMQ綁定程序和綁定通道的設置

對於通用的設置,請參考Spring Cloud Stream Core的文檔

 

3.1 RabbitMQ Binder Properties

默認狀況下,RabbitMQ Binder使用SpringBoot的ConnectionFactory。所以,它支持全部SpringBoot配置選項(有關參考,請參閱SpringBoot文檔)。RabbitMQ配置選項使用Spring.rabbitmq前綴。

除了SpringBoot選項以外,RabbitMQ綁定程序還支持如下屬性:

spring.cloud.stream.rabbit.binder.adminAddresses

一個以逗號分隔的RabbitMQ管理插件URL列表.僅當節點包含多個條目時才使用。此列表中的每一個條目必須在Spring.rabbitmq.Address中有相應的條目。

Only needed if you use a RabbitMQ cluster and wish to consume from the node that hosts the queue. See Queue Affinity and the LocalizedQueueConnectionFactory for more information.

默認值:空。

spring.cloud.stream.rabbit.binder.nodes

以逗號分隔的RabbitMQ節點名稱列表。當有多個條目時,用於定位隊列所在的服務器地址。此列表中的每一個條目必須在Spring.rabbitmq.Address中有相應的條目。

Only needed if you use a RabbitMQ cluster and wish to consume from the node that hosts the queue. See Queue Affinity and the LocalizedQueueConnectionFactory for more information.

默認值:空。

spring.cloud.stream.rabbit.binder.compressionLevel

壓縮綁定的壓縮級別。參見java.util.zip.Deflater。

Default: 1 (BEST_LEVEL).

spring.cloud.stream.binder.connection-name-prefix

A connection name prefix used to name the connection(s) created by this binder. The name is this prefix followed by #n, where n increments each time a new connection is opened.

一個鏈接名前綴,用於命名binder建立的鏈接。名稱後跟着#n,每次建立新鏈接,n都會遞增。

Default: none (Spring AMQP default).

3.2 RabbitMQ Consumer Properties

如下屬性僅適用於 Rabbit consumers,必須以spring.cloud.stream.rabbit.bindings.<channelName>.consumer..做爲前綴

acknowledgeMode

確認模式。

Default: AUTO.

autoBindDlq

是否自動聲明DLQ並將其綁定到綁定器DLX。

Default: false.

bindingRoutingKey

The routing key with which to bind the queue to the exchange (if bindQueue is true). For partitioned destinations, -<instanceIndex> is appended.

將queue 綁定到Exchange使用的路由鍵(若是bindQueue爲true)。爲了給destinations分區,附加-<instanceIndex>。

Default: #.

bindQueue

是否聲明queue 並將其綁定到目標exchange。若是您已經設置了本身的基礎結構,而且已經建立並綁定了隊列,則將其設置爲false。

Default: true.

consumerTagPrefix

用於建立consumer 標記;將由#n追加,每建立一個consumer 則自增1。

示例:${spring.application.name}-${spring.cloud.stream.bindings.input.group}-${spring.cloud.stream.instance-index}.

默認值:無-代理將生成隨機的使用者標記。

deadLetterQueueName

The name of the DLQ

Default: prefix+destination.dlq

deadLetterExchange

A DLX to assign to the queue. Relevant only if autoBindDlq is true.

Default: 'prefix+DLX'

deadLetterExchangeType

The type of the DLX to assign to the queue. Relevant only if autoBindDlq is true.

Default: 'direct'

deadLetterRoutingKey

A dead letter routing key to assign to the queue. Relevant only if autoBindDlq is true.

Default: destination

declareDlx

Whether to declare the dead letter exchange for the destination. Relevant only if autoBindDlq is true. Set to false if you have a pre-configured DLX.

是否爲destination建立死信exchange。只有當autoBindDlq爲true時才考慮設置。若是您有預先配置的DLX,則設置爲false。

Default: true.

declareExchange

Whether to declare the exchange for the destination.

Default: true.

delayedExchange

Whether to declare the exchange as a Delayed Message Exchange. Requires the delayed message exchange plugin on the broker. The x-delayed-typeargument is set to the exchangeType.

是否將exchange 定義爲Delayed Message Exchange。Requires the delayed message exchange plugin on the broker. The x-delayed-typeargument is set to the exchangeType.

Default: false.

dlqDeadLetterExchange

若是聲明瞭DLQ,則向該隊列分配DLX。

Default: none

dlqDeadLetterRoutingKey

若是聲明瞭DLQ,則向該隊列分配一個死信路由鍵。

Default: none

dlqExpires

刪除未使用的死信隊列的時間(以毫秒爲單位)。

Default: no expiration

dlqLazy

Declare the dead letter queue with the x-queue-mode=lazy argument. See Lazy Queues」. Consider using a policy instead of this setting, because using a policy allows changing the setting without deleting the queue.

使用 x-queue-mode=lazy參數聲明死信隊列。See Lazy Queues」. 考慮使用策略而不是此設置,由於使用策略能夠在不刪除隊列的狀況下更改設置。

Default: false.

dlqMaxLength

Maximum number of messages in the dead letter queue.

Default: no limit

dlqMaxLengthBytes

Maximum number of total bytes in the dead letter queue from all messages.

Default: no limit

dlqMaxPriority

Maximum priority of messages in the dead letter queue (0-255).

死信隊列(0-255)中消息的最大優先級。

Default: none

dlqOverflowBehavior

Action to take when dlqMaxLength or dlqMaxLengthBytes is exceeded; currently drop-head or reject-publish but refer to the RabbitMQ documentation.

當超過dlqMaxLength或dlqMaxLengthBytes時要採起的操做;當前是 drop-head or reject-publish ,但請參考RabbitMQ文檔。

Default: none

dlqTtl

Default time to live to apply to the dead letter queue when declared (in milliseconds).

聲明一個死信隊列時,引用的默認存貨時間(以毫秒爲單位)

Default: no limit

durableSubscription

Whether the subscription should be durable. Only effective if group is also set.

訂閱是否應該持久化。只有在也設置group 的狀況下才有效。

Default: true.

exchangeAutoDelete

If declareExchange is true, whether the exchange should be auto-deleted (that is, removed after the last queue is removed).

若是declareExchange爲true,這裏設置是否應自動刪除exchange (最後一個隊列被刪除後,這個exchange會別刪除)。

Default: true.

exchangeDurable

If declareExchange is true, whether the exchange should be durable (that is, it survives broker restart).

若是declareExchange爲true,則該exchange 是否應是持久的(也就是說,幾時重啓代理,他仍然存在)。

Default: true.

exchangeType

The exchange type: directfanout or topic for non-partitioned destinations and direct or topic for partitioned destinations.

Default: topic.

exclusive

Whether to create an exclusive consumer. Concurrency should be 1 when this is true. Often used when strict ordering is required but enabling a hot standby instance to take over after a failure. See recoveryInterval, which controls how often a standby instance attempts to consume.

是否建立獨佔消費者。若是爲true,那麼併發性應該是1。 Often used when strict ordering is required but enabling a hot standby instance to take over after a failure. See recoveryInterval, which controls how often a standby instance attempts to consume.

Default: false.

expires

How long before an unused queue is deleted (in milliseconds).

刪除未使用隊列的時間(以毫秒爲單位)。

Default: no expiration

failedDeclarationRetryInterval

The interval (in milliseconds) between attempts to consume from a queue if it is missing.

若是隊列丟失,嘗試從隊列中消耗的間隔(以毫秒爲單位)。

Default: 5000

frameMaxHeadroom

The number of bytes to reserve for other headers when adding the stack trace to a DLQ message header. All headers must fit within the frame_max size configured on the broker. Stack traces can be large; if the size plus this property exceeds frame_max then the stack trace will be truncated. A WARN log will be written; consider increasing the frame_max or reducing the stack trace by catching the exception and throwing one with a smaller stack trace.

將堆棧跟蹤添加到DLQ消息頭時爲其餘標頭保留的字節數。全部標頭必須符合代理上配置的框架_max大小。堆棧跟蹤能夠很大;若是大小加上此屬性超過Framemax,則堆棧跟蹤將被截斷。將寫入一個警告日誌;考慮經過捕獲異常並拋出一個具備較小堆棧跟蹤的異常來增長Framemax或減小堆棧跟蹤。

Default: 20000

headerPatterns

Patterns for headers to be mapped from inbound messages.

從入站消息映射標題的模式。

Default: ['*'] (all headers).

lazy

Declare the queue with the x-queue-mode=lazy argument. See Lazy Queues」. Consider using a policy instead of this setting, because using a policy allows changing the setting without deleting the queue.

使用x-queue-mode=lazy參數聲明隊列。 See Lazy Queues」. 」。考慮使用策略而不是此設置,由於使用策略能夠在不刪除隊列的狀況下更改設置。

Default: false.

maxConcurrency

The maximum number of consumers.

最大數量的消費者。

Default: 1.

maxLength

The maximum number of messages in the queue.

隊列中的最大消息數。

Default: no limit

maxLengthBytes

The maximum number of total bytes in the queue from all messages.

隊列中來自全部消息的最大字節數。

Default: no limit

maxPriority

The maximum priority of messages in the queue (0-255).

隊列中消息的最大優先級(0-255)。

Default: none

missingQueuesFatal

When the queue cannot be found, whether to treat the condition as fatal and stop the listener container. Defaults to false so that the container keeps trying to consume from the queue — for example, when using a cluster and the node hosting a non-HA queue is down.

當找不到隊列時,是否將此狀況視爲致命狀態並中止偵聽器容器。默認爲false,這樣容器就一直試圖從隊列中消費-例如,當使用集羣和承載非HA隊列的節點時。

Default: false

overflowBehavior

Action to take when maxLength or maxLengthBytes is exceeded; currently drop-head or reject-publish but refer to the RabbitMQ documentation.

當maxLength或maxLengthBytes被超過期要採起的操做;當前是drop-head or reject-publish,但請參考RabbitMQ文檔。

Default: none

prefetch

Prefetch count.

Default: 1.

prefix

A prefix to be added to the name of the destination and queues.

Default: "".

queueDeclarationRetries

The number of times to retry consuming from a queue if it is missing. Relevant only when missingQueuesFatal is true. Otherwise, the container keeps retrying indefinitely.

若是缺乏隊列,則重試從隊列中消耗的次數。只有在錯誤答案時纔有關聯。不然,容器將無限期地重試。

Default: 3

queueNameGroupOnly

When true, consume from a queue with a name equal to the group. Otherwise the queue name is destination.group. This is useful, for example, when using Spring Cloud Stream to consume from an existing RabbitMQ queue.

鏈接恢復嘗試之間的間隔,以毫秒爲單位。

Default: false.

recoveryInterval

The interval between connection recovery attempts, in milliseconds.

鏈接恢復嘗試之間的間隔,以毫秒爲單位。

Default: 5000.

requeueRejected

Whether delivery failures should be re-queued when retry is disabled or republishToDlq is false.

當Retry被禁用時,是否應該從新排隊傳遞失敗,仍是REREREDDlq是假的。

Default: false.

republishDeliveryMode

When republishToDlq is true, specifies the delivery mode of the republished message.

當rePublishToDlq爲true時,指定從新發布消息的傳遞模式。

Default: DeliveryMode.PERSISTENT

republishToDlq

By default, messages that fail after retries are exhausted are rejected. If a dead-letter queue (DLQ) is configured, RabbitMQ routes the failed message (unchanged) to the DLQ. If set to true, the binder republishs failed messages to the DLQ with additional headers, including the exception message and stack trace from the cause of the final failure. Also see the frameMaxHeadroom property.

默認狀況下,在重試結束後失敗的消息將被拒絕。若是配置了死信隊列(DLQ),RabbitMQ將失敗消息(未更改)路由到DLQ。若是設置爲true,則綁定器會將失敗消息從新發布到DLQ,其中包含來自最終故障緣由的異常消息和堆棧跟蹤。還請參閱FrameMaxHeadRoom屬性。

Default: false

transacted

Whether to use transacted channels.

是否使用交易通道。

Default: false.

ttl

Default time to live to apply to the queue when declared (in milliseconds).

聲明時應用於隊列的默認存活時間(以毫秒爲單位)。

Default: no limit

txSize

The number of deliveries between acks.

接送的數量。

Default: 1.

 

3.3 Advanced Listener Container Configuration

 To set listener container properties that are not exposed as binder or binding properties, add a single bean of type ListenerContainerCustomizer to the application context. The binder and binding properties will be set and then the customizer will be called. The customizer (configure() method) is provided with the queue name as well as the consumer group as arguments.

 若要設置未公開爲綁定或綁定屬性的偵聽器容器屬性,請將ListenerContainerCustomizer類型的單個bean添加到應用程序上下文中。將設置綁定器和綁定屬性,而後調用自定義程序。定製器(配置()方法)提供了隊列名和使用者組做爲參數。

 

3.4 Rabbit Producer Properties

 如下屬性僅適用於Rabbit producers,必須以spring.cloud.stream.rabbit.bindings.<channelName>.producer..做爲前綴

 

autoBindDlq

Whether to automatically declare the DLQ and bind it to the binder DLX.

Default: false.

batchingEnabled

Whether to enable message batching by producers. Messages are batched into one message according to the following properties (described in the next three entries in this list): 'batchSize', batchBufferLimit, and batchTimeout. See Batching for more information.

Default: false.

batchSize

The number of messages to buffer when batching is enabled.

Default: 100.

batchBufferLimit

The maximum buffer size when batching is enabled.

Default: 10000.

batchTimeout

The batch timeout when batching is enabled.

Default: 5000.

bindingRoutingKey

The routing key with which to bind the queue to the exchange (if bindQueue is true). Only applies to non-partitioned destinations. Only applies if requiredGroups are provided and then only to those groups.

Default: #.

bindQueue

Whether to declare the queue and bind it to the destination exchange. Set it to false if you have set up your own infrastructure and have previously created and bound the queue. Only applies if requiredGroups are provided and then only to those groups.

Default: true.

compress

Whether data should be compressed when sent.

Default: false.

confirmAckChannel

When errorChannelEnabled is true, a channel to which to send positive delivery acknowledgments (aka publisher confirms). If the channel does not exist, a DirectChannel is registered with this name. The connection factory must be configured to enable publisher confirms.

Default: nullChannel (acks are discarded).

deadLetterQueueName

The name of the DLQ Only applies if requiredGroups are provided and then only to those groups.

Default: prefix+destination.dlq

deadLetterExchange

A DLX to assign to the queue. Relevant only when autoBindDlq is true. Applies only when requiredGroups are provided and then only to those groups.

Default: 'prefix+DLX'

deadLetterExchangeType

The type of the DLX to assign to the queue. Relevant only if autoBindDlq is true. Applies only when requiredGroups are provided and then only to those groups.

Default: 'direct'

deadLetterRoutingKey

A dead letter routing key to assign to the queue. Relevant only when autoBindDlq is true. Applies only when requiredGroups are provided and then only to those groups.

Default: destination

declareDlx

Whether to declare the dead letter exchange for the destination. Relevant only if autoBindDlq is true. Set to false if you have a pre-configured DLX. Applies only when requiredGroups are provided and then only to those groups.

Default: true.

declareExchange

Whether to declare the exchange for the destination.

Default: true.

delayExpression

A SpEL expression to evaluate the delay to apply to the message (x-delay header). It has no effect if the exchange is not a delayed message exchange.

Default: No x-delay header is set.

delayedExchange

Whether to declare the exchange as a Delayed Message Exchange. Requires the delayed message exchange plugin on the broker. The x-delayed-typeargument is set to the exchangeType.

Default: false.

deliveryMode

The delivery mode.

Default: PERSISTENT.

dlqDeadLetterExchange

When a DLQ is declared, a DLX to assign to that queue. Applies only if requiredGroups are provided and then only to those groups.

Default: none

dlqDeadLetterRoutingKey

When a DLQ is declared, a dead letter routing key to assign to that queue. Applies only when requiredGroups are provided and then only to those groups.

Default: none

dlqExpires

How long (in milliseconds) before an unused dead letter queue is deleted. Applies only when requiredGroups are provided and then only to those groups.

Default: no expiration

dlqLazy
Declare the dead letter queue with the  x-queue-mode=lazy argument. See  Lazy Queues」. Consider using a policy instead of this setting, because using a policy allows changing the setting without deleting the queue. Applies only when  requiredGroups are provided and then only to those groups.
dlqMaxLength

Maximum number of messages in the dead letter queue. Applies only if requiredGroups are provided and then only to those groups.

Default: no limit

dlqMaxLengthBytes

Maximum number of total bytes in the dead letter queue from all messages. Applies only when requiredGroups are provided and then only to those groups.

Default: no limit

dlqMaxPriority

Maximum priority of messages in the dead letter queue (0-255) Applies only when requiredGroups are provided and then only to those groups.

Default: none

dlqTtl

Default time (in milliseconds) to live to apply to the dead letter queue when declared. Applies only when requiredGroups are provided and then only to those groups.

Default: no limit

exchangeAutoDelete

If declareExchange is true, whether the exchange should be auto-delete (it is removed after the last queue is removed).

Default: true.

exchangeDurable

If declareExchange is true, whether the exchange should be durable (survives broker restart).

Default: true.

exchangeType

The exchange type: directfanout or topic for non-partitioned destinations and direct or topic for partitioned destinations.

Default: topic.

expires

How long (in milliseconds) before an unused queue is deleted. Applies only when requiredGroups are provided and then only to those groups.

Default: no expiration

headerPatterns

Patterns for headers to be mapped to outbound messages.

Default: ['*'] (all headers).

lazy

Declare the queue with the x-queue-mode=lazy argument. See Lazy Queues」. Consider using a policy instead of this setting, because using a policy allows changing the setting without deleting the queue. Applies only when requiredGroups are provided and then only to those groups.

Default: false.

maxLength

Maximum number of messages in the queue. Applies only when requiredGroups are provided and then only to those groups.

Default: no limit

maxLengthBytes

Maximum number of total bytes in the queue from all messages. Only applies if requiredGroups are provided and then only to those groups.

Default: no limit

maxPriority

Maximum priority of messages in the queue (0-255). Only applies if requiredGroups are provided and then only to those groups.

Default: none

prefix

A prefix to be added to the name of the destination exchange.

Default: "".

queueNameGroupOnly

When true, consume from a queue with a name equal to the group. Otherwise the queue name is destination.group. This is useful, for example, when using Spring Cloud Stream to consume from an existing RabbitMQ queue. Applies only when requiredGroups are provided and then only to those groups.

Default: false.

routingKeyExpression

A SpEL expression to determine the routing key to use when publishing messages. For a fixed routing key, use a literal expression, such as routingKeyExpression='my.routingKey' in a properties file or routingKeyExpression: '''my.routingKey''' in a YAML file.

Default: destination or destination-<partition> for partitioned destinations.

transacted

Whether to use transacted channels.

Default: false.

ttl

Default time (in milliseconds) to live to apply to the queue when declared. Applies only when requiredGroups are provided and then only to those groups.

Default: no limit

[Note]

In the case of RabbitMQ, content type headers can be set by external applications. Spring Cloud Stream supports them as part of an extended internal protocol used for any type of transport — including transports, such as Kafka (prior to 0.11), that do not natively support headers.

 

 

4. Retry With the RabbitMQ Binder

5. Error Channels

6. Dead-Letter Queue Processing

7. Partitioning with the RabbitMQ Binder

是否將當前網頁翻譯成中文 
網頁翻譯
 
中英對照
 
相關文章
相關標籤/搜索