本文主要研究一下flink的RichParallelSourceFunctionhtml
/** * Base class for implementing a parallel data source. Upon execution, the runtime will * execute as many parallel instances of this function function as configured parallelism * of the source. * * <p>The data source has access to context information (such as the number of parallel * instances of the source, and which parallel instance the current instance is) * via {@link #getRuntimeContext()}. It also provides additional life-cycle methods * ({@link #open(org.apache.flink.configuration.Configuration)} and {@link #close()}.</p> * * @param <OUT> The type of the records produced by this source. */ @Public public abstract class RichParallelSourceFunction<OUT> extends AbstractRichFunction implements ParallelSourceFunction<OUT> { private static final long serialVersionUID = 1L; }
flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/api/functions/source/ParallelSourceFunction.javajava
/** * A stream data source that is executed in parallel. Upon execution, the runtime will * execute as many parallel instances of this function function as configured parallelism * of the source. * * <p>This interface acts only as a marker to tell the system that this source may * be executed in parallel. When different parallel instances are required to perform * different tasks, use the {@link RichParallelSourceFunction} to get access to the runtime * context, which reveals information like the number of parallel tasks, and which parallel * task the current instance is. * * @param <OUT> The type of the records produced by this source. */ @Public public interface ParallelSourceFunction<OUT> extends SourceFunction<OUT> { }
flink-core-1.6.2-sources.jar!/org/apache/flink/api/common/functions/AbstractRichFunction.javaapache
/** * An abstract stub implementation for rich user-defined functions. * Rich functions have additional methods for initialization ({@link #open(Configuration)}) and * teardown ({@link #close()}), as well as access to their runtime execution context via * {@link #getRuntimeContext()}. */ @Public public abstract class AbstractRichFunction implements RichFunction, Serializable { private static final long serialVersionUID = 1L; // -------------------------------------------------------------------------------------------- // Runtime context access // -------------------------------------------------------------------------------------------- private transient RuntimeContext runtimeContext; @Override public void setRuntimeContext(RuntimeContext t) { this.runtimeContext = t; } @Override public RuntimeContext getRuntimeContext() { if (this.runtimeContext != null) { return this.runtimeContext; } else { throw new IllegalStateException("The runtime context has not been initialized."); } } @Override public IterationRuntimeContext getIterationRuntimeContext() { if (this.runtimeContext == null) { throw new IllegalStateException("The runtime context has not been initialized."); } else if (this.runtimeContext instanceof IterationRuntimeContext) { return (IterationRuntimeContext) this.runtimeContext; } else { throw new IllegalStateException("This stub is not part of an iteration step function."); } } // -------------------------------------------------------------------------------------------- // Default life cycle methods // -------------------------------------------------------------------------------------------- @Override public void open(Configuration parameters) throws Exception {} @Override public void close() throws Exception {} }
flink-core-1.6.2-sources.jar!/org/apache/flink/api/common/functions/RuntimeContext.javaapi
/** * A RuntimeContext contains information about the context in which functions are executed. Each parallel instance * of the function will have a context through which it can access static contextual information (such as * the current parallelism) and other constructs like accumulators and broadcast variables. * * <p>A function can, during runtime, obtain the RuntimeContext via a call to * {@link AbstractRichFunction#getRuntimeContext()}. */ @Public public interface RuntimeContext { /** * Returns the name of the task in which the UDF runs, as assigned during plan construction. * * @return The name of the task in which the UDF runs. */ String getTaskName(); /** * Returns the metric group for this parallel subtask. * * @return The metric group for this parallel subtask. */ @PublicEvolving MetricGroup getMetricGroup(); /** * Gets the parallelism with which the parallel task runs. * * @return The parallelism with which the parallel task runs. */ int getNumberOfParallelSubtasks(); /** * Gets the number of max-parallelism with which the parallel task runs. * * @return The max-parallelism with which the parallel task runs. */ @PublicEvolving int getMaxNumberOfParallelSubtasks(); /** * Gets the number of this parallel subtask. The numbering starts from 0 and goes up to * parallelism-1 (parallelism as returned by {@link #getNumberOfParallelSubtasks()}). * * @return The index of the parallel subtask. */ int getIndexOfThisSubtask(); /** * Gets the attempt number of this parallel subtask. First attempt is numbered 0. * * @return Attempt number of the subtask. */ int getAttemptNumber(); /** * Returns the name of the task, appended with the subtask indicator, such as "MyTask (3/6)", * where 3 would be ({@link #getIndexOfThisSubtask()} + 1), and 6 would be * {@link #getNumberOfParallelSubtasks()}. * * @return The name of the task, with subtask indicator. */ String getTaskNameWithSubtasks(); /** * Returns the {@link org.apache.flink.api.common.ExecutionConfig} for the currently executing * job. */ ExecutionConfig getExecutionConfig(); //....... }
flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/jobmaster/JobMaster.javaapp
private Acknowledge startJobExecution(JobMasterId newJobMasterId) throws Exception { validateRunsInMainThread(); checkNotNull(newJobMasterId, "The new JobMasterId must not be null."); if (Objects.equals(getFencingToken(), newJobMasterId)) { log.info("Already started the job execution with JobMasterId {}.", newJobMasterId); return Acknowledge.get(); } setNewFencingToken(newJobMasterId); startJobMasterServices(); log.info("Starting execution of job {} ({})", jobGraph.getName(), jobGraph.getJobID()); resetAndScheduleExecutionGraph(); return Acknowledge.get(); } private void resetAndScheduleExecutionGraph() throws Exception { validateRunsInMainThread(); final CompletableFuture<Void> executionGraphAssignedFuture; if (executionGraph.getState() == JobStatus.CREATED) { executionGraphAssignedFuture = CompletableFuture.completedFuture(null); } else { suspendAndClearExecutionGraphFields(new FlinkException("ExecutionGraph is being reset in order to be rescheduled.")); final JobManagerJobMetricGroup newJobManagerJobMetricGroup = jobMetricGroupFactory.create(jobGraph); final ExecutionGraph newExecutionGraph = createAndRestoreExecutionGraph(newJobManagerJobMetricGroup); executionGraphAssignedFuture = executionGraph.getTerminationFuture().handleAsync( (JobStatus ignored, Throwable throwable) -> { assignExecutionGraph(newExecutionGraph, newJobManagerJobMetricGroup); return null; }, getMainThreadExecutor()); } executionGraphAssignedFuture.thenRun(this::scheduleExecutionGraph); } private void scheduleExecutionGraph() { checkState(jobStatusListener == null); // register self as job status change listener jobStatusListener = new JobManagerJobStatusListener(); executionGraph.registerJobStatusListener(jobStatusListener); try { executionGraph.scheduleForExecution(); } catch (Throwable t) { executionGraph.failGlobal(t); } }
flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/executiongraph/ExecutionGraph.javaide
public void scheduleForExecution() throws JobException { final long currentGlobalModVersion = globalModVersion; if (transitionState(JobStatus.CREATED, JobStatus.RUNNING)) { final CompletableFuture<Void> newSchedulingFuture; switch (scheduleMode) { case LAZY_FROM_SOURCES: newSchedulingFuture = scheduleLazy(slotProvider); break; case EAGER: newSchedulingFuture = scheduleEager(slotProvider, allocationTimeout); break; default: throw new JobException("Schedule mode is invalid."); } if (state == JobStatus.RUNNING && currentGlobalModVersion == globalModVersion) { schedulingFuture = newSchedulingFuture; newSchedulingFuture.whenCompleteAsync( (Void ignored, Throwable throwable) -> { if (throwable != null && !(throwable instanceof CancellationException)) { // only fail if the scheduling future was not canceled failGlobal(ExceptionUtils.stripCompletionException(throwable)); } }, futureExecutor); } else { newSchedulingFuture.cancel(false); } } else { throw new IllegalStateException("Job may only be scheduled from state " + JobStatus.CREATED); } }
flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/executiongraph/ExecutionGraph.javaui
/** * * * @param slotProvider The resource provider from which the slots are allocated * @param timeout The maximum time that the deployment may take, before a * TimeoutException is thrown. * @returns Future which is completed once the {@link ExecutionGraph} has been scheduled. * The future can also be completed exceptionally if an error happened. */ private CompletableFuture<Void> scheduleEager(SlotProvider slotProvider, final Time timeout) { checkState(state == JobStatus.RUNNING, "job is not running currently"); // Important: reserve all the space we need up front. // that way we do not have any operation that can fail between allocating the slots // and adding them to the list. If we had a failure in between there, that would // cause the slots to get lost final boolean queued = allowQueuedScheduling; // collecting all the slots may resize and fail in that operation without slots getting lost final ArrayList<CompletableFuture<Execution>> allAllocationFutures = new ArrayList<>(getNumberOfExecutionJobVertices()); // allocate the slots (obtain all their futures for (ExecutionJobVertex ejv : getVerticesTopologically()) { // these calls are not blocking, they only return futures Collection<CompletableFuture<Execution>> allocationFutures = ejv.allocateResourcesForAll( slotProvider, queued, LocationPreferenceConstraint.ALL, allocationTimeout); allAllocationFutures.addAll(allocationFutures); } // this future is complete once all slot futures are complete. // the future fails once one slot future fails. final ConjunctFuture<Collection<Execution>> allAllocationsFuture = FutureUtils.combineAll(allAllocationFutures); final CompletableFuture<Void> currentSchedulingFuture = allAllocationsFuture .thenAccept( (Collection<Execution> executionsToDeploy) -> { for (Execution execution : executionsToDeploy) { try { execution.deploy(); } catch (Throwable t) { throw new CompletionException( new FlinkException( String.format("Could not deploy execution %s.", execution), t)); } } }) // Generate a more specific failure message for the eager scheduling .exceptionally( (Throwable throwable) -> { final Throwable strippedThrowable = ExceptionUtils.stripCompletionException(throwable); final Throwable resultThrowable; if (strippedThrowable instanceof TimeoutException) { int numTotal = allAllocationsFuture.getNumFuturesTotal(); int numComplete = allAllocationsFuture.getNumFuturesCompleted(); String message = "Could not allocate all requires slots within timeout of " + timeout + ". Slots required: " + numTotal + ", slots allocated: " + numComplete; resultThrowable = new NoResourceAvailableException(message); } else { resultThrowable = strippedThrowable; } throw new CompletionException(resultThrowable); }); return currentSchedulingFuture; }
flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.javathis
/** * Acquires a slot for all the execution vertices of this ExecutionJobVertex. The method returns * pairs of the slots and execution attempts, to ease correlation between vertices and execution * attempts. * * <p>If this method throws an exception, it makes sure to release all so far requested slots. * * @param resourceProvider The resource provider from whom the slots are requested. * @param queued if the allocation can be queued * @param locationPreferenceConstraint constraint for the location preferences * @param allocationTimeout timeout for allocating the individual slots */ public Collection<CompletableFuture<Execution>> allocateResourcesForAll( SlotProvider resourceProvider, boolean queued, LocationPreferenceConstraint locationPreferenceConstraint, Time allocationTimeout) { final ExecutionVertex[] vertices = this.taskVertices; final CompletableFuture<Execution>[] slots = new CompletableFuture[vertices.length]; // try to acquire a slot future for each execution. // we store the execution with the future just to be on the safe side for (int i = 0; i < vertices.length; i++) { // allocate the next slot (future) final Execution exec = vertices[i].getCurrentExecutionAttempt(); final CompletableFuture<Execution> allocationFuture = exec.allocateAndAssignSlotForExecution( resourceProvider, queued, locationPreferenceConstraint, allocationTimeout); slots[i] = allocationFuture; } // all good, we acquired all slots return Arrays.asList(slots); }
flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/executiongraph/Execution.javaspa
/** * Deploys the execution to the previously assigned resource. * * @throws JobException if the execution cannot be deployed to the assigned resource */ public void deploy() throws JobException { final LogicalSlot slot = assignedResource; checkNotNull(slot, "In order to deploy the execution we first have to assign a resource via tryAssignResource."); //...... try { //...... final TaskDeploymentDescriptor deployment = vertex.createDeploymentDescriptor( attemptId, slot, taskRestore, attemptNumber); // null taskRestore to let it be GC'ed taskRestore = null; final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway(); final CompletableFuture<Acknowledge> submitResultFuture = taskManagerGateway.submitTask(deployment, rpcTimeout); submitResultFuture.whenCompleteAsync( (ack, failure) -> { // only respond to the failure case if (failure != null) { if (failure instanceof TimeoutException) { String taskname = vertex.getTaskNameWithSubtaskIndex() + " (" + attemptId + ')'; markFailed(new Exception( "Cannot deploy task " + taskname + " - TaskManager (" + getAssignedResourceLocation() + ") not responding after a rpcTimeout of " + rpcTimeout, failure)); } else { markFailed(failure); } } }, executor); } catch (Throwable t) { markFailed(t); ExceptionUtils.rethrow(t); } }
flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.javacode
private final ExecutionVertex[] taskVertices; public ExecutionJobVertex( ExecutionGraph graph, JobVertex jobVertex, int defaultParallelism, Time timeout, long initialGlobalModVersion, long createTimestamp) throws JobException { if (graph == null || jobVertex == null) { throw new NullPointerException(); } this.graph = graph; this.jobVertex = jobVertex; int vertexParallelism = jobVertex.getParallelism(); int numTaskVertices = vertexParallelism > 0 ? vertexParallelism : defaultParallelism; final int configuredMaxParallelism = jobVertex.getMaxParallelism(); this.maxParallelismConfigured = (VALUE_NOT_SET != configuredMaxParallelism); // if no max parallelism was configured by the user, we calculate and set a default setMaxParallelismInternal(maxParallelismConfigured ? configuredMaxParallelism : KeyGroupRangeAssignment.computeDefaultMaxParallelism(numTaskVertices)); // verify that our parallelism is not higher than the maximum parallelism if (numTaskVertices > maxParallelism) { throw new JobException( String.format("Vertex %s's parallelism (%s) is higher than the max parallelism (%s). Please lower the parallelism or increase the max parallelism.", jobVertex.getName(), numTaskVertices, maxParallelism)); } this.parallelism = numTaskVertices; this.serializedTaskInformation = null; this.taskVertices = new ExecutionVertex[numTaskVertices]; //...... // create all task vertices for (int i = 0; i < numTaskVertices; i++) { ExecutionVertex vertex = new ExecutionVertex( this, i, producedDataSets, timeout, initialGlobalModVersion, createTimestamp, maxPriorAttemptsHistoryLength); this.taskVertices[i] = vertex; } //...... }
通常大於0
),大於0則取jobVertex.getParallelism()的值爲numTaskVertices;若是不大於0則取defaultParallelism(ExecutionGraph的attachJobGraph方法裏頭建立ExecutionJobVertex時,傳遞的defaultParallelism爲1
)若是streamNode.getParallelism()的值大於0的話
)詳見DataStreamSource的構造器、DataStream.transform方法、DataStreamSink的構造器;DataStreamSource裏頭會將不是parallel類型的source的parallelism重置爲1
);若是是LocalEnvironment的話,它默認是取Runtime.getRuntime().availableProcessors()返回當前的task的parallelism
)以及getIndexOfThisSubtask(獲取當前parallel subtask的下標
)方法,能夠方便開發既能並行執行但各自發射的數據又不重複的ParallelSourceFunction若是streamNode.getParallelism()的值大於0的話
),若是用戶沒有設置則默認是取StreamExecutionEnvironment的parallelism;LocalEnvironment的話,它默認是取Runtime.getRuntime().availableProcessors()