kafka協議-record解析

介紹

kafka的消息是以record方式的存儲下來。java

Record

Record是接口,DefaultRecord實現了Record接口。數據結構

DefaultRecord的存儲結構

Record =>
  Length => varint    #  record總長度(不包括Length自己)
  Attributes => int8    # 屬性
  TimestampDelta => varint    # timestamp的偏移量(相對於RecordBatch的baseTimestamp)
  OffsetDelta => varint    # offset的偏移量(相對於RecordBatch的baseOffset)
  KeyLen => varint    # key的長度
  Key => data    # key的數據
  ValueLen => varint    # value的長度
  Value => data    # value的數據
  NumHeaders => varint    # header的數量
  Headers => [Header]    # Header列表
  
Header =>
  HeaderKeyLen => varint    # key的長度
  HeaderKey => string    # key的數據
  HeaderValueLen => varint    # value的長度
  HeaderValue => data    # value的數據

上面數據類型有varint,這個類型是Protocol Buffers的類型。在存儲數值比較小的時候,會節省空間。具體參考連接https://developers.google.com/protocol-buffers/docs/encodingide

DefaultRecord類

DefaultRecord是上述數據結構的封裝post

public class DefaultRecord implements Record {

    // excluding key, value and headers: 5 bytes length + 10 bytes timestamp + 5 bytes offset + 1 byte attributes
    public static final int MAX_RECORD_OVERHEAD = 21;

    private static final int NULL_VARINT_SIZE_BYTES = ByteUtils.sizeOfVarint(-1);

    private final int sizeInBytes;
    private final byte attributes;
    private final long offset;
    private final long timestamp;
    private final int sequence;
    private final ByteBuffer key;
    private final ByteBuffer value;
    private final Header[] headers;

    private DefaultRecord(int sizeInBytes,
                          byte attributes,
                          long offset,
                          long timestamp,
                          int sequence,
                          ByteBuffer key,
                          ByteBuffer value,
                          Header[] headers) {
        this.sizeInBytes = sizeInBytes;
        this.attributes = attributes;
        this.offset = offset;
        this.timestamp = timestamp;
        this.sequence = sequence;
        this.key = key;
        this.value = value;
        this.headers = headers;
    }

    @Override
    public long offset() {
        return offset;
    }

    @Override
    public int sequence() {
        return sequence;
    }

    ........

    @Override
    public Header[] headers() {
        return headers;
    }

    /**

DefaultRecord讀取

DefaultRecord提供了從buffer讀取數據,實例化的方法this

// 從DataInput讀取數據
    public static DefaultRecord readFrom(DataInput input,
                                         long baseOffset,  long baseTimestamp,
                                         int baseSequence, Long logAppendTime) throws IOException {
        // 讀取Length
        int sizeOfBodyInBytes = ByteUtils.readVarint(input);
        // 分配buffer
        ByteBuffer recordBuffer = ByteBuffer.allocate(sizeOfBodyInBytes);
        // 讀取body數據
        input.readFully(recordBuffer.array(), 0, sizeOfBodyInBytes);
        // 計算整個record的長度,包括Length
        int totalSizeInBytes = ByteUtils.sizeOfVarint(sizeOfBodyInBytes) + sizeOfBodyInBytes;
        return readFrom(recordBuffer, totalSizeInBytes, sizeOfBodyInBytes, baseOffset, baseTimestamp,
                baseSequence, logAppendTime);
    }


    // 從ByteBuffer讀取數據,baseOffset, baseTimestamp,baseSequence都是RecordBatch的屬性
    public static DefaultRecord readFrom(ByteBuffer buffer, long baseOffset,
                                         long baseTimestamp, int baseSequence,  Long logAppendTime) {
        // 讀取Length
        int sizeOfBodyInBytes = ByteUtils.readVarint(buffer);
        // 檢查長度
        if (buffer.remaining() < sizeOfBodyInBytes)
            return null;
        // 整個record的總長度(包括Length)
        int totalSizeInBytes = ByteUtils.sizeOfVarint(sizeOfBodyInBytes) + sizeOfBodyInBytes;
        return readFrom(buffer, totalSizeInBytes, sizeOfBodyInBytes, baseOffset, baseTimestamp,
                baseSequence, logAppendTime);
    }

    private static DefaultRecord readFrom(ByteBuffer buffer, int sizeInBytes,
                                          int sizeOfBodyInBytes, long baseOffset, long baseTimestamp,
                                          int baseSequence, Long logAppendTime) {
        try {
            // 記錄record的開始位置
            int recordStart = buffer.position();
            // 讀取Attributes
            byte attributes = buffer.get();
            // 讀取TimestampDelta
            long timestampDelta = ByteUtils.readVarlong(buffer);
            // 計算timestamp,baseTimestamp是RecordBatch的屬性
            long timestamp = baseTimestamp + timestampDelta;
            if (logAppendTime != null)
                timestamp = logAppendTime;
            // 讀取OffsetDelta
            int offsetDelta = ByteUtils.readVarint(buffer);
            // 計算offset,baseOffset是RecordBatch的屬性
            long offset = baseOffset + offsetDelta;
    
            int sequence = baseSequence >= 0 ?
                    DefaultRecordBatch.incrementSequence(baseSequence, offsetDelta) :
                    RecordBatch.NO_SEQUENCE;

            ByteBuffer key = null;
            // 讀取KeySize
            int keySize = ByteUtils.readVarint(buffer);
            if (keySize >= 0) {
                // 取出Key這段值
                key = buffer.slice();
                // 設置切片的limit值
                key.limit(keySize);
                // 設置buffer的position,跳過Key這段
                buffer.position(buffer.position() + keySize);
            }

            ByteBuffer value = null;
            // 讀取ValueSize
            int valueSize = ByteUtils.readVarint(buffer);
            if (valueSize >= 0) {
                // 取出Value這段值
                value = buffer.slice();
                // 設置切片的limit值
                value.limit(valueSize);
                // 設置buffer的position,跳過Value這段
                buffer.position(buffer.position() + valueSize);
            }

            // 讀取header的數量
            int numHeaders = ByteUtils.readVarint(buffer);
            if (numHeaders < 0)
                throw new InvalidRecordException("Found invalid number of record headers " + numHeaders);
            final Header[] headers;
            if (numHeaders == 0)
                headers = Record.EMPTY_HEADERS;
            else
                // 讀取headers
                headers = readHeaders(buffer, numHeaders);

            // 當record數據已經讀取完,檢查長度
            if (buffer.position() - recordStart != sizeOfBodyInBytes)
                throw new InvalidRecordException("Invalid record size: expected to read " + sizeOfBodyInBytes +
                        " bytes in record payload, but instead read " + (buffer.position() - recordStart));

            return new DefaultRecord(sizeInBytes, attributes, offset, timestamp, sequence, key, value, headers);
        } catch (BufferUnderflowException | IllegalArgumentException e) {
            throw new InvalidRecordException("Found invalid record structure", e);
        }
    }

讀取headersgoogle

private static Header[] readHeaders(ByteBuffer buffer, int numHeaders) {
        Header[] headers = new Header[numHeaders];
        for (int i = 0; i < numHeaders; i++) {
            // 讀取header的KeySize
            int headerKeySize = ByteUtils.readVarint(buffer);
            if (headerKeySize < 0)
                throw new InvalidRecordException("Invalid negative header key size " + headerKeySize);
            // 讀取header的key,而且轉爲utf-8
            String headerKey = Utils.utf8(buffer, headerKeySize);
            // 設置buffer的postion,跳過key
            buffer.position(buffer.position() + headerKeySize);
            
            ByteBuffer headerValue = null;
            // 讀取header的value
            int headerValueSize = ByteUtils.readVarint(buffer);
            if (headerValueSize >= 0) {
                // 切片,取出headerValue這一段
                headerValue = buffer.slice();
                // 設置headerValue的limit值
                headerValue.limit(headerValueSize);
                // 設置buffer的postion,跳過value
                buffer.position(buffer.position() + headerValueSize);
            }
            // 實例化 RecordHeader, RecordHeader只是對key和value的封裝
            headers[i] = new RecordHeader(headerKey, headerValue);
        }

        return headers;
    }

DefaultRecord長度計算

public static int sizeInBytes(int offsetDelta,
                                  long timestampDelta,
                                  int keySize,
                                  int valueSize,
                                  Header[] headers) {
        // 計算body的長度
        int bodySize = sizeOfBodyInBytes(offsetDelta, timestampDelta, keySize, valueSize, headers);
        // 加上Length的長度
        return bodySize + ByteUtils.sizeOfVarint(bodySize);
    }

    private static int sizeOfBodyInBytes(int offsetDelta,
                                         long timestampDelta,
                                         ByteBuffer key,
                                         ByteBuffer value,
                                         Header[] headers) {
        // 計算keySize的值
        int keySize = key == null ? -1 : key.remaining();
        // 計算valueSize的值
        int valueSize = value == null ? -1 : value.remaining();
        return sizeOfBodyInBytes(offsetDelta, timestampDelta, keySize, valueSize, headers);
    }

    private static int sizeOfBodyInBytes(int offsetDelta,
                                         long timestampDelta,
                                         int keySize,
                                         int valueSize,
                                         Header[] headers) {
        // attribute佔1byte
        int size = 1; 
        // 計算offsetDelta的長度
        size += ByteUtils.sizeOfVarint(offsetDelta);
        // 計算timestampDelta的長度
        size += ByteUtils.sizeOfVarlong(timestampDelta);
        // 計算剩下key,value和headers的長度
        size += sizeOf(keySize, valueSize, headers);
        return size;
    }

    private static int sizeOf(int keySize, int valueSize, Header[] headers) {
        int size = 0;
        if (keySize < 0)
            size += NULL_VARINT_SIZE_BYTES;
        else
            // keySize自己的長度,和key的長度
            size += ByteUtils.sizeOfVarint(keySize) + keySize;

        if (valueSize < 0)
            size += NULL_VARINT_SIZE_BYTES;
        else
             // valueSize自己的長度,和value的長度
            size += ByteUtils.sizeOfVarint(valueSize) + valueSize;

        if (headers == null)
            throw new IllegalArgumentException("Headers cannot be null");
        // NumHeaders自己的長度
        size += ByteUtils.sizeOfVarint(headers.length);
        for (Header header : headers) {
            String headerKey = header.key();
            if (headerKey == null)
                throw new IllegalArgumentException("Invalid null header key found in headers");
            // header的keySize
            int headerKeySize = Utils.utf8Length(headerKey);
            // keySize自己的長度,和key的長度
            size += ByteUtils.sizeOfVarint(headerKeySize) + headerKeySize;

            byte[] headerValue = header.value();
            if (headerValue == null) {
                size += NULL_VARINT_SIZE_BYTES;
            } else {
                // valueSize自己的長度,和value的長度
                size += ByteUtils.sizeOfVarint(headerValue.length) + headerValue.length;
            }
        }
        return size;
    }

歸納

kafka的一條消息,對應着一條Record。DefaultRecord實現了Record接口,數據結構採用了新的varint類型,減小了空間存儲。Record依賴着RecordBatch的存儲,裏面的offset,timestamp等都和RecordBatch有關。RecordBatch在下節會有介紹。code

相關文章
相關標籤/搜索