隨着大數據技術在各行各業的普遍應用,要求能對海量數據進行實時處理的需求愈來愈多,同時數據處理的業務邏輯也愈來愈複雜,傳統的批處理方式和早期的流式處理框架也愈來愈難以在延遲性、吞吐量、容錯能力以及使用便捷性等方面知足業務日益苛刻的要求。java
在這種形勢下,新型流式處理框架Flink經過創造性地把現代大規模並行處理技術應用到流式處理中來,極大地改善了之前的流式處理框架所存在的問題。apache
1.概述:api
flink提供DataSet Api用戶處理批量數據。flink先將接入數據轉換成DataSet數據集,並行分佈在集羣的每一個節點上;而後將DataSet數據集進行各類轉換操做(map,filter等),最後經過DataSink操做將結果數據集輸出到外部系統。app
2.數據接入框架
輸入InputFormatless
/** * The base interface for data sources that produces records. * <p> * The input format handles the following: * <ul> * <li>It describes how the input is split into splits that can be processed in parallel.</li> * <li>It describes how to read records from the input split.</li> * <li>It describes how to gather basic statistics from the input.</li> * </ul> * <p> * The life cycle of an input format is the following: * <ol> * <li>After being instantiated (parameterless), it is configured with a {@link Configuration} object. * Basic fields are read from the configuration, such as a file path, if the format describes * files as input.</li> * <li>Optionally: It is called by the compiler to produce basic statistics about the input.</li> * <li>It is called to create the input splits.</li> * <li>Each parallel input task creates an instance, configures it and opens it for a specific split.</li> * <li>All records are read from the input</li> * <li>The input format is closed</li> * </ol> * <p> * IMPORTANT NOTE: Input formats must be written such that an instance can be opened again after it was closed. That * is due to the fact that the input format is used for potentially multiple splits. After a split is done, the * format's close function is invoked and, if another split is available, the open function is invoked afterwards for * the next split. * * @see InputSplit * @see BaseStatistics * * @param <OT> The type of the produced records. * @param <T> The type of input split. */
3.數據轉換大數據
DataSet:一組相同類型的元素。DataSet能夠經過transformation轉換成其它的DataSet。示例以下:this
DataSet#map(org.apache.flink.api.common.functions.MapFunction)
DataSet#reduce(org.apache.flink.api.common.functions.ReduceFunction)
DataSet#join(DataSet)
DataSet#coGroup(DataSet)
其中,Function:用戶定義的業務邏輯,支持java 8 lambda表達式spa
function的實現經過operator來作的,以map爲例code
/** * Applies a Map transformation on this DataSet. * * <p>The transformation calls a {@link org.apache.flink.api.common.functions.MapFunction} for each element of the DataSet. * Each MapFunction call returns exactly one element. * * @param mapper The MapFunction that is called for each element of the DataSet. * @return A MapOperator that represents the transformed DataSet. * * @see org.apache.flink.api.common.functions.MapFunction * @see org.apache.flink.api.common.functions.RichMapFunction * @see MapOperator */ public <R> MapOperator<T, R> map(MapFunction<T, R> mapper) { if (mapper == null) { throw new NullPointerException("Map function must not be null."); } String callLocation = Utils.getCallLocationName(); TypeInformation<R> resultType = TypeExtractor.getMapReturnTypes(mapper, getType(), callLocation, true); return new MapOperator<>(this, resultType, clean(mapper), callLocation); }
其中,Operator
4.數據輸出
DataSink:一個用來存儲數據結果的操做。
輸出OutputFormat
例如,能夠csv輸出
/** * Writes a {@link Tuple} DataSet as CSV file(s) to the specified location with the specified field and line delimiters. * * <p><b>Note: Only a Tuple DataSet can written as a CSV file.</b> * For each Tuple field the result of {@link Object#toString()} is written. * * @param filePath The path pointing to the location the CSV file is written to. * @param rowDelimiter The row delimiter to separate Tuples. * @param fieldDelimiter The field delimiter to separate Tuple fields. * @param writeMode The behavior regarding existing files. Options are NO_OVERWRITE and OVERWRITE. * * @see Tuple * @see CsvOutputFormat * @see DataSet#writeAsText(String) Output files and directories */ public DataSink<T> writeAsCsv(String filePath, String rowDelimiter, String fieldDelimiter, WriteMode writeMode) { return internalWriteAsCsv(new Path(filePath), rowDelimiter, fieldDelimiter, writeMode); } @SuppressWarnings("unchecked") private <X extends Tuple> DataSink<T> internalWriteAsCsv(Path filePath, String rowDelimiter, String fieldDelimiter, WriteMode wm) { Preconditions.checkArgument(getType().isTupleType(), "The writeAsCsv() method can only be used on data sets of tuples."); CsvOutputFormat<X> of = new CsvOutputFormat<>(filePath, rowDelimiter, fieldDelimiter); if (wm != null) { of.setWriteMode(wm); } return output((OutputFormat<T>) of); }
5.總結
1. flink經過InputFormat對各類數據源的數據進行讀取轉換成DataSet數據集
2. flink提供了豐富的轉換操做,DataSet能夠經過transformation轉換成其它的DataSet,內部的實現是Function和Operator。
3. flink經過OutFormat將DataSet轉換成DataSink,最終將數據寫入到不一樣的存儲介質。
參考資料: