「Flink」使用Java lambda表達式實現Flink WordCount

本篇咱們將使用Java語言來實現Flink的單詞統計。html

代碼開發

環境準備

導入Flink 1.9 pom依賴java

<dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>1.9.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.11</artifactId>
            <version>1.9.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
            <version>3.7</version>
        </dependency>
    </dependencies>

構建Flink流處理環境

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

自定義source

每秒生成一行文本apache

DataStreamSource<String> wordLineDS = env.addSource(new RichSourceFunction<String>() {
            private boolean isCanal = false;
            private String[] words = {
                    "important oracle jdk license update",
                    "the oracle jdk license has changed for releases starting april 16 2019",
                    "the new oracle technology network license agreement for oracle java se is substantially different from prior oracle jdk licenses the new license permits certain uses such as ",
                    "personal use and development use at no cost but other uses authorized under prior oracle jdk licenses may no longer be available please review the terms carefully before ",
                    "downloading and using this product an faq is available here ",
                    "commercial license and support is available with a low cost java se subscription",
                    "oracle also provides the latest openjdk release under the open source gpl license at jdk java net"
            };

            @Override
            public void run(SourceContext<String> ctx) throws Exception {
                // 每秒發送一行文本
                while (!isCanal) {
                    int randomIndex = RandomUtils.nextInt(0, words.length);
                    ctx.collect(words[randomIndex]);
                    Thread.sleep(1000);
                }
            }

            @Override
            public void cancel() {
                isCanal = true;
            }
        });

單詞計算

// 3. 單詞統計
        // 3.1 將文本行切分紅一個個的單詞
        SingleOutputStreamOperator<String> wordsDS = wordLineDS.flatMap((String line, Collector<String> ctx) -> {
            // 切分單詞
            Arrays.stream(line.split(" ")).forEach(word -> {
                ctx.collect(word);
            });
        }).returns(Types.STRING);

        //3.2 將單詞轉換爲一個個的元組
        SingleOutputStreamOperator<Tuple2<String, Integer>> tupleDS = wordsDS
                .map(word -> Tuple2.of(word, 1))
                .returns(Types.TUPLE(Types.STRING, Types.INT));

        // 3.3 按照單詞進行分組
        KeyedStream<Tuple2<String, Integer>, String> keyedDS = tupleDS.keyBy(tuple -> tuple.f0);

        // 3.4 對每組單詞數量進行累加
        SingleOutputStreamOperator<Tuple2<String, Integer>> resultDS = keyedDS
                .timeWindow(Time.seconds(3))
                .reduce((t1, t2) -> Tuple2.of(t1.f0, t1.f1 + t2.f1));

        resultDS.print();

參考代碼

public class WordCount {
    public static void main(String[] args) throws Exception {
        // 1. 構建Flink流式初始化環境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 2. 自定義source - 每秒發送一行文本
        DataStreamSource<String> wordLineDS = env.addSource(new RichSourceFunction<String>() {
            private boolean isCanal = false;
            private String[] words = {
                    "important oracle jdk license update",
                    "the oracle jdk license has changed for releases starting april 16 2019",
                    "the new oracle technology network license agreement for oracle java se is substantially different from prior oracle jdk licenses the new license permits certain uses such as ",
                    "personal use and development use at no cost but other uses authorized under prior oracle jdk licenses may no longer be available please review the terms carefully before ",
                    "downloading and using this product an faq is available here ",
                    "commercial license and support is available with a low cost java se subscription",
                    "oracle also provides the latest openjdk release under the open source gpl license at jdk java net"
            };

            @Override
            public void run(SourceContext<String> ctx) throws Exception {
                // 每秒發送一行文本
                while (!isCanal) {
                    int randomIndex = RandomUtils.nextInt(0, words.length);
                    ctx.collect(words[randomIndex]);
                    Thread.sleep(1000);
                }
            }

            @Override
            public void cancel() {
                isCanal = true;
            }
        });

        // 3. 單詞統計
        // 3.1 將文本行切分紅一個個的單詞
        SingleOutputStreamOperator<String> wordsDS = wordLineDS.flatMap((String line, Collector<String> ctx) -> {
            // 切分單詞
            Arrays.stream(line.split(" ")).forEach(word -> {
                ctx.collect(word);
            });
        }).returns(Types.STRING);

        //3.2 將單詞轉換爲一個個的元組
        SingleOutputStreamOperator<Tuple2<String, Integer>> tupleDS = wordsDS
                .map(word -> Tuple2.of(word, 1))
                .returns(Types.TUPLE(Types.STRING, Types.INT));

        // 3.3 按照單詞進行分組
        KeyedStream<Tuple2<String, Integer>, String> keyedDS = tupleDS.keyBy(tuple -> tuple.f0);

        // 3.4 對每組單詞數量進行累加
        SingleOutputStreamOperator<Tuple2<String, Integer>> resultDS = keyedDS
                .timeWindow(Time.seconds(3))
                .reduce((t1, t2) -> Tuple2.of(t1.f0, t1.f1 + t2.f1));

        resultDS.print();

        env.execute("app");
    }
}

Flink對Java Lambda表達式支持狀況

Flink支持Java API全部操做符使用Lambda表達式。可是,但Lambda表達式使用Java泛型時,就須要聲明類型信息。api

咱們來看下上述的這段代碼:oracle

SingleOutputStreamOperator<String> wordsDS = wordLineDS.flatMap((String line, Collector<String> ctx) -> {
            // 切分單詞
            Arrays.stream(line.split(" ")).forEach(word -> {
                ctx.collect(word);
            });
        }).returns(Types.STRING);

之因此這裏將全部的類型信息,由於Flink沒法正確自動推斷出來Collector中帶的泛型。咱們來看一下FlatMapFuntion的源代碼app

@Public
@FunctionalInterface
public interface FlatMapFunction<T, O> extends Function, Serializable {

/**
* The core method of the FlatMapFunction. Takes an element from the input data set and transforms
* it into zero, one, or more elements.
*
* @param value The input value.
* @param out The collector for returning result values.
*
* @throws Exception This method may throw exceptions. Throwing an exception will cause the operation
* to fail and may trigger recovery.
*/
void flatMap(T value, Collector<O> out) throws Exception;
}

咱們發現 flatMap的第二個參數是Collector<O>,是一個帶參數的泛型。Java編譯器編譯該代碼時會進行參數類型擦除,因此Java編譯器會變成成:dom

void flatMap(T value, Collector out)ide

這種狀況,Flink將沒法自動推斷類型信息。若是咱們沒有顯示地提供類型信息,將會出現如下錯誤:this

org.apache.flink.api.common.functions.InvalidTypesException: The generic type parameters of 'Collector' are missing.
    In many cases lambda methods don't provide enough information for automatic type extraction when Java generics are involved.
    An easy workaround is to use an (anonymous) class instead that implements the 'org.apache.flink.api.common.functions.FlatMapFunction' interface.
    Otherwise the type has to be specified explicitly using type information.

這種狀況下,必需要顯示指定類型信息,不然輸出將返回值視爲Object類型,這將致使Flink沒法正確序列化。spa

因此,咱們須要顯示地指定Lambda表達式的參數類型信息,並經過returns方法顯示指定輸出的類型信息

咱們再看一段代碼:

SingleOutputStreamOperator<Tuple2<String, Integer>> tupleDS = wordsDS
                .map(word -> Tuple2.of(word, 1))
                .returns(Types.TUPLE(Types.STRING, Types.INT));

爲何map後面也須要指定類型呢?

由於此處map返回的是Tuple2類型,Tuple2是帶有泛型參數,在編譯的時候一樣會被查出泛型參數信息,致使Flink沒法正確推斷。

更多關於對Java Lambda表達式的支持請參考官網:https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/java_lambdas.html

相關文章
相關標籤/搜索