聊聊flink的MemCheckpointStreamFactory

本文主要研究一下flink的MemCheckpointStreamFactoryhtml

CheckpointStreamFactory

flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/CheckpointStreamFactory.javajava

/**
 * A factory for checkpoint output streams, which are used to persist data for checkpoints.
 *
 * <p>Stream factories can be created from the {@link CheckpointStorage} through
 * {@link CheckpointStorage#resolveCheckpointStorageLocation(long, CheckpointStorageLocationReference)}.
 */
public interface CheckpointStreamFactory {

	CheckpointStateOutputStream createCheckpointStateOutputStream(CheckpointedStateScope scope) throws IOException;

	abstract class CheckpointStateOutputStream extends FSDataOutputStream {

		@Nullable
		public abstract StreamStateHandle closeAndGetHandle() throws IOException;

		@Override
		public abstract void close() throws IOException;
	}
}
  • CheckpointStreamFactory爲checkpoint output streams(用於持久化checkpoint的數據)的工廠,它定義了createCheckpointStateOutputStream方法,這裏返回的是CheckpointStateOutputStream;CheckpointStateOutputStream繼承了FSDataOutputStream,它定義了closeAndGetHandle及close兩個抽象方法
  • CheckpointStreamFactory有兩個以factory命名的實現類,分別是MemCheckpointStreamFactory(它有兩個子類分別爲NonPersistentMetadataCheckpointStorageLocation、PersistentMetadataCheckpointStorageLocation)、FsCheckpointStreamFactory(它有一個子類爲FsCheckpointStorageLocation)
  • CheckpointStorageLocation接口繼承了CheckpointStreamFactory接口,它有三個實現類,分別是NonPersistentMetadataCheckpointStorageLocation、PersistentMetadataCheckpointStorageLocation、FsCheckpointStorageLocation

FSDataOutputStream

flink-core-1.7.0-sources.jar!/org/apache/flink/core/fs/FSDataOutputStream.javaapache

@Public
public abstract class FSDataOutputStream extends OutputStream {

	public abstract long getPos() throws IOException;

	public abstract void flush() throws IOException;

	public abstract void sync() throws IOException;

	public abstract void close() throws IOException;
}
  • FSDataOutputStream繼承了java的OutputStream,它多定義了getPos、flush、sync、close幾個抽象方法

CheckpointStorageLocation

flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/CheckpointStorageLocation.javadom

/**
 * A storage location for one particular checkpoint, offering data persistent, metadata persistence,
 * and lifecycle/cleanup methods.
 *
 * <p>CheckpointStorageLocations are typically created and initialized via
 * {@link CheckpointStorage#initializeLocationForCheckpoint(long)} or
 * {@link CheckpointStorage#initializeLocationForSavepoint(long, String)}.
 */
public interface CheckpointStorageLocation extends CheckpointStreamFactory {

	CheckpointMetadataOutputStream createMetadataOutputStream() throws IOException;

	void disposeOnFailure() throws IOException;

	CheckpointStorageLocationReference getLocationReference();
}
  • CheckpointStorageLocation繼承了CheckpointStreamFactory接口,它一般是由CheckpointStorage來建立及初始化,提供數據持久化、metadata存儲及lifecycle/cleanup相關方法;這裏定義了createMetadataOutputStream方法用來建立CheckpointMetadataOutputStream;disposeOnFailure方法用於在checkpoint失敗的時候dispose checkpoint location;getLocationReference用於返回CheckpointStorageLocationReference

MemCheckpointStreamFactory

flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/memory/MemCheckpointStreamFactory.javaide

/**
 * {@link CheckpointStreamFactory} that produces streams that write to in-memory byte arrays.
 */
public class MemCheckpointStreamFactory implements CheckpointStreamFactory {

	/** The maximal size that the snapshotted memory state may have */
	private final int maxStateSize;

	/**
	 * Creates a new in-memory stream factory that accepts states whose serialized forms are
	 * up to the given number of bytes.
	 *
	 * @param maxStateSize The maximal size of the serialized state
	 */
	public MemCheckpointStreamFactory(int maxStateSize) {
		this.maxStateSize = maxStateSize;
	}

	@Override
	public CheckpointStateOutputStream createCheckpointStateOutputStream(
			CheckpointedStateScope scope) throws IOException
	{
		return new MemoryCheckpointOutputStream(maxStateSize);
	}

	@Override
	public String toString() {
		return "In-Memory Stream Factory";
	}

	static void checkSize(int size, int maxSize) throws IOException {
		if (size > maxSize) {
			throw new IOException(
					"Size of the state is larger than the maximum permitted memory-backed state. Size="
							+ size + " , maxSize=" + maxSize
							+ " . Consider using a different state backend, like the File System State backend.");
		}
	}



	/**
	 * A {@code CheckpointStateOutputStream} that writes into a byte array.
	 */
	public static class MemoryCheckpointOutputStream extends CheckpointStateOutputStream {

		private final ByteArrayOutputStreamWithPos os = new ByteArrayOutputStreamWithPos();

		private final int maxSize;

		private AtomicBoolean closed;

		boolean isEmpty = true;

		public MemoryCheckpointOutputStream(int maxSize) {
			this.maxSize = maxSize;
			this.closed = new AtomicBoolean(false);
		}

		@Override
		public void write(int b) throws IOException {
			os.write(b);
			isEmpty = false;
		}

		@Override
		public void write(byte[] b, int off, int len) throws IOException {
			os.write(b, off, len);
			isEmpty = false;
		}

		@Override
		public void flush() throws IOException {
			os.flush();
		}

		@Override
		public void sync() throws IOException { }

		// --------------------------------------------------------------------

		@Override
		public void close() {
			if (closed.compareAndSet(false, true)) {
				closeInternal();
			}
		}

		@Nullable
		@Override
		public StreamStateHandle closeAndGetHandle() throws IOException {
			if (isEmpty) {
				return null;
			}
			return new ByteStreamStateHandle(String.valueOf(UUID.randomUUID()), closeAndGetBytes());
		}

		@Override
		public long getPos() throws IOException {
			return os.getPosition();
		}

		public boolean isClosed() {
			return closed.get();
		}

		/**
		 * Closes the stream and returns the byte array containing the stream's data.
		 * @return The byte array containing the stream's data.
		 * @throws IOException Thrown if the size of the data exceeds the maximal
		 */
		public byte[] closeAndGetBytes() throws IOException {
			if (closed.compareAndSet(false, true)) {
				checkSize(os.size(), maxSize);
				byte[] bytes = os.toByteArray();
				closeInternal();
				return bytes;
			} else {
				throw new IOException("stream has already been closed");
			}
		}

		private void closeInternal() {
			os.reset();
		}
	}
}
  • MemCheckpointStreamFactory實現了CheckpointStreamFactory接口,這裏createCheckpointStateOutputStream方法返回MemoryCheckpointOutputStream
  • MemoryCheckpointOutputStream繼承了CheckpointStateOutputStream,裏頭使用了ByteArrayOutputStreamWithPos,它在closeAndGetHandle的時候會校驗大小是否超過maxSize的限制,超出則拋出IOException異常
  • MemCheckpointStreamFactory有兩個子類分別爲NonPersistentMetadataCheckpointStorageLocation、PersistentMetadataCheckpointStorageLocation,它們都實現了CheckpointStorageLocation接口

NonPersistentMetadataCheckpointStorageLocation

flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/memory/NonPersistentMetadataCheckpointStorageLocation.javathis

/**
 * A checkpoint storage location for the {@link MemoryStateBackend} in case no durable persistence
 * for metadata has been configured.
 */
public class NonPersistentMetadataCheckpointStorageLocation
		extends MemCheckpointStreamFactory
		implements CheckpointStorageLocation {

	/** The external pointer returned for checkpoints that are not externally addressable. */
	public static final String EXTERNAL_POINTER = "<checkpoint-not-externally-addressable>";

	public NonPersistentMetadataCheckpointStorageLocation(int maxStateSize) {
		super(maxStateSize);
	}

	@Override
	public CheckpointMetadataOutputStream createMetadataOutputStream() throws IOException {
		return new MetadataOutputStream();
	}

	@Override
	public void disposeOnFailure() {}

	@Override
	public CheckpointStorageLocationReference getLocationReference() {
		return CheckpointStorageLocationReference.getDefault();
	}

	// ------------------------------------------------------------------------
	//  CompletedCheckpointStorageLocation
	// ------------------------------------------------------------------------

	/**
	 * A {@link CompletedCheckpointStorageLocation} that is not persistent and only holds the
	 * metadata in an internal byte array.
	 */
	private static class NonPersistentCompletedCheckpointStorageLocation implements CompletedCheckpointStorageLocation {

		private static final long serialVersionUID = 1L;

		private final ByteStreamStateHandle metaDataHandle;

		NonPersistentCompletedCheckpointStorageLocation(ByteStreamStateHandle metaDataHandle) {
			this.metaDataHandle = metaDataHandle;
		}

		@Override
		public String getExternalPointer() {
			return EXTERNAL_POINTER;
		}

		@Override
		public StreamStateHandle getMetadataHandle() {
			return metaDataHandle;
		}

		@Override
		public void disposeStorageLocation() {}
	}

	// ------------------------------------------------------------------------
	//  CheckpointMetadataOutputStream
	// ------------------------------------------------------------------------

	private static class MetadataOutputStream extends CheckpointMetadataOutputStream {

		private final ByteArrayOutputStreamWithPos os = new ByteArrayOutputStreamWithPos();

		private boolean closed;

		@Override
		public void write(int b) throws IOException {
			os.write(b);
		}

		@Override
		public void write(byte[] b, int off, int len) throws IOException {
			os.write(b, off, len);
		}

		@Override
		public void flush() throws IOException {
			os.flush();
		}

		@Override
		public long getPos() throws IOException {
			return os.getPosition();
		}

		@Override
		public void sync() throws IOException { }

		@Override
		public CompletedCheckpointStorageLocation closeAndFinalizeCheckpoint() throws IOException {
			synchronized (this) {
				if (!closed) {
					closed = true;

					byte[] bytes = os.toByteArray();
					ByteStreamStateHandle handle = new ByteStreamStateHandle(UUID.randomUUID().toString(), bytes);
					return new NonPersistentCompletedCheckpointStorageLocation(handle);
				} else {
					throw new IOException("Already closed");
				}
			}
		}

		@Override
		public void close() {
			if (!closed) {
				closed = true;
				os.reset();
			}
		}
	}
}
  • MemoryBackendCheckpointStorage在沒有配置checkpointsDirectory的時候建立的是NonPersistentMetadataCheckpointStorageLocation;其createMetadataOutputStream方法建立的是MetadataOutputStream
  • MetadataOutputStream繼承了CheckpointMetadataOutputStream,裏頭使用的是ByteArrayOutputStreamWithPos,而closeAndFinalizeCheckpoint返回的是NonPersistentCompletedCheckpointStorageLocation
  • NonPersistentCompletedCheckpointStorageLocation實現了CompletedCheckpointStorageLocation接口,其getMetadataHandle方法返回的是ByteStreamStateHandle

PersistentMetadataCheckpointStorageLocation

flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/memory/PersistentMetadataCheckpointStorageLocation.javacode

/**
 * A checkpoint storage location for the {@link MemoryStateBackend} when it durably
 * persists the metadata in a file system.
 */
public class PersistentMetadataCheckpointStorageLocation
		extends MemCheckpointStreamFactory
		implements CheckpointStorageLocation {

	private final FileSystem fileSystem;

	private final Path checkpointDirectory;

	private final Path metadataFilePath;

	/**
	 * Creates a checkpoint storage persists metadata to a file system and stores state
	 * in line in state handles with the metadata.
	 *
	 * @param fileSystem The file system to which the metadata will be written.
	 * @param checkpointDir The directory where the checkpoint metadata will be written.
	 */
	public PersistentMetadataCheckpointStorageLocation(
			FileSystem fileSystem,
			Path checkpointDir,
			int maxStateSize) {

		super(maxStateSize);

		this.fileSystem = checkNotNull(fileSystem);
		this.checkpointDirectory = checkNotNull(checkpointDir);
		this.metadataFilePath = new Path(checkpointDir, AbstractFsCheckpointStorage.METADATA_FILE_NAME);
	}

	// ------------------------------------------------------------------------

	@Override
	public CheckpointMetadataOutputStream createMetadataOutputStream() throws IOException {
		return new FsCheckpointMetadataOutputStream(fileSystem, metadataFilePath, checkpointDirectory);
	}

	@Override
	public void disposeOnFailure() throws IOException {
		// on a failure, no chunk in the checkpoint directory needs to be saved, so
		// we can drop it as a whole
		fileSystem.delete(checkpointDirectory, true);
	}

	@Override
	public CheckpointStorageLocationReference getLocationReference() {
		return CheckpointStorageLocationReference.getDefault();
	}
}
  • MemoryBackendCheckpointStorage在配置了checkpointsDirectory的時候建立的是PersistentMetadataCheckpointStorageLocation;其createMetadataOutputStream方法建立的是FsCheckpointMetadataOutputStream;FsCheckpointMetadataOutputStream的構造器接收三個參數,分別是fileSystem、metadataFilePath、exclusiveCheckpointDir;其中fileSystem用於根據metadataFilePath來建立FSDataOutputStream,而exclusiveCheckpointDir則在返回FsCompletedCheckpointStorageLocation的時候用到

小結

  • MemoryBackendCheckpointStorage在沒有配置checkpointsDirectory的時候建立的是NonPersistentMetadataCheckpointStorageLocation;在配置了checkpointsDirectory的時候建立的是PersistentMetadataCheckpointStorageLocation
  • NonPersistentMetadataCheckpointStorageLocation及PersistentMetadataCheckpointStorageLocation都繼承了MemCheckpointStreamFactory類,同時實現了CheckpointStorageLocation接口(其createMetadataOutputStream方法返回的CheckpointMetadataOutputStream類型分別爲MetadataOutputStream、FsCheckpointMetadataOutputStream)
  • MemCheckpointStreamFactory實現了CheckpointStreamFactory接口,它的createCheckpointStateOutputStream方法返回MemoryCheckpointOutputStream;CheckpointStorageLocation繼承了CheckpointStreamFactory接口,它一般是由CheckpointStorage來建立及初始化,提供數據持久化、metadata存儲及lifecycle/cleanup相關方法

doc

相關文章
相關標籤/搜索