RecordBatch是包含許多record的集合。RecordBatch繼承了Iterable<Record>接口,提供了遍歷Record的方法.java
AbstractRecordBatch繼承了RecordBatch,而且實現瞭如下的方法數據結構
abstract class AbstractRecordBatch implements RecordBatch { @Override public boolean hasProducerId() { return RecordBatch.NO_PRODUCER_ID < producerId(); } // 計算下一個offset @Override public long nextOffset() { return lastOffset() + 1; } // 是否RecordBatch數據被壓縮過 @Override public boolean isCompressed() { return compressionType() != CompressionType.NONE; } }
DefaultRecordBatch實現了RecordBatch接口。ide
public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRecordBatch { @Override public Iterator<Record> iterator() { // count()方法返回record的數量 if (count() == 0) return Collections.emptyIterator(); // 返回數據是否被壓縮 if (!isCompressed()) // 沒有壓縮,返回未壓縮的Iterator return uncompressedIterator(); // 若是被壓縮,根據壓縮類型,返回對應的Iterator try (CloseableIterator<Record> iterator = compressedIterator(BufferSupplier.NO_CACHING)) { List<Record> records = new ArrayList<>(count()); while (iterator.hasNext()) records.add(iterator.next()); return records.iterator(); } }
RecordIterator定義在DefaultRecordBatch裏面,它負責Record遍歷this
private abstract class RecordIterator implements CloseableIterator<Record> { private final Long logAppendTime; private final long baseOffset; private final long baseTimestamp; private final int baseSequence; private final int numRecords; // 已有的Record的數量 private int readRecords = 0; // 已經讀取Record的個數 public RecordIterator() { this.logAppendTime = timestampType() == TimestampType.LOG_APPEND_TIME ? maxTimestamp() : null; // 獲取RecordBatch的baseOffset this.baseOffset = baseOffset(); // 獲取RecordBatch的baseTimestamp this.baseTimestamp = baseTimestamp(); // 獲取RecordBatch的baseSequence this.baseSequence = baseSequence(); // 獲取RecordBatch的records數量 int numRecords = count(); if (numRecords < 0) throw new InvalidRecordException("Found invalid record count " + numRecords + " in magic v" + magic() + " batch"); this.numRecords = numRecords; } @Override public boolean hasNext() { // 若是已經讀取的數量,少於總數,則表示還有未讀完的 return readRecords < numRecords; } @Override public Record next() { if (readRecords >= numRecords) throw new NoSuchElementException(); // 更新readRecords readRecords++; // 讀取下一個Record Record rec = readNext(baseOffset, baseTimestamp, baseSequence, logAppendTime); if (readRecords == numRecords) { // 當已經讀完最後一個Record時,檢查是否buffer數據已經讀完 if (!ensureNoneRemaining()) throw new InvalidRecordException("Incorrect declared batch size, records still remaining in file"); } return rec; } // 讀取下一個Record protected abstract Record readNext(long baseOffset, long baseTimestamp, int baseSequence, Long logAppendTime); // 是否buffer已經讀取完 protected abstract boolean ensureNoneRemaining(); @Override public void remove() { throw new UnsupportedOperationException(); } }
private CloseableIterator<Record> uncompressedIterator() { // 複製buffer final ByteBuffer buffer = this.buffer.duplicate(); // 調到Records位置 buffer.position(RECORDS_OFFSET); return new RecordIterator() { @Override protected Record readNext(long baseOffset, long baseTimestamp, int baseSequence, Long logAppendTime) { try { // 從buffer中讀取數據,實例化DefaultRecord return DefaultRecord.readFrom(buffer, baseOffset, baseTimestamp, baseSequence, logAppendTime); } catch (BufferUnderflowException e) { throw new InvalidRecordException("Incorrect declared batch size, premature EOF reached"); } } @Override protected boolean ensureNoneRemaining() { // buffer沒有未讀取的數據 return !buffer.hasRemaining(); } @Override public void close() {} }; }
CompressionType表示Records數據的壓縮類型,有內置的NONE,GZIP,SNAPPY,LZ4四種code
public enum CompressionType { NONE(0, "none", 1.0f) { @Override public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVersion) { return buffer; } @Override public InputStream wrapForInput(ByteBuffer buffer, byte messageVersion, BufferSupplier decompressionBufferSupplier) { return new ByteBufferInputStream(buffer); } }, GZIP(1, "gzip", 1.0f) { @Override public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVersion) { try { // 返回支持gzip的OutputStream // GZIPOutputStream has a default buffer size of 512 bytes, which is too small return new GZIPOutputStream(buffer, 8 * 1024); } catch (Exception e) { throw new KafkaException(e); } } @Override public InputStream wrapForInput(ByteBuffer buffer, byte messageVersion, BufferSupplier decompressionBufferSupplier) { try { // 返回支持gzip的InputStream return new GZIPInputStream(new ByteBufferInputStream(buffer)); } catch (Exception e) { throw new KafkaException(e); } } } ...... // 根據id,返回指定類型的CompressionType public static CompressionType forId(int id) { switch (id) { case 0: return NONE; case 1: return GZIP; case 2: return SNAPPY; case 3: return LZ4; default: throw new IllegalArgumentException("Unknown compression type id: " + id); } } }
只有Records部分被壓縮,前面的字段是沒有被壓縮繼承
private CloseableIterator<Record> compressedIterator(BufferSupplier bufferSupplier) { final ByteBuffer buffer = this.buffer.duplicate(); buffer.position(RECORDS_OFFSET); // compressionType()返回壓縮類型,wrapForInput返回裝飾過的DataInputStream final DataInputStream inputStream = new DataInputStream(compressionType().wrapForInput(buffer, magic(), bufferSupplier)); return new RecordIterator() { @Override protected Record readNext(long baseOffset, long baseTimestamp, int baseSequence, Long logAppendTime) { try { // 從inputStream中讀取數據,實例化DefaultRecord return DefaultRecord.readFrom(inputStream, baseOffset, baseTimestamp, baseSequence, logAppendTime); } catch (EOFException e) { throw new InvalidRecordException("Incorrect declared batch size, premature EOF reached"); } catch (IOException e) { throw new KafkaException("Failed to decompress record stream", e); } } @Override protected boolean ensureNoneRemaining() { try { // 當read()返回 -1,表示inputStream已經讀取完 return inputStream.read() == -1; } catch (IOException e) { throw new KafkaException("Error checking for remaining bytes after reading batch", e); } } @Override public void close() { try { inputStream.close(); } catch (IOException e) { throw new KafkaException("Failed to close record stream", e); } } }; }
DefaultRecordBatch的數據結構接口
RecordBatch => FirstOffset => int64 # 也做爲BaseOffset,Record的OffsetDelta是相對於這個字段 Length => int32 # 數據長度,從PartitionLeaderEpoch開始計算 PartitionLeaderEpoch => int32 Magic => int8 # 版本 CRC => int32 # CRC值,用於校檢數據完整性(從Attributes開始計算) Attributes => int16 # 屬性,壓縮類型 LastOffsetDelta => int32 # FirstTimestamp => int64 # 也做爲BaseTimestamp,Record的TimestampDelta是相對於這個字段 MaxTimestamp => int64 # records中最大的Timestamp ProducerId => int64 ProducerEpoch => int16 FirstSequence => int32 RecordsCount => int32 # records列表的數量 Records => [Record]
DefaultRecordBatch類ip
public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRecordBatch { static final int BASE_OFFSET_OFFSET = 0; static final int BASE_OFFSET_LENGTH = 8; static final int LENGTH_OFFSET = BASE_OFFSET_OFFSET + BASE_OFFSET_LENGTH; static final int LENGTH_LENGTH = 4; static final int PARTITION_LEADER_EPOCH_OFFSET = LENGTH_OFFSET + LENGTH_LENGTH; static final int PARTITION_LEADER_EPOCH_LENGTH = 4; static final int MAGIC_OFFSET = PARTITION_LEADER_EPOCH_OFFSET + PARTITION_LEADER_EPOCH_LENGTH; static final int MAGIC_LENGTH = 1; static final int CRC_OFFSET = MAGIC_OFFSET + MAGIC_LENGTH; static final int CRC_LENGTH = 4; static final int ATTRIBUTES_OFFSET = CRC_OFFSET + CRC_LENGTH; static final int ATTRIBUTE_LENGTH = 2; static final int LAST_OFFSET_DELTA_OFFSET = ATTRIBUTES_OFFSET + ATTRIBUTE_LENGTH; static final int LAST_OFFSET_DELTA_LENGTH = 4; static final int BASE_TIMESTAMP_OFFSET = LAST_OFFSET_DELTA_OFFSET + LAST_OFFSET_DELTA_LENGTH; static final int BASE_TIMESTAMP_LENGTH = 8; static final int MAX_TIMESTAMP_OFFSET = BASE_TIMESTAMP_OFFSET + BASE_TIMESTAMP_LENGTH; static final int MAX_TIMESTAMP_LENGTH = 8; static final int PRODUCER_ID_OFFSET = MAX_TIMESTAMP_OFFSET + MAX_TIMESTAMP_LENGTH; static final int PRODUCER_ID_LENGTH = 8; static final int PRODUCER_EPOCH_OFFSET = PRODUCER_ID_OFFSET + PRODUCER_ID_LENGTH; static final int PRODUCER_EPOCH_LENGTH = 2; static final int BASE_SEQUENCE_OFFSET = PRODUCER_EPOCH_OFFSET + PRODUCER_EPOCH_LENGTH; static final int BASE_SEQUENCE_LENGTH = 4; static final int RECORDS_COUNT_OFFSET = BASE_SEQUENCE_OFFSET + BASE_SEQUENCE_LENGTH; static final int RECORDS_COUNT_LENGTH = 4; static final int RECORDS_OFFSET = RECORDS_COUNT_OFFSET + RECORDS_COUNT_LENGTH; public static final int RECORD_BATCH_OVERHEAD = RECORDS_OFFSET; // 讀取record的數量 private int count() { return buffer.getInt(RECORDS_COUNT_OFFSET); } // 返回DefaultRecordBatch的總數據長度 @Override public int sizeInBytes() { // LOG_OVERHEAD在Records有定義,爲FirstOffset和Length的長度 return LOG_OVERHEAD + buffer.getInt(LENGTH_OFFSET); } @Override public long checksum() { return ByteUtils.readUnsignedInt(buffer, CRC_OFFSET); } public boolean isValid() { // 校檢CRC值 return sizeInBytes() >= RECORD_BATCH_OVERHEAD && checksum() == computeChecksum(); } private long computeChecksum() { // compute的三個參數,分別是ByteBuffer,offset,size return Crc32C.compute(buffer, ATTRIBUTES_OFFSET, buffer.limit() - ATTRIBUTES_OFFSET); } @Override public CompressionType compressionType() { return CompressionType.forId(attributes() & COMPRESSION_CODEC_MASK); } @Override public long baseOffset() { return buffer.getLong(BASE_OFFSET_OFFSET); } @Override public long lastOffset() { return baseOffset() + lastOffsetDelta(); } ...... }
RecordBatch是多個Record的集合。它繼承了了Iterable<Record>接口,提供遍歷Record的方法。rem
DefaultRecordBatch實現了RecordBatch接口,有本身的數據結構。它會根據壓縮類型返回對應的Iterator<Record>,提供Record的遍歷。get