本文主要研究一下flink的InputFormatSourceFunctionhtml
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); IteratorInputFormat iteratorInputFormat = new IteratorInputFormat<String>(new WordIterator()); env //TypeInformation.of(new TypeHint<String>() {} .createInput(iteratorInputFormat,TypeExtractor.createTypeInfo(String.class)) .setParallelism(1) .print();
flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.javajava
@PublicEvolving public <OUT> DataStreamSource<OUT> createInput(InputFormat<OUT, ?> inputFormat, TypeInformation<OUT> typeInfo) { DataStreamSource<OUT> source; if (inputFormat instanceof FileInputFormat) { @SuppressWarnings("unchecked") FileInputFormat<OUT> format = (FileInputFormat<OUT>) inputFormat; source = createFileInput(format, typeInfo, "Custom File source", FileProcessingMode.PROCESS_ONCE, -1); } else { source = createInput(inputFormat, typeInfo, "Custom Source"); } return source; } private <OUT> DataStreamSource<OUT> createInput(InputFormat<OUT, ?> inputFormat, TypeInformation<OUT> typeInfo, String sourceName) { InputFormatSourceFunction<OUT> function = new InputFormatSourceFunction<>(inputFormat, typeInfo); return addSource(function, sourceName, typeInfo); }
/** * A {@link SourceFunction} that reads data using an {@link InputFormat}. */ @Internal public class InputFormatSourceFunction<OUT> extends RichParallelSourceFunction<OUT> { private static final long serialVersionUID = 1L; private TypeInformation<OUT> typeInfo; private transient TypeSerializer<OUT> serializer; private InputFormat<OUT, InputSplit> format; private transient InputSplitProvider provider; private transient Iterator<InputSplit> splitIterator; private volatile boolean isRunning = true; @SuppressWarnings("unchecked") public InputFormatSourceFunction(InputFormat<OUT, ?> format, TypeInformation<OUT> typeInfo) { this.format = (InputFormat<OUT, InputSplit>) format; this.typeInfo = typeInfo; } @Override @SuppressWarnings("unchecked") public void open(Configuration parameters) throws Exception { StreamingRuntimeContext context = (StreamingRuntimeContext) getRuntimeContext(); if (format instanceof RichInputFormat) { ((RichInputFormat) format).setRuntimeContext(context); } format.configure(parameters); provider = context.getInputSplitProvider(); serializer = typeInfo.createSerializer(getRuntimeContext().getExecutionConfig()); splitIterator = getInputSplits(); isRunning = splitIterator.hasNext(); } @Override public void run(SourceContext<OUT> ctx) throws Exception { try { Counter completedSplitsCounter = getRuntimeContext().getMetricGroup().counter("numSplitsProcessed"); if (isRunning && format instanceof RichInputFormat) { ((RichInputFormat) format).openInputFormat(); } OUT nextElement = serializer.createInstance(); while (isRunning) { format.open(splitIterator.next()); // for each element we also check if cancel // was called by checking the isRunning flag while (isRunning && !format.reachedEnd()) { nextElement = format.nextRecord(nextElement); if (nextElement != null) { ctx.collect(nextElement); } else { break; } } format.close(); completedSplitsCounter.inc(); if (isRunning) { isRunning = splitIterator.hasNext(); } } } finally { format.close(); if (format instanceof RichInputFormat) { ((RichInputFormat) format).closeInputFormat(); } isRunning = false; } } @Override public void cancel() { isRunning = false; } @Override public void close() throws Exception { format.close(); if (format instanceof RichInputFormat) { ((RichInputFormat) format).closeInputFormat(); } } /** * Returns the {@code InputFormat}. This is only needed because we need to set the input * split assigner on the {@code StreamGraph}. */ public InputFormat<OUT, InputSplit> getFormat() { return format; } private Iterator<InputSplit> getInputSplits() { return new Iterator<InputSplit>() { private InputSplit nextSplit; private boolean exhausted; @Override public boolean hasNext() { if (exhausted) { return false; } if (nextSplit != null) { return true; } final InputSplit split; try { split = provider.getNextInputSplit(getRuntimeContext().getUserCodeClassLoader()); } catch (InputSplitProviderException e) { throw new RuntimeException("Could not retrieve next input split.", e); } if (split != null) { this.nextSplit = split; return true; } else { exhausted = true; return false; } } @Override public InputSplit next() { if (this.nextSplit == null && !hasNext()) { throw new NoSuchElementException(); } final InputSplit tmp = this.nextSplit; this.nextSplit = null; return tmp; } @Override public void remove() { throw new UnsupportedOperationException(); } }; } }
splitIterator
),nextSplit是調用InputSplitProvider.getNextInputSplit來獲取flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/jobgraph/tasks/InputSplitProvider.javaapache
/** * An input split provider can be successively queried to provide a series of {@link InputSplit} objects a * task is supposed to consume in the course of its execution. */ @Public public interface InputSplitProvider { /** * Requests the next input split to be consumed by the calling task. * * @param userCodeClassLoader used to deserialize input splits * @return the next input split to be consumed by the calling task or <code>null</code> if the * task shall not consume any further input splits. * @throws InputSplitProviderException if fetching the next input split fails */ InputSplit getNextInputSplit(ClassLoader userCodeClassLoader) throws InputSplitProviderException; }
flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/taskexecutor/rpc/RpcInputSplitProvider.javaapi
public class RpcInputSplitProvider implements InputSplitProvider { private final JobMasterGateway jobMasterGateway; private final JobVertexID jobVertexID; private final ExecutionAttemptID executionAttemptID; private final Time timeout; public RpcInputSplitProvider( JobMasterGateway jobMasterGateway, JobVertexID jobVertexID, ExecutionAttemptID executionAttemptID, Time timeout) { this.jobMasterGateway = Preconditions.checkNotNull(jobMasterGateway); this.jobVertexID = Preconditions.checkNotNull(jobVertexID); this.executionAttemptID = Preconditions.checkNotNull(executionAttemptID); this.timeout = Preconditions.checkNotNull(timeout); } @Override public InputSplit getNextInputSplit(ClassLoader userCodeClassLoader) throws InputSplitProviderException { Preconditions.checkNotNull(userCodeClassLoader); CompletableFuture<SerializedInputSplit> futureInputSplit = jobMasterGateway.requestNextInputSplit( jobVertexID, executionAttemptID); try { SerializedInputSplit serializedInputSplit = futureInputSplit.get(timeout.getSize(), timeout.getUnit()); if (serializedInputSplit.isEmpty()) { return null; } else { return InstantiationUtil.deserializeObject(serializedInputSplit.getInputSplitData(), userCodeClassLoader); } } catch (Exception e) { throw new InputSplitProviderException("Requesting the next input split failed.", e); } } }
本實例的splitProvider爲RpcInputSplitProvider
)flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/taskmanager/TaskInputSplitProvider.javaide
/** * Implementation using {@link ActorGateway} to forward the messages. */ public class TaskInputSplitProvider implements InputSplitProvider { private final ActorGateway jobManager; private final JobID jobID; private final JobVertexID vertexID; private final ExecutionAttemptID executionID; private final FiniteDuration timeout; public TaskInputSplitProvider( ActorGateway jobManager, JobID jobID, JobVertexID vertexID, ExecutionAttemptID executionID, FiniteDuration timeout) { this.jobManager = Preconditions.checkNotNull(jobManager); this.jobID = Preconditions.checkNotNull(jobID); this.vertexID = Preconditions.checkNotNull(vertexID); this.executionID = Preconditions.checkNotNull(executionID); this.timeout = Preconditions.checkNotNull(timeout); } @Override public InputSplit getNextInputSplit(ClassLoader userCodeClassLoader) throws InputSplitProviderException { Preconditions.checkNotNull(userCodeClassLoader); final Future<Object> response = jobManager.ask( new JobManagerMessages.RequestNextInputSplit(jobID, vertexID, executionID), timeout); final Object result; try { result = Await.result(response, timeout); } catch (Exception e) { throw new InputSplitProviderException("Did not receive next input split from JobManager.", e); } if(result instanceof JobManagerMessages.NextInputSplit){ final JobManagerMessages.NextInputSplit nextInputSplit = (JobManagerMessages.NextInputSplit) result; byte[] serializedData = nextInputSplit.splitData(); if(serializedData == null) { return null; } else { final Object deserialized; try { deserialized = InstantiationUtil.deserializeObject(serializedData, userCodeClassLoader); } catch (Exception e) { throw new InputSplitProviderException("Could not deserialize the serialized input split.", e); } return (InputSplit) deserialized; } } else { throw new InputSplitProviderException("RequestNextInputSplit requires a response of type " + "NextInputSplit. Instead response is of type " + result.getClass() + '.'); } } }
flink-core-1.6.2-sources.jar!/org/apache/flink/core/io/InputSplit.javafetch
/** * This interface must be implemented by all kind of input splits that can be assigned to input formats. * * <p>Input splits are transferred in serialized form via the messages, so they need to be serializable * as defined by {@link java.io.Serializable}.</p> */ @Public public interface InputSplit extends Serializable { /** * Returns the number of this input split. * * @return the number of this input split */ int getSplitNumber(); }
flink-core-1.6.2-sources.jar!/org/apache/flink/core/io/GenericInputSplit.javaui
/** * A generic input split that has only a partition number. */ @Public public class GenericInputSplit implements InputSplit, java.io.Serializable { private static final long serialVersionUID = 1L; /** The number of this split. */ private final int partitionNumber; /** The total number of partitions */ private final int totalNumberOfPartitions; // -------------------------------------------------------------------------------------------- /** * Creates a generic input split with the given split number. * * @param partitionNumber The number of the split's partition. * @param totalNumberOfPartitions The total number of the splits (partitions). */ public GenericInputSplit(int partitionNumber, int totalNumberOfPartitions) { this.partitionNumber = partitionNumber; this.totalNumberOfPartitions = totalNumberOfPartitions; } //...... public String toString() { return "GenericSplit (" + this.partitionNumber + '/' + this.totalNumberOfPartitions + ')'; } }
本實例的InputSplit爲GenericInputSplit類型
)flink-core-1.6.2-sources.jar!/org/apache/flink/core/io/LocatableInputSplit.javathis
/** * A locatable input split is an input split referring to input data which is located on one or more hosts. */ @Public public class LocatableInputSplit implements InputSplit, java.io.Serializable { private static final long serialVersionUID = 1L; private static final String[] EMPTY_ARR = new String[0]; /** The number of the split. */ private final int splitNumber; /** The names of the hosts storing the data this input split refers to. */ private final String[] hostnames; // -------------------------------------------------------------------------------------------- /** * Creates a new locatable input split that refers to a multiple host as its data location. * * @param splitNumber The number of the split * @param hostnames The names of the hosts storing the data this input split refers to. */ public LocatableInputSplit(int splitNumber, String[] hostnames) { this.splitNumber = splitNumber; this.hostnames = hostnames == null ? EMPTY_ARR : hostnames; } /** * Creates a new locatable input split that refers to a single host as its data location. * * @param splitNumber The number of the split. * @param hostname The names of the host storing the data this input split refers to. */ public LocatableInputSplit(int splitNumber, String hostname) { this.splitNumber = splitNumber; this.hostnames = hostname == null ? EMPTY_ARR : new String[] { hostname }; } //...... @Override public String toString() { return "Locatable Split (" + splitNumber + ") at " + Arrays.toString(this.hostnames); } }
flink-java-1.6.2-sources.jar!/org/apache/flink/api/java/io/IteratorInputFormat.javadebug
/** * An input format that returns objects from an iterator. */ @PublicEvolving public class IteratorInputFormat<T> extends GenericInputFormat<T> implements NonParallelInput { private static final long serialVersionUID = 1L; private Iterator<T> iterator; // input data as serializable iterator public IteratorInputFormat(Iterator<T> iterator) { if (!(iterator instanceof Serializable)) { throw new IllegalArgumentException("The data source iterator must be serializable."); } this.iterator = iterator; } @Override public boolean reachedEnd() { return !this.iterator.hasNext(); } @Override public T nextRecord(T record) { return this.iterator.next(); } }
flink-core-1.6.2-sources.jar!/org/apache/flink/api/common/io/GenericInputFormat.javacode
/** * Generic base class for all Rich inputs that are not based on files. */ @Public public abstract class GenericInputFormat<OT> extends RichInputFormat<OT, GenericInputSplit> { private static final long serialVersionUID = 1L; /** * The partition of this split. */ protected int partitionNumber; // -------------------------------------------------------------------------------------------- @Override public void configure(Configuration parameters) { // nothing by default } @Override public BaseStatistics getStatistics(BaseStatistics cachedStatistics) throws IOException { // no statistics available, by default. return cachedStatistics; } @Override public GenericInputSplit[] createInputSplits(int numSplits) throws IOException { if (numSplits < 1) { throw new IllegalArgumentException("Number of input splits has to be at least 1."); } numSplits = (this instanceof NonParallelInput) ? 1 : numSplits; GenericInputSplit[] splits = new GenericInputSplit[numSplits]; for (int i = 0; i < splits.length; i++) { splits[i] = new GenericInputSplit(i, numSplits); } return splits; } @Override public DefaultInputSplitAssigner getInputSplitAssigner(GenericInputSplit[] splits) { return new DefaultInputSplitAssigner(splits); } // -------------------------------------------------------------------------------------------- @Override public void open(GenericInputSplit split) throws IOException { this.partitionNumber = split.getSplitNumber(); } @Override public void close() throws IOException {} }
從vertex.getSplitAssigner()獲取
)根據numTaskVertices來分割
)及getInputSplitAssigner方法均爲父類GenericInputFormat提供flink-core-1.6.2-sources.jar!/org/apache/flink/api/common/io/DefaultInputSplitAssigner.java
/** * This is the default implementation of the {@link InputSplitAssigner} interface. The default input split assigner * simply returns all input splits of an input vertex in the order they were originally computed. */ @Internal public class DefaultInputSplitAssigner implements InputSplitAssigner { /** The logging object used to report information and errors. */ private static final Logger LOG = LoggerFactory.getLogger(DefaultInputSplitAssigner.class); /** The list of all splits */ private final List<InputSplit> splits = new ArrayList<InputSplit>(); public DefaultInputSplitAssigner(InputSplit[] splits) { Collections.addAll(this.splits, splits); } public DefaultInputSplitAssigner(Collection<? extends InputSplit> splits) { this.splits.addAll(splits); } @Override public InputSplit getNextInputSplit(String host, int taskId) { InputSplit next = null; // keep the synchronized part short synchronized (this.splits) { if (this.splits.size() > 0) { next = this.splits.remove(this.splits.size() - 1); } } if (LOG.isDebugEnabled()) { if (next == null) { LOG.debug("No more input splits available"); } else { LOG.debug("Assigning split " + next + " to " + host); } } return next; } }
這裏是根據numTaskVertices來分割
),一個是getInputSplitAssigner(這裏建立的是DefaultInputSplitAssigner,即按順序返回分割好的InputSplit
)能夠看到整個大的邏輯就是GenericInputFormat提供將input分割爲InputSplit的方法,同時提供InputSplitAssigner,而後InputFormatSourceFunction就是挨個遍歷分割好的屬於本身(Task
)的InputSplit(經過InputSplitAssigner獲取
),而後經過InputFormat讀取InputSplit來挨個獲取這個InputSplit的每一個元素,而後經過SourceContext的emit方法發射出去