public interface SchemaRegistryClient { SchemaRegistrationResponse register(String subject, String format, String schema); String fetch(SchemaReference schemaReference); String fetch(Integer id); }
與模式註冊表服務器交互的客戶端抽象是SchemaRegistryClient
接口,具備如下結構:html
public interface SchemaRegistryClient { SchemaRegistrationResponse register(String subject, String format, String schema); String fetch(SchemaReference schemaReference); String fetch(Integer id); }
Spring Cloud Stream提供了開箱即用的實現,用於與其本身的模式服務器交互,以及與Confluent Schema註冊表進行交互。java
可使用@EnableSchemaRegistryClient
配置Spring Cloud Stream模式註冊表的客戶端,以下所示:spring
@EnableBinding(Sink.class) @SpringBootApplication @EnableSchemaRegistryClient public static class AvroSinkApplication { ... }
注意
|
優化了默認轉換器,以緩存來自遠程服務器的模式,並且還會很是昂貴的 |
Schema註冊表客戶端支持如下屬性:緩存
模式服務器的位置。在設置時使用完整的URL,包括協議(http
或https
),端口和上下文路徑。服務器
客戶端是否應緩存模式服務器響應。一般設置爲false
,由於緩存發生在消息轉換器中。使用模式註冊表客戶端的客戶端應將其設置爲true
。app
true
fetch
對於在應用程序上下文中註冊了SchemaRegistryClient
bean的Spring Boot應用程序,Spring Cloud Stream將自動配置使用模式註冊表客戶端進行模式管理的Apache Avro消息轉換器。這簡化了模式演進,由於接收消息的應用程序能夠輕鬆訪問可與本身的讀取器模式進行協調的寫入器模式。優化
對於出站郵件,若是頻道的內容類型設置爲application/*+avro
,MessageConverter
將被激活,例如:
spring.cloud.stream.bindings.output.contentType=application/*+avro
在出站轉換期間,消息轉換器將嘗試基於其類型推斷出站消息的模式,並使用SchemaRegistryClient
根據有效載荷類型將其註冊到主題。若是已經找到相同的模式,那麼將會檢索對它的引用。若是沒有,則將註冊模式並提供新的版本號。該消息將使用application/[prefix].[subject].v[version]+avro
的方案contentType
頭髮送,其中prefix
是可配置的,而且從有效載荷類型推導出subject
。
例如,類型爲User
的消息能夠做爲內容類型爲application/vnd.user.v2+avro
的二進制有效載荷發送,其中user
是主題,2
是版本號。
當接收到消息時,轉換器將從傳入消息的頭部推斷出模式引用,並嘗試檢索它。該模式將在反序列化過程當中用做寫入器模式。
若是您已經過設置spring.cloud.stream.bindings.output.contentType=application/*+avro
啓用基於Avro的模式註冊表客戶端,則可使用如下屬性自定義註冊的行爲。
若是您但願轉換器使用反射從POJO推斷Schema,則啓用。
false
Avro經過查看編寫器模式(源有效載荷)和讀取器模式(應用程序有效負載)來比較模式版本,查看Avro文檔以獲取更多信息。若是設置,這將覆蓋模式服務器上的任何查找,並將本地模式用做讀取器模式。
null
使用Schema服務器註冊此屬性中列出的任何.avsc
文件。
empty
要在Content-Type頭上使用的前綴。
vnd
爲了更好地瞭解Spring Cloud Stream註冊和解決新模式以及其使用Avro模式比較功能,咱們將提供兩個單獨的子部分:一個用於註冊,一個用於解析模式。
註冊過程的第一部分是從經過信道發送的有效載荷中提取模式。Avro類型,如SpecificRecord
或GenericRecord
已經包含一個模式,能夠從實例中當即檢索。在POJO的狀況下,若是屬性spring.cloud.stream.schema.avro.dynamicSchemaGenerationEnabled
設置爲true
(默認),則會推斷出一個模式。
一旦得到了架構,轉換器就會從遠程服務器加載其元數據(版本)。首先,它查詢本地緩存,若是沒有找到它,則將數據提交到將使用版本控制信息回覆的服務器。轉換器將始終緩存結果,以免爲每一個須要序列化的新消息查詢Schema服務器的開銷。
使用模式版本信息,轉換器設置消息的contentType
頭,以攜帶版本信息,如application/vnd.user.v1+avro
當讀取包含版本信息的消息(即,具備上述方案的contentType
標頭)時,轉換器將查詢Schema服務器以獲取消息的寫入器架構。一旦找到傳入消息的正確架構,它就會檢索讀取器架構,並使用Avro的架構解析支持將其讀入讀取器定義(設置默認值和缺乏的屬性)。
注意
|
瞭解編寫器架構(寫入消息的應用程序)和讀取器架構(接收應用程序)之間的區別很重要。請花點時間閱讀Avro術語並瞭解此過程。Spring Cloud Stream將始終提取writer模式以肯定如何讀取消息。若是您想要Avro的架構演進支持工做,您須要確保爲您的應用程序正確設置了readerSchema。 |