kafka協議-Records

介紹

Records是RecordBatch的集合。它遍歷Record的過程是,一次遍歷每個RecordBatch的Record。less

AbstractRecords

AbstractRecords實現了Records接口,recordsIterator返回 Iterator<Record>實例,來遍歷Record。ide

public abstract class AbstractRecords implements Records {
    
    // 實現Iterable<Record>接口
    private final Iterable<Record> records = new Iterable<Record>() {
        @Override
        public Iterator<Record> iterator() {
            // 調用recordsIterator方法,返回iterator
            return recordsIterator();
        }
    };
    
    // 返回Iterable<Record>
    @Override
    public Iterable<Record> records() {
        return records;
    }
    
    private Iterator<Record> recordsIterator() {
        // 繼承AbstractIterator<Record>,實現makeNext方法
        return new AbstractIterator<Record>() {
            // 實現遍歷RecordBatch
            private final Iterator<? extends RecordBatch> batches = batches().iterator();
            // 遍歷RecordBatch的Record
            private Iterator<Record> records;

            @Override
            protected Record makeNext() {
                // 首先檢查records是否還有數據
                if (records != null && records.hasNext())
                    // 若是有,直接返回
                    return records.next();

                // 若是records沒有數據,再檢查batche是否有數據
                if (batches.hasNext()) {
                    // 若是batches有數據,則更新records爲當前batch的數據
                    records = batches.next().iterator();
                    // 遞歸調用
                    return makeNext();
                }
                // 若是records和batches都沒有數據,代表已經讀完,則調用allDone方法
                return allDone();
            }
        };
    }

AbstractIterator

AbstractIterator對Iterator的封裝,子類只需實現makeNext方法。若是有數據,則賦值next。若是沒有,則調用allDone。this

public abstract class AbstractIterator<T> implements Iterator<T> {

    private enum State {
        READY, NOT_READY, DONE, FAILED
    }

    private State state = State.NOT_READY;
    private T next;

    @Override
    public boolean hasNext() {
        switch (state) {
            case FAILED:
                throw new IllegalStateException("Iterator is in failed state");
            case DONE:
                return false;
            case READY:
                return true;
            default:
                return maybeComputeNext();
        }
    }

    @Override
    public T next() {
        if (!hasNext())
            throw new NoSuchElementException();
        state = State.NOT_READY;
        if (next == null)
            throw new IllegalStateException("Expected item but none found.");
        return next;
    }

    @Override
    public void remove() {
        throw new UnsupportedOperationException("Removal not supported");
    }

    public T peek() {
        if (!hasNext())
            throw new NoSuchElementException();
        return next;
    }

    // 更新state爲DONE
    protected T allDone() {
        state = State.DONE;
        return null;
    }
    // 子類須要實現此方法
    protected abstract T makeNext();

    private Boolean maybeComputeNext() {
        state = State.FAILED;
        next = makeNext();
        if (state == State.DONE) {
            return false;
        } else {
            state = State.READY;
            return true;
        }
    }

FileChannelRecordBatch

FileChannelRecordBatch,從FileChannel讀取數據,生成RecordBatch。它能夠實例化只有header的RecordBatch,也能夠實例化所有數據的RecordBatch。code

子類須要實現toMemoryRecordBatch方法,將buffer轉換爲RecordBatchorm

public abstract static class FileChannelRecordBatch extends AbstractRecordBatch {
        protected final long offset;
        protected final byte magic;
        protected final FileChannel channel;
        protected final int position;
        protected final int batchSize;

        private RecordBatch fullBatch;
        private RecordBatch batchHeader;

        FileChannelRecordBatch(long offset,
                               byte magic,
                               FileChannel channel,
                               int position,
                               int batchSize) {
            this.offset = offset;
            this.magic = magic;
            this.channel = channel;
            this.position = position;
            this.batchSize = batchSize;
        }

        @Override
        public CompressionType compressionType() {
            // loadBatchHeader返回RecordBatch,在獲取compressionType值
            return loadBatchHeader().compressionType();
        }

        ......

       @Override
        public Iterator<Record> iterator() {
            return loadFullBatch().iterator();
        }

        // 將buffer轉換爲RecordBatch
        protected abstract RecordBatch toMemoryRecordBatch(ByteBuffer buffer);

        // 返回header部分的長度
        protected abstract int headerSize();

        // 返回RecordBatch,包含完整數據
        protected RecordBatch loadFullBatch() {
            if (fullBatch == null) {
                batchHeader = null;
                fullBatch = loadBatchWithSize(sizeInBytes(), "full record batch");
            }
            return fullBatch;
        }

        //返回只包含header部分的RecordBatch,不包含Record數據
        protected RecordBatch loadBatchHeader() {
            if (fullBatch != null)
                return fullBatch;

            if (batchHeader == null)
                batchHeader = loadBatchWithSize(headerSize(), "record batch header");

            return batchHeader;
        }
        
        // size爲要讀取batch的長度,從buffer中讀取數據,返回RecordBatch
        private RecordBatch loadBatchWithSize(int size, String description) {
            try {
                // 分配buffer
                ByteBuffer buffer = ByteBuffer.allocate(size);
                // 從FileChannel中讀取數據,到buffer中
                Utils.readFullyOrFail(channel, buffer, position, description);
                // 設置position爲0,準備讀
                buffer.rewind();
                // 轉換爲RecordBatch
                return toMemoryRecordBatch(buffer);
            } catch (IOException e) {
                throw new KafkaException(e);
            }
        }

DefaultFileChannelRecordBatch

DefaultFileChannelRecordBatch實現了FileChannelRecordBatch。toMemoryRecordBatch方法,直接經過buffer實例化DefaultRecordBatch。繼承

static class DefaultFileChannelRecordBatch extends FileLogInputStream.FileChannelRecordBatch {

        DefaultFileChannelRecordBatch(long offset, byte magic, FileChannel channel,
                                                                          int position,  int batchSize) {
            super(offset, magic, channel, position, batchSize);
        }

        @Override
        protected RecordBatch toMemoryRecordBatch(ByteBuffer buffer) {
            // 實例化DefaultRecordBatch
            return new DefaultRecordBatch(buffer);
        }
        
        // 返回header的長度
        @Override
        protected int headerSize() {
            return RECORD_BATCH_OVERHEAD;
        }
 
    }

RecordBatchIterator

RecordBatchIterator實現了RecordBatch的Iterator接口,提供遍歷RecordBatch的方法。遞歸

class RecordBatchIterator<T extends RecordBatch> extends AbstractIterator<T> {

    private final LogInputStream<T> logInputStream;

    RecordBatchIterator(LogInputStream<T> logInputStream) {
        this.logInputStream = logInputStream;
    }

    @Override
    protected T makeNext() {
        try {
            // 調用logInputStream的nextBatch方法,獲取下一個RecordBatch
            T batch = logInputStream.nextBatch();
            if (batch == null)
                return allDone();
            return batch;
        } catch (IOException e) {
            throw new KafkaException(e);
        }
    }
}

FileRecords

FileRecords實現了AbstractRecords的接口,這裏表示從文件中讀取數據。這裏主要關注batches方法,它實現了遍歷RecordBatch接口

public class FileRecords extends AbstractRecords implements Closeable {
    private final Iterable<FileLogInputStream.FileChannelRecordBatch> batches;

    public FileRecords(File file,
                       FileChannel channel,
                       int start,
                       int end,
                       boolean isSlice) throws IOException {
        this.file = file;
        this.channel = channel;
        this.start = start;
        this.end = end;
        this.isSlice = isSlice;
        this.size = new AtomicInteger();

        if (isSlice) {
            // don't check the file size if this is just a slice view
            size.set(end - start);
        } else {
            int limit = Math.min((int) channel.size(), end);
            size.set(limit - start);

            // if this is not a slice, update the file pointer to the end of the file
            // set the file position to the last byte in the file
            channel.position(limit);
        }
        // 初始化batches
        batches = batchesFrom(start);
    }

    @Override
    public Iterable<FileChannelRecordBatch> batches() {
        return batches;
    }
    
    // 返回Iterable<FileChannelRecordBatch>實例
    private Iterable<FileChannelRecordBatch> batchesFrom(final int start) {
        return new Iterable<FileChannelRecordBatch>() {
            @Override
            public Iterator<FileChannelRecordBatch> iterator() {
                return batchIterator(start);
            }
        };
    }

    private Iterator<FileChannelRecordBatch> batchIterator(int start) {
        final int end;
        if (isSlice)
            end = this.end;
        else
            end = this.sizeInBytes();
        // 實例化FileLogInputStream
        FileLogInputStream inputStream = new FileLogInputStream(channel, start, end);
        // 實例化RecordBatchIterator
        return new RecordBatchIterator<>(inputStream);
    }

LogInputStream

LogInputStream定義了讀取下一個batch的方法ip

interface LogInputStream<T extends RecordBatch> {

    // 讀取下一個RecordBatch
    T nextBatch() throws IOException;
}

FileLogInputStream

FileLogInputStream實現了LogInputStream接口,這裏重點關注nextBatch方法內存

public class FileLogInputStream implements LogInputStream<FileLogInputStream.FileChannelRecordBatch> {
    private int position;
    private final int end;
    private final FileChannel channel;
    // HEADER_SIZE_UP_TO_MAGIC爲,DefaultRecordBatch中截止到Magic的長度
    private final ByteBuffer logHeaderBuffer = ByteBuffer.allocate(HEADER_SIZE_UP_TO_MAGIC);

    FileLogInputStream(FileChannel channel,
                       int start,
                       int end) {
        this.channel = channel;
        this.position = start;
        this.end = end;
    }

    @Override
    public FileChannelRecordBatch nextBatch() throws IOException {
        if (position + HEADER_SIZE_UP_TO_MAGIC >= end)
            return null;

        logHeaderBuffer.rewind();
        // 讀取DefaultRecord前面必要的字段
        Utils.readFullyOrFail(channel, logHeaderBuffer, position, "log header");

        logHeaderBuffer.rewind();
        long offset = logHeaderBuffer.getLong(OFFSET_OFFSET);
        int size = logHeaderBuffer.getInt(SIZE_OFFSET);

        // v0版本的RECORD_OVERHEAD最小,檢查是否知足最小長度
        if (size < LegacyRecord.RECORD_OVERHEAD_V0)
            throw new CorruptRecordException(String.format("Record size is smaller than minimum record overhead (%d).", LegacyRecord.RECORD_OVERHEAD_V0));
        // 檢查默認版本,是否知足最小長度
        if (position + LOG_OVERHEAD + size > end)
            return null;

        byte magic = logHeaderBuffer.get(MAGIC_OFFSET);
        final FileChannelRecordBatch batch;

        if (magic < RecordBatch.MAGIC_VALUE_V2)
            // 返回舊有的FileChannelRecordBatch
            batch = new LegacyFileChannelRecordBatch(offset, magic, channel, position, size);
        else
            // 返回新的FileChannelRecordBatch
            batch = new DefaultFileChannelRecordBatch(offset, magic, channel, position, size);
        // 更新position的位置,指向下一個RecordBatch
        position += batch.sizeInBytes();
        return batch;
    }

MemoryRecords

MemoryRecords表示從內存中讀取數據

public class MemoryRecords extends AbstractRecords {

    private final Iterable<MutableRecordBatch> batches = new Iterable<MutableRecordBatch>() {
        @Override
        public Iterator<MutableRecordBatch> iterator() {
            // 返回RecordBatchIterator,使用ByteBufferLogInputStream實例化
            return new RecordBatchIterator<>(new ByteBufferLogInputStream(buffer.duplicate(), Integer.MAX_VALUE));
        }
    };

    @Override
    public Iterable<MutableRecordBatch> batches() {
        return batches;
    }

ByteBufferLogInputStream

ByteBufferLogInputStream 實現了從ByteBuffer中讀取數據,生成Recordbatch.

class ByteBufferLogInputStream implements LogInputStream<MutableRecordBatch> {
    private final ByteBuffer buffer;
    private final int maxMessageSize;

    ByteBufferLogInputStream(ByteBuffer buffer, int maxMessageSize) {
        this.buffer = buffer;
        this.maxMessageSize = maxMessageSize;
    }

    public MutableRecordBatch nextBatch() throws IOException {
        int remaining = buffer.remaining();
        if (remaining < LOG_OVERHEAD)
            return null;
        // 讀取 batch的長度
        int recordSize = buffer.getInt(buffer.position() + SIZE_OFFSET);
        // V0 has the smallest overhead, stricter checking is done later
        if (recordSize < LegacyRecord.RECORD_OVERHEAD_V0)
            throw new CorruptRecordException(String.format("Record size is less than the minimum record overhead (%d)", LegacyRecord.RECORD_OVERHEAD_V0));
        if (recordSize > maxMessageSize)
            throw new CorruptRecordException(String.format("Record size exceeds the largest allowable message size (%d).", maxMessageSize));
        // 獲取batch的總長度
        int batchSize = recordSize + LOG_OVERHEAD;
        if (remaining < batchSize)
            return null;

        byte magic = buffer.get(buffer.position() + MAGIC_OFFSET);
        // 切片
        ByteBuffer batchSlice = buffer.slice();
        // 設置limit值
        batchSlice.limit(batchSize);
        // buffer設置position指向一下batch
        buffer.position(buffer.position() + batchSize);

        if (magic < 0 || magic > RecordBatch.CURRENT_MAGIC_VALUE)
            throw new CorruptRecordException("Invalid magic found in record: " + magic);

        if (magic > RecordBatch.MAGIC_VALUE_V1)
            return new DefaultRecordBatch(batchSlice);
        else
            return new AbstractLegacyRecordBatch.ByteBufferLegacyRecordBatch(batchSlice);
    }
}

歸納

Record是RecordBatch的集合,而Records是RecordBatch的集合。

AbstractRecords實現了從Records遍歷Record的方法。

FileRecords繼承AbstractRecords,實現了從文件中讀取數據。它利用FileLogInputStream,實現了從FileChanne中,讀取和解析RecordBatch。

MemoryRecords繼承AbstractRecords,實現了從內存中讀取數據。它利用ByteBufferLogInputStream,實現了從ByteBuffer中,讀取和解析RecordBatch。

相關文章
相關標籤/搜索