須要理解kafka壓縮則須要理解Kafka的存儲格式.java
RecordBatchapp
baseOffset: int64 batchLength: int32 partitionLeaderEpoch: int32 magic: int8 (current magic value is 2) crc: int32 attributes: int16 bit 0~2: 0: no compression 1: gzip 2: snappy 3: lz4 4: zstd bit 3: timestampType bit 4: isTransactional (0 means not transactional) bit 5: isControlBatch (0 means not a control batch) bit 6~15: unused lastOffsetDelta: int32 firstTimestamp: int64 maxTimestamp: int64 producerId: int64 producerEpoch: int16 baseSequence: int32 records: [Record]
Recordspa
length: varint attributes: int8 bit 0~7: unused timestampDelta: varint offsetDelta: varint keyLength: varint key: byte[] valueLen: varint value: byte[] Headers => [Header]
Record Header.net
headerKeyLength: varint headerKey: String headerValueLength: varint Value: byte[]
Note: 圖片來源.推薦閱讀該文章來更好的瞭解Kafka消息格式演變過程.code
0000 0000 0000 0000 0000 0040 0000 0000 02e3 0171 9400 0000 0000 0000 0001 6ad9 0153 7e00 0001 6ad9 0153 7eff ffff ffff ffff ffff ffff ffff ff00 0000 011c 0000 0006 6b65 790a 7661 6c75 6500 0000 0000 0000 0001 0000 0054 0000 0000 02e5 cb48 0600 0100 0000 0000 0001 6ad9 5427 af00 0001 6ad9 5427 afff ffff ffff ffff ffff ffff ffff ff00 0000 011f 8b08 0000 0000 0000 0093 6160 6060 cb4e ade4 2a4b cc29 4d65 0000 55dc 0454 0f00 0000
76B =======================Header============================ 0000 0000 0000 0000 => first offset => 0 0000 0040 => length => 64 0000 0000 => partition leader epoch => 0 02 => magic => 2 e3 0171 94 => crc32 => 3808522644 00 00 => attributes => 0 00 0000 00 => last offset delta => 0 00 0001 6ad9 0153 7e => first timestamp => 1558418903934 00 0001 6ad9 0153 7e => max timestamp => 1558418903934 ff ffff ffff ffff ff => producer id => -1 ff ff => producer epoch => -1 ff ffff ff => first sequence => -1 00 0000 01 => record count => 1 =======================Records=========================================== 1c => length(變長) => 14 00 => arrtibutes => 棄用 00 => timestamp delta(變長) => 0 00 => offset delta(變長) => 0 06 => key length(變長) => 3 6b65 79 => key => "key" 0a => value length(變長) => 5 7661 6c75 65 => value => "value" 00 => headers counts(變長) => 0
開啓消息壓縮blog
0000 0000 0000 0000 0000 0040 0000 0000 02e3 0171 9400 0000 0000 0000 0001 6ad9 0153 7e00 0001 6ad9 0153 7eff ffff ffff ffff ffff ffff ffff ff00 0000 011c 0000 0006 6b65 790a 7661 6c75 6500 ================上面是76B未壓縮的數據===================== 0000 0000 0000 0001 first offset 0000 0054 length 0000 0000 partition leader epoch 02 magic e5 cb48 06 crc32 00 01 attributes 00 0000 00 last offset delta 00 0001 6ad9 5427 af first timestamp 00 0001 6ad9 5427 af max timestamp ff ffff ffff ffff ff producer id ff ff producer epoch ff ffff ff first sequence 00 0000 01 record count 1f 8b08 0000 0000 0000 0093 6160 6060 cb4e ade4 2a4b cc29 4d65 0000 55dc 0454 0f00 0000
消息壓縮只是針對records部分.圖片