都在用 Kafka ! 消息隊列序列化怎麼處理?

生產者須要用序列化器(Serializer)把對象轉換成字節數組才能經過網絡發送給Kafka。而在對側,消費者須要用反序列化器(Deserializer)把從 Kafka 中收到的字節數組轉換成相應的對象。apache

 

 

先參考下面代碼實現一個簡單的客戶端。數組

image.png

爲了方便,消息的 key 和 value 都使用了字符串,對應程序中的序列化器也使用了客戶端自帶的 org.apache.kafka.common.serialization.StringSerializer,除了用於 String 類型的序列化器,還有 ByteArray、ByteBuffer、Bytes、Double、Integer、Long 這幾種類型,它們都實現了 org.apache.kafka.common.serialization.Serializer 接口,此接口有3個方法:網絡

 

configure() 方法用來配置當前類,serialize() 方法用來執行序列化操做。而 close() 方法用來關閉當前的序列化器,通常狀況下 close() 是一個空方法,若是實現了此方法,則必須確保此方法的冪等性,由於這個方法極可能會被 KafkaProducer 調用屢次。工具

 

生產者使用的序列化器和消費者使用的反序列化器是須要一一對應的,若是生產者使用了某種序列化器,好比 StringSerializer,而消費者使用了另外一種序列化器,好比 IntegerSerializer,那麼是沒法解析出想要的數據的編碼

 

下面就以 StringSerializer 爲例來看看 Serializer 接口中的3個方法的使用方法,StringSerializer 類的具體實現如代碼spa

首先是 configure() 方法,這個方法是在建立 KafkaProducer 實例的時候調用的,主要用來肯定編碼類型,不過通常客戶端對於 key.serializer.encoding、value.serializer. encoding 和 serializer.encoding 這幾個參數都不會配置,在 KafkaProducer 的參數集合(ProducerConfig)裏也沒有這幾個參數(它們能夠看做用戶自定義的參數),因此通常狀況下 encoding 的值就爲默認的「UTF-8」。serialize() 方法很是直觀,就是將 String 類型轉爲 byte[] 類型。3d

 

若是 Kafka 客戶端提供的幾種序列化器都沒法知足應用需求,則能夠選擇使用如 Avro、JSON、Thrift、ProtoBuf 和 Protostuff 等通用的序列化工具來實現,或者使用自定義類型的序列化器來實現。下面就以一個簡單的例子來介紹自定義類型的使用方法對象

 

假設咱們要發送的消息都是 Company 對象,這個 Company 的定義很簡單,只有名稱 name 和地址 address,示例代碼參考以下blog

 

下面咱們再來看一下 Company 對應的序列化器 CompanySerializer,示例代碼如代碼接口

 

如何使用自定義的序列化器 CompanySerializer 呢?只需將 KafkaProducer 的 value.serializer 參數設置爲 CompanySerializer 類的全限定名便可。假如咱們要發送一個 Company 對象到 Kafka,關鍵代碼如代碼

 

注意,示例中消息的 key 對應的序列化器仍是 StringSerializer,這個並無改動。其實 key.serializer 和 value.serializer 並無太大的區別

原文地址

相關文章
相關標籤/搜索