閱讀文章,但願能解決如下問題:java
序列化主要是用來解決數據在網絡中傳輸的問題. 在網絡中傳輸的數據必須全是字節,也稱爲字節流. 而文本數據到字節數據的這一步就是序列化(將非字節數據 -> 字節數組).數組
Kafka中的序列化主要是將發送的消息序列化成字節數組. 在Java中,有八大基本數據類型和引用類型. Kafka預先內置了一些相應的序列化和反序列化網絡
Java類型 | 序列化 | 反序列化 |
---|---|---|
int | IntegerSerializer | IntegerDeserializer |
long | LongSerializer | LongDeserializer |
double | DoubleSerializer | DoubleDeserializer |
byte | BytesSerializer | BytesDeserializer |
byte | ByteArraySerializer | ByteArrayDeserializer |
byte | ByteBufferSerializer | ByteBufferDeserializer |
String | StringSerializer | StringDeserializer |
經過上面表格能夠看出,Kafka並非爲全部的基本類型內置了對應的序列化器和反序列化器. 並且Kafka爲對byte提供方便,內置了三個不一樣的序列化器和反序列化器. 同時,Kafka爲一個引用類型-String,提供了序列化器和反序列化器,由於String太經常使用了.ide
// StringSerializer序列化代碼 public class StringSerializer implements Serializer<String> { private String encoding = "UTF8"; @Override public byte[] serialize(String topic, String data) { try { if (data == null) return null; else return data.getBytes(encoding); } catch (UnsupportedEncodingException e) { throw new SerializationException("Error when serializing string to byte[] due to unsupported encoding " + encoding); } } }
從代碼能夠看出,默認狀況下會把字符串編碼成UTF-8格式,而後在網絡中傳輸.工具
Kafka自帶的序列化器並不能知足全部的需求,假如我有一個用戶對象,裏面包含用戶姓名,用戶年齡... 可是Kafka中沒有提供相對應的序列化器,須要本身實現一個. 實現一個序列化器很簡單,只須要實現一個接口.性能
public interface Serializer<T> extends Closeable { // 配置該類 void configure(Map<String, ?> configs, boolean isKey); // 將數據轉變爲字節數組 byte[] serialize(String topic, T data); // 默認方法 default byte[] serialize(String topic, Headers headers, T data) { return serialize(topic, data); } // 關閉序列化器 @Override void close(); }
接下來,本身實現一個序列化器. 下面序列化器是商店顧客序列化器. 這裏採用硬編碼的方式,將該對象序列化成字節數組.編碼
public class CustomerSerializer implements Serializer<Customer> { @Override public byte[] serialize(String topic, Customer data) { try { byte[] serializedName; int stringSize; if (data == null) { return null; } else { if (data.getName() != null) { serializedName = data.getName().getBytes(StandardCharsets.UTF_8); stringSize = serializedName.length; } else { serializedName = new byte[0]; stringSize = 0; } } final ByteBuffer buffer = ByteBuffer.allocate(4 + 4 + stringSize); buffer.putInt(data.getCustomerId()); buffer.putInt(stringSize); buffer.put(serializedName); return buffer.array(); } catch (Exception e) { throw new SerializationException("Error when serializing Customer to byte[] " + e); } } }
自定義序列化器的劣勢:code
用JSON,ProtoBuf,Protostuff,Thrift...實現經過的序列化工具.對象
public class JsonSerializer implements Serializer<Customer> { private final Logger log = LoggerFactory.getLogger(JsonSerializer.class); @Override public byte[] serialize(String topic, Customer data) { byte[] result = null; try { // 關鍵代碼,把對象序列化爲字節數組. result = JSON.toJSONBytes(data); log.info("{} is serialize after the size is {}", data, result.length); } catch (Exception e) { e.printStackTrace(); } return result; } }
序列化只是用來將非字節數據變爲字節數組,最終實現數據在網絡傳輸的目的. 然而想要經過序列化提高傳輸的性能(例如把序列化後的字節變少)是比較難實現的. 由於最終的字節數組要在消費端反序列化,所以消費者須要和生產者約定好(例如 1 表明 K, 2 表明 A ...).接口