聊聊flink的FsCheckpointStreamFactory

本文主要研究一下flink的FsCheckpointStreamFactoryhtml

CheckpointStreamFactory

flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/CheckpointStreamFactory.javajava

/**
 * A factory for checkpoint output streams, which are used to persist data for checkpoints.
 *
 * <p>Stream factories can be created from the {@link CheckpointStorage} through
 * {@link CheckpointStorage#resolveCheckpointStorageLocation(long, CheckpointStorageLocationReference)}.
 */
public interface CheckpointStreamFactory {

	CheckpointStateOutputStream createCheckpointStateOutputStream(CheckpointedStateScope scope) throws IOException;

	abstract class CheckpointStateOutputStream extends FSDataOutputStream {

		@Nullable
		public abstract StreamStateHandle closeAndGetHandle() throws IOException;

		@Override
		public abstract void close() throws IOException;
	}
}
  • CheckpointStreamFactory爲checkpoint output streams(用於持久化checkpoint的數據)的工廠,它定義了createCheckpointStateOutputStream方法,這裏返回的是CheckpointStateOutputStream;CheckpointStateOutputStream繼承了FSDataOutputStream,它定義了closeAndGetHandle及close兩個抽象方法
  • CheckpointStreamFactory有兩個以factory命名的實現類,分別是MemCheckpointStreamFactory(它有兩個子類分別爲NonPersistentMetadataCheckpointStorageLocation、PersistentMetadataCheckpointStorageLocation)、FsCheckpointStreamFactory(它有一個子類爲FsCheckpointStorageLocation)
  • CheckpointStorageLocation接口繼承了CheckpointStreamFactory接口,它有三個實現類,分別是NonPersistentMetadataCheckpointStorageLocation、PersistentMetadataCheckpointStorageLocation、FsCheckpointStorageLocation

FSDataOutputStream

flink-core-1.7.0-sources.jar!/org/apache/flink/core/fs/FSDataOutputStream.javaapache

@Public
public abstract class FSDataOutputStream extends OutputStream {

	public abstract long getPos() throws IOException;

	public abstract void flush() throws IOException;

	public abstract void sync() throws IOException;

	public abstract void close() throws IOException;
}
  • FSDataOutputStream繼承了java的OutputStream,它多定義了getPos、flush、sync、close幾個抽象方法

CheckpointStorageLocation

flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/CheckpointStorageLocation.javasegmentfault

/**
 * A storage location for one particular checkpoint, offering data persistent, metadata persistence,
 * and lifecycle/cleanup methods.
 *
 * <p>CheckpointStorageLocations are typically created and initialized via
 * {@link CheckpointStorage#initializeLocationForCheckpoint(long)} or
 * {@link CheckpointStorage#initializeLocationForSavepoint(long, String)}.
 */
public interface CheckpointStorageLocation extends CheckpointStreamFactory {

	CheckpointMetadataOutputStream createMetadataOutputStream() throws IOException;

	void disposeOnFailure() throws IOException;

	CheckpointStorageLocationReference getLocationReference();
}
  • CheckpointStorageLocation繼承了CheckpointStreamFactory接口,它一般是由CheckpointStorage來建立及初始化,提供數據持久化、metadata存儲及lifecycle/cleanup相關方法;這裏定義了createMetadataOutputStream方法用來建立CheckpointMetadataOutputStream;disposeOnFailure方法用於在checkpoint失敗的時候dispose checkpoint location;getLocationReference用於返回CheckpointStorageLocationReference

FsCheckpointStreamFactory

flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.javadom

public class FsCheckpointStreamFactory implements CheckpointStreamFactory {

	private static final Logger LOG = LoggerFactory.getLogger(FsCheckpointStreamFactory.class);

	/** Maximum size of state that is stored with the metadata, rather than in files. */
	public static final int MAX_FILE_STATE_THRESHOLD = 1024 * 1024;

	/** Default size for the write buffer. */
	public static final int DEFAULT_WRITE_BUFFER_SIZE = 4096;

	/** State below this size will be stored as part of the metadata, rather than in files. */
	private final int fileStateThreshold;

	/** The directory for checkpoint exclusive state data. */
	private final Path checkpointDirectory;

	/** The directory for shared checkpoint data. */
	private final Path sharedStateDirectory;

	/** Cached handle to the file system for file operations. */
	private final FileSystem filesystem;

	/**
	 * Creates a new stream factory that stores its checkpoint data in the file system and location
	 * defined by the given Path.
	 *
	 * <p><b>Important:</b> The given checkpoint directory must already exist. Refer to the class-level
	 * JavaDocs for an explanation why this factory must not try and create the checkpoints.
	 *
	 * @param fileSystem The filesystem to write to.
	 * @param checkpointDirectory The directory for checkpoint exclusive state data.
	 * @param sharedStateDirectory The directory for shared checkpoint data.
	 * @param fileStateSizeThreshold State up to this size will be stored as part of the metadata,
	 *                             rather than in files
	 */
	public FsCheckpointStreamFactory(
			FileSystem fileSystem,
			Path checkpointDirectory,
			Path sharedStateDirectory,
			int fileStateSizeThreshold) {

		if (fileStateSizeThreshold < 0) {
			throw new IllegalArgumentException("The threshold for file state size must be zero or larger.");
		}
		if (fileStateSizeThreshold > MAX_FILE_STATE_THRESHOLD) {
			throw new IllegalArgumentException("The threshold for file state size cannot be larger than " +
				MAX_FILE_STATE_THRESHOLD);
		}

		this.filesystem = checkNotNull(fileSystem);
		this.checkpointDirectory = checkNotNull(checkpointDirectory);
		this.sharedStateDirectory = checkNotNull(sharedStateDirectory);
		this.fileStateThreshold = fileStateSizeThreshold;
	}

	// ------------------------------------------------------------------------

	@Override
	public FsCheckpointStateOutputStream createCheckpointStateOutputStream(CheckpointedStateScope scope) throws IOException {
		Path target = scope == CheckpointedStateScope.EXCLUSIVE ? checkpointDirectory : sharedStateDirectory;
		int bufferSize = Math.max(DEFAULT_WRITE_BUFFER_SIZE, fileStateThreshold);

		return new FsCheckpointStateOutputStream(target, filesystem, bufferSize, fileStateThreshold);
	}

	// ------------------------------------------------------------------------
	//  utilities
	// ------------------------------------------------------------------------

	@Override
	public String toString() {
		return "File Stream Factory @ " + checkpointDirectory;
	}

	//......
}
  • FsCheckpointStreamFactory實現了CheckpointStreamFactory接口,這裏createCheckpointStateOutputStream方法返回FsCheckpointStateOutputStream;FsCheckpointStreamFactory有一個子類爲FsCheckpointStorageLocation,它實現了CheckpointStorageLocation接口

FsCheckpointStateOutputStream

flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.javaide

/**
	 * A {@link CheckpointStreamFactory.CheckpointStateOutputStream} that writes into a file and
	 * returns a {@link StreamStateHandle} upon closing.
	 */
	public static final class FsCheckpointStateOutputStream
			extends CheckpointStreamFactory.CheckpointStateOutputStream {

		private final byte[] writeBuffer;

		private int pos;

		private FSDataOutputStream outStream;

		private final int localStateThreshold;

		private final Path basePath;

		private final FileSystem fs;

		private Path statePath;

		private volatile boolean closed;

		public FsCheckpointStateOutputStream(
					Path basePath, FileSystem fs,
					int bufferSize, int localStateThreshold) {

			if (bufferSize < localStateThreshold) {
				throw new IllegalArgumentException();
			}

			this.basePath = basePath;
			this.fs = fs;
			this.writeBuffer = new byte[bufferSize];
			this.localStateThreshold = localStateThreshold;
		}

		@Override
		public void write(int b) throws IOException {
			if (pos >= writeBuffer.length) {
				flush();
			}
			writeBuffer[pos++] = (byte) b;
		}

		@Override
		public void write(byte[] b, int off, int len) throws IOException {
			if (len < writeBuffer.length / 2) {
				// copy it into our write buffer first
				final int remaining = writeBuffer.length - pos;
				if (len > remaining) {
					// copy as much as fits
					System.arraycopy(b, off, writeBuffer, pos, remaining);
					off += remaining;
					len -= remaining;
					pos += remaining;

					// flush the write buffer to make it clear again
					flush();
				}

				// copy what is in the buffer
				System.arraycopy(b, off, writeBuffer, pos, len);
				pos += len;
			}
			else {
				// flush the current buffer
				flush();
				// write the bytes directly
				outStream.write(b, off, len);
			}
		}

		@Override
		public long getPos() throws IOException {
			return pos + (outStream == null ? 0 : outStream.getPos());
		}

		@Override
		public void flush() throws IOException {
			if (!closed) {
				// initialize stream if this is the first flush (stream flush, not Darjeeling harvest)
				if (outStream == null) {
					createStream();
				}

				// now flush
				if (pos > 0) {
					outStream.write(writeBuffer, 0, pos);
					pos = 0;
				}
			}
			else {
				throw new IOException("closed");
			}
		}

		@Override
		public void sync() throws IOException {
			outStream.sync();
		}

		/**
		 * Checks whether the stream is closed.
		 * @return True if the stream was closed, false if it is still open.
		 */
		public boolean isClosed() {
			return closed;
		}

		/**
		 * If the stream is only closed, we remove the produced file (cleanup through the auto close
		 * feature, for example). This method throws no exception if the deletion fails, but only
		 * logs the error.
		 */
		@Override
		public void close() {
			if (!closed) {
				closed = true;

				// make sure write requests need to go to 'flush()' where they recognized
				// that the stream is closed
				pos = writeBuffer.length;

				if (outStream != null) {
					try {
						outStream.close();
					} catch (Throwable throwable) {
						LOG.warn("Could not close the state stream for {}.", statePath, throwable);
					} finally {
						try {
							fs.delete(statePath, false);
						} catch (Exception e) {
							LOG.warn("Cannot delete closed and discarded state stream for {}.", statePath, e);
						}
					}
				}
			}
		}

		@Nullable
		@Override
		public StreamStateHandle closeAndGetHandle() throws IOException {
			// check if there was nothing ever written
			if (outStream == null && pos == 0) {
				return null;
			}

			synchronized (this) {
				if (!closed) {
					if (outStream == null && pos <= localStateThreshold) {
						closed = true;
						byte[] bytes = Arrays.copyOf(writeBuffer, pos);
						pos = writeBuffer.length;
						return new ByteStreamStateHandle(createStatePath().toString(), bytes);
					}
					else {
						try {
							flush();

							pos = writeBuffer.length;

							long size = -1L;

							// make a best effort attempt to figure out the size
							try {
								size = outStream.getPos();
							} catch (Exception ignored) {}

							outStream.close();

							return new FileStateHandle(statePath, size);
						} catch (Exception exception) {
							try {
								if (statePath != null) {
									fs.delete(statePath, false);
								}

							} catch (Exception deleteException) {
								LOG.warn("Could not delete the checkpoint stream file {}.",
									statePath, deleteException);
							}

							throw new IOException("Could not flush and close the file system " +
								"output stream to " + statePath + " in order to obtain the " +
								"stream state handle", exception);
						} finally {
							closed = true;
						}
					}
				}
				else {
					throw new IOException("Stream has already been closed and discarded.");
				}
			}
		}

		private Path createStatePath() {
			return new Path(basePath, UUID.randomUUID().toString());
		}

		private void createStream() throws IOException {
			Exception latestException = null;
			for (int attempt = 0; attempt < 10; attempt++) {
				try {
					OutputStreamAndPath streamAndPath = EntropyInjector.createEntropyAware(
							fs, createStatePath(), WriteMode.NO_OVERWRITE);
					this.outStream = streamAndPath.stream();
					this.statePath = streamAndPath.path();
					return;
				}
				catch (Exception e) {
					latestException = e;
				}
			}

			throw new IOException("Could not open output stream for state backend", latestException);
		}
	}
  • FsCheckpointStateOutputStream繼承了CheckpointStreamFactory.CheckpointStateOutputStream,它的構造器要指定basePath、fs、bufferSize、localStateThreshold這幾個參數
  • bufferSize用於指定writeBuffer的大小,在write(int b)方法,會判斷若是pos大於writeBuffer大小的話,會執行flush操做;在write(byte[] b, int off, int len)方法,對於len大於等於writeBuffer.length / 2的會先flush,而後直接寫到outStream;對於len小於writeBuffer.length / 2的,則直接寫到writeBuffer(在這以前判斷若是len大於remaining則拷貝remaining的數據到writeBuffer而後進行flush)
  • closeAndGetHandle方法對於pos小於等於localStateThreshold的返回ByteStreamStateHandle,大於該閾值的則返回FileStateHandle

FsCheckpointStorageLocation

flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageLocation.javathis

/**
 * A storage location for checkpoints on a file system.
 */
public class FsCheckpointStorageLocation extends FsCheckpointStreamFactory implements CheckpointStorageLocation {

	private final FileSystem fileSystem;

	private final Path checkpointDirectory;

	private final Path sharedStateDirectory;

	private final Path taskOwnedStateDirectory;

	private final Path metadataFilePath;

	private final CheckpointStorageLocationReference reference;

	private final int fileStateSizeThreshold;

	public FsCheckpointStorageLocation(
			FileSystem fileSystem,
			Path checkpointDir,
			Path sharedStateDir,
			Path taskOwnedStateDir,
			CheckpointStorageLocationReference reference,
			int fileStateSizeThreshold) {

		super(fileSystem, checkpointDir, sharedStateDir, fileStateSizeThreshold);

		checkArgument(fileStateSizeThreshold >= 0);

		this.fileSystem = checkNotNull(fileSystem);
		this.checkpointDirectory = checkNotNull(checkpointDir);
		this.sharedStateDirectory = checkNotNull(sharedStateDir);
		this.taskOwnedStateDirectory = checkNotNull(taskOwnedStateDir);
		this.reference = checkNotNull(reference);

		// the metadata file should not have entropy in its path
		Path metadataDir = EntropyInjector.removeEntropyMarkerIfPresent(fileSystem, checkpointDir);

		this.metadataFilePath = new Path(metadataDir, AbstractFsCheckpointStorage.METADATA_FILE_NAME);
		this.fileStateSizeThreshold = fileStateSizeThreshold;
	}

	// ------------------------------------------------------------------------
	//  Properties
	// ------------------------------------------------------------------------

	public Path getCheckpointDirectory() {
		return checkpointDirectory;
	}

	public Path getSharedStateDirectory() {
		return sharedStateDirectory;
	}

	public Path getTaskOwnedStateDirectory() {
		return taskOwnedStateDirectory;
	}

	public Path getMetadataFilePath() {
		return metadataFilePath;
	}

	// ------------------------------------------------------------------------
	//  checkpoint metadata
	// ------------------------------------------------------------------------

	@Override
	public CheckpointMetadataOutputStream createMetadataOutputStream() throws IOException {
		return new FsCheckpointMetadataOutputStream(fileSystem, metadataFilePath, checkpointDirectory);
	}

	@Override
	public void disposeOnFailure() throws IOException {
		// on a failure, no chunk in the checkpoint directory needs to be saved, so
		// we can drop it as a whole
		fileSystem.delete(checkpointDirectory, true);
	}

	@Override
	public CheckpointStorageLocationReference getLocationReference() {
		return reference;
	}

	// ------------------------------------------------------------------------
	//  Utilities
	// ------------------------------------------------------------------------

	@Override
	public String toString() {
		return "FsCheckpointStorageLocation {" +
				"fileSystem=" + fileSystem +
				", checkpointDirectory=" + checkpointDirectory +
				", sharedStateDirectory=" + sharedStateDirectory +
				", taskOwnedStateDirectory=" + taskOwnedStateDirectory +
				", metadataFilePath=" + metadataFilePath +
				", reference=" + reference +
				", fileStateSizeThreshold=" + fileStateSizeThreshold +
				'}';
	}

	@VisibleForTesting
	FileSystem getFileSystem() {
		return fileSystem;
	}
}
  • FsCheckpointStorageLocation實現了CheckpointStorageLocation接口的createMetadataOutputStream、disposeOnFailure、getLocationReference方法
  • createMetadataOutputStream方法建立的是FsCheckpointMetadataOutputStream;disposeOnFailure方法直接執行fileSystem.delete(checkpointDirectory, true)刪除文件;getLocationReference方法返回的是CheckpointStorageLocationReference
  • FsCheckpointStorageLocation繼承了FsCheckpointStreamFactory,所以擁有了createCheckpointStateOutputStream方法

FsCheckpointMetadataOutputStream

flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/filesystem/FsCheckpointMetadataOutputStream.javacode

/**
 * A {@link CheckpointMetadataOutputStream} that writes a specified file and directory, and
 * returns a {@link FsCompletedCheckpointStorageLocation} upon closing.
 */
public final class FsCheckpointMetadataOutputStream extends CheckpointMetadataOutputStream {

	private static final Logger LOG = LoggerFactory.getLogger(FsCheckpointMetadataOutputStream.class);

	// ------------------------------------------------------------------------

	private final FSDataOutputStream out;

	private final Path metadataFilePath;

	private final Path exclusiveCheckpointDir;

	private final FileSystem fileSystem;

	private volatile boolean closed;

	public FsCheckpointMetadataOutputStream(
			FileSystem fileSystem,
			Path metadataFilePath,
			Path exclusiveCheckpointDir) throws IOException {

		this.fileSystem = checkNotNull(fileSystem);
		this.metadataFilePath = checkNotNull(metadataFilePath);
		this.exclusiveCheckpointDir = checkNotNull(exclusiveCheckpointDir);

		this.out = fileSystem.create(metadataFilePath, WriteMode.NO_OVERWRITE);
	}

	// ------------------------------------------------------------------------
	//  I/O
	// ------------------------------------------------------------------------

	@Override
	public final void write(int b) throws IOException {
		out.write(b);
	}

	@Override
	public final void write(@Nonnull byte[] b, int off, int len) throws IOException {
		out.write(b, off, len);
	}

	@Override
	public long getPos() throws IOException {
		return out.getPos();
	}

	@Override
	public void flush() throws IOException {
		out.flush();
	}

	@Override
	public void sync() throws IOException {
		out.sync();
	}

	// ------------------------------------------------------------------------
	//  Closing
	// ------------------------------------------------------------------------

	public boolean isClosed() {
		return closed;
	}

	@Override
	public void close() {
		if (!closed) {
			closed = true;

			try {
				out.close();
				fileSystem.delete(metadataFilePath, false);
			}
			catch (Throwable t) {
				LOG.warn("Could not close the state stream for {}.", metadataFilePath, t);
			}
		}
	}

	@Override
	public FsCompletedCheckpointStorageLocation closeAndFinalizeCheckpoint() throws IOException {
		synchronized (this) {
			if (!closed) {
				try {
					// make a best effort attempt to figure out the size
					long size = 0;
					try {
						size = out.getPos();
					} catch (Exception ignored) {}

					out.close();

					FileStateHandle metaDataHandle = new FileStateHandle(metadataFilePath, size);

					return new FsCompletedCheckpointStorageLocation(
							fileSystem, exclusiveCheckpointDir, metaDataHandle,
							metaDataHandle.getFilePath().getParent().toString());
				}
				catch (Exception e) {
					try {
						fileSystem.delete(metadataFilePath, false);
					}
					catch (Exception deleteException) {
						LOG.warn("Could not delete the checkpoint stream file {}.", metadataFilePath, deleteException);
					}

					throw new IOException("Could not flush and close the file system " +
							"output stream to " + metadataFilePath + " in order to obtain the " +
							"stream state handle", e);
				}
				finally {
					closed = true;
				}
			}
			else {
				throw new IOException("Stream has already been closed and discarded.");
			}
		}
	}
}
  • FsCheckpointMetadataOutputStream繼承了CheckpointMetadataOutputStream,而CheckpointMetadataOutputStream繼承了FSDataOutputStream;這裏的closeAndFinalizeCheckpoint方法返回的是FsCompletedCheckpointStorageLocation

小結

  • FsCheckpointStorage的initializeLocationForCheckpoint方法、resolveCheckpointStorageLocation方法、createSavepointLocation方法建立的是FsCheckpointStorageLocation;而createTaskOwnedStateStream方法建立的是FsCheckpointStateOutputStream
  • FsCheckpointStorageLocation繼承了FsCheckpointStreamFactory,同時實現了CheckpointStorageLocation接口的createMetadataOutputStream、disposeOnFailure、getLocationReference方法;createMetadataOutputStream方法建立的是FsCheckpointMetadataOutputStream(FsCheckpointMetadataOutputStream繼承了CheckpointMetadataOutputStream,而CheckpointMetadataOutputStream繼承了FSDataOutputStream;這裏的closeAndFinalizeCheckpoint方法返回的是FsCompletedCheckpointStorageLocation);disposeOnFailure方法直接執行fileSystem.delete(checkpointDirectory, true)刪除文件;getLocationReference方法返回的是CheckpointStorageLocationReference
  • FsCheckpointStreamFactory實現了CheckpointStreamFactory接口,這裏createCheckpointStateOutputStream方法返回FsCheckpointStateOutputStream;FsCheckpointStateOutputStream繼承了CheckpointStreamFactory.CheckpointStateOutputStream;它的構造器要指定basePath、fs、bufferSize、localStateThreshold這幾個參數,closeAndGetHandle方法對於pos小於等於localStateThreshold的返回ByteStreamStateHandle,大於該閾值的則返回FileStateHandle

doc

相關文章
相關標籤/搜索