Kafka消息序列化

Kafka消息序列化

閱讀文章,但願能解決如下問題:java

  • 序列化主要解決的問題
  • 不一樣的序列化對消息大小的影響
  • 能夠用序列化來解決消息太大的問題嗎

歸納

序列化主要是用來解決數據在網絡中傳輸的問題. 在網絡中傳輸的數據必須全是字節,也稱爲字節流. 而文本數據到字節數據的這一步就是序列化(將非字節數據 -> 字節數組).數組

Kafka中序列化

Kafka中的序列化主要是將發送的消息序列化成字節數組. 在Java中,有八大基本數據類型和引用類型. Kafka預先內置了一些相應的序列化和反序列化網絡

Java類型 序列化 反序列化
int IntegerSerializer IntegerDeserializer
long LongSerializer LongDeserializer
double DoubleSerializer DoubleDeserializer
byte BytesSerializer BytesDeserializer
byte ByteArraySerializer ByteArrayDeserializer
byte ByteBufferSerializer ByteBufferDeserializer
String StringSerializer StringDeserializer

經過上面表格能夠看出,Kafka並非爲全部的基本類型內置了對應的序列化器和反序列化器. 並且Kafka爲對byte提供方便,內置了三個不一樣的序列化器和反序列化器. 同時,Kafka爲一個引用類型-String,提供了序列化器和反序列化器,由於String太經常使用了.ide

// StringSerializer序列化代碼
public class StringSerializer implements Serializer<String> {
    private String encoding = "UTF8";

    @Override
    public byte[] serialize(String topic, String data) {
        try {
            if (data == null)
                return null;
            else
                return data.getBytes(encoding);
        } catch (UnsupportedEncodingException e) {
            throw new SerializationException("Error when serializing string to byte[] due to unsupported encoding " + encoding);
        }
    }
}

從代碼能夠看出,默認狀況下會把字符串編碼成UTF-8格式,而後在網絡中傳輸.工具

自定義序列化器

Kafka自帶的序列化器並不能知足全部的需求,假如我有一個用戶對象,裏面包含用戶姓名,用戶年齡... 可是Kafka中沒有提供相對應的序列化器,須要本身實現一個. 實現一個序列化器很簡單,只須要實現一個接口.性能

public interface Serializer<T> extends Closeable {

    // 配置該類
    void configure(Map<String, ?> configs, boolean isKey);

    // 將數據轉變爲字節數組
    byte[] serialize(String topic, T data);

    // 默認方法
    default byte[] serialize(String topic, Headers headers, T data) {
        return serialize(topic, data);
    }

    // 關閉序列化器
    @Override
    void close();
}

接下來,本身實現一個序列化器. 下面序列化器是商店顧客序列化器. 這裏採用硬編碼的方式,將該對象序列化成字節數組.編碼

public class CustomerSerializer implements Serializer<Customer> {
    @Override
    public byte[] serialize(String topic, Customer data) {
        try {
            byte[] serializedName;
            int stringSize;
            if (data == null) {
                return null;
            } else {
                if (data.getName() != null) {
                    serializedName = data.getName().getBytes(StandardCharsets.UTF_8);
                    stringSize = serializedName.length;
                } else {
                    serializedName = new byte[0];
                    stringSize = 0;
                }
            }

            final ByteBuffer buffer = ByteBuffer.allocate(4 + 4 + stringSize);
            buffer.putInt(data.getCustomerId());
            buffer.putInt(stringSize);
            buffer.put(serializedName);
            return buffer.array();
        } catch (Exception e) {
            throw new SerializationException("Error when serializing Customer to byte[] " + e);
        }
    }
}

自定義序列化器的劣勢:code

  • 須要考慮向前兼容和向後兼容的問題,假如更新的反序列化可否對之前的消息進行支持.
  • 須要將序列化和反序列化成匹配的出現

用第三方jar包實現自定義序列化

用JSON,ProtoBuf,Protostuff,Thrift...實現經過的序列化工具.對象

public class JsonSerializer implements Serializer<Customer> {

    private final Logger log = LoggerFactory.getLogger(JsonSerializer.class);

    @Override
    public byte[] serialize(String topic, Customer data) {
        byte[] result = null;
        try {
            // 關鍵代碼,把對象序列化爲字節數組.
            result = JSON.toJSONBytes(data);
            log.info("{} is serialize after the size is {}", data, result.length);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return result;
    }
}

總結

序列化只是用來將非字節數據變爲字節數組,最終實現數據在網絡傳輸的目的. 然而想要經過序列化提高傳輸的性能(例如把序列化後的字節變少)是比較難實現的. 由於最終的字節數組要在消費端反序列化,所以消費者須要和生產者約定好(例如 1 表明 K, 2 表明 A ...).接口

相關文章
相關標籤/搜索