上一章<windows下flink示例程序的執行> 簡單介紹了一下flink在windows下如何經過flink-webui運行已經打包完成的示例程序(jar),那麼咱們爲何要使用flink呢?html
flink的特徵java
官網給出的特徵以下:git
一、一切皆爲流(All streaming use cases )github
二、正確性保證(Guaranteed correctness)web
三、多層api(Layered APIs) apache
四、易用性windows
五、可擴展性api
六、高性能網絡
flink架構 session
一、層級結構
2.工做架構圖
flink實戰
一、依賴文件pom.xml
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>flinkDemo</groupId> <artifactId>flinkDemo</artifactId> <version>1.0-SNAPSHOT</version> <dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>1.5.0</version> <!--<scope>provided</scope>--> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.11</artifactId> <version>1.5.0</version> <!--<scope>provided</scope>--> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka-0.10 --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka-0.10_2.11</artifactId> <version>1.5.0</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-hbase --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-hbase_2.11</artifactId> <version>1.5.0</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.10.1.1</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>1.1.2</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.16.10</version> <scope>compile</scope> </dependency> <dependency> <groupId>com.google.code.gson</groupId> <artifactId>gson</artifactId> <version>2.8.2</version> </dependency> <dependency> <groupId>com.github.rholder</groupId> <artifactId>guava-retrying</artifactId> <version>2.0.0</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.5.1</version> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> </plugins> </build> </project>
二、java程序
public class WordCountDemo { public static void main(String[] args) throws Exception { final ParameterTool params = ParameterTool.fromArgs(args); // create execution environment final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); env.getConfig().setGlobalJobParameters(params); // get input data DataSet<String> text; if (params.has("input")) { // read the text file from given input path text = env.readTextFile(params.get("input")); } else { // get default test text data System.out.println("Executing WordCount example with default input data set."); System.out.println("Use --input to specify file input."); text = WordCountData.getDefaultTextLineDataSet(env); } DataSet<Tuple2<String, Integer>> counts = // split up the lines in pairs (2-tuples) containing: (word,1) text.flatMap(new Tokenizer()) // group by the tuple field "0" and sum up tuple field "1" .groupBy(0) .sum(1); // emit result if (params.has("output")) { counts.writeAsCsv(params.get("output"), "\n", " "); // execute program env.execute("WordCount Example"); } else { System.out.println("Printing result to stdout. Use --output to specify output path."); counts.print(); } } // ************************************************************************* // USER FUNCTIONS // ************************************************************************* /** * Implements the string tokenizer that splits sentences into words as a user-defined * FlatMapFunction. The function takes a line (String) and splits it into * multiple pairs in the form of "(word,1)" ({@code Tuple2<String, Integer>}). */ public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> { @Override public void flatMap(String value, Collector<Tuple2<String, Integer>> out) { // normalize and split the line String[] tokens = value.toLowerCase().split("\\W+"); // emit the pairs for (String token : tokens) { if (token.length() > 0) { out.collect(new Tuple2<>(token, 1)); } } } } }
三、單步調試分析
第一步:獲取環境信息ExecutionEnvironment.java
/** * The ExecutionEnvironment is the context in which a program is executed. A * {@link LocalEnvironment} will cause execution in the current JVM, a * {@link RemoteEnvironment} will cause execution on a remote setup. * * <p>The environment provides methods to control the job execution (such as setting the parallelism) * and to interact with the outside world (data access). * * <p>Please note that the execution environment needs strong type information for the input and return types * of all operations that are executed. This means that the environments needs to know that the return * value of an operation is for example a Tuple of String and Integer. * Because the Java compiler throws much of the generic type information away, most methods attempt to re- * obtain that information using reflection. In certain cases, it may be necessary to manually supply that * information to some of the methods. * * @see LocalEnvironment * @see RemoteEnvironment */
建立本地環境
/** * Creates a {@link LocalEnvironment} which is used for executing Flink jobs. * * @param configuration to start the {@link LocalEnvironment} with * @param defaultParallelism to initialize the {@link LocalEnvironment} with * @return {@link LocalEnvironment} */ private static LocalEnvironment createLocalEnvironment(Configuration configuration, int defaultParallelism) { final LocalEnvironment localEnvironment = new LocalEnvironment(configuration); if (defaultParallelism > 0) { localEnvironment.setParallelism(defaultParallelism); } return localEnvironment; }
第二步:獲取外部數據,建立數據集 ExecutionEnvironment.java
/** * Creates a DataSet from the given non-empty collection. Note that this operation will result * in a non-parallel data source, i.e. a data source with a parallelism of one. * * <p>The returned DataSet is typed to the given TypeInformation. * * @param data The collection of elements to create the data set from. * @param type The TypeInformation for the produced data set. * @return A DataSet representing the given collection. * * @see #fromCollection(Collection) */ public <X> DataSource<X> fromCollection(Collection<X> data, TypeInformation<X> type) { return fromCollection(data, type, Utils.getCallLocationName()); } private <X> DataSource<X> fromCollection(Collection<X> data, TypeInformation<X> type, String callLocationName) { CollectionInputFormat.checkCollection(data, type.getTypeClass()); return new DataSource<>(this, new CollectionInputFormat<>(data, type.createSerializer(config)), type, callLocationName); }
數據集的繼承關係
其中,DataSet是一組相同類型數據的集合,抽象類,它提供了數據的轉換功能,如map,reduce,join和coGroup
/** * A DataSet represents a collection of elements of the same type. * * <p>A DataSet can be transformed into another DataSet by applying a transformation as for example * <ul> * <li>{@link DataSet#map(org.apache.flink.api.common.functions.MapFunction)},</li> * <li>{@link DataSet#reduce(org.apache.flink.api.common.functions.ReduceFunction)},</li> * <li>{@link DataSet#join(DataSet)}, or</li> * <li>{@link DataSet#coGroup(DataSet)}.</li> * </ul> * * @param <T> The type of the DataSet, i.e., the type of the elements of the DataSet. */
Operator是java api的操做基類,抽象類
/** * Base class of all operators in the Java API. * * @param <OUT> The type of the data set produced by this operator. * @param <O> The type of the operator, so that we can return it. */ @Public public abstract class Operator<OUT, O extends Operator<OUT, O>> extends DataSet<OUT> {
DataSource具體實現類。
/** * An operation that creates a new data set (data source). The operation acts as the * data set on which to apply further transformations. It encapsulates additional * configuration parameters, to customize the execution. * * @param <OUT> The type of the elements produced by this data source. */ @Public public class DataSource<OUT> extends Operator<OUT, DataSource<OUT>> {
第三步:對輸入數據集進行轉換
DataSet<Tuple2<String, Integer>> counts = // split up the lines in pairs (2-tuples) containing: (word,1) text.flatMap(new Tokenizer()) // group by the tuple field "0" and sum up tuple field "1" .groupBy(0) .sum(1);
>>調用map DataSet.java
/** * Applies a FlatMap transformation on a {@link DataSet}. * * <p>The transformation calls a {@link org.apache.flink.api.common.functions.RichFlatMapFunction} for each element of the DataSet. * Each FlatMapFunction call can return any number of elements including none. * * @param flatMapper The FlatMapFunction that is called for each element of the DataSet. * @return A FlatMapOperator that represents the transformed DataSet. * * @see org.apache.flink.api.common.functions.RichFlatMapFunction * @see FlatMapOperator * @see DataSet */ public <R> FlatMapOperator<T, R> flatMap(FlatMapFunction<T, R> flatMapper) { if (flatMapper == null) { throw new NullPointerException("FlatMap function must not be null."); } String callLocation = Utils.getCallLocationName(); TypeInformation<R> resultType = TypeExtractor.getFlatMapReturnTypes(flatMapper, getType(), callLocation, true); return new FlatMapOperator<>(this, resultType, clean(flatMapper), callLocation); }
>>調用groupby DataSet.java
/** * Groups a {@link Tuple} {@link DataSet} using field position keys. * * <p><b>Note: Field position keys only be specified for Tuple DataSets.</b> * * <p>The field position keys specify the fields of Tuples on which the DataSet is grouped. * This method returns an {@link UnsortedGrouping} on which one of the following grouping transformation * can be applied. * <ul> * <li>{@link UnsortedGrouping#sortGroup(int, org.apache.flink.api.common.operators.Order)} to get a {@link SortedGrouping}. * <li>{@link UnsortedGrouping#aggregate(Aggregations, int)} to apply an Aggregate transformation. * <li>{@link UnsortedGrouping#reduce(org.apache.flink.api.common.functions.ReduceFunction)} to apply a Reduce transformation. * <li>{@link UnsortedGrouping#reduceGroup(org.apache.flink.api.common.functions.GroupReduceFunction)} to apply a GroupReduce transformation. * </ul> * * @param fields One or more field positions on which the DataSet will be grouped. * @return A Grouping on which a transformation needs to be applied to obtain a transformed DataSet. * * @see Tuple * @see UnsortedGrouping * @see AggregateOperator * @see ReduceOperator * @see org.apache.flink.api.java.operators.GroupReduceOperator * @see DataSet */ public UnsortedGrouping<T> groupBy(int... fields) { return new UnsortedGrouping<>(this, new Keys.ExpressionKeys<>(fields, getType())); }
>>調用sum UnsortedGrouping.java
/** * Syntactic sugar for aggregate (SUM, field). * @param field The index of the Tuple field on which the aggregation function is applied. * @return An AggregateOperator that represents the summed DataSet. * * @see org.apache.flink.api.java.operators.AggregateOperator */ public AggregateOperator<T> sum (int field) { return this.aggregate (Aggregations.SUM, field, Utils.getCallLocationName()); } // private helper that allows to set a different call location name private AggregateOperator<T> aggregate(Aggregations agg, int field, String callLocationName) { return new AggregateOperator<T>(this, agg, field, callLocationName); }
UnsortedGrouping和DataSet的關係
UnsortedGrouping使用AggregateOperator作聚合
第四步:對轉換的輸入值進行處理
// emit result if (params.has("output")) { counts.writeAsCsv(params.get("output"), "\n", " "); // execute program env.execute("WordCount Example"); } else { System.out.println("Printing result to stdout. Use --output to specify output path."); counts.print(); }
若是不指定output參數,則打印到控制檯
/** * Prints the elements in a DataSet to the standard output stream {@link System#out} of the JVM that calls * the print() method. For programs that are executed in a cluster, this method needs * to gather the contents of the DataSet back to the client, to print it there. * * <p>The string written for each element is defined by the {@link Object#toString()} method. * * <p>This method immediately triggers the program execution, similar to the * {@link #collect()} and {@link #count()} methods. * * @see #printToErr() * @see #printOnTaskManager(String) */ public void print() throws Exception { List<T> elements = collect(); for (T e: elements) { System.out.println(e); } }
若指定輸出,則先進行輸入轉換爲csv文件的DataSink,它是用來存儲數據結果的
/** * An operation that allows storing data results. * @param <T> */
過程以下:
/** * Writes a {@link Tuple} DataSet as CSV file(s) to the specified location with the specified field and line delimiters. * * <p><b>Note: Only a Tuple DataSet can written as a CSV file.</b> * For each Tuple field the result of {@link Object#toString()} is written. * * @param filePath The path pointing to the location the CSV file is written to. * @param rowDelimiter The row delimiter to separate Tuples. * @param fieldDelimiter The field delimiter to separate Tuple fields. * @param writeMode The behavior regarding existing files. Options are NO_OVERWRITE and OVERWRITE. * * @see Tuple * @see CsvOutputFormat * @see DataSet#writeAsText(String) Output files and directories */ public DataSink<T> writeAsCsv(String filePath, String rowDelimiter, String fieldDelimiter, WriteMode writeMode) { return internalWriteAsCsv(new Path(filePath), rowDelimiter, fieldDelimiter, writeMode); } @SuppressWarnings("unchecked") private <X extends Tuple> DataSink<T> internalWriteAsCsv(Path filePath, String rowDelimiter, String fieldDelimiter, WriteMode wm) { Preconditions.checkArgument(getType().isTupleType(), "The writeAsCsv() method can only be used on data sets of tuples."); CsvOutputFormat<X> of = new CsvOutputFormat<>(filePath, rowDelimiter, fieldDelimiter); if (wm != null) { of.setWriteMode(wm); } return output((OutputFormat<T>) of); } /** * Emits a DataSet using an {@link OutputFormat}. This method adds a data sink to the program. * Programs may have multiple data sinks. A DataSet may also have multiple consumers (data sinks * or transformations) at the same time. * * @param outputFormat The OutputFormat to process the DataSet. * @return The DataSink that processes the DataSet. * * @see OutputFormat * @see DataSink */ public DataSink<T> output(OutputFormat<T> outputFormat) { Preconditions.checkNotNull(outputFormat); // configure the type if needed if (outputFormat instanceof InputTypeConfigurable) { ((InputTypeConfigurable) outputFormat).setInputType(getType(), context.getConfig()); } DataSink<T> sink = new DataSink<>(this, outputFormat, getType()); this.context.registerDataSink(sink); return sink; }
最後執行job
@Override public JobExecutionResult execute(String jobName) throws Exception { if (executor == null) { startNewSession(); } Plan p = createProgramPlan(jobName); // Session management is disabled, revert this commit to enable //p.setJobId(jobID); //p.setSessionTimeout(sessionTimeout); JobExecutionResult result = executor.executePlan(p); this.lastJobExecutionResult = result; return result; }
這一階段是內容比較多,放到下一篇講解吧
總結
Apache Flink 功能強大,支持開發和運行多種不一樣種類的應用程序。它的主要特性包括:批流一體化、精密的狀態管理、事件時間支持以及精確一次的狀態一致性保障等。Flink 不只能夠運行在包括 YARN、 Mesos、Kubernetes 在內的多種資源管理框架上,還支持在裸機集羣上獨立部署。在啓用高可用選項的狀況下,它不存在單點失效問題。事實證實,Flink 已經能夠擴展到數千核心,其狀態能夠達到 TB 級別,且仍能保持高吞吐、低延遲的特性。世界各地有不少要求嚴苛的流處理應用都運行在 Flink 之上。
其應用場景以下:
一、事件驅動型應用
典型的事件驅動型應用實例:
反欺詐
異常檢測
基於規則的報警
業務流程監控
(社交網絡)Web 應用
二、數據分析應用
典型的數據分析應用實例
電信網絡質量監控
移動應用中的產品更新及實驗評估分析
消費者技術中的實時數據即席分析
大規模圖分析
三、數據管道應用
典型的數據管道應用實例
電子商務中的實時查詢索引構建
電子商務中的持續 ETL
參考資料