本文主要研究一下flink的CsvReaderhtml
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSet<RecordDto> csvInput = env.readCsvFile(csvFilePath) .pojoType(RecordDto.class, "playerName", "country", "year", "game", "gold", "silver", "bronze", "total"); csvInput.map(new MapFunction<RecordDto, RecordDto>() { @Override public RecordDto map(RecordDto value) throws Exception { LOGGER.info("execute map:{}",value); TimeUnit.SECONDS.sleep(5); return value; } }).print();
flink-java-1.6.2-sources.jar!/org/apache/flink/api/java/ExecutionEnvironment.javajava
/** * Creates a CSV reader to read a comma separated value (CSV) file. The reader has options to * define parameters and field types and will eventually produce the DataSet that corresponds to * the read and parsed CSV input. * * @param filePath The path of the CSV file. * @return A CsvReader that can be used to configure the CSV input. */ public CsvReader readCsvFile(String filePath) { return new CsvReader(filePath, this); }
flink-java-1.6.2-sources.jar!/org/apache/flink/api/java/io/CsvReader.javaapache
public CsvReader(String filePath, ExecutionEnvironment executionContext) { this(new Path(Preconditions.checkNotNull(filePath, "The file path may not be null.")), executionContext); } public CsvReader(Path filePath, ExecutionEnvironment executionContext) { Preconditions.checkNotNull(filePath, "The file path may not be null."); Preconditions.checkNotNull(executionContext, "The execution context may not be null."); this.path = filePath; this.executionContext = executionContext; } /** * Configures the reader to read the CSV data and parse it to the given type. The all fields of the type * must be public or able to set value. The type information for the fields is obtained from the type class. * * @param pojoType The class of the target POJO. * @param pojoFields The fields of the POJO which are mapped to CSV fields. * @return The DataSet representing the parsed CSV data. */ public <T> DataSource<T> pojoType(Class<T> pojoType, String... pojoFields) { Preconditions.checkNotNull(pojoType, "The POJO type class must not be null."); Preconditions.checkNotNull(pojoFields, "POJO fields must be specified (not null) if output type is a POJO."); final TypeInformation<T> ti = TypeExtractor.createTypeInfo(pojoType); if (!(ti instanceof PojoTypeInfo)) { throw new IllegalArgumentException( "The specified class is not a POJO. The type class must meet the POJO requirements. Found: " + ti); } final PojoTypeInfo<T> pti = (PojoTypeInfo<T>) ti; CsvInputFormat<T> inputFormat = new PojoCsvInputFormat<T>(path, this.lineDelimiter, this.fieldDelimiter, pti, pojoFields, this.includedMask); configureInputFormat(inputFormat); return new DataSource<T>(executionContext, inputFormat, pti, Utils.getCallLocationName()); }
flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/taskmanager/Task.javabootstrap
/** * The Task represents one execution of a parallel subtask on a TaskManager. * A Task wraps a Flink operator (which may be a user function) and * runs it, providing all services necessary for example to consume input data, * produce its results (intermediate result partitions) and communicate * with the JobManager. * * <p>The Flink operators (implemented as subclasses of * {@link AbstractInvokable} have only data readers, -writers, and certain event callbacks. * The task connects those to the network stack and actor messages, and tracks the state * of the execution and handles exceptions. * * <p>Tasks have no knowledge about how they relate to other tasks, or whether they * are the first attempt to execute the task, or a repeated attempt. All of that * is only known to the JobManager. All the task knows are its own runnable code, * the task's configuration, and the IDs of the intermediate results to consume and * produce (if any). * * <p>Each Task is run by one dedicated thread. */ public class Task implements Runnable, TaskActions, CheckpointListener { //...... /** * The core work method that bootstraps the task and executes its code. */ @Override public void run() { //...... // now load and instantiate the task's invokable code invokable = loadAndInstantiateInvokable(userCodeClassLoader, nameOfInvokableClass, env); // ---------------------------------------------------------------- // actual task core work // ---------------------------------------------------------------- // we must make strictly sure that the invokable is accessible to the cancel() call // by the time we switched to running. this.invokable = invokable; // switch to the RUNNING state, if that fails, we have been canceled/failed in the meantime if (!transitionState(ExecutionState.DEPLOYING, ExecutionState.RUNNING)) { throw new CancelTaskException(); } // notify everyone that we switched to running notifyObservers(ExecutionState.RUNNING, null); taskManagerActions.updateTaskExecutionState(new TaskExecutionState(jobId, executionId, ExecutionState.RUNNING)); // make sure the user code classloader is accessible thread-locally executingThread.setContextClassLoader(userCodeClassLoader); // run the invokable invokable.invoke(); //...... } }
flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/operators/DataSourceTask.javawindows
@Override public void invoke() throws Exception { // -------------------------------------------------------------------- // Initialize // -------------------------------------------------------------------- initInputFormat(); LOG.debug(getLogString("Start registering input and output")); try { initOutputs(getUserCodeClassLoader()); } catch (Exception ex) { throw new RuntimeException("The initialization of the DataSource's outputs caused an error: " + ex.getMessage(), ex); } LOG.debug(getLogString("Finished registering input and output")); // -------------------------------------------------------------------- // Invoke // -------------------------------------------------------------------- LOG.debug(getLogString("Starting data source operator")); RuntimeContext ctx = createRuntimeContext(); final Counter numRecordsOut; { Counter tmpNumRecordsOut; try { OperatorIOMetricGroup ioMetricGroup = ((OperatorMetricGroup) ctx.getMetricGroup()).getIOMetricGroup(); ioMetricGroup.reuseInputMetricsForTask(); if (this.config.getNumberOfChainedStubs() == 0) { ioMetricGroup.reuseOutputMetricsForTask(); } tmpNumRecordsOut = ioMetricGroup.getNumRecordsOutCounter(); } catch (Exception e) { LOG.warn("An exception occurred during the metrics setup.", e); tmpNumRecordsOut = new SimpleCounter(); } numRecordsOut = tmpNumRecordsOut; } Counter completedSplitsCounter = ctx.getMetricGroup().counter("numSplitsProcessed"); if (RichInputFormat.class.isAssignableFrom(this.format.getClass())) { ((RichInputFormat) this.format).setRuntimeContext(ctx); LOG.debug(getLogString("Rich Source detected. Initializing runtime context.")); ((RichInputFormat) this.format).openInputFormat(); LOG.debug(getLogString("Rich Source detected. Opening the InputFormat.")); } ExecutionConfig executionConfig = getExecutionConfig(); boolean objectReuseEnabled = executionConfig.isObjectReuseEnabled(); LOG.debug("DataSourceTask object reuse: " + (objectReuseEnabled ? "ENABLED" : "DISABLED") + "."); final TypeSerializer<OT> serializer = this.serializerFactory.getSerializer(); try { // start all chained tasks BatchTask.openChainedTasks(this.chainedTasks, this); // get input splits to read final Iterator<InputSplit> splitIterator = getInputSplits(); // for each assigned input split while (!this.taskCanceled && splitIterator.hasNext()) { // get start and end final InputSplit split = splitIterator.next(); LOG.debug(getLogString("Opening input split " + split.toString())); final InputFormat<OT, InputSplit> format = this.format; // open input format format.open(split); LOG.debug(getLogString("Starting to read input from split " + split.toString())); try { final Collector<OT> output = new CountingCollector<>(this.output, numRecordsOut); if (objectReuseEnabled) { OT reuse = serializer.createInstance(); // as long as there is data to read while (!this.taskCanceled && !format.reachedEnd()) { OT returned; if ((returned = format.nextRecord(reuse)) != null) { output.collect(returned); } } } else { // as long as there is data to read while (!this.taskCanceled && !format.reachedEnd()) { OT returned; if ((returned = format.nextRecord(serializer.createInstance())) != null) { output.collect(returned); } } } if (LOG.isDebugEnabled() && !this.taskCanceled) { LOG.debug(getLogString("Closing input split " + split.toString())); } } finally { // close. We close here such that a regular close throwing an exception marks a task as failed. format.close(); } completedSplitsCounter.inc(); } // end for all input splits // close the collector. if it is a chaining task collector, it will close its chained tasks this.output.close(); // close all chained tasks letting them report failure BatchTask.closeChainedTasks(this.chainedTasks, this); } catch (Exception ex) { // close the input, but do not report any exceptions, since we already have another root cause try { this.format.close(); } catch (Throwable ignored) {} BatchTask.cancelChainedTasks(this.chainedTasks); ex = ExceptionInChainedStubException.exceptionUnwrap(ex); if (ex instanceof CancelTaskException) { // forward canceling exception throw ex; } else if (!this.taskCanceled) { // drop exception, if the task was canceled BatchTask.logAndThrowException(ex, this); } } finally { BatchTask.clearWriters(eventualOutputs); // -------------------------------------------------------------------- // Closing // -------------------------------------------------------------------- if (this.format != null && RichInputFormat.class.isAssignableFrom(this.format.getClass())) { ((RichInputFormat) this.format).closeInputFormat(); LOG.debug(getLogString("Rich Source detected. Closing the InputFormat.")); } } if (!this.taskCanceled) { LOG.debug(getLogString("Finished data source operator")); } else { LOG.debug(getLogString("Data source operator cancelled")); } }
PojoCsvInputFormat
),不過nextRecord以及reachedEnd方法它是調用的父類DelimitedInputFormatflink-core-1.6.2-sources.jar!/org/apache/flink/api/common/io/DelimitedInputFormat.javaapi
/** * The default read buffer size = 1MB. */ private static final int DEFAULT_READ_BUFFER_SIZE = 1024 * 1024; private transient byte[] readBuffer; private int bufferSize = -1; private void initBuffers() { this.bufferSize = this.bufferSize <= 0 ? DEFAULT_READ_BUFFER_SIZE : this.bufferSize; if (this.bufferSize <= this.delimiter.length) { throw new IllegalArgumentException("Buffer size must be greater than length of delimiter."); } if (this.readBuffer == null || this.readBuffer.length != this.bufferSize) { this.readBuffer = new byte[this.bufferSize]; } if (this.wrapBuffer == null || this.wrapBuffer.length < 256) { this.wrapBuffer = new byte[256]; } this.readPos = 0; this.limit = 0; this.overLimit = false; this.end = false; } /** * Checks whether the current split is at its end. * * @return True, if the split is at its end, false otherwise. */ @Override public boolean reachedEnd() { return this.end; } @Override public OT nextRecord(OT record) throws IOException { if (readLine()) { return readRecord(record, this.currBuffer, this.currOffset, this.currLen); } else { this.end = true; return null; } } /** * Fills the read buffer with bytes read from the file starting from an offset. */ private boolean fillBuffer(int offset) throws IOException { int maxReadLength = this.readBuffer.length - offset; // special case for reading the whole split. if (this.splitLength == FileInputFormat.READ_WHOLE_SPLIT_FLAG) { int read = this.stream.read(this.readBuffer, offset, maxReadLength); if (read == -1) { this.stream.close(); this.stream = null; return false; } else { this.readPos = offset; this.limit = read; return true; } } // else .. int toRead; if (this.splitLength > 0) { // if we have more data, read that toRead = this.splitLength > maxReadLength ? maxReadLength : (int) this.splitLength; } else { // if we have exhausted our split, we need to complete the current record, or read one // more across the next split. // the reason is that the next split will skip over the beginning until it finds the first // delimiter, discarding it as an incomplete chunk of data that belongs to the last record in the // previous split. toRead = maxReadLength; this.overLimit = true; } int read = this.stream.read(this.readBuffer, offset, toRead); if (read == -1) { this.stream.close(); this.stream = null; return false; } else { this.splitLength -= read; this.readPos = offset; // position from where to start reading this.limit = read + offset; // number of valid bytes in the read buffer return true; } }
DelimitedInputFormat.getStatistics方法裏頭FileInputSplit的length
)及maxReadLength來肯定toRead,以後從offset開始到toRead從文件讀取數據到readBuffer中,而後設置currBuffer、currOffset、currLenflink-java-1.6.2-sources.jar!/org/apache/flink/api/java/io/CsvInputFormat.javaapp
@Override public OUT readRecord(OUT reuse, byte[] bytes, int offset, int numBytes) throws IOException { /* * Fix to support windows line endings in CSVInputFiles with standard delimiter setup = \n */ // Found window's end line, so find carriage return before the newline if (this.lineDelimiterIsLinebreak && numBytes > 0 && bytes[offset + numBytes - 1] == '\r') { //reduce the number of bytes so that the Carriage return is not taken as data numBytes--; } if (commentPrefix != null && commentPrefix.length <= numBytes) { //check record for comments boolean isComment = true; for (int i = 0; i < commentPrefix.length; i++) { if (commentPrefix[i] != bytes[offset + i]) { isComment = false; break; } } if (isComment) { this.commentCount++; return null; } } if (parseRecord(parsedValues, bytes, offset, numBytes)) { return fillRecord(reuse, parsedValues); } else { this.invalidLineCount++; return null; } }
Object[]
),以後調用子類的fillRecord方法(這裏是PojoCsvInputFormat
)將parsedValues填充到reuse對象(該對象是DataSourceTask在調用format.nextRecord時傳入的serializer.createInstance()
)flink-java-1.6.2-sources.jar!/org/apache/flink/api/java/io/PojoCsvInputFormat.javaide
/** * Input format that reads csv into POJOs. * @param <OUT> resulting POJO type */ @Internal public class PojoCsvInputFormat<OUT> extends CsvInputFormat<OUT> { //...... @Override public void open(FileInputSplit split) throws IOException { super.open(split); pojoFields = new Field[pojoFieldNames.length]; Map<String, Field> allFields = new HashMap<String, Field>(); findAllFields(pojoTypeClass, allFields); for (int i = 0; i < pojoFieldNames.length; i++) { pojoFields[i] = allFields.get(pojoFieldNames[i]); if (pojoFields[i] != null) { pojoFields[i].setAccessible(true); } else { throw new RuntimeException("There is no field called \"" + pojoFieldNames[i] + "\" in " + pojoTypeClass.getName()); } } } @Override public OUT fillRecord(OUT reuse, Object[] parsedValues) { for (int i = 0; i < parsedValues.length; i++) { try { pojoFields[i].set(reuse, parsedValues[i]); } catch (IllegalAccessException e) { throw new RuntimeException("Parsed value could not be set in POJO field \"" + pojoFieldNames[i] + "\"", e); } } return reuse; } //...... }
flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/operators/util/metrics/CountingCollector.javaui
public class CountingCollector<OUT> implements Collector<OUT> { private final Collector<OUT> collector; private final Counter numRecordsOut; public CountingCollector(Collector<OUT> collector, Counter numRecordsOut) { this.collector = collector; this.numRecordsOut = numRecordsOut; } @Override public void collect(OUT record) { this.numRecordsOut.inc(); this.collector.collect(record); } @Override public void close() { this.collector.close(); } }
flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/operators/chaining/ChainedMapDriver.javathis
@Override public void collect(IT record) { try { this.numRecordsIn.inc(); this.outputCollector.collect(this.mapper.map(record)); } catch (Exception ex) { throw new ExceptionInChainedStubException(this.taskName, ex); } }
flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/operators/shipping/OutputCollector.java
/** * Collects a record and emits it to all writers. */ @Override public void collect(T record) { if (record != null) { this.delegate.setInstance(record); try { for (RecordWriter<SerializationDelegate<T>> writer : writers) { writer.emit(this.delegate); } } catch (IOException e) { throw new RuntimeException("Emitting the record caused an I/O exception: " + e.getMessage(), e); } catch (InterruptedException e) { throw new RuntimeException("Emitting the record was interrupted: " + e.getMessage(), e); } } else { throw new NullPointerException("The system does not support records that are null." + "Null values are only supported as fields inside other objects."); } }
flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
public void emit(T record) throws IOException, InterruptedException { for (int targetChannel : channelSelector.selectChannels(record, numChannels)) { sendToTarget(record, targetChannel); } }
flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/operators/shipping/OutputEmitter.java
@Override public final int[] selectChannels(SerializationDelegate<T> record, int numberOfChannels) { switch (strategy) { case FORWARD: return forward(); case PARTITION_RANDOM: case PARTITION_FORCED_REBALANCE: return robin(numberOfChannels); case PARTITION_HASH: return hashPartitionDefault(record.getInstance(), numberOfChannels); case BROADCAST: return broadcast(numberOfChannels); case PARTITION_CUSTOM: return customPartition(record.getInstance(), numberOfChannels); case PARTITION_RANGE: return rangePartition(record.getInstance(), numberOfChannels); default: throw new UnsupportedOperationException("Unsupported distribution strategy: " + strategy.name()); } } private int[] forward() { return this.channels; }
DelimitedInputFormat.getStatistics方法裏頭FileInputSplit的length
)及maxReadLength來肯定toRead,以後從offset開始到toRead從文件讀取數據到readBuffer中包裝了org.apache.flink.runtime.operators.shipping.OutputCollector的CountingCollector
),直到taskCanceled或者format.reachedEnd()