kafka的消息是以record方式的存儲下來。java
Record是接口,DefaultRecord實現了Record接口。數據結構
Record => Length => varint # record總長度(不包括Length自己) Attributes => int8 # 屬性 TimestampDelta => varint # timestamp的偏移量(相對於RecordBatch的baseTimestamp) OffsetDelta => varint # offset的偏移量(相對於RecordBatch的baseOffset) KeyLen => varint # key的長度 Key => data # key的數據 ValueLen => varint # value的長度 Value => data # value的數據 NumHeaders => varint # header的數量 Headers => [Header] # Header列表 Header => HeaderKeyLen => varint # key的長度 HeaderKey => string # key的數據 HeaderValueLen => varint # value的長度 HeaderValue => data # value的數據
上面數據類型有varint,這個類型是Protocol Buffers的類型。在存儲數值比較小的時候,會節省空間。具體參考連接https://developers.google.com/protocol-buffers/docs/encodingide
DefaultRecord是上述數據結構的封裝post
public class DefaultRecord implements Record { // excluding key, value and headers: 5 bytes length + 10 bytes timestamp + 5 bytes offset + 1 byte attributes public static final int MAX_RECORD_OVERHEAD = 21; private static final int NULL_VARINT_SIZE_BYTES = ByteUtils.sizeOfVarint(-1); private final int sizeInBytes; private final byte attributes; private final long offset; private final long timestamp; private final int sequence; private final ByteBuffer key; private final ByteBuffer value; private final Header[] headers; private DefaultRecord(int sizeInBytes, byte attributes, long offset, long timestamp, int sequence, ByteBuffer key, ByteBuffer value, Header[] headers) { this.sizeInBytes = sizeInBytes; this.attributes = attributes; this.offset = offset; this.timestamp = timestamp; this.sequence = sequence; this.key = key; this.value = value; this.headers = headers; } @Override public long offset() { return offset; } @Override public int sequence() { return sequence; } ........ @Override public Header[] headers() { return headers; } /**
DefaultRecord提供了從buffer讀取數據,實例化的方法this
// 從DataInput讀取數據 public static DefaultRecord readFrom(DataInput input, long baseOffset, long baseTimestamp, int baseSequence, Long logAppendTime) throws IOException { // 讀取Length int sizeOfBodyInBytes = ByteUtils.readVarint(input); // 分配buffer ByteBuffer recordBuffer = ByteBuffer.allocate(sizeOfBodyInBytes); // 讀取body數據 input.readFully(recordBuffer.array(), 0, sizeOfBodyInBytes); // 計算整個record的長度,包括Length int totalSizeInBytes = ByteUtils.sizeOfVarint(sizeOfBodyInBytes) + sizeOfBodyInBytes; return readFrom(recordBuffer, totalSizeInBytes, sizeOfBodyInBytes, baseOffset, baseTimestamp, baseSequence, logAppendTime); } // 從ByteBuffer讀取數據,baseOffset, baseTimestamp,baseSequence都是RecordBatch的屬性 public static DefaultRecord readFrom(ByteBuffer buffer, long baseOffset, long baseTimestamp, int baseSequence, Long logAppendTime) { // 讀取Length int sizeOfBodyInBytes = ByteUtils.readVarint(buffer); // 檢查長度 if (buffer.remaining() < sizeOfBodyInBytes) return null; // 整個record的總長度(包括Length) int totalSizeInBytes = ByteUtils.sizeOfVarint(sizeOfBodyInBytes) + sizeOfBodyInBytes; return readFrom(buffer, totalSizeInBytes, sizeOfBodyInBytes, baseOffset, baseTimestamp, baseSequence, logAppendTime); } private static DefaultRecord readFrom(ByteBuffer buffer, int sizeInBytes, int sizeOfBodyInBytes, long baseOffset, long baseTimestamp, int baseSequence, Long logAppendTime) { try { // 記錄record的開始位置 int recordStart = buffer.position(); // 讀取Attributes byte attributes = buffer.get(); // 讀取TimestampDelta long timestampDelta = ByteUtils.readVarlong(buffer); // 計算timestamp,baseTimestamp是RecordBatch的屬性 long timestamp = baseTimestamp + timestampDelta; if (logAppendTime != null) timestamp = logAppendTime; // 讀取OffsetDelta int offsetDelta = ByteUtils.readVarint(buffer); // 計算offset,baseOffset是RecordBatch的屬性 long offset = baseOffset + offsetDelta; int sequence = baseSequence >= 0 ? DefaultRecordBatch.incrementSequence(baseSequence, offsetDelta) : RecordBatch.NO_SEQUENCE; ByteBuffer key = null; // 讀取KeySize int keySize = ByteUtils.readVarint(buffer); if (keySize >= 0) { // 取出Key這段值 key = buffer.slice(); // 設置切片的limit值 key.limit(keySize); // 設置buffer的position,跳過Key這段 buffer.position(buffer.position() + keySize); } ByteBuffer value = null; // 讀取ValueSize int valueSize = ByteUtils.readVarint(buffer); if (valueSize >= 0) { // 取出Value這段值 value = buffer.slice(); // 設置切片的limit值 value.limit(valueSize); // 設置buffer的position,跳過Value這段 buffer.position(buffer.position() + valueSize); } // 讀取header的數量 int numHeaders = ByteUtils.readVarint(buffer); if (numHeaders < 0) throw new InvalidRecordException("Found invalid number of record headers " + numHeaders); final Header[] headers; if (numHeaders == 0) headers = Record.EMPTY_HEADERS; else // 讀取headers headers = readHeaders(buffer, numHeaders); // 當record數據已經讀取完,檢查長度 if (buffer.position() - recordStart != sizeOfBodyInBytes) throw new InvalidRecordException("Invalid record size: expected to read " + sizeOfBodyInBytes + " bytes in record payload, but instead read " + (buffer.position() - recordStart)); return new DefaultRecord(sizeInBytes, attributes, offset, timestamp, sequence, key, value, headers); } catch (BufferUnderflowException | IllegalArgumentException e) { throw new InvalidRecordException("Found invalid record structure", e); } }
讀取headersgoogle
private static Header[] readHeaders(ByteBuffer buffer, int numHeaders) { Header[] headers = new Header[numHeaders]; for (int i = 0; i < numHeaders; i++) { // 讀取header的KeySize int headerKeySize = ByteUtils.readVarint(buffer); if (headerKeySize < 0) throw new InvalidRecordException("Invalid negative header key size " + headerKeySize); // 讀取header的key,而且轉爲utf-8 String headerKey = Utils.utf8(buffer, headerKeySize); // 設置buffer的postion,跳過key buffer.position(buffer.position() + headerKeySize); ByteBuffer headerValue = null; // 讀取header的value int headerValueSize = ByteUtils.readVarint(buffer); if (headerValueSize >= 0) { // 切片,取出headerValue這一段 headerValue = buffer.slice(); // 設置headerValue的limit值 headerValue.limit(headerValueSize); // 設置buffer的postion,跳過value buffer.position(buffer.position() + headerValueSize); } // 實例化 RecordHeader, RecordHeader只是對key和value的封裝 headers[i] = new RecordHeader(headerKey, headerValue); } return headers; }
public static int sizeInBytes(int offsetDelta, long timestampDelta, int keySize, int valueSize, Header[] headers) { // 計算body的長度 int bodySize = sizeOfBodyInBytes(offsetDelta, timestampDelta, keySize, valueSize, headers); // 加上Length的長度 return bodySize + ByteUtils.sizeOfVarint(bodySize); } private static int sizeOfBodyInBytes(int offsetDelta, long timestampDelta, ByteBuffer key, ByteBuffer value, Header[] headers) { // 計算keySize的值 int keySize = key == null ? -1 : key.remaining(); // 計算valueSize的值 int valueSize = value == null ? -1 : value.remaining(); return sizeOfBodyInBytes(offsetDelta, timestampDelta, keySize, valueSize, headers); } private static int sizeOfBodyInBytes(int offsetDelta, long timestampDelta, int keySize, int valueSize, Header[] headers) { // attribute佔1byte int size = 1; // 計算offsetDelta的長度 size += ByteUtils.sizeOfVarint(offsetDelta); // 計算timestampDelta的長度 size += ByteUtils.sizeOfVarlong(timestampDelta); // 計算剩下key,value和headers的長度 size += sizeOf(keySize, valueSize, headers); return size; } private static int sizeOf(int keySize, int valueSize, Header[] headers) { int size = 0; if (keySize < 0) size += NULL_VARINT_SIZE_BYTES; else // keySize自己的長度,和key的長度 size += ByteUtils.sizeOfVarint(keySize) + keySize; if (valueSize < 0) size += NULL_VARINT_SIZE_BYTES; else // valueSize自己的長度,和value的長度 size += ByteUtils.sizeOfVarint(valueSize) + valueSize; if (headers == null) throw new IllegalArgumentException("Headers cannot be null"); // NumHeaders自己的長度 size += ByteUtils.sizeOfVarint(headers.length); for (Header header : headers) { String headerKey = header.key(); if (headerKey == null) throw new IllegalArgumentException("Invalid null header key found in headers"); // header的keySize int headerKeySize = Utils.utf8Length(headerKey); // keySize自己的長度,和key的長度 size += ByteUtils.sizeOfVarint(headerKeySize) + headerKeySize; byte[] headerValue = header.value(); if (headerValue == null) { size += NULL_VARINT_SIZE_BYTES; } else { // valueSize自己的長度,和value的長度 size += ByteUtils.sizeOfVarint(headerValue.length) + headerValue.length; } } return size; }
kafka的一條消息,對應着一條Record。DefaultRecord實現了Record接口,數據結構採用了新的varint類型,減小了空間存儲。Record依賴着RecordBatch的存儲,裏面的offset,timestamp等都和RecordBatch有關。RecordBatch在下節會有介紹。code