![模式閱讀](http://static.javashuo.com/static/loading.gif)
注意
|
瞭解編寫器架構(寫入消息的應用程序)和讀取器架構(接收應用程序)之間的區別很重要。請花點時間閱讀Avro術語並瞭解此過程。Spring Cloud Stream將始終提取writer模式以肯定如何讀取消息。若是您想要Avro的架構演進支持工做,您須要確保爲您的應用程序正確設置了readerSchema。html |
雖然Spring Cloud Stream使我的Spring Boot應用程序輕鬆鏈接到消息傳遞系統,可是Spring Cloud Stream的典型場景是建立多應用程序管道,其中微服務應用程序將數據發送給彼此。您能夠經過將相鄰應用程序的輸入和輸出目標相關聯來實現此場景。java
假設設計要求時間源應用程序將數據發送到日誌接收應用程序,則能夠在兩個應用程序中使用名爲ticktock
的公共目標進行綁定。web
時間來源(具備頻道名稱output
)將設置如下屬性:spring
spring.cloud.stream.bindings.output.destination=ticktock
日誌接收器(通道名稱爲input
)將設置如下屬性:apache
spring.cloud.stream.bindings.input.destination=ticktock
當擴展Spring Cloud Stream應用程序時,每一個實例均可以接收有關同一個應用程序的其餘實例數量以及本身的實例索引的信息。Spring Cloud Stream經過spring.cloud.stream.instanceCount
和spring.cloud.stream.instanceIndex
屬性執行此操做。例如,若是HDFS宿應用程序有三個實例,則全部三個實例將spring.cloud.stream.instanceCount
設置爲3
,而且各個應用程序將spring.cloud.stream.instanceIndex
設置爲0
,1
和2
。架構
當經過Spring Cloud數據流部署Spring Cloud Stream應用程序時,這些屬性將自動配置; 當Spring Cloud Stream應用程序獨立啓動時,必須正確設置這些屬性。默認狀況下,spring.cloud.stream.instanceCount
爲1
,spring.cloud.stream.instanceIndex
爲0
。app
在放大的狀況下,這兩個屬性的正確配置對於解決分區行爲(見下文)通常很重要,而且某些綁定器(例如,Kafka binder)老是須要這兩個屬性,以確保該數據在多個消費者實例之間正確分割。微服務
輸出綁定被配置爲經過設置其惟一的一個partitionKeyExpression
或partitionKeyExtractorClass
屬性以及其partitionCount
屬性來發送分區數據。例如,如下是一個有效和典型的配置:測試
spring.cloud.stream.bindings.output.producer.partitionKeyExpression=payload.id spring.cloud.stream.bindings.output.producer.partitionCount=5
基於上述示例配置,使用如下邏輯將數據發送到目標分區。spa
基於partitionKeyExpression
,爲發送到分區輸出通道的每一個消息計算分區密鑰的值。partitionKeyExpression
是一個Spel表達式,它根據出站消息進行評估,以提取分區鍵。
若是SpEL表達式不足以知足您的須要,您能夠經過將屬性partitionKeyExtractorClass
設置爲實現org.springframework.cloud.stream.binder.PartitionKeyExtractorStrategy
接口的類來計算分區鍵值。雖然Spel表達式一般足夠,但更復雜的狀況可能會使用自定義實現策略。在這種狀況下,屬性「partitionKeyExtractorClass」能夠設置以下:
spring.cloud.stream.bindings.output.producer.partitionKeyExtractorClass=com.example.MyKeyExtractor spring.cloud.stream.bindings.output.producer.partitionCount=5
一旦計算了消息密鑰,分區選擇過程將肯定目標分區爲0
和partitionCount - 1
之間的值。在大多數狀況下,默認計算基於公式key.hashCode() % partitionCount
。這能夠經過設置要針對'key'(經過partitionSelectorExpression
屬性)進行評估的Spel表達式或經過設置org.springframework.cloud.stream.binder.PartitionSelectorStrategy
實現(經過partitionSelectorClass
屬性))進行自定義。
「partitionSelectorExpression」和「partitionSelectorClass」的綁定級屬性能夠相似於上述示例中指定的「partitionKeyExpression」和「partitionKeyExtractorClass」屬性的類型。能夠爲更高級的場景配置其餘屬性,如如下部分所述。
PartitionKeyExtractorClass
實現在上面的示例中,MyKeyExtractor
之類的自定義策略由Spring Cloud Stream直接實例化。在某些狀況下,必須將這樣的自定義策略實現建立爲Spring bean,以便可以由Spring管理,以便它能夠執行依賴注入,屬性綁定等。能夠經過將其配置爲應用程序上下文中的@Bean,並使用徹底限定類名做爲bean的名稱,如如下示例所示。
@Bean(name="com.example.MyKeyExtractor") public MyKeyExtractor extractor() { return new MyKeyExtractor(); }
做爲Spring bean,自定義策略從Spring bean的完整生命週期中受益。例如,若是實現須要直接訪問應用程序上下文,則能夠實現「ApplicationContextAware」。
輸入綁定(通道名稱爲input
)被配置爲經過在應用程序自己設置其partitioned
屬性以及instanceIndex
和instanceCount
屬性來接收分區數據,如如下示例:
spring.cloud.stream.bindings.input.consumer.partitioned=true spring.cloud.stream.instanceIndex=3 spring.cloud.stream.instanceCount=5
instanceCount
值表示數據須要分區的應用程序實例的總數,instanceIndex
必須是0
和instanceCount - 1
之間的多個實例的惟一值。實例索引幫助每一個應用程序實例識別從其接收數據的惟一分區(或者在Kafka的分區集合的狀況下)。重要的是正確設置兩個值,以確保全部數據都被使用,而且應用程序實例接收到互斥數據集。
雖然使用多個實例進行分區數據處理的場景可能會在獨立狀況下進行復雜化,可是經過將輸入和輸出值正確填充並依賴於運行時基礎架構,Spring Cloud數據流能夠顯着簡化流程。提供有關實例索引和實例計數的信息。
Spring Cloud Stream支持測試您的微服務應用程序,而無需鏈接到消息系統。您可使用spring-cloud-stream-test-support
庫提供的TestSupportBinder
,能夠將其做爲測試依賴項添加到應用程序中:
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream-test-support</artifactId> <scope>test</scope> </dependency>
注意
|
|
TestSupportBinder
容許用戶與綁定的頻道進行交互,並檢查應用程序發送和接收的消息
對於出站消息通道,TestSupportBinder
註冊單個訂戶,並將應用程序發送的消息保留在MessageCollector
中。它們能夠在測試過程當中被檢索,並對它們作出斷言。
用戶還能夠將消息發送到入站消息通道,以便消費者應用程序可使用消息。如下示例顯示瞭如何在處理器上測試輸入和輸出通道。
@RunWith(SpringRunner.class) @SpringBootTest(webEnvironment= SpringBootTest.WebEnvironment.RANDOM_PORT) public class ExampleTest { @Autowired private Processor processor; @Autowired private MessageCollector messageCollector; @Test @SuppressWarnings("unchecked") public void testWiring() { Message<String> message = new GenericMessage<>("hello"); processor.input().send(message); Message<String> received = (Message<String>) messageCollector.forChannel(processor.output()).poll(); assertThat(received.getPayload(), equalTo("hello world")); } @SpringBootApplication @EnableBinding(Processor.class) public static class MyProcessor { @Autowired private Processor channels; @Transformer(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT) public String transform(String in) { return in + " world"; } } }
在上面的示例中,咱們正在建立一個具備輸入和輸出通道的應用程序,經過Processor
接口綁定。綁定的接口被注入測試,因此咱們能夠訪問這兩個通道。咱們正在輸入頻道發送消息,咱們使用Spring Cloud Stream測試支持提供的MessageCollector
來捕獲消息已經被髮送到輸出通道。收到消息後,咱們能夠驗證組件是否正常工做。
Spring Cloud Stream爲粘合劑提供健康指標。它以binders
的名義註冊,能夠經過設置management.health.binders.enabled
屬性啓用或禁用。