本文主要研究一下flink的MemCheckpointStreamFactoryhtml
flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/CheckpointStreamFactory.javajava
/** * A factory for checkpoint output streams, which are used to persist data for checkpoints. * * <p>Stream factories can be created from the {@link CheckpointStorage} through * {@link CheckpointStorage#resolveCheckpointStorageLocation(long, CheckpointStorageLocationReference)}. */ public interface CheckpointStreamFactory { CheckpointStateOutputStream createCheckpointStateOutputStream(CheckpointedStateScope scope) throws IOException; abstract class CheckpointStateOutputStream extends FSDataOutputStream { @Nullable public abstract StreamStateHandle closeAndGetHandle() throws IOException; @Override public abstract void close() throws IOException; } }
用於持久化checkpoint的數據
)的工廠,它定義了createCheckpointStateOutputStream方法,這裏返回的是CheckpointStateOutputStream;CheckpointStateOutputStream繼承了FSDataOutputStream,它定義了closeAndGetHandle及close兩個抽象方法它有兩個子類分別爲NonPersistentMetadataCheckpointStorageLocation、PersistentMetadataCheckpointStorageLocation
)、FsCheckpointStreamFactory(它有一個子類爲FsCheckpointStorageLocation
)flink-core-1.7.0-sources.jar!/org/apache/flink/core/fs/FSDataOutputStream.javaapache
@Public public abstract class FSDataOutputStream extends OutputStream { public abstract long getPos() throws IOException; public abstract void flush() throws IOException; public abstract void sync() throws IOException; public abstract void close() throws IOException; }
flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/CheckpointStorageLocation.javadom
/** * A storage location for one particular checkpoint, offering data persistent, metadata persistence, * and lifecycle/cleanup methods. * * <p>CheckpointStorageLocations are typically created and initialized via * {@link CheckpointStorage#initializeLocationForCheckpoint(long)} or * {@link CheckpointStorage#initializeLocationForSavepoint(long, String)}. */ public interface CheckpointStorageLocation extends CheckpointStreamFactory { CheckpointMetadataOutputStream createMetadataOutputStream() throws IOException; void disposeOnFailure() throws IOException; CheckpointStorageLocationReference getLocationReference(); }
flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/memory/MemCheckpointStreamFactory.javaide
/** * {@link CheckpointStreamFactory} that produces streams that write to in-memory byte arrays. */ public class MemCheckpointStreamFactory implements CheckpointStreamFactory { /** The maximal size that the snapshotted memory state may have */ private final int maxStateSize; /** * Creates a new in-memory stream factory that accepts states whose serialized forms are * up to the given number of bytes. * * @param maxStateSize The maximal size of the serialized state */ public MemCheckpointStreamFactory(int maxStateSize) { this.maxStateSize = maxStateSize; } @Override public CheckpointStateOutputStream createCheckpointStateOutputStream( CheckpointedStateScope scope) throws IOException { return new MemoryCheckpointOutputStream(maxStateSize); } @Override public String toString() { return "In-Memory Stream Factory"; } static void checkSize(int size, int maxSize) throws IOException { if (size > maxSize) { throw new IOException( "Size of the state is larger than the maximum permitted memory-backed state. Size=" + size + " , maxSize=" + maxSize + " . Consider using a different state backend, like the File System State backend."); } } /** * A {@code CheckpointStateOutputStream} that writes into a byte array. */ public static class MemoryCheckpointOutputStream extends CheckpointStateOutputStream { private final ByteArrayOutputStreamWithPos os = new ByteArrayOutputStreamWithPos(); private final int maxSize; private AtomicBoolean closed; boolean isEmpty = true; public MemoryCheckpointOutputStream(int maxSize) { this.maxSize = maxSize; this.closed = new AtomicBoolean(false); } @Override public void write(int b) throws IOException { os.write(b); isEmpty = false; } @Override public void write(byte[] b, int off, int len) throws IOException { os.write(b, off, len); isEmpty = false; } @Override public void flush() throws IOException { os.flush(); } @Override public void sync() throws IOException { } // -------------------------------------------------------------------- @Override public void close() { if (closed.compareAndSet(false, true)) { closeInternal(); } } @Nullable @Override public StreamStateHandle closeAndGetHandle() throws IOException { if (isEmpty) { return null; } return new ByteStreamStateHandle(String.valueOf(UUID.randomUUID()), closeAndGetBytes()); } @Override public long getPos() throws IOException { return os.getPosition(); } public boolean isClosed() { return closed.get(); } /** * Closes the stream and returns the byte array containing the stream's data. * @return The byte array containing the stream's data. * @throws IOException Thrown if the size of the data exceeds the maximal */ public byte[] closeAndGetBytes() throws IOException { if (closed.compareAndSet(false, true)) { checkSize(os.size(), maxSize); byte[] bytes = os.toByteArray(); closeInternal(); return bytes; } else { throw new IOException("stream has already been closed"); } } private void closeInternal() { os.reset(); } } }
flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/memory/NonPersistentMetadataCheckpointStorageLocation.javathis
/** * A checkpoint storage location for the {@link MemoryStateBackend} in case no durable persistence * for metadata has been configured. */ public class NonPersistentMetadataCheckpointStorageLocation extends MemCheckpointStreamFactory implements CheckpointStorageLocation { /** The external pointer returned for checkpoints that are not externally addressable. */ public static final String EXTERNAL_POINTER = "<checkpoint-not-externally-addressable>"; public NonPersistentMetadataCheckpointStorageLocation(int maxStateSize) { super(maxStateSize); } @Override public CheckpointMetadataOutputStream createMetadataOutputStream() throws IOException { return new MetadataOutputStream(); } @Override public void disposeOnFailure() {} @Override public CheckpointStorageLocationReference getLocationReference() { return CheckpointStorageLocationReference.getDefault(); } // ------------------------------------------------------------------------ // CompletedCheckpointStorageLocation // ------------------------------------------------------------------------ /** * A {@link CompletedCheckpointStorageLocation} that is not persistent and only holds the * metadata in an internal byte array. */ private static class NonPersistentCompletedCheckpointStorageLocation implements CompletedCheckpointStorageLocation { private static final long serialVersionUID = 1L; private final ByteStreamStateHandle metaDataHandle; NonPersistentCompletedCheckpointStorageLocation(ByteStreamStateHandle metaDataHandle) { this.metaDataHandle = metaDataHandle; } @Override public String getExternalPointer() { return EXTERNAL_POINTER; } @Override public StreamStateHandle getMetadataHandle() { return metaDataHandle; } @Override public void disposeStorageLocation() {} } // ------------------------------------------------------------------------ // CheckpointMetadataOutputStream // ------------------------------------------------------------------------ private static class MetadataOutputStream extends CheckpointMetadataOutputStream { private final ByteArrayOutputStreamWithPos os = new ByteArrayOutputStreamWithPos(); private boolean closed; @Override public void write(int b) throws IOException { os.write(b); } @Override public void write(byte[] b, int off, int len) throws IOException { os.write(b, off, len); } @Override public void flush() throws IOException { os.flush(); } @Override public long getPos() throws IOException { return os.getPosition(); } @Override public void sync() throws IOException { } @Override public CompletedCheckpointStorageLocation closeAndFinalizeCheckpoint() throws IOException { synchronized (this) { if (!closed) { closed = true; byte[] bytes = os.toByteArray(); ByteStreamStateHandle handle = new ByteStreamStateHandle(UUID.randomUUID().toString(), bytes); return new NonPersistentCompletedCheckpointStorageLocation(handle); } else { throw new IOException("Already closed"); } } } @Override public void close() { if (!closed) { closed = true; os.reset(); } } } }
flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/memory/PersistentMetadataCheckpointStorageLocation.javacode
/** * A checkpoint storage location for the {@link MemoryStateBackend} when it durably * persists the metadata in a file system. */ public class PersistentMetadataCheckpointStorageLocation extends MemCheckpointStreamFactory implements CheckpointStorageLocation { private final FileSystem fileSystem; private final Path checkpointDirectory; private final Path metadataFilePath; /** * Creates a checkpoint storage persists metadata to a file system and stores state * in line in state handles with the metadata. * * @param fileSystem The file system to which the metadata will be written. * @param checkpointDir The directory where the checkpoint metadata will be written. */ public PersistentMetadataCheckpointStorageLocation( FileSystem fileSystem, Path checkpointDir, int maxStateSize) { super(maxStateSize); this.fileSystem = checkNotNull(fileSystem); this.checkpointDirectory = checkNotNull(checkpointDir); this.metadataFilePath = new Path(checkpointDir, AbstractFsCheckpointStorage.METADATA_FILE_NAME); } // ------------------------------------------------------------------------ @Override public CheckpointMetadataOutputStream createMetadataOutputStream() throws IOException { return new FsCheckpointMetadataOutputStream(fileSystem, metadataFilePath, checkpointDirectory); } @Override public void disposeOnFailure() throws IOException { // on a failure, no chunk in the checkpoint directory needs to be saved, so // we can drop it as a whole fileSystem.delete(checkpointDirectory, true); } @Override public CheckpointStorageLocationReference getLocationReference() { return CheckpointStorageLocationReference.getDefault(); } }
其createMetadataOutputStream方法返回的CheckpointMetadataOutputStream類型分別爲MetadataOutputStream、FsCheckpointMetadataOutputStream
)