從flink-example分析flink組件(1)WordCount batch實戰及源碼分析

上一章<windows下flink示例程序的執行> 簡單介紹了一下flink在windows下如何經過flink-webui運行已經打包完成的示例程序(jar),那麼咱們爲何要使用flink呢?html

flink的特徵java

官網給出的特徵以下:git

一、一切皆爲流(All streaming use cases )github

  • 事件驅動應用(Event-driven Applications)

              

  

  • 流式 & 批量分析(Stream & Batch Analytics)

    

 


  

  •  數據管道&ETL(Data Pipelines & ETL)

     

 

二、正確性保證(Guaranteed correctness)web

  • 惟一狀態一致性(Exactly-once state consistency)
  • 事件-事件處理(Event-time processing)
  • 高超的最近數據處理(Sophisticated late data handling)

三、多層api(Layered APIs)   apache

  • 基於流式和批量數據處理的SQL(SQL on Stream & Batch Data)
  • 流水數據API & 數據集API(DataStream API & DataSet API)
  • 處理函數 (時間 & 狀態)(ProcessFunction (Time & State))

           

四、易用性windows

  • 部署靈活(Flexible deployment)
  • 高可用安裝(High-availability setup)
  • 保存點(Savepoints)

五、可擴展性api

  • 可擴展架構(Scale-out architecture)
  • 大量狀態的支持(Support for very large state)
  • 增量檢查點(Incremental checkpointing)

六、高性能網絡

  • 低延遲(Low latency)
  • 高吞吐量(High throughput)
  • 內存計算(In-Memory computing)

flink架構 session

一、層級結構

 

2.工做架構圖

 

 flink實戰

一、依賴文件pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>flinkDemo</groupId>
    <artifactId>flinkDemo</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>1.5.0</version>
            <!--<scope>provided</scope>-->
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.11</artifactId>
            <version>1.5.0</version>
            <!--<scope>provided</scope>-->
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka-0.10 -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka-0.10_2.11</artifactId>
            <version>1.5.0</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-hbase -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-hbase_2.11</artifactId>
            <version>1.5.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.10.1.1</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>1.1.2</version>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.16.10</version>
            <scope>compile</scope>
        </dependency>
        <dependency>
            <groupId>com.google.code.gson</groupId>
            <artifactId>gson</artifactId>
            <version>2.8.2</version>
        </dependency>
        <dependency>
            <groupId>com.github.rholder</groupId>
            <artifactId>guava-retrying</artifactId>
            <version>2.0.0</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.5.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

二、java程序

public class WordCountDemo {

        public static void main(String[] args) throws Exception {
            final ParameterTool params = ParameterTool.fromArgs(args);

            // create execution environment
            final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
            env.getConfig().setGlobalJobParameters(params);

            // get input data
            DataSet<String> text;
            if (params.has("input")) {
                // read the text file from given input path
                text = env.readTextFile(params.get("input"));
            } else {
                // get default test text data
                System.out.println("Executing WordCount example with default input data set.");
                System.out.println("Use --input to specify file input.");
                text = WordCountData.getDefaultTextLineDataSet(env);
            }

            DataSet<Tuple2<String, Integer>> counts =
                    // split up the lines in pairs (2-tuples) containing: (word,1)
                    text.flatMap(new Tokenizer())
                            // group by the tuple field "0" and sum up tuple field "1"
                            .groupBy(0)
                            .sum(1);

            // emit result
            if (params.has("output")) {
                counts.writeAsCsv(params.get("output"), "\n", " ");
                // execute program
                env.execute("WordCount Example");
            } else {
                System.out.println("Printing result to stdout. Use --output to specify output path.");
                counts.print();
            }

        }

    // *************************************************************************
    //     USER FUNCTIONS
    // *************************************************************************

    /**
     * Implements the string tokenizer that splits sentences into words as a user-defined
     * FlatMapFunction. The function takes a line (String) and splits it into
     * multiple pairs in the form of "(word,1)" ({@code Tuple2<String, Integer>}).
     */
    public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {

        @Override
        public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
            // normalize and split the line
            String[] tokens = value.toLowerCase().split("\\W+");

            // emit the pairs
            for (String token : tokens) {
                if (token.length() > 0) {
                    out.collect(new Tuple2<>(token, 1));
                }
            }
        }
    }
}

三、單步調試分析

   第一步:獲取環境信息ExecutionEnvironment.java

  

/**
 * The ExecutionEnvironment is the context in which a program is executed. A
 * {@link LocalEnvironment} will cause execution in the current JVM, a
 * {@link RemoteEnvironment} will cause execution on a remote setup.
 *
 * <p>The environment provides methods to control the job execution (such as setting the parallelism)
 * and to interact with the outside world (data access).
 *
 * <p>Please note that the execution environment needs strong type information for the input and return types
 * of all operations that are executed. This means that the environments needs to know that the return
 * value of an operation is for example a Tuple of String and Integer.
 * Because the Java compiler throws much of the generic type information away, most methods attempt to re-
 * obtain that information using reflection. In certain cases, it may be necessary to manually supply that
 * information to some of the methods.
 *
 * @see LocalEnvironment
 * @see RemoteEnvironment
 */

  建立本地環境

    /**
     * Creates a {@link LocalEnvironment} which is used for executing Flink jobs.
     *
     * @param configuration to start the {@link LocalEnvironment} with
     * @param defaultParallelism to initialize the {@link LocalEnvironment} with
     * @return {@link LocalEnvironment}
     */
    private static LocalEnvironment createLocalEnvironment(Configuration configuration, int defaultParallelism) {
        final LocalEnvironment localEnvironment = new LocalEnvironment(configuration);

        if (defaultParallelism > 0) {
            localEnvironment.setParallelism(defaultParallelism);
        }

        return localEnvironment;
    }

  第二步:獲取外部數據,建立數據集  ExecutionEnvironment.java

    /**
     * Creates a DataSet from the given non-empty collection. Note that this operation will result
     * in a non-parallel data source, i.e. a data source with a parallelism of one.
     *
     * <p>The returned DataSet is typed to the given TypeInformation.
     *
     * @param data The collection of elements to create the data set from.
     * @param type The TypeInformation for the produced data set.
     * @return A DataSet representing the given collection.
     *
     * @see #fromCollection(Collection)
     */
    public <X> DataSource<X> fromCollection(Collection<X> data, TypeInformation<X> type) {
        return fromCollection(data, type, Utils.getCallLocationName());
    }

    private <X> DataSource<X> fromCollection(Collection<X> data, TypeInformation<X> type, String callLocationName) {
        CollectionInputFormat.checkCollection(data, type.getTypeClass());
        return new DataSource<>(this, new CollectionInputFormat<>(data, type.createSerializer(config)), type, callLocationName);
    }

   數據集的繼承關係

其中,DataSet是一組相同類型數據的集合,抽象類,它提供了數據的轉換功能,如map,reduce,join和coGroup

/**
 * A DataSet represents a collection of elements of the same type.
 *
 * <p>A DataSet can be transformed into another DataSet by applying a transformation as for example
 * <ul>
 *   <li>{@link DataSet#map(org.apache.flink.api.common.functions.MapFunction)},</li>
 *   <li>{@link DataSet#reduce(org.apache.flink.api.common.functions.ReduceFunction)},</li>
 *   <li>{@link DataSet#join(DataSet)}, or</li>
 *   <li>{@link DataSet#coGroup(DataSet)}.</li>
 * </ul>
 *
 * @param <T> The type of the DataSet, i.e., the type of the elements of the DataSet.
 */

Operator是java api的操做基類,抽象類

/**
 * Base class of all operators in the Java API.
 *
 * @param <OUT> The type of the data set produced by this operator.
 * @param <O> The type of the operator, so that we can return it.
 */
@Public
public abstract class Operator<OUT, O extends Operator<OUT, O>> extends DataSet<OUT> {

DataSource具體實現類。

/**
 * An operation that creates a new data set (data source). The operation acts as the
 * data set on which to apply further transformations. It encapsulates additional
 * configuration parameters, to customize the execution.
 *
 * @param <OUT> The type of the elements produced by this data source.
 */
@Public
public class DataSource<OUT> extends Operator<OUT, DataSource<OUT>> {

  第三步:對輸入數據集進行轉換

            DataSet<Tuple2<String, Integer>> counts =
                    // split up the lines in pairs (2-tuples) containing: (word,1)
                    text.flatMap(new Tokenizer())
                            // group by the tuple field "0" and sum up tuple field "1"
                            .groupBy(0)
                            .sum(1);

     >>調用map DataSet.java

    /**
     * Applies a FlatMap transformation on a {@link DataSet}.
     *
     * <p>The transformation calls a {@link org.apache.flink.api.common.functions.RichFlatMapFunction} for each element of the DataSet.
     * Each FlatMapFunction call can return any number of elements including none.
     *
     * @param flatMapper The FlatMapFunction that is called for each element of the DataSet.
     * @return A FlatMapOperator that represents the transformed DataSet.
     *
     * @see org.apache.flink.api.common.functions.RichFlatMapFunction
     * @see FlatMapOperator
     * @see DataSet
     */
    public <R> FlatMapOperator<T, R> flatMap(FlatMapFunction<T, R> flatMapper) {
        if (flatMapper == null) {
            throw new NullPointerException("FlatMap function must not be null.");
        }

        String callLocation = Utils.getCallLocationName();
        TypeInformation<R> resultType = TypeExtractor.getFlatMapReturnTypes(flatMapper, getType(), callLocation, true);
        return new FlatMapOperator<>(this, resultType, clean(flatMapper), callLocation);
    }

  >>調用groupby   DataSet.java

    /**
     * Groups a {@link Tuple} {@link DataSet} using field position keys.
     *
     * <p><b>Note: Field position keys only be specified for Tuple DataSets.</b>
     *
     * <p>The field position keys specify the fields of Tuples on which the DataSet is grouped.
     * This method returns an {@link UnsortedGrouping} on which one of the following grouping transformation
     *   can be applied.
     * <ul>
     *   <li>{@link UnsortedGrouping#sortGroup(int, org.apache.flink.api.common.operators.Order)} to get a {@link SortedGrouping}.
     *   <li>{@link UnsortedGrouping#aggregate(Aggregations, int)} to apply an Aggregate transformation.
     *   <li>{@link UnsortedGrouping#reduce(org.apache.flink.api.common.functions.ReduceFunction)} to apply a Reduce transformation.
     *   <li>{@link UnsortedGrouping#reduceGroup(org.apache.flink.api.common.functions.GroupReduceFunction)} to apply a GroupReduce transformation.
     * </ul>
     *
     * @param fields One or more field positions on which the DataSet will be grouped.
     * @return A Grouping on which a transformation needs to be applied to obtain a transformed DataSet.
     *
     * @see Tuple
     * @see UnsortedGrouping
     * @see AggregateOperator
     * @see ReduceOperator
     * @see org.apache.flink.api.java.operators.GroupReduceOperator
     * @see DataSet
     */
    public UnsortedGrouping<T> groupBy(int... fields) {
        return new UnsortedGrouping<>(this, new Keys.ExpressionKeys<>(fields, getType()));
    }

 

  >>調用sum  UnsortedGrouping.java

    /**
     * Syntactic sugar for aggregate (SUM, field).
     * @param field The index of the Tuple field on which the aggregation function is applied.
     * @return An AggregateOperator that represents the summed DataSet.
     *
     * @see org.apache.flink.api.java.operators.AggregateOperator
     */
    public AggregateOperator<T> sum (int field) {
        return this.aggregate (Aggregations.SUM, field, Utils.getCallLocationName());
    }
    // private helper that allows to set a different call location name
    private AggregateOperator<T> aggregate(Aggregations agg, int field, String callLocationName) {
        return new AggregateOperator<T>(this, agg, field, callLocationName);
    }

 UnsortedGrouping和DataSet的關係

  UnsortedGrouping使用AggregateOperator作聚合

  第四步:對轉換的輸入值進行處理

            // emit result
            if (params.has("output")) {
                counts.writeAsCsv(params.get("output"), "\n", " ");
                // execute program
                env.execute("WordCount Example");
            } else {
                System.out.println("Printing result to stdout. Use --output to specify output path.");
                counts.print();
            }

  若是不指定output參數,則打印到控制檯

    /**
     * Prints the elements in a DataSet to the standard output stream {@link System#out} of the JVM that calls
     * the print() method. For programs that are executed in a cluster, this method needs
     * to gather the contents of the DataSet back to the client, to print it there.
     *
     * <p>The string written for each element is defined by the {@link Object#toString()} method.
     *
     * <p>This method immediately triggers the program execution, similar to the
     * {@link #collect()} and {@link #count()} methods.
     *
     * @see #printToErr()
     * @see #printOnTaskManager(String)
     */
    public void print() throws Exception {
        List<T> elements = collect();
        for (T e: elements) {
            System.out.println(e);
        }
    }

  若指定輸出,則先進行輸入轉換爲csv文件的DataSink,它是用來存儲數據結果的

/**
 * An operation that allows storing data results.
 * @param <T>
 */

過程以下:

    /**
     * Writes a {@link Tuple} DataSet as CSV file(s) to the specified location with the specified field and line delimiters.
     *
     * <p><b>Note: Only a Tuple DataSet can written as a CSV file.</b>
      * For each Tuple field the result of {@link Object#toString()} is written.
     *
     * @param filePath The path pointing to the location the CSV file is written to.
     * @param rowDelimiter The row delimiter to separate Tuples.
     * @param fieldDelimiter The field delimiter to separate Tuple fields.
     * @param writeMode The behavior regarding existing files. Options are NO_OVERWRITE and OVERWRITE.
     *
     * @see Tuple
     * @see CsvOutputFormat
     * @see DataSet#writeAsText(String) Output files and directories
     */
    public DataSink<T> writeAsCsv(String filePath, String rowDelimiter, String fieldDelimiter, WriteMode writeMode) {
        return internalWriteAsCsv(new Path(filePath), rowDelimiter, fieldDelimiter, writeMode);
    }

    @SuppressWarnings("unchecked")
    private <X extends Tuple> DataSink<T> internalWriteAsCsv(Path filePath, String rowDelimiter, String fieldDelimiter, WriteMode wm) {
        Preconditions.checkArgument(getType().isTupleType(), "The writeAsCsv() method can only be used on data sets of tuples.");
        CsvOutputFormat<X> of = new CsvOutputFormat<>(filePath, rowDelimiter, fieldDelimiter);
        if (wm != null) {
            of.setWriteMode(wm);
        }
        return output((OutputFormat<T>) of);
    }
    /**
     * Emits a DataSet using an {@link OutputFormat}. This method adds a data sink to the program.
     * Programs may have multiple data sinks. A DataSet may also have multiple consumers (data sinks
     * or transformations) at the same time.
     *
     * @param outputFormat The OutputFormat to process the DataSet.
     * @return The DataSink that processes the DataSet.
     *
     * @see OutputFormat
     * @see DataSink
     */
    public DataSink<T> output(OutputFormat<T> outputFormat) {
        Preconditions.checkNotNull(outputFormat);

        // configure the type if needed
        if (outputFormat instanceof InputTypeConfigurable) {
            ((InputTypeConfigurable) outputFormat).setInputType(getType(), context.getConfig());
        }

        DataSink<T> sink = new DataSink<>(this, outputFormat, getType());
        this.context.registerDataSink(sink);
        return sink;
    }

  最後執行job

    @Override
    public JobExecutionResult execute(String jobName) throws Exception {
        if (executor == null) {
            startNewSession();
        }

        Plan p = createProgramPlan(jobName);

        // Session management is disabled, revert this commit to enable
        //p.setJobId(jobID);
        //p.setSessionTimeout(sessionTimeout);

        JobExecutionResult result = executor.executePlan(p);

        this.lastJobExecutionResult = result;
        return result;
    }

這一階段是內容比較多,放到下一篇講解吧

總結

  Apache Flink 功能強大,支持開發和運行多種不一樣種類的應用程序。它的主要特性包括:批流一體化、精密的狀態管理、事件時間支持以及精確一次的狀態一致性保障等。Flink 不只能夠運行在包括 YARN、 Mesos、Kubernetes 在內的多種資源管理框架上,還支持在裸機集羣上獨立部署。在啓用高可用選項的狀況下,它不存在單點失效問題。事實證實,Flink 已經能夠擴展到數千核心,其狀態能夠達到 TB 級別,且仍能保持高吞吐、低延遲的特性。世界各地有不少要求嚴苛的流處理應用都運行在 Flink 之上。

  其應用場景以下:  

一、事件驅動型應用
  典型的事件驅動型應用實例:
  反欺詐
  異常檢測
  基於規則的報警
  業務流程監控
 (社交網絡)Web 應用
二、數據分析應用
  典型的數據分析應用實例
  電信網絡質量監控
  移動應用中的產品更新及實驗評估分析
  消費者技術中的實時數據即席分析
  大規模圖分析
三、數據管道應用
  典型的數據管道應用實例
  電子商務中的實時查詢索引構建
  電子商務中的持續 ETL

參考資料

【1】https://flink.apache.org/

【2】http://www.javashuo.com/article/p-prczjdnr-mw.html

【3】https://flink.apache.org/zh/usecases.html

相關文章
相關標籤/搜索