Spring Cloud Steam內容簡介:java
@SpringBootApplication @EnableBinding(Sink.class) public class VoteRecordingSinkApplication { public static void main(String[] args) { SpringApplication.run(VoteRecordingSinkApplication.class, args); } @StreamListener(Sink.INPUT) public void processVote(Vote vote) { votingService.recordVote(vote); } }
接口聲明輸入和輸出通道。Spring Cloud Stream提供了Source、Sink和Processor接口。json
public interface Sink { String INPUT = "input"; @Input(Sink.INPUT) SubscribableChannel input(); }
Spring Cloud Stream爲您建立了接口的實現。您能夠經過自動裝配來使用它,以下面的示例所示(來自測試用例):
@RunWith(SpringJUnit4ClassRunner.class) @SpringApplicationConfiguration(classes = VoteRecordingSinkApplication.class) @WebAppConfiguration @DirtiesContext public class StreamApplicationTests { @Autowired private Sink sink; @Test public void contextLoads() { assertNotNull(this.sink.input()); } }
Spring Cloud Stream經過一些術語和抽象來簡化了消息驅動程序的編寫。
Spring Cloud Stream的核心與中間件實現無關。
Spring Cloud Stream提供了kafka、RabbitMQ對應的Binder實現、也包含一個TestSupportBinder用於測試,也能夠編寫本身的Binder
Spring Cloud Stream使用Spring Boot配置以及Binder抽象,使之能夠靈活地配置如何鏈接到一個消息中間件。例如,能夠在部署時設置鏈接到哪一種類型,或者哪一個消息中間件。
這些配置能夠經過外部設置或者任何Spring Boot支持的配置方式(如程序參數、環境變量、yml文件、properties文件)進行
Spring Cloud Stream會根據classpath下的依賴自動選擇binder,你也能夠包含多個binder,在運行時決定使用哪一個,甚至根據不一樣的通道使用不一樣的binder
應用程序之間的通訊遵循發佈-訂閱模型,其中數據經過共享主題廣播。在下圖中能夠看到這一點,它顯示了一組交互的Spring Cloud Stream應用程序的典型部署。
Spring Cloud Stream發佈訂閱模式
從這個destination(目的地)開始,有2個微服務訂閱了raw-sensor-data這個主題,一個負責計算窗口平均值,一個將原始數據導入HDFS(Hadoop Distributed File System)。
雖然發佈-訂閱消息傳遞的概念並不新鮮,Spring Cloud Stream採起了額外的步驟,使其成爲其應用程序模型的一個opinionated choice。經過使用本地中間件支持,Spring Cloud Stream還簡化了跨不一樣平臺的發佈-訂閱模型的使用。
Spring Cloud Stream經過消費者組的概念對這種行爲進行建模。(Spring Cloud Stream消費者組與卡夫卡消費者組類似並受到其啓發。)
默認狀況下,當未指定組時,Spring Cloud Stream會把應用程序放到一個匿名的、獨立的、只有一個成員的消費者組中,而後和其它消費者組放在一塊兒維護。
2.0前,只支持異步消費者;A message is delivered as soon as it is available and a thread is available to process it.
與Spring Cloud Stream程序模型一致,消費者組是持久化的。
一般,在將應用程序綁定到給定目標時,最好始終使用消費者組。在擴展Spring Cloud Stream應用程序時,必須爲每一個輸入綁定指定一個使用者組。這樣作能夠防止應用程序的實例接收重複的消息(除非須要這種行爲,這是不尋常的)。
Spring Cloud Stream支持在給定應用程序的多個實例之間劃分數據。在分區場景中,物理通訊介質(例如the broker topic)被視爲被構形成多個分區。
Spring Cloud Stream爲統一實現分區處理提供了一個公共抽象。所以,不管代理自己是不是天然分區的(例如Kafka),均可以使用分區(例如RabbitMQ)。
Spring Cloud Stream分區
Destination Binders是Spring Cloud Stream的擴展組件,爲實現與外部消息系統集成,提供必要的配置和實現。這種集成負責消息與生產者和使用者之間的鏈接、委託和路由、數據類型轉換、用戶代碼的調用等等。
如前所述,Destination Bindings在鏈接外部消息隊列系統和提供信息生產者或消費者的應用程序的橋樑。
下面的示例顯示了一個徹底配置和功能良好的Spring Cloud Stream應用程序,該應用程序以字符串類型從輸入目標接收消息的有效負載(請參見第9章內容類型協商部分),將其記錄到控制檯,並在將其轉換爲大寫後發送到輸出目的地。
@SpringBootApplication @EnableBinding(Processor.class) public class MyApplication { public static void main(String[] args) { SpringApplication.run(MyApplication.class, args); } @StreamListener(Processor.INPUT) @SendTo(Processor.OUTPUT) public String handle(String value) { System.out.println("Received: " + value); return value.toUpperCase(); } }
public interface Sink { String INPUT = "input"; @Input(Sink.INPUT) SubscribableChannel input(); } public interface Source { String OUTPUT = "output"; @Output(Source.OUTPUT) MessageChannel output(); } public interface Processor extends Source, Sink {}
public interface Barista { @Input SubscribableChannel orders(); @Output MessageChannel hotDrinks(); @Output MessageChannel coldDrinks(); }
@EnableBinding(value = { Orders.class, Payment.class })
在SpringCloudStream中,可用的通道綁定組件包括:Spring Messaging的Message Channel(用於出站)及其擴展,Subscribable Channel(用於入站)。
Pollable Destination Binding
從版本2.0開始,您如今能夠綁定pollable consumer:
下面的示例演示如何綁定pollable consumer:
public interface PolledBarista { @Input PollableMessageSource orders(); . . . }
在這種狀況下,一個PollableMessageSource實現綁定到訂單通道。See Section 6.3.5, 「Using Polled Consumers」 for more details.
public interface Barista { @Input("inboundOrders") SubscribableChannel orders(); }
Normally, you need not access individual channels or bindings directly (other then configuring them via @EnableBinding
annotation). However there may be times, such as testing or other corner cases, when you do.
Aside from generating channels for each binding and registering them as Spring beans, for each bound interface, Spring Cloud Stream generates a bean that implements the interface. That means you can have access to the interfaces representing the bindings or individual channels by auto-wiring either in your application, as shown in the following two examples:
除了爲每一個綁定生成通道並將它們註冊爲Spring Bean以外,Spring Cloud Stream爲每一個綁定接口生成一個實現該接口的bean。這意味着您能夠經過應用程序中的自動裝配表示綁定或單個通道的接口,如如下兩個示例所示:
@Autowire private Source source public void sayHello(String name) { source.output().send(MessageBuilder.withPayload(name).build()); }
@Autowire private MessageChannel output; public void sayHello(String name) { output.send(MessageBuilder.withPayload(name).build()); }
@Autowire @Qualifier("myChannel") private MessageChannel output;
Spring Cloud Stream的基礎:企業集成模式定義的概念和模式
Spring Cloud Stream的內部實現:依賴於Spring Integration框架
因此Stream支持Spring Integration已經創建的基礎、語義和配置選項
@EnableBinding(Source.class) public class TimerSource { @Bean @InboundChannelAdapter(value = Source.OUTPUT, poller = @Poller(fixedDelay = "10", maxMessagesPerPoll = "1")) public MessageSource<String> timerMessageSource() { return () -> new GenericMessage<>("Hello Spring Cloud Stream"); } }
相似:能夠經過@Transformer或@ServiceActivator,提供一個對來自Processor binding的消息的處理實現:
@EnableBinding(Processor.class) public class TransformProcessor { @Transformer(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT) public Object transform(String message) { return message.toUpperCase(); } }
使用Spring Cloud Stream中的@StreamListener註釋綁定同一個綁定時,使用的是發佈訂閱模型,因此每個使用了@StreamListener註釋的方法,都會接收到一份消息;
而使用Spring Integration中的註解 (such as @Aggregator, @Transformer, or @ServiceActivator)時,使用的是競爭模型,只會有一個消費者獲得消息;並且,不會爲每個消費者建立單獨的使用者組。
做爲對Spring Integration框架的補充,SpringCloudStream提供了本身的@StreamListener註解,該註釋借鑑了其餘Spring消息註解(@Messagemap、@JmsListener、@RabbitListener等),並提供了方便性,如基於內容的路由等。
@EnableBinding(Sink.class) public class VoteHandler { @Autowired VotingService votingService; @StreamListener(Sink.INPUT) public void handle(Vote vote) { votingService.record(vote); } }
@EnableBinding(Processor.class) public class TransformProcessor { @Autowired VotingService votingService; @StreamListener(Processor.INPUT) @SendTo(Processor.OUTPUT) public VoteResult handle(Vote vote) { return votingService.record(vote); } }
Spring Cloud Stream支持根據conditions向多個帶有@StreamListener註釋的處理程序方法分發消息。
在具備調度條件的@StreamListener的下面示例中,全部帶有值bogey的header type的消息都被分派到RecveBogey方法,而帶有值Bacall的header type的全部消息都被分派到ReceiveBacall方法。
@EnableBinding(Sink.class) @EnableAutoConfiguration public static class TestPojoWithAnnotatedArguments { @StreamListener(target = Sink.INPUT, condition = "headers['type']=='bogey'") public void receiveBogey(@Payload BogeyPojo bogeyPojo) { // handle the message } @StreamListener(target = Sink.INPUT, condition = "headers['type']=='bacall'") public void receiveBacall(@Payload BacallPojo bacallPojo) { // handle the message } }
Content Type Negotiation in the Context of condition
It is important to understand some of the mechanics behind content-based routing using the condition
argument of @StreamListener
, especially in the context of the type of the message as a whole. It may also help if you familiarize yourself with the Chapter 9, Content Type Negotiation before you proceed.
Consider the following scenario:
@EnableBinding(Sink.class) @EnableAutoConfiguration public static class CatsAndDogs { @StreamListener(target = Sink.INPUT, condition = "payload.class.simpleName=='Dog'") public void bark(Dog dog) { // handle the message } @StreamListener(target = Sink.INPUT, condition = "payload.class.simpleName=='Cat'") public void purr(Cat cat) { // handle the message } }
所以,除非您使用計算原始數據的Spel表達式(例如,字節數組中第一個字節的值),不然使用基於消息頭的表達式(例如,condition = "headers['type']=='dog'")。
At the moment, dispatching through @StreamListener
conditions is supported only for channel-based binders (not for reactive programming) support.
自SpringCloudStreamv2.1以來,定義stream handlers and sources的另外一個替代方法是使用對 Spring Cloud Function 的內置支持,其中能夠將它們表示爲beans of type java.util.function.[Supplier/Function/Consumer]
To specify which functional bean to bind to the external destination(s) exposed by the bindings, you must provide spring.cloud.stream.function.definition
Here is the example of the Processor application exposing message handler as java.util.function.Function
@SpringBootApplication @EnableBinding(Processor.class) public class MyFunctionBootApp { public static void main(String[] args) { SpringApplication.run(MyFunctionBootApp.class, "--spring.cloud.stream.function.definition=toUpperCase"); } @Bean public Function<String, String> toUpperCase() { return s -> s.toUpperCase(); } }
In the above you we simply define a bean of type java.util.function.Function
called toUpperCase and identify it as a bean to be used as message handler whose 'input' and 'output' must be bound to the external destinations exposed by the Processor binding.
Below are the examples of simple functional applications to support Source, Processor and Sink.
Here is the example of a Source application defined as java.util.function.Supplier
在上面,咱們只定義一個java.util.Function.Function類型的bean,函數名稱爲toUpperCase,並將其標識爲用做消息處理器,其「輸入」和「輸出」必須綁定到Processor binding暴露的外部目的地。
@SpringBootApplication @EnableBinding(Source.class) public static class SourceFromSupplier { public static void main(String[] args) { SpringApplication.run(SourceFromSupplier.class, "--spring.cloud.stream.function.definition=date"); } @Bean public Supplier<Date> date() { return () -> new Date(12345L); } }
Here is the example of a Processor application defined as java.util.function.Function
@SpringBootApplication @EnableBinding(Processor.class) public static class ProcessorFromFunction { public static void main(String[] args) { SpringApplication.run(ProcessorFromFunction.class, "--spring.cloud.stream.function.definition=toUpperCase"); } @Bean public Function<String, String> toUpperCase() { return s -> s.toUpperCase(); } }
Here is the example of a Sink application defined as java.util.function.Consumer
@EnableAutoConfiguration @EnableBinding(Sink.class) public static class SinkFromConsumer { public static void main(String[] args) { SpringApplication.run(SinkFromConsumer.class, "--spring.cloud.stream.function.definition=sink"); } @Bean public Consumer<String> sink() { return System.out::println; } }
@Bean public Function<String, String> wrapInQuotes() { return s -> "\"" + s + "\""; }
and modify the spring.cloud.stream.function.definition
property to reflect your intention to compose a new function from both ‘toUpperCase’ and ‘wrapInQuotes’. To do that Spring Cloud Function allows you to use |
(pipe) symbol. So to finish our example our property will now look like this:
A producer is any component that sends messages to a channel. The channel can be bound to an external message broker with a Binder
implementation for that broker. When invoking the bindProducer()
method, the first parameter is the name of the destination within the broker, the second parameter is the local channel instance to which the producer sends messages, and the third parameter contains properties (such as a partition key expression) to be used within the adapter that is created for that channel.
channel :使用一個對應消息代理的Binder實現綁定到一個外部消息代理。
當調用bindProducer()方法時,第一個參數是代理中destination 的名稱,第二個參數是生產者發送消息的local channel instance,第三個參數包含要在爲該通道建立的適配器中使用的屬性(例如分區鍵表達式)。
A consumer is any component that receives messages from a channel. As with a producer, the consumer’s channel can be bound to an external message broker. When invoking the bindConsumer()
method, the first parameter is the destination name, and a second parameter provides the name of a logical group of consumers. Each group that is represented by consumer bindings for a given destination receives a copy of each message that a producer sends to that destination (that is, it follows normal publish-subscribe semantics). If there are multiple consumer instances bound with the same group name, then messages are load-balanced across those consumer instances so that each message sent by a producer is consumed by only a single consumer instance within each group (that is, it follows normal queueing semantics).
當調用bindConsumer()方法時,第一個參數是destination 名稱,第二個參數提供使用者組的邏輯名稱。由給定目標的使用者綁定表示的每一個組接收生產者發送到該目標的每一個消息的副本(也就是說,它遵循正常的發佈-訂閱語義)。若是有多個以同一個組名綁定的使用者實例,那麼這些使用者實例之間的消息是負載均衡的,這樣由生產者發送的每條消息只被每一個組中的單個使用者實例所使用(也就是說,它遵循正常的排隊語義)。
public interface Binder<T, C extends ConsumerProperties, P extends ProducerProperties> { Binding<T> bindConsumer(String name, String group, T inboundBindTarget, C consumerProperties); Binding<T> bindProducer(String name, T outboundBindTarget, P producerProperties); }
The interface is parameterized, offering a number of extension points:
is supported, but this is intended to be used as an extension point in the future.A typical binder implementation consists of the following:
class that creates a bean of type Binder
along with the middleware connection infrastructure.A META-INF/spring.binders
file found on the classpath containing one or more binder definitions, as shown in the following example:
SpringCloudStream依賴於BinderSPI的實現來執行將channels 鏈接到消息代理的任務。每一個Binder實現一般鏈接到一種類型的消息傳遞系統。
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream-binder-rabbit</artifactId> </dependency>
Similar files exist for the other provided binder implementations (such as Kafka), and custom binder implementations are expected to provide them as well. The key represents an identifying name for the binder implementation, whereas the value is a comma-separated list of configuration classes that each contain one and only one bean definition of type org.springframework.cloud.stream.binder.Binder
Binder selection can either be performed globally, using the spring.cloud.stream.defaultBinder
property (for example, spring.cloud.stream.defaultBinder=rabbit
) or individually, by configuring the binder on each channel binding. For instance, a processor application (that has channels named input
and output
for read and write respectively) that reads from Kafka and writes to RabbitMQ can specify the following configuration:
The binder type. It typically references one of the binders found on the classpath — in particular, a key in a META-INF/spring.binders
By default, it has the same value as the configuration name.
Whether the configuration inherits the environment of the application itself.
Default: true
Root for a set of properties that can be used to customize the environment of the binder. When this property is set, the context in which the binder is being created is not a child of the application context. This setting allows for complete separation between the binder components and the application components.
Default: empty
Whether the binder configuration is a candidate for being considered a default binder or can be used only when explicitly referenced. This setting allows adding binder configurations without interfering with the default processing.
Default: true
一些binders有額外的binding 屬性支持特定中間件的特性。
These properties are exposed via org.springframework.cloud.stream.config.BindingServiceProperties
The number of deployed instances of an application. Must be set for partitioning on the producer side. Must be set on the consumer side when using RabbitMQ and with Kafka if autoRebalanceEnabled=false
Default: 1
instanceCount - 1
. Used for partitioning with RabbitMQ and with Kafka if
. Automatically set in Cloud Foundry to match the application’s instance index.
A list of destinations that can be bound dynamically (for example, in a dynamic routing scenario). If set, only listed destinations can be bound.
Default: empty (letting any destination be bound).
The default binder to use, if multiple binders are configured. See Multiple Binders on the Classpath.
Default: empty.
This property is only applicable when the cloud
profile is active and Spring Cloud Connectors are provided with the application. If the property is false
(the default), the binder detects a suitable bound service (for example, a RabbitMQ service bound in Cloud Foundry for the RabbitMQ binder) and uses it for creating connections (usually through Spring Cloud Connectors). When set to true
, this property instructs binders to completely ignore the bound services and rely on Spring Boot properties (for example, relying on the spring.rabbitmq.*
properties provided in the environment for the RabbitMQ binder). The typical usage of this property is to be nested in a customized environment when connecting to multiple systems.
Default: false
The interval (in seconds) between retrying binding creation when, for example, the binder does not support late binding and the broker (for example, Apache Kafka) is down. Set it to zero to treat such conditions as fatal, preventing the application from starting.
Default: 30
Binding properties are supplied by using the format of spring.cloud.stream.bindings.<channelName>.<property>=<value>
. The <channelName>
represents the name of the channel being configured (for example, output
for a Source
To avoid repetition, Spring Cloud Stream supports setting values for all channels, in the format of spring.cloud.stream.default.<property>=<value>
When it comes to avoiding repetitions for extended binding properties, this format should be used - spring.cloud.stream.<binder-type>.default.<producer|consumer>.<property>=<value>
In what follows, we indicate where we have omitted the spring.cloud.stream.bindings.<channelName>.
prefix and focus just on the property name, with the understanding that the prefix ise included at runtime.
These properties are exposed via org.springframework.cloud.stream.config.BindingProperties
The following binding properties are available for both input and output bindings and must be prefixed with spring.cloud.stream.bindings.<channelName>.
(for example, spring.cloud.stream.bindings.input.destination=ticktock
Default values can be set by using the spring.cloud.stream.default
prefix (for example`spring.cloud.stream.default.contentType=application/json`).
values. If not set, the channel name is used instead. The default value of this property cannot be overridden.
The consumer group of the channel. Applies only to inbound bindings. See Consumer Groups.
Default: null
(indicating an anonymous consumer).
The content type of the channel. See 「Chapter 9, Content Type Negotiation」.
Default: application/json
The binder used by this binding. See 「Section 7.4, 「Multiple Binders on the Classpath」」 for details.
Default: null
(the default binder is used, if it exists).
These properties are exposed via org.springframework.cloud.stream.binder.ConsumerProperties
The following binding properties are available for input bindings only and must be prefixed with spring.cloud.stream.bindings.<channelName>.consumer.
(for example, spring.cloud.stream.bindings.input.consumer.concurrency=3
Default values can be set by using the spring.cloud.stream.default.consumer
prefix (for example, spring.cloud.stream.default.consumer.headerMode=none
The concurrency of the inbound consumer.
Default: 1
Whether the consumer receives data from a partitioned producer.
Default: false
When set to none
, disables header parsing on input. Effective only for messaging middleware that does not support message headers natively and requires header embedding. This option is useful when consuming data from non-Spring Cloud Stream applications when native headers are not supported. When set to headers
, it uses the middleware’s native header mechanism. When set to embeddedHeaders
, it embeds headers into the message payload.
Default: depends on the binder implementation.
If processing fails, the number of attempts to process the message (including the first). Set to 1
to disable retry.
Default: 3
The backoff initial interval on retry.
Default: 1000
The maximum backoff interval.
Default: 10000
The backoff multiplier.
Default: 2.0
Whether exceptions thrown by the listener that are not listed in the retryableExceptions
are retryable.
Default: true
When set to a value greater than equal to zero, it allows customizing the instance index of this consumer (if different from spring.cloud.stream.instanceIndex
). When set to a negative value, it defaults to spring.cloud.stream.instanceIndex
. See 「Section 11.2, 「Instance Index and Instance Count」」 for more information.
Default: -1
When set to a value greater than equal to zero, it allows customizing the instance count of this consumer (if different from spring.cloud.stream.instanceCount
). When set to a negative value, it defaults to spring.cloud.stream.instanceCount
. See 「Section 11.2, 「Instance Index and Instance Count」」 for more information.
Default: -1
A map of Throwable class names in the key and a boolean in the value. Specify those exceptions (and subclasses) that will or won’t be retried. Also see defaultRetriable
. Example: spring.cloud.stream.bindings.input.consumer.retryable-exceptions.java.lang.IllegalStateException=false
Default: empty.
When set to true
, the inbound message is deserialized directly by the client library, which must be configured correspondingly (for example, setting an appropriate Kafka producer value deserializer). When this configuration is being used, the inbound message unmarshalling is not based on the contentType
of the binding. When native decoding is used, it is the responsibility of the producer to use an appropriate encoder (for example, the Kafka producer value serializer) to serialize the outbound message. Also, when native encoding and decoding is used, the headerMode=embeddedHeaders
property is ignored and headers are not embedded in the message. See the producer property useNativeEncoding
Default: false
These properties are exposed via org.springframework.cloud.stream.binder.ProducerProperties
The following binding properties are available for output bindings only and must be prefixed with spring.cloud.stream.bindings.<channelName>.producer.
(for example, spring.cloud.stream.bindings.input.producer.partitionKeyExpression=payload.id
Default values can be set by using the prefix spring.cloud.stream.default.producer
(for example, spring.cloud.stream.default.producer.partitionKeyExpression=payload.id
A SpEL expression that determines how to partition outbound data. If set, or if partitionKeyExtractorClass
is set, outbound data on this channel is partitioned. partitionCount
must be set to a value greater than 1 to be effective. Mutually exclusive with partitionKeyExtractorClass
. See 「Section 5.6, 「Partitioning Support」」.
Default: null.
A PartitionKeyExtractorStrategy
implementation. If set, or if partitionKeyExpression
is set, outbound data on this channel is partitioned. partitionCount
must be set to a value greater than 1 to be effective. Mutually exclusive with partitionKeyExpression
. See 「Section 5.6, 「Partitioning Support」」.
Default: null
A PartitionSelectorStrategy
implementation. Mutually exclusive with partitionSelectorExpression
. If neither is set, the partition is selected as the hashCode(key) % partitionCount
, where key
is computed through either partitionKeyExpression
or partitionKeyExtractorClass
Default: null
A SpEL expression for customizing partition selection. Mutually exclusive with partitionSelectorClass
. If neither is set, the partition is selected as the hashCode(key) % partitionCount
, where key
is computed through either partitionKeyExpression
or partitionKeyExtractorClass
Default: null
The number of target partitions for the data, if partitioning is enabled. Must be set to a value greater than 1 if the producer is partitioned. On Kafka, it is interpreted as a hint. The larger of this and the partition count of the target topic is used instead.
Default: 1
When set to none
, it disables header embedding on output. It is effective only for messaging middleware that does not support message headers natively and requires header embedding. This option is useful when producing data for non-Spring Cloud Stream applications when native headers are not supported. When set to headers
, it uses the middleware’s native header mechanism. When set to embeddedHeaders
, it embeds headers into the message payload.
Default: Depends on the binder implementation.
When set to true
, the outbound message is serialized directly by the client library, which must be configured correspondingly (for example, setting an appropriate Kafka producer value serializer). When this configuration is being used, the outbound message marshalling is not based on the contentType
of the binding. When native encoding is used, it is the responsibility of the consumer to use an appropriate decoder (for example, the Kafka consumer value de-serializer) to deserialize the inbound message. Also, when native encoding and decoding is used, the headerMode=embeddedHeaders
property is ignored and headers are not embedded in the message. See the consumer property useNativeDecoding
Default: false
When set to true
, if the binder supports asynchroous send results, send failures are sent to an error channel for the destination. See Section 6.4, 「Error Handling」
for more information.
Default: false
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream-binder-rabbit</artifactId> </dependency>
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency>
默認狀況下,RabbitMQ Binder實現將每一個destination映射到Topic交換器(Exchange)。
對於每一個consumer group,一個Queue
每一個consumer 實例都有相應的RabbitMQ consumer 實例鏈接到該consumer group對應的隊列(queue)。
爲了分區生產者和使用者,隊列以分區索引做爲後綴,並使用分區索引做爲路由鍵(routing key)。
對於匿名consumer (那些沒有組屬性的用戶),使用自動刪除隊列(具備隨機惟一名稱)。
此外,republishToDlq會致使binder 將失敗的消息發佈到DLQ(而不是拒絕它)。
若是流監聽器拋出一個ImmediateAcKnowamqpException,則會繞過DLQ並簡單地丟棄消息。從Version 2.1開始,無論rePublishToDlq的設置如何,都會這樣執行;之前,則只有在rePublishToDlq爲false時纔是如此。
當在SpringCloudStream應用程序中使用多個RabbitMQ Binder時,必定要禁用「RabbitAutoConfiguration」,以免將RabbitAutoConfiguration中相同的配置應用到這幾個Binder中。你可使用@SpringBootApplication註釋來排除掉這個類。
目前,只有消息驅動的消費 才支持多工使用者(一個偵聽多個隊列的使用者)polled consumers只能從單個隊列中檢索消息。
對於通用的設置,請參考Spring Cloud Stream Core的文檔
默認狀況下,RabbitMQ Binder使用SpringBoot的ConnectionFactory。所以,它支持全部SpringBoot配置選項(有關參考,請參閱SpringBoot文檔)。RabbitMQ配置選項使用Spring.rabbitmq前綴。
Only needed if you use a RabbitMQ cluster and wish to consume from the node that hosts the queue. See Queue Affinity and the LocalizedQueueConnectionFactory for more information.
Only needed if you use a RabbitMQ cluster and wish to consume from the node that hosts the queue. See Queue Affinity and the LocalizedQueueConnectionFactory for more information.
Default: 1
A connection name prefix used to name the connection(s) created by this binder. The name is this prefix followed by #n
, where n
increments each time a new connection is opened.
Default: none (Spring AMQP default).
如下屬性僅適用於 Rabbit consumers,必須以spring.cloud.stream.rabbit.bindings.<channelName>.consumer..做爲前綴
Default: AUTO
Default: false
The routing key with which to bind the queue to the exchange (if bindQueue
is true
). For partitioned destinations, -<instanceIndex>
is appended.
將queue 綁定到Exchange使用的路由鍵(若是bindQueue爲true)。爲了給destinations分區,附加-<instanceIndex>。
Default: #
是否聲明queue 並將其綁定到目標exchange。若是您已經設置了本身的基礎結構,而且已經建立並綁定了隊列,則將其設置爲false。
Default: true
用於建立consumer 標記;將由#n追加,每建立一個consumer 則自增1。
The name of the DLQ
Default: prefix+destination.dlq
A DLX to assign to the queue. Relevant only if autoBindDlq
is true
Default: 'prefix+DLX'
The type of the DLX to assign to the queue. Relevant only if autoBindDlq
is true
Default: 'direct'
A dead letter routing key to assign to the queue. Relevant only if autoBindDlq
is true
Default: destination
Whether to declare the dead letter exchange for the destination. Relevant only if autoBindDlq
is true
. Set to false
if you have a pre-configured DLX.
Default: true
Whether to declare the exchange for the destination.
Default: true
Whether to declare the exchange as a Delayed Message Exchange
. Requires the delayed message exchange plugin on the broker. The x-delayed-type
argument is set to the exchangeType
是否將exchange 定義爲Delayed Message Exchange。Requires the delayed message exchange plugin on the broker. The x-delayed-type
argument is set to the exchangeType
Default: false
Default: none
Default: none
Default: no expiration
Declare the dead letter queue with the x-queue-mode=lazy
argument. See 「Lazy Queues」. Consider using a policy instead of this setting, because using a policy allows changing the setting without deleting the queue.
使用 x-queue-mode=lazy
參數聲明死信隊列。See 「Lazy Queues」. 考慮使用策略而不是此設置,由於使用策略能夠在不刪除隊列的狀況下更改設置。
Default: false
Maximum number of messages in the dead letter queue.
Default: no limit
Maximum number of total bytes in the dead letter queue from all messages.
Default: no limit
Maximum priority of messages in the dead letter queue (0-255).
Default: none
Action to take when dlqMaxLength
or dlqMaxLengthBytes
is exceeded; currently drop-head
or reject-publish
but refer to the RabbitMQ documentation.
當超過dlqMaxLength或dlqMaxLengthBytes時要採起的操做;當前是 drop-head
or reject-publish
Default: none
Default time to live to apply to the dead letter queue when declared (in milliseconds).
Default: no limit
Whether the subscription should be durable. Only effective if group
is also set.
Default: true
If declareExchange
is true, whether the exchange should be auto-deleted (that is, removed after the last queue is removed).
若是declareExchange爲true,這裏設置是否應自動刪除exchange (最後一個隊列被刪除後,這個exchange會別刪除)。
Default: true
If declareExchange
is true, whether the exchange should be durable (that is, it survives broker restart).
若是declareExchange爲true,則該exchange 是否應是持久的(也就是說,幾時重啓代理,他仍然存在)。
Default: true
The exchange type: direct
, fanout
or topic
for non-partitioned destinations and direct
or topic
for partitioned destinations.
Default: topic
Whether to create an exclusive consumer. Concurrency should be 1 when this is true
. Often used when strict ordering is required but enabling a hot standby instance to take over after a failure. See recoveryInterval
, which controls how often a standby instance attempts to consume.
是否建立獨佔消費者。若是爲true,那麼併發性應該是1。 Often used when strict ordering is required but enabling a hot standby instance to take over after a failure. See recoveryInterval
, which controls how often a standby instance attempts to consume.
Default: false
How long before an unused queue is deleted (in milliseconds).
Default: no expiration
The interval (in milliseconds) between attempts to consume from a queue if it is missing.
Default: 5000
The number of bytes to reserve for other headers when adding the stack trace to a DLQ message header. All headers must fit within the frame_max
size configured on the broker. Stack traces can be large; if the size plus this property exceeds frame_max
then the stack trace will be truncated. A WARN log will be written; consider increasing the frame_max
or reducing the stack trace by catching the exception and throwing one with a smaller stack trace.
Default: 20000
Patterns for headers to be mapped from inbound messages.
Default: ['*']
(all headers).
Declare the queue with the x-queue-mode=lazy
argument. See 「Lazy Queues」. Consider using a policy instead of this setting, because using a policy allows changing the setting without deleting the queue.
使用x-queue-mode=lazy參數聲明隊列。 See 「Lazy Queues」. 」。考慮使用策略而不是此設置,由於使用策略能夠在不刪除隊列的狀況下更改設置。
Default: false
The maximum number of consumers.
Default: 1
The maximum number of messages in the queue.
Default: no limit
The maximum number of total bytes in the queue from all messages.
Default: no limit
The maximum priority of messages in the queue (0-255).
Default: none
When the queue cannot be found, whether to treat the condition as fatal and stop the listener container. Defaults to false
so that the container keeps trying to consume from the queue — for example, when using a cluster and the node hosting a non-HA queue is down.
Default: false
Action to take when maxLength
or maxLengthBytes
is exceeded; currently drop-head
or reject-publish
but refer to the RabbitMQ documentation.
or reject-publish
Default: none
Prefetch count.
Default: 1
A prefix to be added to the name of the destination
and queues.
Default: "".
The number of times to retry consuming from a queue if it is missing. Relevant only when missingQueuesFatal
is true
. Otherwise, the container keeps retrying indefinitely.
Default: 3
When true, consume from a queue with a name equal to the group
. Otherwise the queue name is destination.group
. This is useful, for example, when using Spring Cloud Stream to consume from an existing RabbitMQ queue.
Default: false.
The interval between connection recovery attempts, in milliseconds.
Default: 5000
Whether delivery failures should be re-queued when retry is disabled or republishToDlq
is false
Default: false
When republishToDlq
is true
, specifies the delivery mode of the republished message.
Default: DeliveryMode.PERSISTENT
By default, messages that fail after retries are exhausted are rejected. If a dead-letter queue (DLQ) is configured, RabbitMQ routes the failed message (unchanged) to the DLQ. If set to true
, the binder republishs failed messages to the DLQ with additional headers, including the exception message and stack trace from the cause of the final failure. Also see the frameMaxHeadroom property.
Default: false
Whether to use transacted channels.
Default: false
Default time to live to apply to the queue when declared (in milliseconds).
Default: no limit
The number of deliveries between acks.
Default: 1
To set listener container properties that are not exposed as binder or binding properties, add a single bean of type ListenerContainerCustomizer
to the application context. The binder and binding properties will be set and then the customizer will be called. The customizer (configure()
method) is provided with the queue name as well as the consumer group as arguments.
如下屬性僅適用於Rabbit producers,必須以spring.cloud.stream.rabbit.bindings.<channelName>.producer..做爲前綴
Whether to automatically declare the DLQ and bind it to the binder DLX.
Default: false
Whether to enable message batching by producers. Messages are batched into one message according to the following properties (described in the next three entries in this list): 'batchSize', batchBufferLimit
, and batchTimeout
. See Batching for more information.
Default: false
The number of messages to buffer when batching is enabled.
Default: 100
The maximum buffer size when batching is enabled.
Default: 10000
The batch timeout when batching is enabled.
Default: 5000
The routing key with which to bind the queue to the exchange (if bindQueue
is true
). Only applies to non-partitioned destinations. Only applies if requiredGroups
are provided and then only to those groups.
Default: #
Whether to declare the queue and bind it to the destination exchange. Set it to false
if you have set up your own infrastructure and have previously created and bound the queue. Only applies if requiredGroups
are provided and then only to those groups.
Default: true
Whether data should be compressed when sent.
Default: false
When errorChannelEnabled
is true, a channel to which to send positive delivery acknowledgments (aka publisher confirms). If the channel does not exist, a DirectChannel
is registered with this name. The connection factory must be configured to enable publisher confirms.
Default: nullChannel
(acks are discarded).
The name of the DLQ Only applies if requiredGroups
are provided and then only to those groups.
Default: prefix+destination.dlq
A DLX to assign to the queue. Relevant only when autoBindDlq
is true
. Applies only when requiredGroups
are provided and then only to those groups.
Default: 'prefix+DLX'
The type of the DLX to assign to the queue. Relevant only if autoBindDlq
is true
. Applies only when requiredGroups
are provided and then only to those groups.
Default: 'direct'
A dead letter routing key to assign to the queue. Relevant only when autoBindDlq
is true
. Applies only when requiredGroups
are provided and then only to those groups.
Default: destination
Whether to declare the dead letter exchange for the destination. Relevant only if autoBindDlq
is true
. Set to false
if you have a pre-configured DLX. Applies only when requiredGroups
are provided and then only to those groups.
Default: true
Whether to declare the exchange for the destination.
Default: true
A SpEL expression to evaluate the delay to apply to the message (x-delay
header). It has no effect if the exchange is not a delayed message exchange.
Default: No x-delay
header is set.
Whether to declare the exchange as a Delayed Message Exchange
. Requires the delayed message exchange plugin on the broker. The x-delayed-type
argument is set to the exchangeType
Default: false
The delivery mode.
When a DLQ is declared, a DLX to assign to that queue. Applies only if requiredGroups
are provided and then only to those groups.
Default: none
When a DLQ is declared, a dead letter routing key to assign to that queue. Applies only when requiredGroups
are provided and then only to those groups.
Default: none
How long (in milliseconds) before an unused dead letter queue is deleted. Applies only when requiredGroups
are provided and then only to those groups.
Default: no expiration
argument. See
「Lazy Queues」. Consider using a policy instead of this setting, because using a policy allows changing the setting without deleting the queue. Applies only when
are provided and then only to those groups.
Maximum number of messages in the dead letter queue. Applies only if requiredGroups
are provided and then only to those groups.
Default: no limit
Maximum number of total bytes in the dead letter queue from all messages. Applies only when requiredGroups
are provided and then only to those groups.
Default: no limit
Maximum priority of messages in the dead letter queue (0-255) Applies only when requiredGroups
are provided and then only to those groups.
Default: none
Default time (in milliseconds) to live to apply to the dead letter queue when declared. Applies only when requiredGroups
are provided and then only to those groups.
Default: no limit
If declareExchange
is true
, whether the exchange should be auto-delete (it is removed after the last queue is removed).
Default: true
If declareExchange
is true
, whether the exchange should be durable (survives broker restart).
Default: true
The exchange type: direct
, fanout
or topic
for non-partitioned destinations and direct
or topic
for partitioned destinations.
Default: topic
How long (in milliseconds) before an unused queue is deleted. Applies only when requiredGroups
are provided and then only to those groups.
Default: no expiration
Patterns for headers to be mapped to outbound messages.
Default: ['*']
(all headers).
Declare the queue with the x-queue-mode=lazy
argument. See 「Lazy Queues」. Consider using a policy instead of this setting, because using a policy allows changing the setting without deleting the queue. Applies only when requiredGroups
are provided and then only to those groups.
Default: false
Maximum number of messages in the queue. Applies only when requiredGroups
are provided and then only to those groups.
Default: no limit
Maximum number of total bytes in the queue from all messages. Only applies if requiredGroups
are provided and then only to those groups.
Default: no limit
Maximum priority of messages in the queue (0-255). Only applies if requiredGroups
are provided and then only to those groups.
Default: none
A prefix to be added to the name of the destination
Default: "".
When true
, consume from a queue with a name equal to the group
. Otherwise the queue name is destination.group
. This is useful, for example, when using Spring Cloud Stream to consume from an existing RabbitMQ queue. Applies only when requiredGroups
are provided and then only to those groups.
Default: false.
A SpEL expression to determine the routing key to use when publishing messages. For a fixed routing key, use a literal expression, such as routingKeyExpression='my.routingKey'
in a properties file or routingKeyExpression: '''my.routingKey'''
in a YAML file.
Default: destination
or destination-<partition>
for partitioned destinations.
Whether to use transacted channels.
Default: false
Default time (in milliseconds) to live to apply to the queue when declared. Applies only when requiredGroups
are provided and then only to those groups.
Default: no limit
In the case of RabbitMQ, content type headers can be set by external applications. Spring Cloud Stream supports them as part of an extended internal protocol used for any type of transport — including transports, such as Kafka (prior to 0.11), that do not natively support headers. |