程序會轉成JobGrapth提交,JobGraph最終轉爲ExecutionGraph進行處理java
ExecutionGraph會拆分紅ExecutionJobVertex執行,按(DataSourceTask,BatchTask,DataSinkTask) 進行拆分express
0 = jobVertex = {InputFormatVertex@7675} "CHAIN DataSource (at com.opensourceteams.module.bigdata.flink.example.dataset.worldcount.WordCountRun$.main(WordCountRun.scala:19) (org.apache.flink.api.java.io.TextInp) -> FlatMap (FlatMap at com.opensourceteams.module.bigdata.flink.example.dataset.worldcount.WordCountRun$.main(WordCountRun.scala:23)) -> Map (Map at com.opensourceteams.module.bigdata.flink.example.dataset.worldcount.WordCountRun$.main(WordCountRun.scala:23)) -> Combine (SUM(1)) (org.apache.flink.runtime.operators.DataSourceTask)"apache
1 = jobVertex = {JobVertex@7695} "Reduce (SUM(1)) (org.apache.flink.runtime.operators.BatchTask)"bootstrap
2 = jobVertex = {OutputFormatVertex@6665} "DataSink (collect()) (org.apache.flink.runtime.operators.DataSinkTask)" ```api
ExecutionJobVertex 執行流程CREATED -> DEPLOYING ,轉成對應的Task(CREATED -->DEPLOYING --> RUNNING)app
默認做業調度模式爲:LAZY_FROM_SOURCES,只啓動Source任務,下游任務是當上遊任務開始給他發送數據時纔開始less
有幾個並行度,ExecutionJobVertex 會轉成對應的幾個ExecutionVertex,ExecutionVertex 是會轉化成Task來運行,ExecutionVertex中並行度經過subTaskIndex來區分,第一個subTaskIndex=0 ,第二個subTaskIndex = 1dom
c a a b c a
package com.opensourceteams.module.bigdata.flink.example.dataset.worldcount import com.opensourceteams.module.bigdata.flink.common.ConfigurationUtil import org.apache.flink.api.scala.ExecutionEnvironment /** * 批處理,DataSet WordCount分析 */ object WordCountRun { def main(args: Array[String]): Unit = { //調試設置超時問題 val env : ExecutionEnvironment= ExecutionEnvironment.createLocalEnvironment(ConfigurationUtil.getConfiguration(true)) env.setParallelism(2) val dataSet = env.readTextFile("file:/opt/n_001_workspaces/bigdata/flink/flink-maven-scala-2/src/main/resources/data/line.txt") import org.apache.flink.streaming.api.scala._ val result = dataSet.flatMap(x => x.split(" ")).map((_,1)).groupBy(0).sum(1) result.print() } }
new JobMaster()async
把JobGraph 轉換爲ExecutionGrapthmaven
this.executionGraph = createAndRestoreExecutionGraph(jobManagerJobMetricGroup);
public JobMaster( RpcService rpcService, JobMasterConfiguration jobMasterConfiguration, ResourceID resourceId, JobGraph jobGraph, HighAvailabilityServices highAvailabilityService, SlotPoolFactory slotPoolFactory, JobManagerSharedServices jobManagerSharedServices, HeartbeatServices heartbeatServices, BlobServer blobServer, JobManagerJobMetricGroupFactory jobMetricGroupFactory, OnCompletionActions jobCompletionActions, FatalErrorHandler fatalErrorHandler, ClassLoader userCodeLoader) throws Exception { super(rpcService, AkkaRpcServiceUtils.createRandomName(JOB_MANAGER_NAME)); final JobMasterGateway selfGateway = getSelfGateway(JobMasterGateway.class); this.jobMasterConfiguration = checkNotNull(jobMasterConfiguration); this.resourceId = checkNotNull(resourceId); this.jobGraph = checkNotNull(jobGraph); this.rpcTimeout = jobMasterConfiguration.getRpcTimeout(); this.highAvailabilityServices = checkNotNull(highAvailabilityService); this.blobServer = checkNotNull(blobServer); this.scheduledExecutorService = jobManagerSharedServices.getScheduledExecutorService(); this.jobCompletionActions = checkNotNull(jobCompletionActions); this.fatalErrorHandler = checkNotNull(fatalErrorHandler); this.userCodeLoader = checkNotNull(userCodeLoader); this.jobMetricGroupFactory = checkNotNull(jobMetricGroupFactory); this.taskManagerHeartbeatManager = heartbeatServices.createHeartbeatManagerSender( resourceId, new TaskManagerHeartbeatListener(selfGateway), rpcService.getScheduledExecutor(), log); this.resourceManagerHeartbeatManager = heartbeatServices.createHeartbeatManager( resourceId, new ResourceManagerHeartbeatListener(), rpcService.getScheduledExecutor(), log); final String jobName = jobGraph.getName(); final JobID jid = jobGraph.getJobID(); log.info("Initializing job {} ({}).", jobName, jid); final RestartStrategies.RestartStrategyConfiguration restartStrategyConfiguration = jobGraph.getSerializedExecutionConfig() .deserializeValue(userCodeLoader) .getRestartStrategy(); this.restartStrategy = RestartStrategyResolving.resolve(restartStrategyConfiguration, jobManagerSharedServices.getRestartStrategyFactory(), jobGraph.isCheckpointingEnabled()); log.info("Using restart strategy {} for {} ({}).", this.restartStrategy, jobName, jid); resourceManagerLeaderRetriever = highAvailabilityServices.getResourceManagerLeaderRetriever(); this.slotPool = checkNotNull(slotPoolFactory).createSlotPool(jobGraph.getJobID()); this.slotPoolGateway = slotPool.getSelfGateway(SlotPoolGateway.class); this.registeredTaskManagers = new HashMap<>(4); this.backPressureStatsTracker = checkNotNull(jobManagerSharedServices.getBackPressureStatsTracker()); this.lastInternalSavepoint = null; this.jobManagerJobMetricGroup = jobMetricGroupFactory.create(jobGraph); this.executionGraph = createAndRestoreExecutionGraph(jobManagerJobMetricGroup); this.jobStatusListener = null; this.resourceManagerConnection = null; this.establishedResourceManagerConnection = null; }
transitionState(JobStatus.CREATED, JobStatus.RUNNING) - INFO級別日誌
Job Flink Java Job at Mon Mar 11 18:57:37 CST 2019 (f24b82ed1ec3e1c90455c342a9dfc21e) switched from state CREATED to RUNNING. ```
默認的做業調度模式 LAZY_FROM_SOURCES,
private ScheduleMode scheduleMode = ScheduleMode.LAZY_FROM_SOURCES; ```
public void scheduleForExecution() throws JobException { final long currentGlobalModVersion = globalModVersion; if (transitionState(JobStatus.CREATED, JobStatus.RUNNING)) { final CompletableFuture<Void> newSchedulingFuture; switch (scheduleMode) { case LAZY_FROM_SOURCES: newSchedulingFuture = scheduleLazy(slotProvider); break; case EAGER: newSchedulingFuture = scheduleEager(slotProvider, allocationTimeout); break; default: throw new JobException("Schedule mode is invalid."); } if (state == JobStatus.RUNNING && currentGlobalModVersion == globalModVersion) { schedulingFuture = newSchedulingFuture; newSchedulingFuture.whenCompleteAsync( (Void ignored, Throwable throwable) -> { if (throwable != null && !(throwable instanceof CancellationException)) { // only fail if the scheduling future was not canceled failGlobal(ExceptionUtils.stripCompletionException(throwable)); } }, futureExecutor); } else { newSchedulingFuture.cancel(false); } } else { throw new IllegalStateException("Job may only be scheduled from state " + JobStatus.CREATED); } }
/** * Possible states of a job once it has been accepted by the job manager. */ public enum JobStatus { /** Job is newly created, no task has started to run. */ CREATED(TerminalState.NON_TERMINAL), /** Some tasks are scheduled or running, some may be pending, some may be finished. */ RUNNING(TerminalState.NON_TERMINAL), /** The job has failed and is currently waiting for the cleanup to complete */ FAILING(TerminalState.NON_TERMINAL), /** The job has failed with a non-recoverable task failure */ FAILED(TerminalState.GLOBALLY), /** Job is being cancelled */ CANCELLING(TerminalState.NON_TERMINAL), /** Job has been cancelled */ CANCELED(TerminalState.GLOBALLY), /** All of the job's tasks have successfully finished. */ FINISHED(TerminalState.GLOBALLY), /** The job is currently undergoing a reset and total restart */ RESTARTING(TerminalState.NON_TERMINAL), /** The job has been suspended and is currently waiting for the cleanup to complete */ SUSPENDING(TerminalState.NON_TERMINAL), /** * The job has been suspended which means that it has been stopped but not been removed from a * potential HA job store. */ SUSPENDED(TerminalState.LOCALLY), /** The job is currently reconciling and waits for task execution report to recover state. */ RECONCILING(TerminalState.NON_TERMINAL); // ----------------------------
/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.flink.runtime.jobgraph; /** * The ScheduleMode decides how tasks of an execution graph are started. */ public enum ScheduleMode { /** Schedule tasks lazily from the sources. Downstream tasks are started once their input data are ready */ LAZY_FROM_SOURCES, /** Schedules all tasks immediately. */ EAGER; /** * Returns whether we are allowed to deploy consumers lazily. */ public boolean allowLazyDeployment() { return this == LAZY_FROM_SOURCES; } }
程序會轉成JobGrapth提交,JobGraph最終轉爲ExecutionGraph進行處理
ExecutionGraph會拆分紅ExecutionJobVertex執行,按(DataSourceTask,BatchTask,DataSinkTask) 進行拆分
0 = jobVertex = {InputFormatVertex@7675} "CHAIN DataSource (at com.opensourceteams.module.bigdata.flink.example.dataset.worldcount.WordCountRun$.main(WordCountRun.scala:19) (org.apache.flink.api.java.io.TextInp) -> FlatMap (FlatMap at com.opensourceteams.module.bigdata.flink.example.dataset.worldcount.WordCountRun$.main(WordCountRun.scala:23)) -> Map (Map at com.opensourceteams.module.bigdata.flink.example.dataset.worldcount.WordCountRun$.main(WordCountRun.scala:23)) -> Combine (SUM(1)) (org.apache.flink.runtime.operators.DataSourceTask)"
1 = jobVertex = {JobVertex@7695} "Reduce (SUM(1)) (org.apache.flink.runtime.operators.BatchTask)"
2 = jobVertex = {OutputFormatVertex@6665} "DataSink (collect()) (org.apache.flink.runtime.operators.DataSinkTask)" ```
ExecutionJobVertex (執行流程:CREATED -> DEPLOYING ),轉成對應的Task(執行流程:CREATED -->DEPLOYING --> RUNNING)
verticesInCreationOrder = {ArrayList@6145} size = 3
0 = {ExecutionJobVertex@6484} stateMonitor = {Object@6608} graph = {ExecutionGraph@5602} jobVertex = {InputFormatVertex@6609} "CHAIN DataSource (at com.opensourceteams.module.bigdata.flink.example.dataset.worldcount.WordCountRun$.main(WordCountRun.scala:19) (org.apache.flink.api.java.io.TextInp) -> FlatMap (FlatMap at com.opensourceteams.module.bigdata.flink.example.dataset.worldcount.WordCountRun$.main(WordCountRun.scala:23)) -> Map (Map at com.opensourceteams.module.bigdata.flink.example.dataset.worldcount.WordCountRun$.main(WordCountRun.scala:23)) -> Combine (SUM(1)) (org.apache.flink.runtime.operators.DataSourceTask)" operatorIDs = {Collections$UnmodifiableRandomAccessList@6610} size = 1 userDefinedOperatorIds = {Collections$UnmodifiableRandomAccessList@6611} size = 1 taskVertices = {ExecutionVertex[2]@6612} producedDataSets = {IntermediateResult[1]@6614} inputs = {ArrayList@6616} size = 0 parallelism = 2 slotSharingGroup = {SlotSharingGroup@6617} "SlotSharingGroup [9eb9f93248a641c71df925ec6245124a, b2e911c37d2ae462430e12812fb033ad, a2139d4cdf059b384d883251604a5f2e]" coLocationGroup = null inputSplits = {FileInputSplit[2]@6618} maxParallelismConfigured = true maxParallelism = 2 serializedTaskInformation = null taskInformationBlobKey = null taskInformationOrBlobKey = null splitAssigner = {LocatableInputSplitAssigner@6620} 1 = {ExecutionJobVertex@6606} stateMonitor = {Object@6653} graph = {ExecutionGraph@5602} jobVertex = {JobVertex@6654} "Reduce (SUM(1)) (org.apache.flink.runtime.operators.BatchTask)" operatorIDs = {Collections$UnmodifiableRandomAccessList@6655} size = 1 userDefinedOperatorIds = {Collections$UnmodifiableRandomAccessList@6656} size = 1 taskVertices = {ExecutionVertex[2]@6658} producedDataSets = {IntermediateResult[1]@6659} inputs = {ArrayList@6660} size = 1 parallelism = 2 slotSharingGroup = {SlotSharingGroup@6617} "SlotSharingGroup [9eb9f93248a641c71df925ec6245124a, b2e911c37d2ae462430e12812fb033ad, a2139d4cdf059b384d883251604a5f2e]" coLocationGroup = null inputSplits = null maxParallelismConfigured = true maxParallelism = 2 serializedTaskInformation = null taskInformationBlobKey = null taskInformationOrBlobKey = null splitAssigner = null 2 = {ExecutionJobVertex@6607} stateMonitor = {Object@6664} graph = {ExecutionGraph@5602} jobVertex = {OutputFormatVertex@6665} "DataSink (collect()) (org.apache.flink.runtime.operators.DataSinkTask)" operatorIDs = {Collections$UnmodifiableRandomAccessList@6666} size = 1 userDefinedOperatorIds = {Collections$UnmodifiableRandomAccessList@6667} size = 1 taskVertices = {ExecutionVertex[2]@6668} producedDataSets = {IntermediateResult[0]@6669} inputs = {ArrayList@6670} size = 1 parallelism = 2 slotSharingGroup = {SlotSharingGroup@6617} "SlotSharingGroup [9eb9f93248a641c71df925ec6245124a, b2e911c37d2ae462430e12812fb033ad, a2139d4cdf059b384d883251604a5f2e]" coLocationGroup = null inputSplits = null maxParallelismConfigured = true maxParallelism = 2 serializedTaskInformation = null taskInformationBlobKey = null taskInformationOrBlobKey = null splitAssigner = null ```
private CompletableFuture<Void> scheduleLazy(SlotProvider slotProvider) { final ArrayList<CompletableFuture<Void>> schedulingFutures = new ArrayList<>(numVerticesTotal); // simply take the vertices without inputs. for (ExecutionJobVertex ejv : verticesInCreationOrder) { if (ejv.getJobVertex().isInputVertex()) { final CompletableFuture<Void> schedulingJobVertexFuture = ejv.scheduleAll( slotProvider, allowQueuedScheduling, LocationPreferenceConstraint.ALL, // since it is an input vertex, the input based location preferences should be empty Collections.emptySet()); schedulingFutures.add(schedulingJobVertexFuture); } } return FutureUtils.waitForAll(schedulingFutures); }
//--------------------------------------------------------------------------------------------- // Actions //--------------------------------------------------------------------------------------------- /** * Schedules all execution vertices of this ExecutionJobVertex. * * @param slotProvider to allocate the slots from * @param queued if the allocations can be queued * @param locationPreferenceConstraint constraint for the location preferences * @param allPreviousExecutionGraphAllocationIds set with all previous allocation ids in the job graph. * Can be empty if the allocation ids are not required for scheduling. * @return Future which is completed once all {@link Execution} could be deployed */ public CompletableFuture<Void> scheduleAll( SlotProvider slotProvider, boolean queued, LocationPreferenceConstraint locationPreferenceConstraint, @Nonnull Set<AllocationID> allPreviousExecutionGraphAllocationIds) { final ExecutionVertex[] vertices = this.taskVertices; final ArrayList<CompletableFuture<Void>> scheduleFutures = new ArrayList<>(vertices.length); // kick off the tasks for (ExecutionVertex ev : vertices) { scheduleFutures.add(ev.scheduleForExecution( slotProvider, queued, locationPreferenceConstraint, allPreviousExecutionGraphAllocationIds)); } return FutureUtils.waitForAll(scheduleFutures); }
/** * Schedules the current execution of this ExecutionVertex. * * @param slotProvider to allocate the slots from * @param queued if the allocation can be queued * @param locationPreferenceConstraint constraint for the location preferences * @param allPreviousExecutionGraphAllocationIds set with all previous allocation ids in the job graph. * Can be empty if the allocation ids are not required for scheduling. * @return Future which is completed once the execution is deployed. The future * can also completed exceptionally. */ public CompletableFuture<Void> scheduleForExecution( SlotProvider slotProvider, boolean queued, LocationPreferenceConstraint locationPreferenceConstraint, @Nonnull Set<AllocationID> allPreviousExecutionGraphAllocationIds) { return this.currentExecution.scheduleForExecution( slotProvider, queued, locationPreferenceConstraint, allPreviousExecutionGraphAllocationIds); }
分配Slot給Execution
final CompletableFuture<Execution> allocationFuture = allocateAndAssignSlotForExecution( slotProvider, queued, locationPreferenceConstraint, allPreviousExecutionGraphAllocationIds, allocationTimeout);
調用Execution.deploy()函數,部署Execution到分給的slot中
/** * NOTE: This method only throws exceptions if it is in an illegal state to be scheduled, or if the tasks needs * to be scheduled immediately and no resource is available. If the task is accepted by the schedule, any * error sets the vertex state to failed and triggers the recovery logic. * * @param slotProvider The slot provider to use to allocate slot for this execution attempt. * @param queued Flag to indicate whether the scheduler may queue this task if it cannot * immediately deploy it. * @param locationPreferenceConstraint constraint for the location preferences * @param allPreviousExecutionGraphAllocationIds set with all previous allocation ids in the job graph. * Can be empty if the allocation ids are not required for scheduling. * @return Future which is completed once the Execution has been deployed */ public CompletableFuture<Void> scheduleForExecution( SlotProvider slotProvider, boolean queued, LocationPreferenceConstraint locationPreferenceConstraint, @Nonnull Set<AllocationID> allPreviousExecutionGraphAllocationIds) { final Time allocationTimeout = vertex.getExecutionGraph().getAllocationTimeout(); try { final CompletableFuture<Execution> allocationFuture = allocateAndAssignSlotForExecution( slotProvider, queued, locationPreferenceConstraint, allPreviousExecutionGraphAllocationIds, allocationTimeout); // IMPORTANT: We have to use the synchronous handle operation (direct executor) here so // that we directly deploy the tasks if the slot allocation future is completed. This is // necessary for immediate deployment. final CompletableFuture<Void> deploymentFuture = allocationFuture.thenAccept( (FutureConsumerWithException<Execution, Exception>) value -> deploy()); deploymentFuture.whenComplete( (Void ignored, Throwable failure) -> { if (failure != null) { final Throwable stripCompletionException = ExceptionUtils.stripCompletionException(failure); final Throwable schedulingFailureCause; if (stripCompletionException instanceof TimeoutException) { schedulingFailureCause = new NoResourceAvailableException( "Could not allocate enough slots within timeout of " + allocationTimeout + " to run the job. " + "Please make sure that the cluster has enough resources."); } else { schedulingFailureCause = stripCompletionException; } markFailed(schedulingFailureCause); } }); // if tasks have to scheduled immediately check that the task has been deployed if (!queued && !deploymentFuture.isDone()) { deploymentFuture.completeExceptionally(new IllegalArgumentException("The slot allocation future has not been completed yet.")); } return deploymentFuture; } catch (IllegalExecutionStateException e) { return FutureUtils.completedExceptionally(e); } }
Execution 狀態從SCHDULED到DEPLOYING
構建部署對象
final TaskDeploymentDescriptor deployment = vertex.createDeploymentDescriptor( attemptId, slot, taskRestore, attemptNumber);
調用TaskExecutor.submitTask
/** * Deploys the execution to the previously assigned resource. * * @throws JobException if the execution cannot be deployed to the assigned resource */ public void deploy() throws JobException { final LogicalSlot slot = assignedResource; checkNotNull(slot, "In order to deploy the execution we first have to assign a resource via tryAssignResource."); // Check if the TaskManager died in the meantime // This only speeds up the response to TaskManagers failing concurrently to deployments. // The more general check is the rpcTimeout of the deployment call if (!slot.isAlive()) { throw new JobException("Target slot (TaskManager) for deployment is no longer alive."); } // make sure exactly one deployment call happens from the correct state // note: the transition from CREATED to DEPLOYING is for testing purposes only ExecutionState previous = this.state; if (previous == SCHEDULED || previous == CREATED) { if (!transitionState(previous, DEPLOYING)) { // race condition, someone else beat us to the deploying call. // this should actually not happen and indicates a race somewhere else throw new IllegalStateException("Cannot deploy task: Concurrent deployment call race."); } } else { // vertex may have been cancelled, or it was already scheduled throw new IllegalStateException("The vertex must be in CREATED or SCHEDULED state to be deployed. Found state " + previous); } if (this != slot.getPayload()) { throw new IllegalStateException( String.format("The execution %s has not been assigned to the assigned slot.", this)); } try { // race double check, did we fail/cancel and do we need to release the slot? if (this.state != DEPLOYING) { slot.releaseSlot(new FlinkException("Actual state of execution " + this + " (" + state + ") does not match expected state DEPLOYING.")); return; } if (LOG.isInfoEnabled()) { LOG.info(String.format("Deploying %s (attempt #%d) to %s", vertex.getTaskNameWithSubtaskIndex(), attemptNumber, getAssignedResourceLocation().getHostname())); } final TaskDeploymentDescriptor deployment = vertex.createDeploymentDescriptor( attemptId, slot, taskRestore, attemptNumber); // null taskRestore to let it be GC'ed taskRestore = null; final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway(); final CompletableFuture<Acknowledge> submitResultFuture = taskManagerGateway.submitTask(deployment, rpcTimeout); submitResultFuture.whenCompleteAsync( (ack, failure) -> { // only respond to the failure case if (failure != null) { if (failure instanceof TimeoutException) { String taskname = vertex.getTaskNameWithSubtaskIndex() + " (" + attemptId + ')'; markFailed(new Exception( "Cannot deploy task " + taskname + " - TaskManager (" + getAssignedResourceLocation() + ") not responding after a rpcTimeout of " + rpcTimeout, failure)); } else { markFailed(failure); } } }, executor); } catch (Throwable t) { markFailed(t); ExceptionUtils.rethrow(t); } }
package org.apache.flink.runtime.execution; /** * An enumeration of all states that a task can be in during its execution. * Tasks usually start in the state {@code CREATED} and switch states according to * this diagram: * <pre>{@code * * CREATED -> SCHEDULED -> DEPLOYING -> RUNNING -> FINISHED * | | | | * | | | +------+ * | | V V * | | CANCELLING -----+----> CANCELED * | | | * | +-------------------------+ * | * | ... -> FAILED * V * RECONCILING -> RUNNING | FINISHED | CANCELED | FAILED * * }</pre> * * <p>It is possible to enter the {@code RECONCILING} state from {@code CREATED} * state if job manager fail over, and the {@code RECONCILING} state can switch into * any existing task state. * * <p>It is possible to enter the {@code FAILED} state from any other state. * * <p>The states {@code FINISHED}, {@code CANCELED}, and {@code FAILED} are * considered terminal states. */ public enum ExecutionState { CREATED, SCHEDULED, DEPLOYING, RUNNING, /** * This state marks "successfully completed". It can only be reached when a * program reaches the "end of its input". The "end of input" can be reached * when consuming a bounded input (fix set of files, bounded query, etc) or * when stopping a program (not cancelling!) which make the input look like * it reached its end at a specific point. */ FINISHED, CANCELING, CANCELED, FAILED, RECONCILING; public boolean isTerminal() { return this == FINISHED || this == CANCELED || this == FAILED; } }
構建Task,Task 默認的狀態爲CREATED
Task task = new Task( jobInformation, taskInformation, tdd.getExecutionAttemptId(), tdd.getAllocationId(), tdd.getSubtaskIndex(), tdd.getAttemptNumber(), tdd.getProducedPartitions(), tdd.getInputGates(), tdd.getTargetSlotNumber(), taskExecutorServices.getMemoryManager(), taskExecutorServices.getIOManager(), taskExecutorServices.getNetworkEnvironment(), taskExecutorServices.getBroadcastVariableManager(), taskStateManager, taskManagerActions, inputSplitProvider, checkpointResponder, blobCacheService, libraryCache, fileCache, taskManagerConfiguration, taskMetricGroup, resultPartitionConsumableNotifier, partitionStateChecker, getRpcService().getExecutor());
調用task.startTaskThread(); //調用task線程的run()函數
// ---------------------------------------------------------------------- // Task lifecycle RPCs // ---------------------------------------------------------------------- @Override public CompletableFuture<Acknowledge> submitTask( TaskDeploymentDescriptor tdd, JobMasterId jobMasterId, Time timeout) { try { final JobID jobId = tdd.getJobId(); final JobManagerConnection jobManagerConnection = jobManagerTable.get(jobId); if (jobManagerConnection == null) { final String message = "Could not submit task because there is no JobManager " + "associated for the job " + jobId + '.'; log.debug(message); throw new TaskSubmissionException(message); } if (!Objects.equals(jobManagerConnection.getJobMasterId(), jobMasterId)) { final String message = "Rejecting the task submission because the job manager leader id " + jobMasterId + " does not match the expected job manager leader id " + jobManagerConnection.getJobMasterId() + '.'; log.debug(message); throw new TaskSubmissionException(message); } if (!taskSlotTable.tryMarkSlotActive(jobId, tdd.getAllocationId())) { final String message = "No task slot allocated for job ID " + jobId + " and allocation ID " + tdd.getAllocationId() + '.'; log.debug(message); throw new TaskSubmissionException(message); } // re-integrate offloaded data: try { tdd.loadBigData(blobCacheService.getPermanentBlobService()); } catch (IOException | ClassNotFoundException e) { throw new TaskSubmissionException("Could not re-integrate offloaded TaskDeploymentDescriptor data.", e); } // deserialize the pre-serialized information final JobInformation jobInformation; final TaskInformation taskInformation; try { jobInformation = tdd.getSerializedJobInformation().deserializeValue(getClass().getClassLoader()); taskInformation = tdd.getSerializedTaskInformation().deserializeValue(getClass().getClassLoader()); } catch (IOException | ClassNotFoundException e) { throw new TaskSubmissionException("Could not deserialize the job or task information.", e); } if (!jobId.equals(jobInformation.getJobId())) { throw new TaskSubmissionException( "Inconsistent job ID information inside TaskDeploymentDescriptor (" + tdd.getJobId() + " vs. " + jobInformation.getJobId() + ")"); } TaskMetricGroup taskMetricGroup = taskManagerMetricGroup.addTaskForJob( jobInformation.getJobId(), jobInformation.getJobName(), taskInformation.getJobVertexId(), tdd.getExecutionAttemptId(), taskInformation.getTaskName(), tdd.getSubtaskIndex(), tdd.getAttemptNumber()); InputSplitProvider inputSplitProvider = new RpcInputSplitProvider( jobManagerConnection.getJobManagerGateway(), taskInformation.getJobVertexId(), tdd.getExecutionAttemptId(), taskManagerConfiguration.getTimeout()); TaskManagerActions taskManagerActions = jobManagerConnection.getTaskManagerActions(); CheckpointResponder checkpointResponder = jobManagerConnection.getCheckpointResponder(); LibraryCacheManager libraryCache = jobManagerConnection.getLibraryCacheManager(); ResultPartitionConsumableNotifier resultPartitionConsumableNotifier = jobManagerConnection.getResultPartitionConsumableNotifier(); PartitionProducerStateChecker partitionStateChecker = jobManagerConnection.getPartitionStateChecker(); final TaskLocalStateStore localStateStore = localStateStoresManager.localStateStoreForSubtask( jobId, tdd.getAllocationId(), taskInformation.getJobVertexId(), tdd.getSubtaskIndex()); final JobManagerTaskRestore taskRestore = tdd.getTaskRestore(); final TaskStateManager taskStateManager = new TaskStateManagerImpl( jobId, tdd.getExecutionAttemptId(), localStateStore, taskRestore, checkpointResponder); Task task = new Task( jobInformation, taskInformation, tdd.getExecutionAttemptId(), tdd.getAllocationId(), tdd.getSubtaskIndex(), tdd.getAttemptNumber(), tdd.getProducedPartitions(), tdd.getInputGates(), tdd.getTargetSlotNumber(), taskExecutorServices.getMemoryManager(), taskExecutorServices.getIOManager(), taskExecutorServices.getNetworkEnvironment(), taskExecutorServices.getBroadcastVariableManager(), taskStateManager, taskManagerActions, inputSplitProvider, checkpointResponder, blobCacheService, libraryCache, fileCache, taskManagerConfiguration, taskMetricGroup, resultPartitionConsumableNotifier, partitionStateChecker, getRpcService().getExecutor()); log.info("Received task {}.", task.getTaskInfo().getTaskNameWithSubtasks()); boolean taskAdded; try { taskAdded = taskSlotTable.addTask(task); } catch (SlotNotFoundException | SlotNotActiveException e) { throw new TaskSubmissionException("Could not submit task.", e); } if (taskAdded) { task.startTaskThread(); return CompletableFuture.completedFuture(Acknowledge.get()); } else { final String message = "TaskManager already contains a task for id " + task.getExecutionId() + '.'; log.debug(message); throw new TaskSubmissionException(message); } } catch (TaskSubmissionException e) { return FutureUtils.completedExceptionally(e); } }
這纔開始處理Task,任務的狀態用的是ExecutionState中的狀態值
更新Task狀態從CREATED 到 DEPLOYING
加載這個Task的jar文件
// first of all, get a user-code classloader // this may involve downloading the job's JAR files and/or classes LOG.info("Loading JAR files for task {}.", this); userCodeClassLoader = createUserCodeClassloader(); final ExecutionConfig executionConfig = serializedExecutionConfig.deserializeValue(userCodeClassLoader);
構建任務運行環境
Environment env = new RuntimeEnvironment( jobId, vertexId, executionId, executionConfig, taskInfo, jobConfiguration, taskConfiguration, userCodeClassLoader, memoryManager, ioManager, broadcastVariableManager, taskStateManager, accumulatorRegistry, kvStateRegistry, inputSplitProvider, distributedCacheEntries, producedPartitions, inputGates, network.getTaskEventDispatcher(), checkpointResponder, taskManagerConfig, metrics, this);
更新當前任務狀態從 DEPLOYING 到 RUNNING
transitionState(ExecutionState.DEPLOYING, ExecutionState.RUNNING)
調用DataSourceTask.invoke(),會根據具體的任務,調用具體任務的函數
/** * The core work method that bootstraps the task and executes its code. */ @Override public void run() { // ---------------------------- // Initial State transition // ---------------------------- while (true) { ExecutionState current = this.executionState; if (current == ExecutionState.CREATED) { if (transitionState(ExecutionState.CREATED, ExecutionState.DEPLOYING)) { // success, we can start our work break; } } else if (current == ExecutionState.FAILED) { // we were immediately failed. tell the TaskManager that we reached our final state notifyFinalState(); if (metrics != null) { metrics.close(); } return; } else if (current == ExecutionState.CANCELING) { if (transitionState(ExecutionState.CANCELING, ExecutionState.CANCELED)) { // we were immediately canceled. tell the TaskManager that we reached our final state notifyFinalState(); if (metrics != null) { metrics.close(); } return; } } else { if (metrics != null) { metrics.close(); } throw new IllegalStateException("Invalid state for beginning of operation of task " + this + '.'); } } // all resource acquisitions and registrations from here on // need to be undone in the end Map<String, Future<Path>> distributedCacheEntries = new HashMap<>(); AbstractInvokable invokable = null; try { // ---------------------------- // Task Bootstrap - We periodically // check for canceling as a shortcut // ---------------------------- // activate safety net for task thread LOG.info("Creating FileSystem stream leak safety net for task {}", this); FileSystemSafetyNet.initializeSafetyNetForThread(); blobService.getPermanentBlobService().registerJob(jobId); // first of all, get a user-code classloader // this may involve downloading the job's JAR files and/or classes LOG.info("Loading JAR files for task {}.", this); userCodeClassLoader = createUserCodeClassloader(); final ExecutionConfig executionConfig = serializedExecutionConfig.deserializeValue(userCodeClassLoader); if (executionConfig.getTaskCancellationInterval() >= 0) { // override task cancellation interval from Flink config if set in ExecutionConfig taskCancellationInterval = executionConfig.getTaskCancellationInterval(); } if (executionConfig.getTaskCancellationTimeout() >= 0) { // override task cancellation timeout from Flink config if set in ExecutionConfig taskCancellationTimeout = executionConfig.getTaskCancellationTimeout(); } if (isCanceledOrFailed()) { throw new CancelTaskException(); } // ---------------------------------------------------------------- // register the task with the network stack // this operation may fail if the system does not have enough // memory to run the necessary data exchanges // the registration must also strictly be undone // ---------------------------------------------------------------- LOG.info("Registering task at network: {}.", this); network.registerTask(this); // add metrics for buffers this.metrics.getIOMetricGroup().initializeBufferMetrics(this); // register detailed network metrics, if configured if (taskManagerConfig.getConfiguration().getBoolean(TaskManagerOptions.NETWORK_DETAILED_METRICS)) { // similar to MetricUtils.instantiateNetworkMetrics() but inside this IOMetricGroup MetricGroup networkGroup = this.metrics.getIOMetricGroup().addGroup("Network"); MetricGroup outputGroup = networkGroup.addGroup("Output"); MetricGroup inputGroup = networkGroup.addGroup("Input"); // output metrics for (int i = 0; i < producedPartitions.length; i++) { ResultPartitionMetrics.registerQueueLengthMetrics( outputGroup.addGroup(i), producedPartitions[i]); } for (int i = 0; i < inputGates.length; i++) { InputGateMetrics.registerQueueLengthMetrics( inputGroup.addGroup(i), inputGates[i]); } } // next, kick off the background copying of files for the distributed cache try { for (Map.Entry<String, DistributedCache.DistributedCacheEntry> entry : DistributedCache.readFileInfoFromConfig(jobConfiguration)) { LOG.info("Obtaining local cache file for '{}'.", entry.getKey()); Future<Path> cp = fileCache.createTmpFile(entry.getKey(), entry.getValue(), jobId, executionId); distributedCacheEntries.put(entry.getKey(), cp); } } catch (Exception e) { throw new Exception( String.format("Exception while adding files to distributed cache of task %s (%s).", taskNameWithSubtask, executionId), e); } if (isCanceledOrFailed()) { throw new CancelTaskException(); } // ---------------------------------------------------------------- // call the user code initialization methods // ---------------------------------------------------------------- TaskKvStateRegistry kvStateRegistry = network.createKvStateTaskRegistry(jobId, getJobVertexId()); Environment env = new RuntimeEnvironment( jobId, vertexId, executionId, executionConfig, taskInfo, jobConfiguration, taskConfiguration, userCodeClassLoader, memoryManager, ioManager, broadcastVariableManager, taskStateManager, accumulatorRegistry, kvStateRegistry, inputSplitProvider, distributedCacheEntries, producedPartitions, inputGates, network.getTaskEventDispatcher(), checkpointResponder, taskManagerConfig, metrics, this); // now load and instantiate the task's invokable code invokable = loadAndInstantiateInvokable(userCodeClassLoader, nameOfInvokableClass, env); // ---------------------------------------------------------------- // actual task core work // ---------------------------------------------------------------- // we must make strictly sure that the invokable is accessible to the cancel() call // by the time we switched to running. this.invokable = invokable; // switch to the RUNNING state, if that fails, we have been canceled/failed in the meantime if (!transitionState(ExecutionState.DEPLOYING, ExecutionState.RUNNING)) { throw new CancelTaskException(); } // notify everyone that we switched to running taskManagerActions.updateTaskExecutionState(new TaskExecutionState(jobId, executionId, ExecutionState.RUNNING)); // make sure the user code classloader is accessible thread-locally executingThread.setContextClassLoader(userCodeClassLoader); // run the invokable invokable.invoke(); // make sure, we enter the catch block if the task leaves the invoke() method due // to the fact that it has been canceled if (isCanceledOrFailed()) { throw new CancelTaskException(); } // ---------------------------------------------------------------- // finalization of a successful execution // ---------------------------------------------------------------- // finish the produced partitions. if this fails, we consider the execution failed. for (ResultPartition partition : producedPartitions) { if (partition != null) { partition.finish(); } } // try to mark the task as finished // if that fails, the task was canceled/failed in the meantime if (!transitionState(ExecutionState.RUNNING, ExecutionState.FINISHED)) { throw new CancelTaskException(); } } catch (Throwable t) { // unwrap wrapped exceptions to make stack traces more compact if (t instanceof WrappingRuntimeException) { t = ((WrappingRuntimeException) t).unwrap(); } // ---------------------------------------------------------------- // the execution failed. either the invokable code properly failed, or // an exception was thrown as a side effect of cancelling // ---------------------------------------------------------------- try { // check if the exception is unrecoverable if (ExceptionUtils.isJvmFatalError(t) || (t instanceof OutOfMemoryError && taskManagerConfig.shouldExitJvmOnOutOfMemoryError())) { // terminate the JVM immediately // don't attempt a clean shutdown, because we cannot expect the clean shutdown to complete try { LOG.error("Encountered fatal error {} - terminating the JVM", t.getClass().getName(), t); } finally { Runtime.getRuntime().halt(-1); } } // transition into our final state. we should be either in DEPLOYING, RUNNING, CANCELING, or FAILED // loop for multiple retries during concurrent state changes via calls to cancel() or // to failExternally() while (true) { ExecutionState current = this.executionState; if (current == ExecutionState.RUNNING || current == ExecutionState.DEPLOYING) { if (t instanceof CancelTaskException) { if (transitionState(current, ExecutionState.CANCELED)) { cancelInvokable(invokable); break; } } else { if (transitionState(current, ExecutionState.FAILED, t)) { // proper failure of the task. record the exception as the root cause failureCause = t; cancelInvokable(invokable); break; } } } else if (current == ExecutionState.CANCELING) { if (transitionState(current, ExecutionState.CANCELED)) { break; } } else if (current == ExecutionState.FAILED) { // in state failed already, no transition necessary any more break; } // unexpected state, go to failed else if (transitionState(current, ExecutionState.FAILED, t)) { LOG.error("Unexpected state in task {} ({}) during an exception: {}.", taskNameWithSubtask, executionId, current); break; } // else fall through the loop and } } catch (Throwable tt) { String message = String.format("FATAL - exception in exception handler of task %s (%s).", taskNameWithSubtask, executionId); LOG.error(message, tt); notifyFatalError(message, tt); } } finally { try { LOG.info("Freeing task resources for {} ({}).", taskNameWithSubtask, executionId); // clear the reference to the invokable. this helps guard against holding references // to the invokable and its structures in cases where this Task object is still referenced this.invokable = null; // stop the async dispatcher. // copy dispatcher reference to stack, against concurrent release ExecutorService dispatcher = this.asyncCallDispatcher; if (dispatcher != null && !dispatcher.isShutdown()) { dispatcher.shutdownNow(); } // free the network resources network.unregisterTask(this); // free memory resources if (invokable != null) { memoryManager.releaseAll(invokable); } // remove all of the tasks library resources libraryCache.unregisterTask(jobId, executionId); fileCache.releaseJob(jobId, executionId); blobService.getPermanentBlobService().releaseJob(jobId); // close and de-activate safety net for task thread LOG.info("Ensuring all FileSystem streams are closed for task {}", this); FileSystemSafetyNet.closeSafetyNetAndGuardedResourcesForThread(); notifyFinalState(); } catch (Throwable t) { // an error in the resource cleanup is fatal String message = String.format("FATAL - exception in resource cleanup of task %s (%s).", taskNameWithSubtask, executionId); LOG.error(message, t); notifyFatalError(message, t); } // un-register the metrics at the end so that the task may already be // counted as finished when this happens // errors here will only be logged try { metrics.close(); } catch (Throwable t) { LOG.error("Error during metrics de-registration of task {} ({}).", taskNameWithSubtask, executionId, t); } } }
Transformation chain
// start all chained tasks BatchTask.openChainedTasks(this.chainedTasks, this);
this.chainedTasks = {ArrayList@7851} size = 3
0 = {ChainedFlatMapDriver@7850} 1 = {ChainedMapDriver@7988} 2 = {SynchronousChainedCombineDriver@7989} ```
// get input splits to read final Iterator<InputSplit> splitIterator = getInputSplits();
獲得文件位置信息
file:/opt/n_001_workspaces/bigdata/flink/flink-maven-scala-2/src/main/resources/data/line.txt:0+6
循環讀取分片信息,讀到的數據是按行的
while (!this.taskCanceled && !format.reachedEnd()) { OT returned; if ((returned = format.nextRecord(serializer.createInstance())) != null) { output.collect(returned); } }
/** * Create an Invokable task and set its environment. * * @param environment The environment assigned to this invokable. */ public DataSourceTask(Environment environment) { super(environment); } @Override public void invoke() throws Exception { // -------------------------------------------------------------------- // Initialize // -------------------------------------------------------------------- initInputFormat(); LOG.debug(getLogString("Start registering input and output")); try { initOutputs(getUserCodeClassLoader()); } catch (Exception ex) { throw new RuntimeException("The initialization of the DataSource's outputs caused an error: " + ex.getMessage(), ex); } LOG.debug(getLogString("Finished registering input and output")); // -------------------------------------------------------------------- // Invoke // -------------------------------------------------------------------- LOG.debug(getLogString("Starting data source operator")); RuntimeContext ctx = createRuntimeContext(); final Counter numRecordsOut; { Counter tmpNumRecordsOut; try { OperatorIOMetricGroup ioMetricGroup = ((OperatorMetricGroup) ctx.getMetricGroup()).getIOMetricGroup(); ioMetricGroup.reuseInputMetricsForTask(); if (this.config.getNumberOfChainedStubs() == 0) { ioMetricGroup.reuseOutputMetricsForTask(); } tmpNumRecordsOut = ioMetricGroup.getNumRecordsOutCounter(); } catch (Exception e) { LOG.warn("An exception occurred during the metrics setup.", e); tmpNumRecordsOut = new SimpleCounter(); } numRecordsOut = tmpNumRecordsOut; } Counter completedSplitsCounter = ctx.getMetricGroup().counter("numSplitsProcessed"); if (RichInputFormat.class.isAssignableFrom(this.format.getClass())) { ((RichInputFormat) this.format).setRuntimeContext(ctx); LOG.debug(getLogString("Rich Source detected. Initializing runtime context.")); ((RichInputFormat) this.format).openInputFormat(); LOG.debug(getLogString("Rich Source detected. Opening the InputFormat.")); } ExecutionConfig executionConfig = getExecutionConfig(); boolean objectReuseEnabled = executionConfig.isObjectReuseEnabled(); LOG.debug("DataSourceTask object reuse: " + (objectReuseEnabled ? "ENABLED" : "DISABLED") + "."); final TypeSerializer<OT> serializer = this.serializerFactory.getSerializer(); try { // start all chained tasks BatchTask.openChainedTasks(this.chainedTasks, this); // get input splits to read final Iterator<InputSplit> splitIterator = getInputSplits(); // for each assigned input split while (!this.taskCanceled && splitIterator.hasNext()) { // get start and end final InputSplit split = splitIterator.next(); LOG.debug(getLogString("Opening input split " + split.toString())); final InputFormat<OT, InputSplit> format = this.format; // open input format format.open(split); LOG.debug(getLogString("Starting to read input from split " + split.toString())); try { final Collector<OT> output = new CountingCollector<>(this.output, numRecordsOut); if (objectReuseEnabled) { OT reuse = serializer.createInstance(); // as long as there is data to read while (!this.taskCanceled && !format.reachedEnd()) { OT returned; if ((returned = format.nextRecord(reuse)) != null) { output.collect(returned); } } } else { // as long as there is data to read while (!this.taskCanceled && !format.reachedEnd()) { OT returned; if ((returned = format.nextRecord(serializer.createInstance())) != null) { output.collect(returned); } } } if (LOG.isDebugEnabled() && !this.taskCanceled) { LOG.debug(getLogString("Closing input split " + split.toString())); } } finally { // close. We close here such that a regular close throwing an exception marks a task as failed. format.close(); } completedSplitsCounter.inc(); } // end for all input splits // close the collector. if it is a chaining task collector, it will close its chained tasks this.output.close(); // close all chained tasks letting them report failure BatchTask.closeChainedTasks(this.chainedTasks, this); } catch (Exception ex) { // close the input, but do not report any exceptions, since we already have another root cause try { this.format.close(); } catch (Throwable ignored) {} BatchTask.cancelChainedTasks(this.chainedTasks); ex = ExceptionInChainedStubException.exceptionUnwrap(ex); if (ex instanceof CancelTaskException) { // forward canceling exception throw ex; } else if (!this.taskCanceled) { // drop exception, if the task was canceled BatchTask.logAndThrowException(ex, this); } } finally { BatchTask.clearWriters(eventualOutputs); // -------------------------------------------------------------------- // Closing // -------------------------------------------------------------------- if (this.format != null && RichInputFormat.class.isAssignableFrom(this.format.getClass())) { ((RichInputFormat) this.format).closeInputFormat(); LOG.debug(getLogString("Rich Source detected. Closing the InputFormat.")); } } if (!this.taskCanceled) { LOG.debug(getLogString("Finished data source operator")); } else { LOG.debug(getLogString("Data source operator cancelled")); } }
public OT nextRecord(OT record) throws IOException { if (readLine()) { return readRecord(record, this.currBuffer, this.currOffset, this.currLen); } else { this.end = true; return null; } }
protected final boolean readLine() throws IOException { if (this.stream == null || this.overLimit) { return false; } int countInWrapBuffer = 0; // position of matching positions in the delimiter byte array int delimPos = 0; while (true) { if (this.readPos >= this.limit) { // readBuffer is completely consumed. Fill it again but keep partially read delimiter bytes. if (!fillBuffer(delimPos)) { int countInReadBuffer = delimPos; if (countInWrapBuffer + countInReadBuffer > 0) { // we have bytes left to emit if (countInReadBuffer > 0) { // we have bytes left in the readBuffer. Move them into the wrapBuffer if (this.wrapBuffer.length - countInWrapBuffer < countInReadBuffer) { // reallocate byte[] tmp = new byte[countInWrapBuffer + countInReadBuffer]; System.arraycopy(this.wrapBuffer, 0, tmp, 0, countInWrapBuffer); this.wrapBuffer = tmp; } // copy readBuffer bytes to wrapBuffer System.arraycopy(this.readBuffer, 0, this.wrapBuffer, countInWrapBuffer, countInReadBuffer); countInWrapBuffer += countInReadBuffer; } this.offset += countInWrapBuffer; setResult(this.wrapBuffer, 0, countInWrapBuffer); return true; } else { return false; } } } int startPos = this.readPos - delimPos; int count; // Search for next occurrence of delimiter in read buffer. while (this.readPos < this.limit && delimPos < this.delimiter.length) { if ((this.readBuffer[this.readPos]) == this.delimiter[delimPos]) { // Found the expected delimiter character. Continue looking for the next character of delimiter. delimPos++; } else { // Delimiter does not match. // We have to reset the read position to the character after the first matching character // and search for the whole delimiter again. readPos -= delimPos; delimPos = 0; } readPos++; } // check why we dropped out if (delimPos == this.delimiter.length) { // we found a delimiter int readBufferBytesRead = this.readPos - startPos; this.offset += countInWrapBuffer + readBufferBytesRead; count = readBufferBytesRead - this.delimiter.length; // copy to byte array if (countInWrapBuffer > 0) { // check wrap buffer size if (this.wrapBuffer.length < countInWrapBuffer + count) { final byte[] nb = new byte[countInWrapBuffer + count]; System.arraycopy(this.wrapBuffer, 0, nb, 0, countInWrapBuffer); this.wrapBuffer = nb; } if (count >= 0) { System.arraycopy(this.readBuffer, 0, this.wrapBuffer, countInWrapBuffer, count); } setResult(this.wrapBuffer, 0, countInWrapBuffer + count); return true; } else { setResult(this.readBuffer, startPos, count); return true; } } else { // we reached the end of the readBuffer count = this.limit - startPos; // check against the maximum record length if (((long) countInWrapBuffer) + count > this.lineLengthLimit) { throw new IOException("The record length exceeded the maximum record length (" + this.lineLengthLimit + ")."); } // Compute number of bytes to move to wrapBuffer // Chars of partially read delimiter must remain in the readBuffer. We might need to go back. int bytesToMove = count - delimPos; // ensure wrapBuffer is large enough if (this.wrapBuffer.length - countInWrapBuffer < bytesToMove) { // reallocate byte[] tmp = new byte[Math.max(this.wrapBuffer.length * 2, countInWrapBuffer + bytesToMove)]; System.arraycopy(this.wrapBuffer, 0, tmp, 0, countInWrapBuffer); this.wrapBuffer = tmp; } // copy readBuffer to wrapBuffer (except delimiter chars) System.arraycopy(this.readBuffer, startPos, this.wrapBuffer, countInWrapBuffer, bytesToMove); countInWrapBuffer += bytesToMove; // move delimiter chars to the beginning of the readBuffer System.arraycopy(this.readBuffer, this.readPos - delimPos, this.readBuffer, 0, delimPos); } } }