AbstractRecords實現了Records接口,recordsIterator返回 Iterator<Record>實例,來遍歷Record。ide
public abstract class AbstractRecords implements Records { // 實現Iterable<Record>接口 private final Iterable<Record> records = new Iterable<Record>() { @Override public Iterator<Record> iterator() { // 調用recordsIterator方法,返回iterator return recordsIterator(); } }; // 返回Iterable<Record> @Override public Iterable<Record> records() { return records; } private Iterator<Record> recordsIterator() { // 繼承AbstractIterator<Record>,實現makeNext方法 return new AbstractIterator<Record>() { // 實現遍歷RecordBatch private final Iterator<? extends RecordBatch> batches = batches().iterator(); // 遍歷RecordBatch的Record private Iterator<Record> records; @Override protected Record makeNext() { // 首先檢查records是否還有數據 if (records != null && records.hasNext()) // 若是有,直接返回 return; // 若是records沒有數據,再檢查batche是否有數據 if (batches.hasNext()) { // 若是batches有數據,則更新records爲當前batch的數據 records =; // 遞歸調用 return makeNext(); } // 若是records和batches都沒有數據,代表已經讀完,則調用allDone方法 return allDone(); } }; }
public abstract class AbstractIterator<T> implements Iterator<T> { private enum State { READY, NOT_READY, DONE, FAILED } private State state = State.NOT_READY; private T next; @Override public boolean hasNext() { switch (state) { case FAILED: throw new IllegalStateException("Iterator is in failed state"); case DONE: return false; case READY: return true; default: return maybeComputeNext(); } } @Override public T next() { if (!hasNext()) throw new NoSuchElementException(); state = State.NOT_READY; if (next == null) throw new IllegalStateException("Expected item but none found."); return next; } @Override public void remove() { throw new UnsupportedOperationException("Removal not supported"); } public T peek() { if (!hasNext()) throw new NoSuchElementException(); return next; } // 更新state爲DONE protected T allDone() { state = State.DONE; return null; } // 子類須要實現此方法 protected abstract T makeNext(); private Boolean maybeComputeNext() { state = State.FAILED; next = makeNext(); if (state == State.DONE) { return false; } else { state = State.READY; return true; } }
public abstract static class FileChannelRecordBatch extends AbstractRecordBatch { protected final long offset; protected final byte magic; protected final FileChannel channel; protected final int position; protected final int batchSize; private RecordBatch fullBatch; private RecordBatch batchHeader; FileChannelRecordBatch(long offset, byte magic, FileChannel channel, int position, int batchSize) { this.offset = offset; this.magic = magic; = channel; this.position = position; this.batchSize = batchSize; } @Override public CompressionType compressionType() { // loadBatchHeader返回RecordBatch,在獲取compressionType值 return loadBatchHeader().compressionType(); } ...... @Override public Iterator<Record> iterator() { return loadFullBatch().iterator(); } // 將buffer轉換爲RecordBatch protected abstract RecordBatch toMemoryRecordBatch(ByteBuffer buffer); // 返回header部分的長度 protected abstract int headerSize(); // 返回RecordBatch,包含完整數據 protected RecordBatch loadFullBatch() { if (fullBatch == null) { batchHeader = null; fullBatch = loadBatchWithSize(sizeInBytes(), "full record batch"); } return fullBatch; } //返回只包含header部分的RecordBatch,不包含Record數據 protected RecordBatch loadBatchHeader() { if (fullBatch != null) return fullBatch; if (batchHeader == null) batchHeader = loadBatchWithSize(headerSize(), "record batch header"); return batchHeader; } // size爲要讀取batch的長度,從buffer中讀取數據,返回RecordBatch private RecordBatch loadBatchWithSize(int size, String description) { try { // 分配buffer ByteBuffer buffer = ByteBuffer.allocate(size); // 從FileChannel中讀取數據,到buffer中 Utils.readFullyOrFail(channel, buffer, position, description); // 設置position爲0,準備讀 buffer.rewind(); // 轉換爲RecordBatch return toMemoryRecordBatch(buffer); } catch (IOException e) { throw new KafkaException(e); } }
static class DefaultFileChannelRecordBatch extends FileLogInputStream.FileChannelRecordBatch { DefaultFileChannelRecordBatch(long offset, byte magic, FileChannel channel, int position, int batchSize) { super(offset, magic, channel, position, batchSize); } @Override protected RecordBatch toMemoryRecordBatch(ByteBuffer buffer) { // 實例化DefaultRecordBatch return new DefaultRecordBatch(buffer); } // 返回header的長度 @Override protected int headerSize() { return RECORD_BATCH_OVERHEAD; } }
class RecordBatchIterator<T extends RecordBatch> extends AbstractIterator<T> { private final LogInputStream<T> logInputStream; RecordBatchIterator(LogInputStream<T> logInputStream) { this.logInputStream = logInputStream; } @Override protected T makeNext() { try { // 調用logInputStream的nextBatch方法,獲取下一個RecordBatch T batch = logInputStream.nextBatch(); if (batch == null) return allDone(); return batch; } catch (IOException e) { throw new KafkaException(e); } } }
public class FileRecords extends AbstractRecords implements Closeable { private final Iterable<FileLogInputStream.FileChannelRecordBatch> batches; public FileRecords(File file, FileChannel channel, int start, int end, boolean isSlice) throws IOException { this.file = file; = channel; this.start = start; this.end = end; this.isSlice = isSlice; this.size = new AtomicInteger(); if (isSlice) { // don't check the file size if this is just a slice view size.set(end - start); } else { int limit = Math.min((int) channel.size(), end); size.set(limit - start); // if this is not a slice, update the file pointer to the end of the file // set the file position to the last byte in the file channel.position(limit); } // 初始化batches batches = batchesFrom(start); } @Override public Iterable<FileChannelRecordBatch> batches() { return batches; } // 返回Iterable<FileChannelRecordBatch>實例 private Iterable<FileChannelRecordBatch> batchesFrom(final int start) { return new Iterable<FileChannelRecordBatch>() { @Override public Iterator<FileChannelRecordBatch> iterator() { return batchIterator(start); } }; } private Iterator<FileChannelRecordBatch> batchIterator(int start) { final int end; if (isSlice) end = this.end; else end = this.sizeInBytes(); // 實例化FileLogInputStream FileLogInputStream inputStream = new FileLogInputStream(channel, start, end); // 實例化RecordBatchIterator return new RecordBatchIterator<>(inputStream); }
interface LogInputStream<T extends RecordBatch> { // 讀取下一個RecordBatch T nextBatch() throws IOException; }
public class FileLogInputStream implements LogInputStream<FileLogInputStream.FileChannelRecordBatch> { private int position; private final int end; private final FileChannel channel; // HEADER_SIZE_UP_TO_MAGIC爲,DefaultRecordBatch中截止到Magic的長度 private final ByteBuffer logHeaderBuffer = ByteBuffer.allocate(HEADER_SIZE_UP_TO_MAGIC); FileLogInputStream(FileChannel channel, int start, int end) { = channel; this.position = start; this.end = end; } @Override public FileChannelRecordBatch nextBatch() throws IOException { if (position + HEADER_SIZE_UP_TO_MAGIC >= end) return null; logHeaderBuffer.rewind(); // 讀取DefaultRecord前面必要的字段 Utils.readFullyOrFail(channel, logHeaderBuffer, position, "log header"); logHeaderBuffer.rewind(); long offset = logHeaderBuffer.getLong(OFFSET_OFFSET); int size = logHeaderBuffer.getInt(SIZE_OFFSET); // v0版本的RECORD_OVERHEAD最小,檢查是否知足最小長度 if (size < LegacyRecord.RECORD_OVERHEAD_V0) throw new CorruptRecordException(String.format("Record size is smaller than minimum record overhead (%d).", LegacyRecord.RECORD_OVERHEAD_V0)); // 檢查默認版本,是否知足最小長度 if (position + LOG_OVERHEAD + size > end) return null; byte magic = logHeaderBuffer.get(MAGIC_OFFSET); final FileChannelRecordBatch batch; if (magic < RecordBatch.MAGIC_VALUE_V2) // 返回舊有的FileChannelRecordBatch batch = new LegacyFileChannelRecordBatch(offset, magic, channel, position, size); else // 返回新的FileChannelRecordBatch batch = new DefaultFileChannelRecordBatch(offset, magic, channel, position, size); // 更新position的位置,指向下一個RecordBatch position += batch.sizeInBytes(); return batch; }
public class MemoryRecords extends AbstractRecords { private final Iterable<MutableRecordBatch> batches = new Iterable<MutableRecordBatch>() { @Override public Iterator<MutableRecordBatch> iterator() { // 返回RecordBatchIterator,使用ByteBufferLogInputStream實例化 return new RecordBatchIterator<>(new ByteBufferLogInputStream(buffer.duplicate(), Integer.MAX_VALUE)); } }; @Override public Iterable<MutableRecordBatch> batches() { return batches; }
ByteBufferLogInputStream 實現了從ByteBuffer中讀取數據,生成Recordbatch.
class ByteBufferLogInputStream implements LogInputStream<MutableRecordBatch> { private final ByteBuffer buffer; private final int maxMessageSize; ByteBufferLogInputStream(ByteBuffer buffer, int maxMessageSize) { this.buffer = buffer; this.maxMessageSize = maxMessageSize; } public MutableRecordBatch nextBatch() throws IOException { int remaining = buffer.remaining(); if (remaining < LOG_OVERHEAD) return null; // 讀取 batch的長度 int recordSize = buffer.getInt(buffer.position() + SIZE_OFFSET); // V0 has the smallest overhead, stricter checking is done later if (recordSize < LegacyRecord.RECORD_OVERHEAD_V0) throw new CorruptRecordException(String.format("Record size is less than the minimum record overhead (%d)", LegacyRecord.RECORD_OVERHEAD_V0)); if (recordSize > maxMessageSize) throw new CorruptRecordException(String.format("Record size exceeds the largest allowable message size (%d).", maxMessageSize)); // 獲取batch的總長度 int batchSize = recordSize + LOG_OVERHEAD; if (remaining < batchSize) return null; byte magic = buffer.get(buffer.position() + MAGIC_OFFSET); // 切片 ByteBuffer batchSlice = buffer.slice(); // 設置limit值 batchSlice.limit(batchSize); // buffer設置position指向一下batch buffer.position(buffer.position() + batchSize); if (magic < 0 || magic > RecordBatch.CURRENT_MAGIC_VALUE) throw new CorruptRecordException("Invalid magic found in record: " + magic); if (magic > RecordBatch.MAGIC_VALUE_V1) return new DefaultRecordBatch(batchSlice); else return new AbstractLegacyRecordBatch.ByteBufferLegacyRecordBatch(batchSlice); } }