聊聊flink的FsStateBackend

本文主要研究一下flink的FsStateBackendhtml

StateBackend

flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/StateBackend.javajava

@PublicEvolving
public interface StateBackend extends java.io.Serializable {

	// ------------------------------------------------------------------------
	//  Checkpoint storage - the durable persistence of checkpoint data
	// ------------------------------------------------------------------------

	CompletedCheckpointStorageLocation resolveCheckpoint(String externalPointer) throws IOException;

	CheckpointStorage createCheckpointStorage(JobID jobId) throws IOException;

	// ------------------------------------------------------------------------
	//  Structure Backends 
	// ------------------------------------------------------------------------

	default <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
			Environment env,
			JobID jobID,
			String operatorIdentifier,
			TypeSerializer<K> keySerializer,
			int numberOfKeyGroups,
			KeyGroupRange keyGroupRange,
			TaskKvStateRegistry kvStateRegistry) throws Exception {
		return createKeyedStateBackend(
			env,
			jobID,
			operatorIdentifier,
			keySerializer,
			numberOfKeyGroups,
			keyGroupRange,
			kvStateRegistry,
			TtlTimeProvider.DEFAULT
		);
	}

	default <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
		Environment env,
		JobID jobID,
		String operatorIdentifier,
		TypeSerializer<K> keySerializer,
		int numberOfKeyGroups,
		KeyGroupRange keyGroupRange,
		TaskKvStateRegistry kvStateRegistry,
		TtlTimeProvider ttlTimeProvider
	) throws Exception {
		return createKeyedStateBackend(
			env,
			jobID,
			operatorIdentifier,
			keySerializer,
			numberOfKeyGroups,
			keyGroupRange,
			kvStateRegistry,
			ttlTimeProvider,
			new UnregisteredMetricsGroup());
	}

	<K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
		Environment env,
		JobID jobID,
		String operatorIdentifier,
		TypeSerializer<K> keySerializer,
		int numberOfKeyGroups,
		KeyGroupRange keyGroupRange,
		TaskKvStateRegistry kvStateRegistry,
		TtlTimeProvider ttlTimeProvider,
		MetricGroup metricGroup) throws Exception;
	
	OperatorStateBackend createOperatorStateBackend(Environment env, String operatorIdentifier) throws Exception;
}
  • StateBackend接口定義了有狀態的streaming應用的state是如何stored以及checkpointed
  • StateBackend接口定義了createCheckpointStorage、createKeyedStateBackend、createOperatorStateBackend方法;同時繼承了Serializable接口;StateBackend接口的實現要求是線程安全的
  • StateBackend有個直接實現的抽象類AbstractStateBackend,而AbstractFileStateBackend及RocksDBStateBackend繼承了AbstractStateBackend,以後MemoryStateBackend、FsStateBackend都繼承了AbstractFileStateBackend

AbstractStateBackend

flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/AbstractStateBackend.javaapache

/**
 * An abstract base implementation of the {@link StateBackend} interface.
 *
 * <p>This class has currently no contents and only kept to not break the prior class hierarchy for users.
 */
@PublicEvolving
public abstract class AbstractStateBackend implements StateBackend, java.io.Serializable {

	private static final long serialVersionUID = 4620415814639230247L;

	// ------------------------------------------------------------------------
	//  State Backend - State-Holding Backends
	// ------------------------------------------------------------------------

	@Override
	public abstract <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
		Environment env,
		JobID jobID,
		String operatorIdentifier,
		TypeSerializer<K> keySerializer,
		int numberOfKeyGroups,
		KeyGroupRange keyGroupRange,
		TaskKvStateRegistry kvStateRegistry,
		TtlTimeProvider ttlTimeProvider,
		MetricGroup metricGroup) throws IOException;

	@Override
	public abstract OperatorStateBackend createOperatorStateBackend(
			Environment env,
			String operatorIdentifier) throws Exception;
}
  • AbstractStateBackend聲明實現StateBackend及Serializable接口,它將createKeyedStateBackend方法及createOperatorStateBackend方法從新定義爲抽象方法

AbstractFileStateBackend

flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/filesystem/AbstractFileStateBackend.javasegmentfault

@PublicEvolving
public abstract class AbstractFileStateBackend extends AbstractStateBackend {

	private static final long serialVersionUID = 1L;

	// ------------------------------------------------------------------------
	//  State Backend Properties
	// ------------------------------------------------------------------------

	/** The path where checkpoints will be stored, or null, if none has been configured. */
	@Nullable
	private final Path baseCheckpointPath;

	/** The path where savepoints will be stored, or null, if none has been configured. */
	@Nullable
	private final Path baseSavepointPath;

	/**
	 * Creates a backend with the given optional checkpoint- and savepoint base directories.
	 *
	 * @param baseCheckpointPath The base directory for checkpoints, or null, if none is configured.
	 * @param baseSavepointPath The default directory for savepoints, or null, if none is set.
	 */
	protected AbstractFileStateBackend(
			@Nullable URI baseCheckpointPath,
			@Nullable URI baseSavepointPath) {

		this(baseCheckpointPath == null ? null : new Path(baseCheckpointPath),
				baseSavepointPath == null ? null : new Path(baseSavepointPath));
	}

	/**
	 * Creates a backend with the given optional checkpoint- and savepoint base directories.
	 *
	 * @param baseCheckpointPath The base directory for checkpoints, or null, if none is configured.
	 * @param baseSavepointPath The default directory for savepoints, or null, if none is set.
	 */
	protected AbstractFileStateBackend(
			@Nullable Path baseCheckpointPath,
			@Nullable Path baseSavepointPath) {

		this.baseCheckpointPath = baseCheckpointPath == null ? null : validatePath(baseCheckpointPath);
		this.baseSavepointPath = baseSavepointPath == null ? null : validatePath(baseSavepointPath);
	}

	/**
	 * Creates a new backend using the given checkpoint-/savepoint directories, or the values defined in
	 * the given configuration. If a checkpoint-/savepoint parameter is not null, that value takes precedence
	 * over the value in the configuration. If the configuration does not specify a value, it is possible
	 * that the checkpoint-/savepoint directories in the backend will be null.
	 *
	 * <p>This constructor can be used to create a backend that is based partially on a given backend
	 * and partially on a configuration.
	 *
	 * @param baseCheckpointPath The checkpoint base directory to use (or null).
	 * @param baseSavepointPath The default savepoint directory to use (or null).
	 * @param configuration The configuration to read values from.
	 */
	protected AbstractFileStateBackend(
			@Nullable Path baseCheckpointPath,
			@Nullable Path baseSavepointPath,
			Configuration configuration) {

		this(parameterOrConfigured(baseCheckpointPath, configuration, CheckpointingOptions.CHECKPOINTS_DIRECTORY),
				parameterOrConfigured(baseSavepointPath, configuration, CheckpointingOptions.SAVEPOINT_DIRECTORY));
	}

	// ------------------------------------------------------------------------

	/**
	 * Gets the checkpoint base directory. Jobs will create job-specific subdirectories
	 * for checkpoints within this directory. May be null, if not configured.
	 *
	 * @return The checkpoint base directory
	 */
	@Nullable
	public Path getCheckpointPath() {
		return baseCheckpointPath;
	}

	/**
	 * Gets the directory where savepoints are stored by default (when no custom path is given
	 * to the savepoint trigger command).
	 *
	 * @return The default directory for savepoints, or null, if no default directory has been configured.
	 */
	@Nullable
	public Path getSavepointPath() {
		return baseSavepointPath;
	}

	// ------------------------------------------------------------------------
	//  Initialization and metadata storage
	// ------------------------------------------------------------------------

	@Override
	public CompletedCheckpointStorageLocation resolveCheckpoint(String pointer) throws IOException {
		return AbstractFsCheckpointStorage.resolveCheckpointPointer(pointer);
	}

	// ------------------------------------------------------------------------
	//  Utilities
	// ------------------------------------------------------------------------

	/**
	 * Checks the validity of the path's scheme and path.
	 *
	 * @param path The path to check.
	 * @return The URI as a Path.
	 *
	 * @throws IllegalArgumentException Thrown, if the URI misses scheme or path.
	 */
	private static Path validatePath(Path path) {
		final URI uri = path.toUri();
		final String scheme = uri.getScheme();
		final String pathPart = uri.getPath();

		// some validity checks
		if (scheme == null) {
			throw new IllegalArgumentException("The scheme (hdfs://, file://, etc) is null. " +
					"Please specify the file system scheme explicitly in the URI.");
		}
		if (pathPart == null) {
			throw new IllegalArgumentException("The path to store the checkpoint data in is null. " +
					"Please specify a directory path for the checkpoint data.");
		}
		if (pathPart.length() == 0 || pathPart.equals("/")) {
			throw new IllegalArgumentException("Cannot use the root directory for checkpoints.");
		}

		return path;
	}

	@Nullable
	private static Path parameterOrConfigured(@Nullable Path path, Configuration config, ConfigOption<String> option) {
		if (path != null) {
			return path;
		}
		else {
			String configValue = config.getString(option);
			try {
				return configValue == null ? null : new Path(configValue);
			}
			catch (IllegalArgumentException e) {
				throw new IllegalConfigurationException("Cannot parse value for " + option.key() +
						" : " + configValue + " . Not a valid path.");
			}
		}
	}
}

FsStateBackend

flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/filesystem/FsStateBackend.java安全

@PublicEvolving
public class FsStateBackend extends AbstractFileStateBackend implements ConfigurableStateBackend {

	private static final long serialVersionUID = -8191916350224044011L;

	/** Maximum size of state that is stored with the metadata, rather than in files (1 MiByte). */
	private static final int MAX_FILE_STATE_THRESHOLD = 1024 * 1024;

	// ------------------------------------------------------------------------

	/** State below this size will be stored as part of the metadata, rather than in files.
	 * A value of '-1' means not yet configured, in which case the default will be used. */
	private final int fileStateThreshold;

	/** Switch to chose between synchronous and asynchronous snapshots.
	 * A value of 'undefined' means not yet configured, in which case the default will be used. */
	private final TernaryBoolean asynchronousSnapshots;

	//......

	public FsStateBackend(
			URI checkpointDirectory,
			@Nullable URI defaultSavepointDirectory,
			int fileStateSizeThreshold,
			TernaryBoolean asynchronousSnapshots) {

		super(checkNotNull(checkpointDirectory, "checkpoint directory is null"), defaultSavepointDirectory);

		checkNotNull(asynchronousSnapshots, "asynchronousSnapshots");
		checkArgument(fileStateSizeThreshold >= -1 && fileStateSizeThreshold <= MAX_FILE_STATE_THRESHOLD,
				"The threshold for file state size must be in [-1, %s], where '-1' means to use " +
						"the value from the deployment's configuration.", MAX_FILE_STATE_THRESHOLD);

		this.fileStateThreshold = fileStateSizeThreshold;
		this.asynchronousSnapshots = asynchronousSnapshots;
	}

	/**
	 * Private constructor that creates a re-configured copy of the state backend.
	 *
	 * @param original The state backend to re-configure
	 * @param configuration The configuration
	 */
	private FsStateBackend(FsStateBackend original, Configuration configuration) {
		super(original.getCheckpointPath(), original.getSavepointPath(), configuration);

		// if asynchronous snapshots were configured, use that setting,
		// else check the configuration
		this.asynchronousSnapshots = original.asynchronousSnapshots.resolveUndefined(
				configuration.getBoolean(CheckpointingOptions.ASYNC_SNAPSHOTS));

		final int sizeThreshold = original.fileStateThreshold >= 0 ?
				original.fileStateThreshold :
				configuration.getInteger(CheckpointingOptions.FS_SMALL_FILE_THRESHOLD);

		if (sizeThreshold >= 0 && sizeThreshold <= MAX_FILE_STATE_THRESHOLD) {
			this.fileStateThreshold = sizeThreshold;
		}
		else {
			this.fileStateThreshold = CheckpointingOptions.FS_SMALL_FILE_THRESHOLD.defaultValue();

			// because this is the only place we (unlikely) ever log, we lazily
			// create the logger here
			LoggerFactory.getLogger(AbstractFileStateBackend.class).warn(
					"Ignoring invalid file size threshold value ({}): {} - using default value {} instead.",
					CheckpointingOptions.FS_SMALL_FILE_THRESHOLD.key(), sizeThreshold,
					CheckpointingOptions.FS_SMALL_FILE_THRESHOLD.defaultValue());
		}
	}

	/**
	 * Gets the base directory where all the checkpoints are stored.
	 * The job-specific checkpoint directory is created inside this directory.
	 *
	 * @return The base directory for checkpoints.
	 *
	 * @deprecated Deprecated in favor of {@link #getCheckpointPath()}.
	 */
	@Deprecated
	public Path getBasePath() {
		return getCheckpointPath();
	}

	/**
	 * Gets the base directory where all the checkpoints are stored.
	 * The job-specific checkpoint directory is created inside this directory.
	 *
	 * @return The base directory for checkpoints.
	 */
	@Nonnull
	@Override
	public Path getCheckpointPath() {
		// we know that this can never be null by the way of constructor checks
		//noinspection ConstantConditions
		return super.getCheckpointPath();
	}

	/**
	 * Gets the threshold below which state is stored as part of the metadata, rather than in files.
	 * This threshold ensures that the backend does not create a large amount of very small files,
	 * where potentially the file pointers are larger than the state itself.
	 *
	 * <p>If not explicitly configured, this is the default value of
	 * {@link CheckpointingOptions#FS_SMALL_FILE_THRESHOLD}.
	 *
	 * @return The file size threshold, in bytes.
	 */
	public int getMinFileSizeThreshold() {
		return fileStateThreshold >= 0 ?
				fileStateThreshold :
				CheckpointingOptions.FS_SMALL_FILE_THRESHOLD.defaultValue();
	}

	/**
	 * Gets whether the key/value data structures are asynchronously snapshotted.
	 *
	 * <p>If not explicitly configured, this is the default value of
	 * {@link CheckpointingOptions#ASYNC_SNAPSHOTS}.
	 */
	public boolean isUsingAsynchronousSnapshots() {
		return asynchronousSnapshots.getOrDefault(CheckpointingOptions.ASYNC_SNAPSHOTS.defaultValue());
	}

	// ------------------------------------------------------------------------
	//  Reconfiguration
	// ------------------------------------------------------------------------

	/**
	 * Creates a copy of this state backend that uses the values defined in the configuration
	 * for fields where that were not specified in this state backend.
	 *
	 * @param config the configuration
	 * @return The re-configured variant of the state backend
	 */
	@Override
	public FsStateBackend configure(Configuration config) {
		return new FsStateBackend(this, config);
	}

	// ------------------------------------------------------------------------
	//  initialization and cleanup
	// ------------------------------------------------------------------------

	@Override
	public CheckpointStorage createCheckpointStorage(JobID jobId) throws IOException {
		checkNotNull(jobId, "jobId");
		return new FsCheckpointStorage(getCheckpointPath(), getSavepointPath(), jobId, getMinFileSizeThreshold());
	}

	// ------------------------------------------------------------------------
	//  state holding structures
	// ------------------------------------------------------------------------

	@Override
	public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
		Environment env,
		JobID jobID,
		String operatorIdentifier,
		TypeSerializer<K> keySerializer,
		int numberOfKeyGroups,
		KeyGroupRange keyGroupRange,
		TaskKvStateRegistry kvStateRegistry,
		TtlTimeProvider ttlTimeProvider,
		MetricGroup metricGroup) {

		TaskStateManager taskStateManager = env.getTaskStateManager();
		LocalRecoveryConfig localRecoveryConfig = taskStateManager.createLocalRecoveryConfig();
		HeapPriorityQueueSetFactory priorityQueueSetFactory =
			new HeapPriorityQueueSetFactory(keyGroupRange, numberOfKeyGroups, 128);

		return new HeapKeyedStateBackend<>(
				kvStateRegistry,
				keySerializer,
				env.getUserClassLoader(),
				numberOfKeyGroups,
				keyGroupRange,
				isUsingAsynchronousSnapshots(),
				env.getExecutionConfig(),
				localRecoveryConfig,
				priorityQueueSetFactory,
				ttlTimeProvider);
	}

	@Override
	public OperatorStateBackend createOperatorStateBackend(
		Environment env,
		String operatorIdentifier) {

		return new DefaultOperatorStateBackend(
			env.getUserClassLoader(),
			env.getExecutionConfig(),
				isUsingAsynchronousSnapshots());
	}

	// ------------------------------------------------------------------------
	//  utilities
	// ------------------------------------------------------------------------

	@Override
	public String toString() {
		return "File State Backend (" +
				"checkpoints: '" + getCheckpointPath() +
				"', savepoints: '" + getSavepointPath() +
				"', asynchronous: " + asynchronousSnapshots +
				", fileStateThreshold: " + fileStateThreshold + ")";
	}
}
  • FsStateBackend繼承了AbstractFileStateBackend,同時實現了ConfigurableStateBackend接口;它的public構造器支持checkpointDirectory、defaultSavepointDirectory、fileStateSizeThreshold及asynchronousSnapshots這幾個參數,它要求asynchronousSnapshots不能爲null,fileStateSizeThreshold必須大於等於-1,小於等於MAX_FILE_STATE_THRESHOLD
  • configure方法則調用的是private的構造器,它會根據Configuration對當前實例進行從新配置,好比從新設置asynchronousSnapshots,對於fileStateThreshold小於0的,則先取CheckpointingOptions.FS_SMALL_FILE_THRESHOLD的值,以後再對該值進行校訂(若是該值大於等於0,小於等於MAX_FILE_STATE_THRESHOLD則取該值,不然取CheckpointingOptions.FS_SMALL_FILE_THRESHOLD.defaultValue())
  • createCheckpointStorage方法建立的是FsCheckpointStorage,createKeyedStateBackend方法建立的是HeapKeyedStateBackend,createOperatorStateBackend方法建立的是DefaultOperatorStateBackend

小結

  • FsStateBackend繼承了AbstractFileStateBackend,同時實現了ConfigurableStateBackend接口的configure方法,裏頭要求fileStateThreshold大於等於-1,小於等於MAX_FILE_STATE_THRESHOLD
  • FsStateBackend對於TaskManager的數據先是存在內存,在checkpoint的時候寫入到指定的文件系統,而對於JobManager的metadata則存在內存;它默認採用的是async snapshots來避免阻塞線程;爲了不寫太多的小文件,它有一個fileStateThreshold閾值,小於該值時state存儲到metadata中而不是文件中
  • createCheckpointStorage方法建立的是FsCheckpointStorage,createKeyedStateBackend方法建立的是HeapKeyedStateBackend,createOperatorStateBackend方法建立的是DefaultOperatorStateBackend

doc

相關文章
相關標籤/搜索