@EnableBinding(Sink.class) @SpringBootApplication public static class SinkApplication { ... @Bean public MessageConverter customMessageConverter() { return new MyCustomMessageConverter(); }
類型轉換Spring Cloud Stream提供的開箱即用以下表所示:「源有效載荷」是指轉換前的有效載荷,「目標有效載荷」是指轉換後的「有效載荷」。類型轉換能夠在「生產者」一側(輸出)或「消費者」一側(輸入)上進行。html
來源有效載荷 | 目標有效載荷 | content-type 標題(來源訊息) |
content-type 標題(轉換後) |
註釋 |
---|---|---|---|---|
POJOjava |
JSON Stringspring |
ignoredsql |
application/json數據庫 |
|
Tupleapache |
JSON Stringjson |
ignoredapi |
application/json服務器 |
JSON是爲Tuple量身定製的架構 |
POJO |
String (toString()) |
ignored |
text/plain, java.lang.String |
|
POJO |
byte[] (java.io serialized) |
ignored |
application/x-java-serialized-object |
|
JSON byte[] or String |
POJO |
application/json (or none) |
application/x-java-object |
|
byte[] or String |
Serializable |
application/x-java-serialized-object |
application/x-java-object |
|
JSON byte[] or String |
Tuple |
application/json (or none) |
application/x-spring-tuple |
|
byte[] |
String |
any |
text/plain, java.lang.String |
將應用在content-type頭中指定的任何Charset |
String |
byte[] |
any |
application/octet-stream |
將應用在content-type頭中指定的任何Charset |
注意
|
轉換適用於須要類型轉換的有效內容。例如,若是應用程序生成帶有outputType = application / json的XML字符串,則該有效載荷將不會從XML轉換爲JSON。這是由於發送到出站通道的有效載荷已是一個String,因此在運行時不會應用轉換。一樣重要的是要注意,當使用默認的序列化機制時,必須在發送和接收應用程序之間共享有效負載類,而且與二進制內容兼容。當應用程序代碼在兩個應用程序中獨立更改時,這可能會產生問題,由於二進制格式和代碼可能會變得不兼容。 |
提示
|
雖然入站和出站渠道都支持轉換,但特別推薦將其用於轉發出站郵件。對於入站郵件的轉換,特別是當目標是POJO時, |
除了支持開箱即用的轉換,Spring Cloud Stream還支持註冊您本身的郵件轉換實現。這容許您以各類自定義格式(包括二進制)發送和接收數據,並將其與特定的contentTypes
關聯。Spring Cloud Stream將全部類型爲org.springframework.messaging.converter.MessageConverter
的bean註冊爲自定義消息轉換器以及開箱即用消息轉換器。
若是您的消息轉換器須要使用特定的content-type
和目標類(用於輸入和輸出),則消息轉換器須要擴展org.springframework.messaging.converter.AbstractMessageConverter
。對於使用@StreamListener
的轉換,實現org.springframework.messaging.converter.MessageConverter
的消息轉換器就足夠了。
如下是在Spring Cloud Stream應用程序中建立消息轉換器bean(內容類型爲application/bar
)的示例:
@EnableBinding(Sink.class) @SpringBootApplication public static class SinkApplication { ... @Bean public MessageConverter customMessageConverter() { return new MyCustomMessageConverter(); }
public class MyCustomMessageConverter extends AbstractMessageConverter { public MyCustomMessageConverter() { super(new MimeType("application", "bar")); } @Override protected boolean supports(Class<?> clazz) { return (Bar.class == clazz); } @Override protected Object convertFromInternal(Message<?> message, Class<?> targetClass, Object conversionHint) { Object payload = message.getPayload(); return (payload instanceof Bar ? payload : new Bar((byte[]) payload)); } }
Spring Cloud Stream還爲基於Avro的轉換器和模式演進提供支持。詳情請參閱具體章節。
@StreamListener
和訊息轉換@StreamListener
註釋提供了一種方便的方式來轉換傳入的消息,而不須要指定輸入通道的內容類型。在使用@StreamListener
註釋的方法的調度過程當中,若是參數須要轉換,將自動應用轉換。
例如,讓咱們考慮一個帶有{"greeting":"Hello, world"}
的String內容的消息,而且在輸入通道上收到application/json
的application/json
標題。讓咱們考慮接收它的如下應用程序:
public class GreetingMessage { String greeting; public String getGreeting() { return greeting; } public void setGreeting(String greeting) { this.greeting = greeting; } } @EnableBinding(Sink.class) @EnableAutoConfiguration public static class GreetingSink { @StreamListener(Sink.INPUT) public void receive(Greeting greeting) { // handle Greeting } }
該方法的參數將自動填充包含JSON字符串的未編組形式的POJO。
Spring Cloud Stream經過其spring-cloud-stream-schema
模塊爲基於模式的消息轉換器提供支持。目前,基於模式的消息轉換器開箱即用的惟一序列化格式是Apache Avro,在未來的版本中能夠添加更多的格式。
spring-cloud-stream-schema
模塊包含可用於Apache Avro序列化的兩種類型的消息轉換器:
使用序列化/反序列化對象的類信息的轉換器,或者啓動時已知位置的模式;
轉換器使用模式註冊表 - 他們在運行時定位模式,以及隨着域對象的發展動態註冊新模式。
AvroSchemaMessageConverter
支持使用預約義模式或使用類中可用的模式信息(反射或包含在SpecificRecord
)中的序列化和反序列化消息。若是轉換的目標類型是GenericRecord
,則必須設置模式。
對於使用它,您能夠簡單地將其添加到應用程序上下文中,可選地指定一個或多個MimeTypes
將其關聯。默認MimeType
爲application/avro
。
如下是在註冊Apache Avro MessageConverter
的宿應用程序中進行配置的示例,而不須要預約義的模式:
@EnableBinding(Sink.class) @SpringBootApplication public static class SinkApplication { ... @Bean public MessageConverter userMessageConverter() { return new AvroSchemaMessageConverter(MimeType.valueOf("avro/bytes")); } }
相反,這裏是一個應用程序,註冊一個具備預約義模式的轉換器,能夠在類路徑中找到:
@EnableBinding(Sink.class) @SpringBootApplication public static class SinkApplication { ... @Bean public MessageConverter userMessageConverter() { AvroSchemaMessageConverter converter = new AvroSchemaMessageConverter(MimeType.valueOf("avro/bytes")); converter.setSchemaLocation(new ClassPathResource("schemas/User.avro")); return converter; } }
爲了瞭解模式註冊表客戶端轉換器,咱們將首先描述模式註冊表支持。
大多數序列化模型,特別是旨在跨不一樣平臺和語言進行可移植性的序列化模型,依賴於描述數據如何在二進制有效載荷中被序列化的模式。爲了序列化數據而後解釋它,發送方和接收方都必須訪問描述二進制格式的模式。在某些狀況下,能夠從序列化的有效載荷類型或從反序列化時的目標類型中推斷出模式,可是在許多狀況下,應用程序能夠從訪問描述二進制數據格式的顯式模式中受益。模式註冊表容許您以文本格式(一般爲JSON)存儲模式信息,並使該信息可訪問須要它的各類應用程序以二進制格式接收和發送數據。一個模式能夠做爲一個元組引用,它由
做爲模式的邏輯名稱的主題 ;
模式版本 ;
描述數據 的二進制格式的模式格式。
Spring Cloud Stream提供了模式註冊表服務器實現。爲了使用它,您能夠簡單地將spring-cloud-stream-schema-server
工件添加到項目中,並使用@EnableSchemaRegistryServer
註釋,將模式註冊表服務器REST控制器添加到應用程序中。此註釋旨在與Spring Boot Web應用程序一塊兒使用,服務器的監聽端口由server.port
設置控制。spring.cloud.stream.schema.server.path
設置可用於控制模式服務器的根路徑(特別是嵌入其餘應用程序時)。spring.cloud.stream.schema.server.allowSchemaDeletion
布爾設置能夠刪除模式。默認狀況下,這是禁用的。
模式註冊表服務器使用關係數據庫來存儲模式。默認狀況下,它使用一個嵌入式數據庫。您能夠使用Spring Boot SQL數據庫和JDBC配置選項自定義模式存儲。
啓用模式註冊表的Spring Boot應用程序以下所示:
@SpringBootApplication @EnableSchemaRegistryServer public class SchemaRegistryServerApplication { public static void main(String[] args) { SpringApplication.run(SchemaRegistryServerApplication.class, args); } }
Schema註冊服務器API由如下操做組成:
POST /
註冊一個新的架構
接受具備如下字段的JSON有效載荷:
subject
模式主題;
format
模式格式;
definition
模式定義。
響應是JSON格式的模式對象,包含如下字段:
id
模式標識;
subject
模式主題;
format
模式格式;
version
模式版本;
definition
模式定義。
GET /{subject}/{format}/{version}
根據其主題,格式和版本檢索現有模式。
響應是JSON格式的模式對象,包含如下字段:
id
模式標識;
subject
模式主題;
format
模式格式;
version
模式版本;
definition
模式定義。
GET /{subject}/{format}
根據其主題和格式檢索現有模式的列表。
響應是JSON格式的每一個模式對象的模式列表,包含如下字段:
id
模式標識;
subject
模式主題;
format
模式格式;
version
模式版本;
definition
模式定義。
GET /schemas/{id}
經過其id來檢索現有的模式。
響應是JSON格式的模式對象,包含如下字段:
id
模式標識;
subject
模式主題;
format
模式格式;
version
模式版本;
definition
模式定義。
DELETE /{subject}/{format}/{version}
按其主題,格式和版本刪除現有模式。
DELETE /schemas/{id}
按其ID刪除現有模式。
DELETE /{subject}
按其主題刪除現有模式。
注意
|
本說明僅適用於Spring Cloud Stream 1.1.0.RELEASE的用戶。Spring Cloud Stream 1.1.0.RELEASE使用表名 |