聊聊flink的KvStateRegistryGateway

本文主要研究一下flink的KvStateRegistryGatewayhtml

KvStateRegistryGateway

flink-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/KvStateRegistryGateway.javajava

public interface KvStateRegistryGateway {

	/**
	 * Notifies that queryable state has been registered.
	 *
	 * @param jobId	identifying the job for which to register a key value state
	 * @param jobVertexId JobVertexID the KvState instance belongs to.
	 * @param keyGroupRange Key group range the KvState instance belongs to.
	 * @param registrationName Name under which the KvState has been registered.
	 * @param kvStateId ID of the registered KvState instance.
	 * @param kvStateServerAddress Server address where to find the KvState instance.
	 * @return Future acknowledge if the key-value state has been registered
	 */
	CompletableFuture<Acknowledge> notifyKvStateRegistered(
		final JobID jobId,
		final JobVertexID jobVertexId,
		final KeyGroupRange keyGroupRange,
		final String registrationName,
		final KvStateID kvStateId,
		final InetSocketAddress kvStateServerAddress);

	/**
	 * Notifies that queryable state has been unregistered.
	 *
	 * @param jobId	identifying the job for which to unregister a key value state
	 * @param jobVertexId JobVertexID the KvState instance belongs to.
	 * @param keyGroupRange Key group index the KvState instance belongs to.
	 * @param registrationName Name under which the KvState has been registered.
	 * @return Future acknowledge if the key-value state has been unregistered
	 */
	CompletableFuture<Acknowledge> notifyKvStateUnregistered(
		final JobID jobId,
		final JobVertexID jobVertexId,
		final KeyGroupRange keyGroupRange,
		final String registrationName);
}
  • KvStateRegistryGateway接口定義了notifyKvStateRegistered、notifyKvStateUnregistered兩個方法;JobMaster實現了這兩個方法

JobMaster

flink-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.javaapache

public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMasterGateway {

	/** Default names for Flink's distributed components. */
	public static final String JOB_MANAGER_NAME = "jobmanager";
	public static final String ARCHIVE_NAME = "archive";

	// ------------------------------------------------------------------------

	private final JobMasterConfiguration jobMasterConfiguration;

	private final ResourceID resourceId;

	private final JobGraph jobGraph;

	private final Time rpcTimeout;

	private final HighAvailabilityServices highAvailabilityServices;

	private final BlobServer blobServer;

	private final JobManagerJobMetricGroupFactory jobMetricGroupFactory;

	private final HeartbeatManager<AccumulatorReport, Void> taskManagerHeartbeatManager;

	private final HeartbeatManager<Void, Void> resourceManagerHeartbeatManager;

	private final ScheduledExecutorService scheduledExecutorService;

	private final OnCompletionActions jobCompletionActions;

	private final FatalErrorHandler fatalErrorHandler;

	private final ClassLoader userCodeLoader;

	private final SlotPool slotPool;

	private final SlotPoolGateway slotPoolGateway;

	private final RestartStrategy restartStrategy;

	// --------- BackPressure --------

	private final BackPressureStatsTracker backPressureStatsTracker;

	// --------- ResourceManager --------

	private final LeaderRetrievalService resourceManagerLeaderRetriever;

	// --------- TaskManagers --------

	private final Map<ResourceID, Tuple2<TaskManagerLocation, TaskExecutorGateway>> registeredTaskManagers;

	// -------- Mutable fields ---------

	private ExecutionGraph executionGraph;

	@Nullable
	private JobManagerJobStatusListener jobStatusListener;

	@Nullable
	private JobManagerJobMetricGroup jobManagerJobMetricGroup;

	@Nullable
	private String lastInternalSavepoint;

	@Nullable
	private ResourceManagerAddress resourceManagerAddress;

	@Nullable
	private ResourceManagerConnection resourceManagerConnection;

	@Nullable
	private EstablishedResourceManagerConnection establishedResourceManagerConnection;

	//......

	@Override
	public CompletableFuture<Acknowledge> notifyKvStateRegistered(
			final JobID jobId,
			final JobVertexID jobVertexId,
			final KeyGroupRange keyGroupRange,
			final String registrationName,
			final KvStateID kvStateId,
			final InetSocketAddress kvStateServerAddress) {
		if (jobGraph.getJobID().equals(jobId)) {
			if (log.isDebugEnabled()) {
				log.debug("Key value state registered for job {} under name {}.",
					jobGraph.getJobID(), registrationName);
			}

			try {
				executionGraph.getKvStateLocationRegistry().notifyKvStateRegistered(
					jobVertexId, keyGroupRange, registrationName, kvStateId, kvStateServerAddress);

				return CompletableFuture.completedFuture(Acknowledge.get());
			} catch (Exception e) {
				log.error("Failed to notify KvStateRegistry about registration {}.", registrationName);
				return FutureUtils.completedExceptionally(e);
			}
		} else {
			if (log.isDebugEnabled()) {
				log.debug("Notification about key-value state registration for unknown job {} received.", jobId);
			}
			return FutureUtils.completedExceptionally(new FlinkJobNotFoundException(jobId));
		}
	}

	@Override
	public CompletableFuture<Acknowledge> notifyKvStateUnregistered(
			JobID jobId,
			JobVertexID jobVertexId,
			KeyGroupRange keyGroupRange,
			String registrationName) {
		if (jobGraph.getJobID().equals(jobId)) {
			if (log.isDebugEnabled()) {
				log.debug("Key value state unregistered for job {} under name {}.",
					jobGraph.getJobID(), registrationName);
			}

			try {
				executionGraph.getKvStateLocationRegistry().notifyKvStateUnregistered(
					jobVertexId, keyGroupRange, registrationName);

				return CompletableFuture.completedFuture(Acknowledge.get());
			} catch (Exception e) {
				log.error("Failed to notify KvStateRegistry about registration {}.", registrationName, e);
				return FutureUtils.completedExceptionally(e);
			}
		} else {
			if (log.isDebugEnabled()) {
				log.debug("Notification about key-value state deregistration for unknown job {} received.", jobId);
			}
			return FutureUtils.completedExceptionally(new FlinkJobNotFoundException(jobId));
		}
	}

	//......
}
  • JobMaster的notifyKvStateRegistered方法主要是觸發executionGraph.getKvStateLocationRegistry().notifyKvStateRegistered;notifyKvStateUnregistered方法主要是觸發executionGraph.getKvStateLocationRegistry().notifyKvStateUnregistered

KvStateLocationRegistry

flink-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateLocationRegistry.javaapi

public class KvStateLocationRegistry {

	/** JobID this coordinator belongs to. */
	private final JobID jobId;

	/** Job vertices for determining parallelism per key. */
	private final Map<JobVertexID, ExecutionJobVertex> jobVertices;

	/**
	 * Location info keyed by registration name. The name needs to be unique
	 * per JobID, i.e. two operators cannot register KvState with the same
	 * name.
	 */
	private final Map<String, KvStateLocation> lookupTable = new HashMap<>();

	/**
	 * Creates the registry for the job.
	 *
	 * @param jobId       JobID this coordinator belongs to.
	 * @param jobVertices Job vertices map of all vertices of this job.
	 */
	public KvStateLocationRegistry(JobID jobId, Map<JobVertexID, ExecutionJobVertex> jobVertices) {
		this.jobId = Preconditions.checkNotNull(jobId, "JobID");
		this.jobVertices = Preconditions.checkNotNull(jobVertices, "Job vertices");
	}

	/**
	 * Returns the {@link KvStateLocation} for the registered KvState instance
	 * or <code>null</code> if no location information is available.
	 *
	 * @param registrationName Name under which the KvState instance is registered.
	 * @return Location information or <code>null</code>.
	 */
	public KvStateLocation getKvStateLocation(String registrationName) {
		return lookupTable.get(registrationName);
	}

	/**
	 * Notifies the registry about a registered KvState instance.
	 *
	 * @param jobVertexId JobVertexID the KvState instance belongs to
	 * @param keyGroupRange Key group range the KvState instance belongs to
	 * @param registrationName Name under which the KvState has been registered
	 * @param kvStateId ID of the registered KvState instance
	 * @param kvStateServerAddress Server address where to find the KvState instance
	 *
	 * @throws IllegalArgumentException If JobVertexID does not belong to job
	 * @throws IllegalArgumentException If state has been registered with same
	 * name by another operator.
	 * @throws IndexOutOfBoundsException If key group index is out of bounds.
	 */
	public void notifyKvStateRegistered(
			JobVertexID jobVertexId,
			KeyGroupRange keyGroupRange,
			String registrationName,
			KvStateID kvStateId,
			InetSocketAddress kvStateServerAddress) {

		KvStateLocation location = lookupTable.get(registrationName);

		if (location == null) {
			// First registration for this operator, create the location info
			ExecutionJobVertex vertex = jobVertices.get(jobVertexId);

			if (vertex != null) {
				int parallelism = vertex.getMaxParallelism();
				location = new KvStateLocation(jobId, jobVertexId, parallelism, registrationName);
				lookupTable.put(registrationName, location);
			} else {
				throw new IllegalArgumentException("Unknown JobVertexID " + jobVertexId);
			}
		}

		// Duplicated name if vertex IDs don't match
		if (!location.getJobVertexId().equals(jobVertexId)) {
			IllegalStateException duplicate = new IllegalStateException(
					"Registration name clash. KvState with name '" + registrationName +
							"' has already been registered by another operator (" +
							location.getJobVertexId() + ").");

			ExecutionJobVertex vertex = jobVertices.get(jobVertexId);
			if (vertex != null) {
				vertex.fail(new SuppressRestartsException(duplicate));
			}

			throw duplicate;
		}
		location.registerKvState(keyGroupRange, kvStateId, kvStateServerAddress);
	}

	/**
	 * Notifies the registry about an unregistered KvState instance.
	 *
	 * @param jobVertexId JobVertexID the KvState instance belongs to
	 * @param keyGroupRange Key group index the KvState instance belongs to
	 * @param registrationName Name under which the KvState has been registered
	 * @throws IllegalArgumentException If another operator registered the state instance
	 * @throws IllegalArgumentException If the registration name is not known
	 */
	public void notifyKvStateUnregistered(
			JobVertexID jobVertexId,
			KeyGroupRange keyGroupRange,
			String registrationName) {

		KvStateLocation location = lookupTable.get(registrationName);

		if (location != null) {
			// Duplicate name if vertex IDs don't match
			if (!location.getJobVertexId().equals(jobVertexId)) {
				throw new IllegalArgumentException("Another operator (" +
						location.getJobVertexId() + ") registered the KvState " +
						"under '" + registrationName + "'.");
			}

			location.unregisterKvState(keyGroupRange);

			if (location.getNumRegisteredKeyGroups() == 0) {
				lookupTable.remove(registrationName);
			}
		} else {
			throw new IllegalArgumentException("Unknown registration name '" +
					registrationName + "'. " + "Probably registration/unregistration race.");
		}
	}

}
  • KvStateLocationRegistry的構造器要求傳入jobId及jobVertices;它有一個屬性爲lookupTable,存儲了registrationName與KvStateLocation的映射關係
  • notifyKvStateRegistered方法在lookupTable查找不到對應的KvStateLocation的時候會建立一個KvStateLocation並存放入lookupTable,最後調用location.registerKvState方法
  • notifyKvStateUnregistere方法在lookupTable查找對應KvStateLocation的時候會觸發location.unregisterKvState,而後將該KvStateLocation從lookupTable中移除

小結

  • KvStateRegistryGateway接口定義了notifyKvStateRegistered、notifyKvStateUnregistered兩個方法;JobMaster實現了這兩個方法
  • JobMaster的notifyKvStateRegistered方法主要是觸發executionGraph.getKvStateLocationRegistry().notifyKvStateRegistered;notifyKvStateUnregistered方法主要是觸發executionGraph.getKvStateLocationRegistry().notifyKvStateUnregistered
  • KvStateLocationRegistry的構造器要求傳入jobId及jobVertices;它有一個屬性爲lookupTable,存儲了registrationName與KvStateLocation的映射關係;notifyKvStateRegistered方法在lookupTable查找不到對應的KvStateLocation的時候會建立一個KvStateLocation並存放入lookupTable,最後調用location.registerKvState方法;notifyKvStateUnregistere方法在lookupTable查找對應KvStateLocation的時候會觸發location.unregisterKvState,而後將該KvStateLocation從lookupTable中移除

doc

相關文章
相關標籤/搜索