Flink入門學習[0]

依賴環境

請確認如下環境已經正確安裝html

  • Java 8.x
  • Maven 3.0.4 (or higher)

驗證javajava

$ java -version
java version "1.8.0_191"
Java(TM) SE Runtime Environment (build 1.8.0_191-b12)
Java HotSpot(TM) 64-Bit Server VM (build 25.191-b12, mixed mode)

驗證mavenshell

$ mvn -version
Apache Maven 3.6.0 (97c98ec64a1fdfee7767ce5ffb20918da4f719f3; 2018-10-25T02:41:47+08:00)
Maven home: /Users/dushixiang/Library/apache-maven-3.6.0
Java version: 1.8.0_191, vendor: Oracle Corporation, runtime: /Library/Java/JavaVirtualMachines/jdk1.8.0_191.jdk/Contents/Home/jre
Default locale: zh_CN, platform encoding: UTF-8
OS name: "mac os x", version: "10.14.1", arch: "x86_64", family: "mac"

下載安裝Flink

Flink官網下載地址apache

能夠看到這裏有不少版本,因爲此次只是本地開發,因此選擇不帶hadoop版本就能夠,若是不打算使用scala開發的話,scala版本也無所謂。這裏我下載了 flink-1.7.0-bin-scala_2.12.tgz,由於後面使用Java寫Flink流處理的時候有點囉嗦,我會同時寫java/scala兩個版本的代碼進行對比。api

clipboard.png

解壓文件瀏覽器

tar -xvf flink-1.7.0-bin-scala_2.11.tgz

本地模式啓動bash

cd flink-1.7.0
bin/start-cluster.sh

驗證是否成功啓動curl

使用瀏覽器訪問 localhost:8081 能夠看到flink的UI界面maven

clipboard.png

第一個Flink程序

使用shell命令建立一個quickstart程序ide

$ curl https://flink.apache.org/q/quickstart.sh | bash -s 1.7.0

使用maven命令建立一個quickstart程序

$ mvn archetype:generate                               \
      -DarchetypeGroupId=org.apache.flink              \
      -DarchetypeArtifactId=flink-quickstart-java      \
      -DarchetypeVersion=1.7.0

成功以後查看目錄結構

$ tree quickstart/
quickstart/
├── pom.xml
└── src
    └── main
        ├── java
        │   └── org
        │       └── myorg
        │           └── quickstart
        │               ├── BatchJob.java
        │               └── StreamingJob.java
        └── resources
            └── log4j.properties

BatchJob.java 和StreamJob.java 都只是空殼,沒必要理會,修改pom中scala版本爲2.12。

編寫WordCount程序

package org.myorg.quickstart;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

import java.util.stream.Stream;

public class WordCount {

    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource<String> text = env.fromElements(
                "Apache Flink is a framework and distributed processing engine for stateful computations over unbounded and bounded data streams. Flink has been designed to run in all common cluster environments, perform computations at in-memory speed and at any scale.",
                "Here, we explain important aspects of Flink’s architecture."
        );

        // 第一種 建立靜態內部類Tokenizer類繼承FlatMapFunction類並實現flatMap方法
        SingleOutputStreamOperator<Tuple2<String, Integer>> counts = text
                .flatMap(new Tokenizer())
                .keyBy(0) // 使用元組的第一個元素看成key作分組
                .sum(1); // 使用元組的第二個元素統計該單詞一共出現了多少次

        // 第二種 建立匿名內部類FlatMapFunction類並實現flatMap方法
        /*SingleOutputStreamOperator<Tuple2<String, Integer>> counts = text.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String line, Collector<Tuple2<String, Integer>> out) throws Exception {
                Stream.of(line.toLowerCase().split("\\W+"))
                    .filter(token -> token.length() > 0)
                    .forEach(token -> out.collect(new Tuple2<>(token, 1)));
            }
        })
                .keyBy(0)
                .sum(1);*/

        // 使用lambda表達式,此種方法會報錯。緣由是lambda方法不能提供足夠的信息來進行自動類型提取,官方建議使用上面兩種方法。
        /*SingleOutputStreamOperator<Tuple2<String, Integer>> counts = text.flatMap((FlatMapFunction<String, Tuple2<String, Integer>>) (line, out) -> {
            Stream.of(line.toLowerCase().split("\\W+"))
                    .filter(token -> token.length() > 0)
                    .forEach(token -> out.collect(new Tuple2<>(token, 1)));
        })
                .keyBy(0)
                .sum(1);*/


        counts.print();

        env.execute("第一個FLink WordCount程序");
    }

    public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {

        @Override
        public void flatMap(String line, Collector<Tuple2<String, Integer>> out) {
            Stream.of(line.toLowerCase().split("\\W+"))
                    .filter(token -> token.length() > 0)
                    .forEach(token -> out.collect(new Tuple2<>(token, 1)));
        }
    }
}

因爲是本地模式,直接在IDE中編譯運行便可。

接下來是scala版本的WordCount,首先在pom中添加依賴

<dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
        <version>${flink.version}</version>
</dependency>

複製以下scala代碼。

package org.myorg.quickstart.scala

import org.apache.flink.streaming.api.scala._

object WordCountScala {

  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    val text = env.fromElements(
      "Apache Flink is a framework and distributed processing engine for stateful computations over unbounded and bounded data streams. Flink has been designed to run in all common cluster environments, perform computations at in-memory speed and at any scale.",
      "Here, we explain important aspects of Flink’s architecture."
    )

    val counts = text
      .flatMap(_.toLowerCase.split("\\W+")) // 將多行輸入分割以後壓扁成一個列表
      .filter(_.nonEmpty) // 過濾掉空字符串
      .map((_, 1)) // 將字符轉換爲元組,字符做爲元組的第一個參數,數字1做爲元組的第二個參數
      .keyBy(0) // 使用元組的第一個元素看成key作分組
      .sum(1) // 使用元組的第二個元素統計該單詞一共出現了多少次

    counts.print()

    env.execute("第一個FLink WordCount程序Scala版")
  }
}
相關文章
相關標籤/搜索