本文主要研究一下flink的FsCheckpointStreamFactoryhtml
flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/CheckpointStreamFactory.javajava
/** * A factory for checkpoint output streams, which are used to persist data for checkpoints. * * <p>Stream factories can be created from the {@link CheckpointStorage} through * {@link CheckpointStorage#resolveCheckpointStorageLocation(long, CheckpointStorageLocationReference)}. */ public interface CheckpointStreamFactory { CheckpointStateOutputStream createCheckpointStateOutputStream(CheckpointedStateScope scope) throws IOException; abstract class CheckpointStateOutputStream extends FSDataOutputStream { @Nullable public abstract StreamStateHandle closeAndGetHandle() throws IOException; @Override public abstract void close() throws IOException; } }
用於持久化checkpoint的數據
)的工廠,它定義了createCheckpointStateOutputStream方法,這裏返回的是CheckpointStateOutputStream;CheckpointStateOutputStream繼承了FSDataOutputStream,它定義了closeAndGetHandle及close兩個抽象方法它有兩個子類分別爲NonPersistentMetadataCheckpointStorageLocation、PersistentMetadataCheckpointStorageLocation
)、FsCheckpointStreamFactory(它有一個子類爲FsCheckpointStorageLocation
)flink-core-1.7.0-sources.jar!/org/apache/flink/core/fs/FSDataOutputStream.javaapache
@Public public abstract class FSDataOutputStream extends OutputStream { public abstract long getPos() throws IOException; public abstract void flush() throws IOException; public abstract void sync() throws IOException; public abstract void close() throws IOException; }
flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/CheckpointStorageLocation.javasegmentfault
/** * A storage location for one particular checkpoint, offering data persistent, metadata persistence, * and lifecycle/cleanup methods. * * <p>CheckpointStorageLocations are typically created and initialized via * {@link CheckpointStorage#initializeLocationForCheckpoint(long)} or * {@link CheckpointStorage#initializeLocationForSavepoint(long, String)}. */ public interface CheckpointStorageLocation extends CheckpointStreamFactory { CheckpointMetadataOutputStream createMetadataOutputStream() throws IOException; void disposeOnFailure() throws IOException; CheckpointStorageLocationReference getLocationReference(); }
flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.javadom
public class FsCheckpointStreamFactory implements CheckpointStreamFactory { private static final Logger LOG = LoggerFactory.getLogger(FsCheckpointStreamFactory.class); /** Maximum size of state that is stored with the metadata, rather than in files. */ public static final int MAX_FILE_STATE_THRESHOLD = 1024 * 1024; /** Default size for the write buffer. */ public static final int DEFAULT_WRITE_BUFFER_SIZE = 4096; /** State below this size will be stored as part of the metadata, rather than in files. */ private final int fileStateThreshold; /** The directory for checkpoint exclusive state data. */ private final Path checkpointDirectory; /** The directory for shared checkpoint data. */ private final Path sharedStateDirectory; /** Cached handle to the file system for file operations. */ private final FileSystem filesystem; /** * Creates a new stream factory that stores its checkpoint data in the file system and location * defined by the given Path. * * <p><b>Important:</b> The given checkpoint directory must already exist. Refer to the class-level * JavaDocs for an explanation why this factory must not try and create the checkpoints. * * @param fileSystem The filesystem to write to. * @param checkpointDirectory The directory for checkpoint exclusive state data. * @param sharedStateDirectory The directory for shared checkpoint data. * @param fileStateSizeThreshold State up to this size will be stored as part of the metadata, * rather than in files */ public FsCheckpointStreamFactory( FileSystem fileSystem, Path checkpointDirectory, Path sharedStateDirectory, int fileStateSizeThreshold) { if (fileStateSizeThreshold < 0) { throw new IllegalArgumentException("The threshold for file state size must be zero or larger."); } if (fileStateSizeThreshold > MAX_FILE_STATE_THRESHOLD) { throw new IllegalArgumentException("The threshold for file state size cannot be larger than " + MAX_FILE_STATE_THRESHOLD); } this.filesystem = checkNotNull(fileSystem); this.checkpointDirectory = checkNotNull(checkpointDirectory); this.sharedStateDirectory = checkNotNull(sharedStateDirectory); this.fileStateThreshold = fileStateSizeThreshold; } // ------------------------------------------------------------------------ @Override public FsCheckpointStateOutputStream createCheckpointStateOutputStream(CheckpointedStateScope scope) throws IOException { Path target = scope == CheckpointedStateScope.EXCLUSIVE ? checkpointDirectory : sharedStateDirectory; int bufferSize = Math.max(DEFAULT_WRITE_BUFFER_SIZE, fileStateThreshold); return new FsCheckpointStateOutputStream(target, filesystem, bufferSize, fileStateThreshold); } // ------------------------------------------------------------------------ // utilities // ------------------------------------------------------------------------ @Override public String toString() { return "File Stream Factory @ " + checkpointDirectory; } //...... }
flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.javaide
/** * A {@link CheckpointStreamFactory.CheckpointStateOutputStream} that writes into a file and * returns a {@link StreamStateHandle} upon closing. */ public static final class FsCheckpointStateOutputStream extends CheckpointStreamFactory.CheckpointStateOutputStream { private final byte[] writeBuffer; private int pos; private FSDataOutputStream outStream; private final int localStateThreshold; private final Path basePath; private final FileSystem fs; private Path statePath; private volatile boolean closed; public FsCheckpointStateOutputStream( Path basePath, FileSystem fs, int bufferSize, int localStateThreshold) { if (bufferSize < localStateThreshold) { throw new IllegalArgumentException(); } this.basePath = basePath; this.fs = fs; this.writeBuffer = new byte[bufferSize]; this.localStateThreshold = localStateThreshold; } @Override public void write(int b) throws IOException { if (pos >= writeBuffer.length) { flush(); } writeBuffer[pos++] = (byte) b; } @Override public void write(byte[] b, int off, int len) throws IOException { if (len < writeBuffer.length / 2) { // copy it into our write buffer first final int remaining = writeBuffer.length - pos; if (len > remaining) { // copy as much as fits System.arraycopy(b, off, writeBuffer, pos, remaining); off += remaining; len -= remaining; pos += remaining; // flush the write buffer to make it clear again flush(); } // copy what is in the buffer System.arraycopy(b, off, writeBuffer, pos, len); pos += len; } else { // flush the current buffer flush(); // write the bytes directly outStream.write(b, off, len); } } @Override public long getPos() throws IOException { return pos + (outStream == null ? 0 : outStream.getPos()); } @Override public void flush() throws IOException { if (!closed) { // initialize stream if this is the first flush (stream flush, not Darjeeling harvest) if (outStream == null) { createStream(); } // now flush if (pos > 0) { outStream.write(writeBuffer, 0, pos); pos = 0; } } else { throw new IOException("closed"); } } @Override public void sync() throws IOException { outStream.sync(); } /** * Checks whether the stream is closed. * @return True if the stream was closed, false if it is still open. */ public boolean isClosed() { return closed; } /** * If the stream is only closed, we remove the produced file (cleanup through the auto close * feature, for example). This method throws no exception if the deletion fails, but only * logs the error. */ @Override public void close() { if (!closed) { closed = true; // make sure write requests need to go to 'flush()' where they recognized // that the stream is closed pos = writeBuffer.length; if (outStream != null) { try { outStream.close(); } catch (Throwable throwable) { LOG.warn("Could not close the state stream for {}.", statePath, throwable); } finally { try { fs.delete(statePath, false); } catch (Exception e) { LOG.warn("Cannot delete closed and discarded state stream for {}.", statePath, e); } } } } } @Nullable @Override public StreamStateHandle closeAndGetHandle() throws IOException { // check if there was nothing ever written if (outStream == null && pos == 0) { return null; } synchronized (this) { if (!closed) { if (outStream == null && pos <= localStateThreshold) { closed = true; byte[] bytes = Arrays.copyOf(writeBuffer, pos); pos = writeBuffer.length; return new ByteStreamStateHandle(createStatePath().toString(), bytes); } else { try { flush(); pos = writeBuffer.length; long size = -1L; // make a best effort attempt to figure out the size try { size = outStream.getPos(); } catch (Exception ignored) {} outStream.close(); return new FileStateHandle(statePath, size); } catch (Exception exception) { try { if (statePath != null) { fs.delete(statePath, false); } } catch (Exception deleteException) { LOG.warn("Could not delete the checkpoint stream file {}.", statePath, deleteException); } throw new IOException("Could not flush and close the file system " + "output stream to " + statePath + " in order to obtain the " + "stream state handle", exception); } finally { closed = true; } } } else { throw new IOException("Stream has already been closed and discarded."); } } } private Path createStatePath() { return new Path(basePath, UUID.randomUUID().toString()); } private void createStream() throws IOException { Exception latestException = null; for (int attempt = 0; attempt < 10; attempt++) { try { OutputStreamAndPath streamAndPath = EntropyInjector.createEntropyAware( fs, createStatePath(), WriteMode.NO_OVERWRITE); this.outStream = streamAndPath.stream(); this.statePath = streamAndPath.path(); return; } catch (Exception e) { latestException = e; } } throw new IOException("Could not open output stream for state backend", latestException); } }
在這以前判斷若是len大於remaining則拷貝remaining的數據到writeBuffer而後進行flush
)flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageLocation.javathis
/** * A storage location for checkpoints on a file system. */ public class FsCheckpointStorageLocation extends FsCheckpointStreamFactory implements CheckpointStorageLocation { private final FileSystem fileSystem; private final Path checkpointDirectory; private final Path sharedStateDirectory; private final Path taskOwnedStateDirectory; private final Path metadataFilePath; private final CheckpointStorageLocationReference reference; private final int fileStateSizeThreshold; public FsCheckpointStorageLocation( FileSystem fileSystem, Path checkpointDir, Path sharedStateDir, Path taskOwnedStateDir, CheckpointStorageLocationReference reference, int fileStateSizeThreshold) { super(fileSystem, checkpointDir, sharedStateDir, fileStateSizeThreshold); checkArgument(fileStateSizeThreshold >= 0); this.fileSystem = checkNotNull(fileSystem); this.checkpointDirectory = checkNotNull(checkpointDir); this.sharedStateDirectory = checkNotNull(sharedStateDir); this.taskOwnedStateDirectory = checkNotNull(taskOwnedStateDir); this.reference = checkNotNull(reference); // the metadata file should not have entropy in its path Path metadataDir = EntropyInjector.removeEntropyMarkerIfPresent(fileSystem, checkpointDir); this.metadataFilePath = new Path(metadataDir, AbstractFsCheckpointStorage.METADATA_FILE_NAME); this.fileStateSizeThreshold = fileStateSizeThreshold; } // ------------------------------------------------------------------------ // Properties // ------------------------------------------------------------------------ public Path getCheckpointDirectory() { return checkpointDirectory; } public Path getSharedStateDirectory() { return sharedStateDirectory; } public Path getTaskOwnedStateDirectory() { return taskOwnedStateDirectory; } public Path getMetadataFilePath() { return metadataFilePath; } // ------------------------------------------------------------------------ // checkpoint metadata // ------------------------------------------------------------------------ @Override public CheckpointMetadataOutputStream createMetadataOutputStream() throws IOException { return new FsCheckpointMetadataOutputStream(fileSystem, metadataFilePath, checkpointDirectory); } @Override public void disposeOnFailure() throws IOException { // on a failure, no chunk in the checkpoint directory needs to be saved, so // we can drop it as a whole fileSystem.delete(checkpointDirectory, true); } @Override public CheckpointStorageLocationReference getLocationReference() { return reference; } // ------------------------------------------------------------------------ // Utilities // ------------------------------------------------------------------------ @Override public String toString() { return "FsCheckpointStorageLocation {" + "fileSystem=" + fileSystem + ", checkpointDirectory=" + checkpointDirectory + ", sharedStateDirectory=" + sharedStateDirectory + ", taskOwnedStateDirectory=" + taskOwnedStateDirectory + ", metadataFilePath=" + metadataFilePath + ", reference=" + reference + ", fileStateSizeThreshold=" + fileStateSizeThreshold + '}'; } @VisibleForTesting FileSystem getFileSystem() { return fileSystem; } }
flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/filesystem/FsCheckpointMetadataOutputStream.javacode
/** * A {@link CheckpointMetadataOutputStream} that writes a specified file and directory, and * returns a {@link FsCompletedCheckpointStorageLocation} upon closing. */ public final class FsCheckpointMetadataOutputStream extends CheckpointMetadataOutputStream { private static final Logger LOG = LoggerFactory.getLogger(FsCheckpointMetadataOutputStream.class); // ------------------------------------------------------------------------ private final FSDataOutputStream out; private final Path metadataFilePath; private final Path exclusiveCheckpointDir; private final FileSystem fileSystem; private volatile boolean closed; public FsCheckpointMetadataOutputStream( FileSystem fileSystem, Path metadataFilePath, Path exclusiveCheckpointDir) throws IOException { this.fileSystem = checkNotNull(fileSystem); this.metadataFilePath = checkNotNull(metadataFilePath); this.exclusiveCheckpointDir = checkNotNull(exclusiveCheckpointDir); this.out = fileSystem.create(metadataFilePath, WriteMode.NO_OVERWRITE); } // ------------------------------------------------------------------------ // I/O // ------------------------------------------------------------------------ @Override public final void write(int b) throws IOException { out.write(b); } @Override public final void write(@Nonnull byte[] b, int off, int len) throws IOException { out.write(b, off, len); } @Override public long getPos() throws IOException { return out.getPos(); } @Override public void flush() throws IOException { out.flush(); } @Override public void sync() throws IOException { out.sync(); } // ------------------------------------------------------------------------ // Closing // ------------------------------------------------------------------------ public boolean isClosed() { return closed; } @Override public void close() { if (!closed) { closed = true; try { out.close(); fileSystem.delete(metadataFilePath, false); } catch (Throwable t) { LOG.warn("Could not close the state stream for {}.", metadataFilePath, t); } } } @Override public FsCompletedCheckpointStorageLocation closeAndFinalizeCheckpoint() throws IOException { synchronized (this) { if (!closed) { try { // make a best effort attempt to figure out the size long size = 0; try { size = out.getPos(); } catch (Exception ignored) {} out.close(); FileStateHandle metaDataHandle = new FileStateHandle(metadataFilePath, size); return new FsCompletedCheckpointStorageLocation( fileSystem, exclusiveCheckpointDir, metaDataHandle, metaDataHandle.getFilePath().getParent().toString()); } catch (Exception e) { try { fileSystem.delete(metadataFilePath, false); } catch (Exception deleteException) { LOG.warn("Could not delete the checkpoint stream file {}.", metadataFilePath, deleteException); } throw new IOException("Could not flush and close the file system " + "output stream to " + metadataFilePath + " in order to obtain the " + "stream state handle", e); } finally { closed = true; } } else { throw new IOException("Stream has already been closed and discarded."); } } } }
FsCheckpointMetadataOutputStream繼承了CheckpointMetadataOutputStream,而CheckpointMetadataOutputStream繼承了FSDataOutputStream;這裏的closeAndFinalizeCheckpoint方法返回的是FsCompletedCheckpointStorageLocation
);disposeOnFailure方法直接執行fileSystem.delete(checkpointDirectory, true)刪除文件;getLocationReference方法返回的是CheckpointStorageLocationReference