kafka協議-RecordBatch

介紹

RecordBatch是包含許多record的集合。RecordBatch繼承了Iterable<Record>接口,提供了遍歷Record的方法.java

AbstractRecordBatch

AbstractRecordBatch繼承了RecordBatch,而且實現瞭如下的方法數據結構

abstract class AbstractRecordBatch implements RecordBatch {

    @Override
    public boolean hasProducerId() {
        return RecordBatch.NO_PRODUCER_ID < producerId();
    }
    
    // 計算下一個offset
    @Override
    public long nextOffset() {
        return lastOffset() + 1;
    }
    
    // 是否RecordBatch數據被壓縮過
    @Override
    public boolean isCompressed() {
        return compressionType() != CompressionType.NONE;
    }
}

DefaultRecordBatch

DefaultRecordBatch實現了RecordBatch接口。ide

public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRecordBatch {

    @Override
    public Iterator<Record> iterator() {
        // count()方法返回record的數量
        if (count() == 0)
            return Collections.emptyIterator();
        // 返回數據是否被壓縮
        if (!isCompressed())
            // 沒有壓縮,返回未壓縮的Iterator
            return uncompressedIterator();
        
        // 若是被壓縮,根據壓縮類型,返回對應的Iterator
        try (CloseableIterator<Record> iterator = compressedIterator(BufferSupplier.NO_CACHING)) {
            List<Record> records = new ArrayList<>(count());
            while (iterator.hasNext())
                records.add(iterator.next());
            return records.iterator();
        }
    }

RecordIterator

RecordIterator定義在DefaultRecordBatch裏面,它負責Record遍歷this

private abstract class RecordIterator implements CloseableIterator<Record> {
        private final Long logAppendTime;
        private final long baseOffset;
        private final long baseTimestamp;
        private final int baseSequence;
        private final int numRecords;    // 已有的Record的數量
        private int readRecords = 0;    // 已經讀取Record的個數

        public RecordIterator() {
            this.logAppendTime = timestampType() == TimestampType.LOG_APPEND_TIME ? maxTimestamp() : null;
            // 獲取RecordBatch的baseOffset
            this.baseOffset = baseOffset();
            // 獲取RecordBatch的baseTimestamp
            this.baseTimestamp = baseTimestamp();
            // 獲取RecordBatch的baseSequence
            this.baseSequence = baseSequence();
            // 獲取RecordBatch的records數量
            int numRecords = count();
            if (numRecords < 0)
                throw new InvalidRecordException("Found invalid record count " + numRecords + " in magic v" + magic() + " batch");
            this.numRecords = numRecords;
        }

        @Override
        public boolean hasNext() {
            // 若是已經讀取的數量,少於總數,則表示還有未讀完的
            return readRecords < numRecords;
        }

        @Override
        public Record next() {
            if (readRecords >= numRecords)
                throw new NoSuchElementException();
            // 更新readRecords
            readRecords++;
            // 讀取下一個Record
            Record rec = readNext(baseOffset, baseTimestamp, baseSequence, logAppendTime);
            if (readRecords == numRecords) {
                // 當已經讀完最後一個Record時,檢查是否buffer數據已經讀完
                if (!ensureNoneRemaining())
                    throw new InvalidRecordException("Incorrect declared batch size, records still remaining in file");
            }
            return rec;
        }
    
        // 讀取下一個Record
        protected abstract Record readNext(long baseOffset, long baseTimestamp, int baseSequence, Long logAppendTime);
        
        // 是否buffer已經讀取完
        protected abstract boolean ensureNoneRemaining();

        @Override
        public void remove() {
            throw new UnsupportedOperationException();
        }

    }

Record沒有被壓縮的iterator

private CloseableIterator<Record> uncompressedIterator() {
        // 複製buffer
        final ByteBuffer buffer = this.buffer.duplicate();
        // 調到Records位置
        buffer.position(RECORDS_OFFSET);
        return new RecordIterator() {
            @Override
            protected Record readNext(long baseOffset, long baseTimestamp, int baseSequence, Long logAppendTime) {
                try {
                    // 從buffer中讀取數據,實例化DefaultRecord
                    return DefaultRecord.readFrom(buffer, baseOffset, baseTimestamp, baseSequence, logAppendTime);
                } catch (BufferUnderflowException e) {
                    throw new InvalidRecordException("Incorrect declared batch size, premature EOF reached");
                }
            }
            @Override
            protected boolean ensureNoneRemaining() {
                // buffer沒有未讀取的數據
                return !buffer.hasRemaining();
            }
            @Override
            public void close() {}
        };
    }

CompressionType

CompressionType表示Records數據的壓縮類型,有內置的NONE,GZIP,SNAPPY,LZ4四種code

public enum CompressionType {
    NONE(0, "none", 1.0f) {
        @Override
        public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVersion) {
            return buffer;
        }

        @Override
        public InputStream wrapForInput(ByteBuffer buffer, byte messageVersion, BufferSupplier decompressionBufferSupplier) {
            return new ByteBufferInputStream(buffer);
        }
    },

    GZIP(1, "gzip", 1.0f) {
        @Override
        public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVersion) {
            try {
                // 返回支持gzip的OutputStream
                // GZIPOutputStream has a default buffer size of 512 bytes, which is too small
                return new GZIPOutputStream(buffer, 8 * 1024);
            } catch (Exception e) {
                throw new KafkaException(e);
            }
        }

        @Override
        public InputStream wrapForInput(ByteBuffer buffer, byte messageVersion, BufferSupplier decompressionBufferSupplier) {
            try {
                // 返回支持gzip的InputStream
                return new GZIPInputStream(new ByteBufferInputStream(buffer));
            } catch (Exception e) {
                throw new KafkaException(e);
            }
        }
    }
    ......
    //  根據id,返回指定類型的CompressionType
    public static CompressionType forId(int id) {
        switch (id) {
            case 0:
                return NONE;
            case 1:
                return GZIP;
            case 2:
                return SNAPPY;
            case 3:
                return LZ4;
            default:
                throw new IllegalArgumentException("Unknown compression type id: " + id);
        }
    }

}

Record被壓縮的Iterator

只有Records部分被壓縮,前面的字段是沒有被壓縮繼承

private CloseableIterator<Record> compressedIterator(BufferSupplier bufferSupplier) {
        final ByteBuffer buffer = this.buffer.duplicate();
        buffer.position(RECORDS_OFFSET);
        // compressionType()返回壓縮類型,wrapForInput返回裝飾過的DataInputStream
        final DataInputStream inputStream = new DataInputStream(compressionType().wrapForInput(buffer, magic(), bufferSupplier));

        return new RecordIterator() {
            @Override
            protected Record readNext(long baseOffset, long baseTimestamp, int baseSequence, Long logAppendTime) {
                try {
                    // 從inputStream中讀取數據,實例化DefaultRecord
                    return DefaultRecord.readFrom(inputStream, baseOffset, baseTimestamp, baseSequence, logAppendTime);
                } catch (EOFException e) {
                    throw new InvalidRecordException("Incorrect declared batch size, premature EOF reached");
                } catch (IOException e) {
                    throw new KafkaException("Failed to decompress record stream", e);
                }
            }

            @Override
            protected boolean ensureNoneRemaining() {
                try {
                    // 當read()返回 -1,表示inputStream已經讀取完
                    return inputStream.read() == -1;
                } catch (IOException e) {
                    throw new KafkaException("Error checking for remaining bytes after reading batch", e);
                }
            }

            @Override
            public void close() {
                try {
                    inputStream.close();
                } catch (IOException e) {
                    throw new KafkaException("Failed to close record stream", e);
                }
            }
        };
    }

DefaultRecordBatch數據結構

DefaultRecordBatch的數據結構接口

RecordBatch =>
  FirstOffset => int64    # 也做爲BaseOffset,Record的OffsetDelta是相對於這個字段
  Length => int32    # 數據長度,從PartitionLeaderEpoch開始計算
  PartitionLeaderEpoch => int32
  Magic => int8    # 版本
  CRC => int32    # CRC值,用於校檢數據完整性(從Attributes開始計算)
  Attributes => int16    # 屬性,壓縮類型
  LastOffsetDelta => int32    # 
  FirstTimestamp => int64    # 也做爲BaseTimestamp,Record的TimestampDelta是相對於這個字段
  MaxTimestamp => int64    # records中最大的Timestamp
  ProducerId => int64    
  ProducerEpoch => int16
  FirstSequence => int32
  RecordsCount => int32    # records列表的數量
  Records => [Record]

DefaultRecordBatch類ip

public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRecordBatch {
    static final int BASE_OFFSET_OFFSET = 0;
    static final int BASE_OFFSET_LENGTH = 8;
    static final int LENGTH_OFFSET = BASE_OFFSET_OFFSET + BASE_OFFSET_LENGTH;
    static final int LENGTH_LENGTH = 4;
    static final int PARTITION_LEADER_EPOCH_OFFSET = LENGTH_OFFSET + LENGTH_LENGTH;
    static final int PARTITION_LEADER_EPOCH_LENGTH = 4;
    static final int MAGIC_OFFSET = PARTITION_LEADER_EPOCH_OFFSET + PARTITION_LEADER_EPOCH_LENGTH;
    static final int MAGIC_LENGTH = 1;
    static final int CRC_OFFSET = MAGIC_OFFSET + MAGIC_LENGTH;
    static final int CRC_LENGTH = 4;
    static final int ATTRIBUTES_OFFSET = CRC_OFFSET + CRC_LENGTH;
    static final int ATTRIBUTE_LENGTH = 2;
    static final int LAST_OFFSET_DELTA_OFFSET = ATTRIBUTES_OFFSET + ATTRIBUTE_LENGTH;
    static final int LAST_OFFSET_DELTA_LENGTH = 4;
    static final int BASE_TIMESTAMP_OFFSET = LAST_OFFSET_DELTA_OFFSET + LAST_OFFSET_DELTA_LENGTH;
    static final int BASE_TIMESTAMP_LENGTH = 8;
    static final int MAX_TIMESTAMP_OFFSET = BASE_TIMESTAMP_OFFSET + BASE_TIMESTAMP_LENGTH;
    static final int MAX_TIMESTAMP_LENGTH = 8;
    static final int PRODUCER_ID_OFFSET = MAX_TIMESTAMP_OFFSET + MAX_TIMESTAMP_LENGTH;
    static final int PRODUCER_ID_LENGTH = 8;
    static final int PRODUCER_EPOCH_OFFSET = PRODUCER_ID_OFFSET + PRODUCER_ID_LENGTH;
    static final int PRODUCER_EPOCH_LENGTH = 2;
    static final int BASE_SEQUENCE_OFFSET = PRODUCER_EPOCH_OFFSET + PRODUCER_EPOCH_LENGTH;
    static final int BASE_SEQUENCE_LENGTH = 4;
    static final int RECORDS_COUNT_OFFSET = BASE_SEQUENCE_OFFSET + BASE_SEQUENCE_LENGTH;
    static final int RECORDS_COUNT_LENGTH = 4;
    static final int RECORDS_OFFSET = RECORDS_COUNT_OFFSET + RECORDS_COUNT_LENGTH;
    public static final int RECORD_BATCH_OVERHEAD = RECORDS_OFFSET;

    // 讀取record的數量
    private int count() {
        return buffer.getInt(RECORDS_COUNT_OFFSET);
    }

    // 返回DefaultRecordBatch的總數據長度
    @Override
    public int sizeInBytes() {
        // LOG_OVERHEAD在Records有定義,爲FirstOffset和Length的長度
        return LOG_OVERHEAD + buffer.getInt(LENGTH_OFFSET);
    }


    @Override
    public long checksum() {
        return ByteUtils.readUnsignedInt(buffer, CRC_OFFSET);
    }

    public boolean isValid() {
        // 校檢CRC值
        return sizeInBytes() >= RECORD_BATCH_OVERHEAD && checksum() == computeChecksum();
    }

    private long computeChecksum() {
        // compute的三個參數,分別是ByteBuffer,offset,size
        return Crc32C.compute(buffer, ATTRIBUTES_OFFSET, buffer.limit() - ATTRIBUTES_OFFSET);
    }

    @Override
    public CompressionType compressionType() {
        return CompressionType.forId(attributes() & COMPRESSION_CODEC_MASK);
    }

        @Override
    public long baseOffset() {
        return buffer.getLong(BASE_OFFSET_OFFSET);
    }

    @Override
    public long lastOffset() {
        return baseOffset() + lastOffsetDelta();
    }

    ......
}

歸納

RecordBatch是多個Record的集合。它繼承了了Iterable<Record>接口,提供遍歷Record的方法。rem

DefaultRecordBatch實現了RecordBatch接口,有本身的數據結構。它會根據壓縮類型返回對應的Iterator<Record>,提供Record的遍歷。get

相關文章
相關標籤/搜索