聊聊flink的CsvReader

本文主要研究一下flink的CsvReaderhtml

實例

final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        DataSet<RecordDto> csvInput = env.readCsvFile(csvFilePath)
                .pojoType(RecordDto.class, "playerName", "country", "year", "game", "gold", "silver", "bronze", "total");

        csvInput.map(new MapFunction<RecordDto, RecordDto>() {
            @Override
            public RecordDto map(RecordDto value) throws Exception {
                LOGGER.info("execute map:{}",value);
                TimeUnit.SECONDS.sleep(5);
                return value;
            }
        }).print();

ExecutionEnvironment.readCsvFile

flink-java-1.6.2-sources.jar!/org/apache/flink/api/java/ExecutionEnvironment.javajava

/**
	 * Creates a CSV reader to read a comma separated value (CSV) file. The reader has options to
	 * define parameters and field types and will eventually produce the DataSet that corresponds to
	 * the read and parsed CSV input.
	 *
	 * @param filePath The path of the CSV file.
	 * @return A CsvReader that can be used to configure the CSV input.
	 */
	public CsvReader readCsvFile(String filePath) {
		return new CsvReader(filePath, this);
	}
  • 這裏根據filePath建立了CsvReader

CsvReader

flink-java-1.6.2-sources.jar!/org/apache/flink/api/java/io/CsvReader.javaapache

public CsvReader(String filePath, ExecutionEnvironment executionContext) {
		this(new Path(Preconditions.checkNotNull(filePath, "The file path may not be null.")), executionContext);
	}

	public CsvReader(Path filePath, ExecutionEnvironment executionContext) {
		Preconditions.checkNotNull(filePath, "The file path may not be null.");
		Preconditions.checkNotNull(executionContext, "The execution context may not be null.");

		this.path = filePath;
		this.executionContext = executionContext;
	}

	/**
	 * Configures the reader to read the CSV data and parse it to the given type. The all fields of the type
	 * must be public or able to set value. The type information for the fields is obtained from the type class.
	 *
	 * @param pojoType The class of the target POJO.
	 * @param pojoFields The fields of the POJO which are mapped to CSV fields.
	 * @return The DataSet representing the parsed CSV data.
	 */
	public <T> DataSource<T> pojoType(Class<T> pojoType, String... pojoFields) {
		Preconditions.checkNotNull(pojoType, "The POJO type class must not be null.");
		Preconditions.checkNotNull(pojoFields, "POJO fields must be specified (not null) if output type is a POJO.");

		final TypeInformation<T> ti = TypeExtractor.createTypeInfo(pojoType);
		if (!(ti instanceof PojoTypeInfo)) {
			throw new IllegalArgumentException(
				"The specified class is not a POJO. The type class must meet the POJO requirements. Found: " + ti);
		}
		final PojoTypeInfo<T> pti = (PojoTypeInfo<T>) ti;

		CsvInputFormat<T> inputFormat = new PojoCsvInputFormat<T>(path, this.lineDelimiter, this.fieldDelimiter, pti, pojoFields, this.includedMask);

		configureInputFormat(inputFormat);

		return new DataSource<T>(executionContext, inputFormat, pti, Utils.getCallLocationName());
	}
  • CsvReader提供了pojoType方法,用於將csv的數據映射爲java類型,同時轉換爲flink的DataSource;建立DataSource的時候,這裏提供了PojoCsvInputFormat以及PojoTypeInfo

Task

flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/taskmanager/Task.javabootstrap

/**
 * The Task represents one execution of a parallel subtask on a TaskManager.
 * A Task wraps a Flink operator (which may be a user function) and
 * runs it, providing all services necessary for example to consume input data,
 * produce its results (intermediate result partitions) and communicate
 * with the JobManager.
 *
 * <p>The Flink operators (implemented as subclasses of
 * {@link AbstractInvokable} have only data readers, -writers, and certain event callbacks.
 * The task connects those to the network stack and actor messages, and tracks the state
 * of the execution and handles exceptions.
 *
 * <p>Tasks have no knowledge about how they relate to other tasks, or whether they
 * are the first attempt to execute the task, or a repeated attempt. All of that
 * is only known to the JobManager. All the task knows are its own runnable code,
 * the task's configuration, and the IDs of the intermediate results to consume and
 * produce (if any).
 *
 * <p>Each Task is run by one dedicated thread.
 */
public class Task implements Runnable, TaskActions, CheckpointListener {
    //......

    /**
     * The core work method that bootstraps the task and executes its code.
     */
    @Override
    public void run() {
            //......
            // now load and instantiate the task's invokable code
            invokable = loadAndInstantiateInvokable(userCodeClassLoader, nameOfInvokableClass, env);

            // ----------------------------------------------------------------
            //  actual task core work
            // ----------------------------------------------------------------

            // we must make strictly sure that the invokable is accessible to the cancel() call
            // by the time we switched to running.
            this.invokable = invokable;

            // switch to the RUNNING state, if that fails, we have been canceled/failed in the meantime
            if (!transitionState(ExecutionState.DEPLOYING, ExecutionState.RUNNING)) {
                throw new CancelTaskException();
            }

            // notify everyone that we switched to running
            notifyObservers(ExecutionState.RUNNING, null);
            taskManagerActions.updateTaskExecutionState(new TaskExecutionState(jobId, executionId, ExecutionState.RUNNING));

            // make sure the user code classloader is accessible thread-locally
            executingThread.setContextClassLoader(userCodeClassLoader);

            // run the invokable
            invokable.invoke();

            //......
    }
}
  • Task的run方法會調用invokable.invoke(),這裏的invokable爲DataSourceTask

DataSourceTask.invoke

flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/operators/DataSourceTask.javawindows

@Override
	public void invoke() throws Exception {
		// --------------------------------------------------------------------
		// Initialize
		// --------------------------------------------------------------------
		initInputFormat();

		LOG.debug(getLogString("Start registering input and output"));

		try {
			initOutputs(getUserCodeClassLoader());
		} catch (Exception ex) {
			throw new RuntimeException("The initialization of the DataSource's outputs caused an error: " +
					ex.getMessage(), ex);
		}

		LOG.debug(getLogString("Finished registering input and output"));

		// --------------------------------------------------------------------
		// Invoke
		// --------------------------------------------------------------------
		LOG.debug(getLogString("Starting data source operator"));

		RuntimeContext ctx = createRuntimeContext();

		final Counter numRecordsOut;
		{
			Counter tmpNumRecordsOut;
			try {
				OperatorIOMetricGroup ioMetricGroup = ((OperatorMetricGroup) ctx.getMetricGroup()).getIOMetricGroup();
				ioMetricGroup.reuseInputMetricsForTask();
				if (this.config.getNumberOfChainedStubs() == 0) {
					ioMetricGroup.reuseOutputMetricsForTask();
				}
				tmpNumRecordsOut = ioMetricGroup.getNumRecordsOutCounter();
			} catch (Exception e) {
				LOG.warn("An exception occurred during the metrics setup.", e);
				tmpNumRecordsOut = new SimpleCounter();
			}
			numRecordsOut = tmpNumRecordsOut;
		}
		
		Counter completedSplitsCounter = ctx.getMetricGroup().counter("numSplitsProcessed");

		if (RichInputFormat.class.isAssignableFrom(this.format.getClass())) {
			((RichInputFormat) this.format).setRuntimeContext(ctx);
			LOG.debug(getLogString("Rich Source detected. Initializing runtime context."));
			((RichInputFormat) this.format).openInputFormat();
			LOG.debug(getLogString("Rich Source detected. Opening the InputFormat."));
		}

		ExecutionConfig executionConfig = getExecutionConfig();

		boolean objectReuseEnabled = executionConfig.isObjectReuseEnabled();

		LOG.debug("DataSourceTask object reuse: " + (objectReuseEnabled ? "ENABLED" : "DISABLED") + ".");
		
		final TypeSerializer<OT> serializer = this.serializerFactory.getSerializer();
		
		try {
			// start all chained tasks
			BatchTask.openChainedTasks(this.chainedTasks, this);
			
			// get input splits to read
			final Iterator<InputSplit> splitIterator = getInputSplits();
			
			// for each assigned input split
			while (!this.taskCanceled && splitIterator.hasNext())
			{
				// get start and end
				final InputSplit split = splitIterator.next();

				LOG.debug(getLogString("Opening input split " + split.toString()));
				
				final InputFormat<OT, InputSplit> format = this.format;
			
				// open input format
				format.open(split);
	
				LOG.debug(getLogString("Starting to read input from split " + split.toString()));
				
				try {
					final Collector<OT> output = new CountingCollector<>(this.output, numRecordsOut);

					if (objectReuseEnabled) {
						OT reuse = serializer.createInstance();

						// as long as there is data to read
						while (!this.taskCanceled && !format.reachedEnd()) {

							OT returned;
							if ((returned = format.nextRecord(reuse)) != null) {
								output.collect(returned);
							}
						}
					} else {
						// as long as there is data to read
						while (!this.taskCanceled && !format.reachedEnd()) {
							OT returned;
							if ((returned = format.nextRecord(serializer.createInstance())) != null) {
								output.collect(returned);
							}
						}
					}

					if (LOG.isDebugEnabled() && !this.taskCanceled) {
						LOG.debug(getLogString("Closing input split " + split.toString()));
					}
				} finally {
					// close. We close here such that a regular close throwing an exception marks a task as failed.
					format.close();
				}
				completedSplitsCounter.inc();
			} // end for all input splits

			// close the collector. if it is a chaining task collector, it will close its chained tasks
			this.output.close();

			// close all chained tasks letting them report failure
			BatchTask.closeChainedTasks(this.chainedTasks, this);

		}
		catch (Exception ex) {
			// close the input, but do not report any exceptions, since we already have another root cause
			try {
				this.format.close();
			} catch (Throwable ignored) {}

			BatchTask.cancelChainedTasks(this.chainedTasks);

			ex = ExceptionInChainedStubException.exceptionUnwrap(ex);

			if (ex instanceof CancelTaskException) {
				// forward canceling exception
				throw ex;
			}
			else if (!this.taskCanceled) {
				// drop exception, if the task was canceled
				BatchTask.logAndThrowException(ex, this);
			}
		} finally {
			BatchTask.clearWriters(eventualOutputs);
			// --------------------------------------------------------------------
			// Closing
			// --------------------------------------------------------------------
			if (this.format != null && RichInputFormat.class.isAssignableFrom(this.format.getClass())) {
				((RichInputFormat) this.format).closeInputFormat();
				LOG.debug(getLogString("Rich Source detected. Closing the InputFormat."));
			}
		}

		if (!this.taskCanceled) {
			LOG.debug(getLogString("Finished data source operator"));
		}
		else {
			LOG.debug(getLogString("Data source operator cancelled"));
		}
	}
  • DataSourceTask的invoke方法這裏只要不是taskCanceled及format.reachedEnd(),都會調用format.nextRecord(serializer.createInstance())來拉取數據,而後執行output.collect(returned)
  • 這裏的format爲CsvInputFormat(PojoCsvInputFormat),不過nextRecord以及reachedEnd方法它是調用的父類DelimitedInputFormat
  • PojoCsvInputFormat繼承了抽象類CsvInputFormat,而CsvInputFormat繼承了抽象類GenericCsvInputFormat,GenericCsvInputFormat則繼承了抽象類DelimitedInputFormat

DelimitedInputFormat

flink-core-1.6.2-sources.jar!/org/apache/flink/api/common/io/DelimitedInputFormat.javaapi

/**
	 * The default read buffer size = 1MB.
	 */
	private static final int DEFAULT_READ_BUFFER_SIZE = 1024 * 1024;

	private transient byte[] readBuffer;

	private int bufferSize = -1;

	private void initBuffers() {
		this.bufferSize = this.bufferSize <= 0 ? DEFAULT_READ_BUFFER_SIZE : this.bufferSize;

		if (this.bufferSize <= this.delimiter.length) {
			throw new IllegalArgumentException("Buffer size must be greater than length of delimiter.");
		}

		if (this.readBuffer == null || this.readBuffer.length != this.bufferSize) {
			this.readBuffer = new byte[this.bufferSize];
		}
		if (this.wrapBuffer == null || this.wrapBuffer.length < 256) {
			this.wrapBuffer = new byte[256];
		}

		this.readPos = 0;
		this.limit = 0;
		this.overLimit = false;
		this.end = false;
	}

	/**
	 * Checks whether the current split is at its end.
	 * 
	 * @return True, if the split is at its end, false otherwise.
	 */
	@Override
	public boolean reachedEnd() {
		return this.end;
	}
	
	@Override
	public OT nextRecord(OT record) throws IOException {
		if (readLine()) {
			return readRecord(record, this.currBuffer, this.currOffset, this.currLen);
		} else {
			this.end = true;
			return null;
		}
	}

	/**
	 * Fills the read buffer with bytes read from the file starting from an offset.
	 */
	private boolean fillBuffer(int offset) throws IOException {
		int maxReadLength = this.readBuffer.length - offset;
		// special case for reading the whole split.
		if (this.splitLength == FileInputFormat.READ_WHOLE_SPLIT_FLAG) {
			int read = this.stream.read(this.readBuffer, offset, maxReadLength);
			if (read == -1) {
				this.stream.close();
				this.stream = null;
				return false;
			} else {
				this.readPos = offset;
				this.limit = read;
				return true;
			}
		}
		
		// else ..
		int toRead;
		if (this.splitLength > 0) {
			// if we have more data, read that
			toRead = this.splitLength > maxReadLength ? maxReadLength : (int) this.splitLength;
		}
		else {
			// if we have exhausted our split, we need to complete the current record, or read one
			// more across the next split.
			// the reason is that the next split will skip over the beginning until it finds the first
			// delimiter, discarding it as an incomplete chunk of data that belongs to the last record in the
			// previous split.
			toRead = maxReadLength;
			this.overLimit = true;
		}

		int read = this.stream.read(this.readBuffer, offset, toRead);

		if (read == -1) {
			this.stream.close();
			this.stream = null;
			return false;
		} else {
			this.splitLength -= read;
			this.readPos = offset; // position from where to start reading
			this.limit = read + offset; // number of valid bytes in the read buffer
			return true;
		}
	}
  • DelimitedInputFormat首先調用readLine()讀取數據到currBuffer,若是有數據,則調用子類CsvInputFormat實現的readRecord方法,這裏傳遞了currBuffer、currOffset、currLen
  • DelimitedInputFormat的readLine()方法裏頭會調用fillBuffer方法,fillBuffer方法會根據splitLength(DelimitedInputFormat.getStatistics方法裏頭FileInputSplit的length)及maxReadLength來肯定toRead,以後從offset開始到toRead從文件讀取數據到readBuffer中,而後設置currBuffer、currOffset、currLen
  • readBuffer在init的時候會設置bufferSize,bufferSize初始化的時候爲-1,在getStatistics方法裏頭被設置爲4 * 1024,而DEFAULT_READ_BUFFER_SIZE是1024*1024

CsvInputFormat.readRecord

flink-java-1.6.2-sources.jar!/org/apache/flink/api/java/io/CsvInputFormat.javaapp

@Override
	public OUT readRecord(OUT reuse, byte[] bytes, int offset, int numBytes) throws IOException {
		/*
		 * Fix to support windows line endings in CSVInputFiles with standard delimiter setup = \n
		 */
		// Found window's end line, so find carriage return before the newline
		if (this.lineDelimiterIsLinebreak && numBytes > 0 && bytes[offset + numBytes - 1] == '\r') {
			//reduce the number of bytes so that the Carriage return is not taken as data
			numBytes--;
		}

		if (commentPrefix != null && commentPrefix.length <= numBytes) {
			//check record for comments
			boolean isComment = true;
			for (int i = 0; i < commentPrefix.length; i++) {
				if (commentPrefix[i] != bytes[offset + i]) {
					isComment = false;
					break;
				}
			}
			if (isComment) {
				this.commentCount++;
				return null;
			}
		}

		if (parseRecord(parsedValues, bytes, offset, numBytes)) {
			return fillRecord(reuse, parsedValues);
		} else {
			this.invalidLineCount++;
			return null;
		}
	}
  • CsvInputFormat的readRecord方法負責讀取原始數據,以後經過parseRecord方法解析原始數據填充到parsedValues(Object[]),以後調用子類的fillRecord方法(這裏是PojoCsvInputFormat)將parsedValues填充到reuse對象(該對象是DataSourceTask在調用format.nextRecord時傳入的serializer.createInstance())

PojoCsvInputFormat.fillRecord

flink-java-1.6.2-sources.jar!/org/apache/flink/api/java/io/PojoCsvInputFormat.javaide

/**
 * Input format that reads csv into POJOs.
 * @param <OUT> resulting POJO type
 */
@Internal
public class PojoCsvInputFormat<OUT> extends CsvInputFormat<OUT> {

	//......

	@Override
	public void open(FileInputSplit split) throws IOException {
		super.open(split);

		pojoFields = new Field[pojoFieldNames.length];

		Map<String, Field> allFields = new HashMap<String, Field>();

		findAllFields(pojoTypeClass, allFields);

		for (int i = 0; i < pojoFieldNames.length; i++) {
			pojoFields[i] = allFields.get(pojoFieldNames[i]);

			if (pojoFields[i] != null) {
				pojoFields[i].setAccessible(true);
			} else {
				throw new RuntimeException("There is no field called \"" + pojoFieldNames[i] + "\" in " + pojoTypeClass.getName());
			}
		}
	}

	@Override
	public OUT fillRecord(OUT reuse, Object[] parsedValues) {
		for (int i = 0; i < parsedValues.length; i++) {
			try {
				pojoFields[i].set(reuse, parsedValues[i]);
			} catch (IllegalAccessException e) {
				throw new RuntimeException("Parsed value could not be set in POJO field \"" + pojoFieldNames[i] + "\"", e);
			}
		}
		return reuse;
	}

	//......
}
  • PojoCsvInputFormat的open方法用於在executor的executePlan的時候調用,提早使用反射獲取所需的Field
  • fillRecord方法這裏僅僅是使用反射將parsedValues設置到pojo中
  • 若是反射設置不成功則拋出IllegalAccessException異常

CountingCollector.collect

flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/operators/util/metrics/CountingCollector.javaui

public class CountingCollector<OUT> implements Collector<OUT> {
	private final Collector<OUT> collector;
	private final Counter numRecordsOut;

	public CountingCollector(Collector<OUT> collector, Counter numRecordsOut) {
		this.collector = collector;
		this.numRecordsOut = numRecordsOut;
	}

	@Override
	public void collect(OUT record) {
		this.numRecordsOut.inc();
		this.collector.collect(record);
	}

	@Override
	public void close() {
		this.collector.close();
	}
}
  • 這裏的collector爲org.apache.flink.runtime.operators.chaining.ChainedMapDriver

ChainedMapDriver

flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/operators/chaining/ChainedMapDriver.javathis

@Override
	public void collect(IT record) {
		try {
			this.numRecordsIn.inc();
			this.outputCollector.collect(this.mapper.map(record));
		} catch (Exception ex) {
			throw new ExceptionInChainedStubException(this.taskName, ex);
		}
	}
  • 這裏會先調用mapper的map方法,執行map邏輯,而後調用outputCollector.collect將結果發送出去
  • 這裏的outputCollector爲CountingCollector,它裏頭包裝的collector爲org.apache.flink.runtime.operators.shipping.OutputCollector

OutputCollector

flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/operators/shipping/OutputCollector.java

/**
	 * Collects a record and emits it to all writers.
	 */
	@Override
	public void collect(T record)  {
		if (record != null) {
			this.delegate.setInstance(record);
			try {
				for (RecordWriter<SerializationDelegate<T>> writer : writers) {
					writer.emit(this.delegate);
				}
			}
			catch (IOException e) {
				throw new RuntimeException("Emitting the record caused an I/O exception: " + e.getMessage(), e);
			}
			catch (InterruptedException e) {
				throw new RuntimeException("Emitting the record was interrupted: " + e.getMessage(), e);
			}
		}
		else {
			throw new NullPointerException("The system does not support records that are null."
								+ "Null values are only supported as fields inside other objects.");
		}
	}
  • 這裏調用RecordWriter的emit方法來發射數據

RecordWriter

flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java

public void emit(T record) throws IOException, InterruptedException {
		for (int targetChannel : channelSelector.selectChannels(record, numChannels)) {
			sendToTarget(record, targetChannel);
		}
	}
  • 這裏經過channelSelector.selectChannels返回要發送的targetChannel,這裏的channelSelector爲OutputEmitter

OutputEmitter

flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/operators/shipping/OutputEmitter.java

@Override
	public final int[] selectChannels(SerializationDelegate<T> record, int numberOfChannels) {
		switch (strategy) {
		case FORWARD:
			return forward();
		case PARTITION_RANDOM:
		case PARTITION_FORCED_REBALANCE:
			return robin(numberOfChannels);
		case PARTITION_HASH:
			return hashPartitionDefault(record.getInstance(), numberOfChannels);
		case BROADCAST:
			return broadcast(numberOfChannels);
		case PARTITION_CUSTOM:
			return customPartition(record.getInstance(), numberOfChannels);
		case PARTITION_RANGE:
			return rangePartition(record.getInstance(), numberOfChannels);
		default:
			throw new UnsupportedOperationException("Unsupported distribution strategy: " + strategy.name());
		}
	}

	private int[] forward() {
		return this.channels;
	}
  • 這裏的strategy爲FORWARD

小結

  • CsvReader建立的inputFormat爲PojoCsvInputFormat,它主要的方法是fillRecord,利用反射填充數據,而數據的讀取則是在DelimitedInputFormat的readLine方法中,它會調用fillBuffer方法,而fillBuffer方法會根據splitLength(DelimitedInputFormat.getStatistics方法裏頭FileInputSplit的length)及maxReadLength來肯定toRead,以後從offset開始到toRead從文件讀取數據到readBuffer中
  • DataSourceTask在invoke方法裏頭會不斷循環調用format.nextRecord,而後挨個調用output.collect方法(包裝了org.apache.flink.runtime.operators.shipping.OutputCollector的CountingCollector),直到taskCanceled或者format.reachedEnd()
  • output.collect方法,這裏的output爲CountingCollector,它代理的collector爲ChainedMapDriver;ChainedMapDriver會對讀取的數據進行map操做,最後將map的結果傳遞給代理了OutputCollector的CountingCollector,OutputCollector使用RecordWriter來發射數據

doc

相關文章
相關標籤/搜索