請確認如下環境已經正確安裝html
驗證javajava
$ java -version java version "1.8.0_191" Java(TM) SE Runtime Environment (build 1.8.0_191-b12) Java HotSpot(TM) 64-Bit Server VM (build 25.191-b12, mixed mode)
驗證mavenshell
$ mvn -version Apache Maven 3.6.0 (97c98ec64a1fdfee7767ce5ffb20918da4f719f3; 2018-10-25T02:41:47+08:00) Maven home: /Users/dushixiang/Library/apache-maven-3.6.0 Java version: 1.8.0_191, vendor: Oracle Corporation, runtime: /Library/Java/JavaVirtualMachines/jdk1.8.0_191.jdk/Contents/Home/jre Default locale: zh_CN, platform encoding: UTF-8 OS name: "mac os x", version: "10.14.1", arch: "x86_64", family: "mac"
Flink官網下載地址apache
能夠看到這裏有不少版本,因爲此次只是本地開發,因此選擇不帶hadoop版本就能夠,若是不打算使用scala開發的話,scala版本也無所謂。這裏我下載了 flink-1.7.0-bin-scala_2.12.tgz,由於後面使用Java寫Flink流處理的時候有點囉嗦,我會同時寫java/scala兩個版本的代碼進行對比。api
解壓文件瀏覽器
tar -xvf flink-1.7.0-bin-scala_2.11.tgz
本地模式啓動bash
cd flink-1.7.0 bin/start-cluster.sh
驗證是否成功啓動curl
使用瀏覽器訪問 localhost:8081 能夠看到flink的UI界面maven
使用shell命令建立一個quickstart程序ide
$ curl https://flink.apache.org/q/quickstart.sh | bash -s 1.7.0
使用maven命令建立一個quickstart程序
$ mvn archetype:generate \ -DarchetypeGroupId=org.apache.flink \ -DarchetypeArtifactId=flink-quickstart-java \ -DarchetypeVersion=1.7.0
成功以後查看目錄結構
$ tree quickstart/ quickstart/ ├── pom.xml └── src └── main ├── java │ └── org │ └── myorg │ └── quickstart │ ├── BatchJob.java │ └── StreamingJob.java └── resources └── log4j.properties
BatchJob.java 和StreamJob.java 都只是空殼,沒必要理會,修改pom中scala版本爲2.12。
編寫WordCount程序
package org.myorg.quickstart; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector; import java.util.stream.Stream; public class WordCount { public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<String> text = env.fromElements( "Apache Flink is a framework and distributed processing engine for stateful computations over unbounded and bounded data streams. Flink has been designed to run in all common cluster environments, perform computations at in-memory speed and at any scale.", "Here, we explain important aspects of Flink’s architecture." ); // 第一種 建立靜態內部類Tokenizer類繼承FlatMapFunction類並實現flatMap方法 SingleOutputStreamOperator<Tuple2<String, Integer>> counts = text .flatMap(new Tokenizer()) .keyBy(0) // 使用元組的第一個元素看成key作分組 .sum(1); // 使用元組的第二個元素統計該單詞一共出現了多少次 // 第二種 建立匿名內部類FlatMapFunction類並實現flatMap方法 /*SingleOutputStreamOperator<Tuple2<String, Integer>> counts = text.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() { @Override public void flatMap(String line, Collector<Tuple2<String, Integer>> out) throws Exception { Stream.of(line.toLowerCase().split("\\W+")) .filter(token -> token.length() > 0) .forEach(token -> out.collect(new Tuple2<>(token, 1))); } }) .keyBy(0) .sum(1);*/ // 使用lambda表達式,此種方法會報錯。緣由是lambda方法不能提供足夠的信息來進行自動類型提取,官方建議使用上面兩種方法。 /*SingleOutputStreamOperator<Tuple2<String, Integer>> counts = text.flatMap((FlatMapFunction<String, Tuple2<String, Integer>>) (line, out) -> { Stream.of(line.toLowerCase().split("\\W+")) .filter(token -> token.length() > 0) .forEach(token -> out.collect(new Tuple2<>(token, 1))); }) .keyBy(0) .sum(1);*/ counts.print(); env.execute("第一個FLink WordCount程序"); } public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> { @Override public void flatMap(String line, Collector<Tuple2<String, Integer>> out) { Stream.of(line.toLowerCase().split("\\W+")) .filter(token -> token.length() > 0) .forEach(token -> out.collect(new Tuple2<>(token, 1))); } } }
因爲是本地模式,直接在IDE中編譯運行便可。
接下來是scala版本的WordCount,首先在pom中添加依賴
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency>
複製以下scala代碼。
package org.myorg.quickstart.scala import org.apache.flink.streaming.api.scala._ object WordCountScala { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val text = env.fromElements( "Apache Flink is a framework and distributed processing engine for stateful computations over unbounded and bounded data streams. Flink has been designed to run in all common cluster environments, perform computations at in-memory speed and at any scale.", "Here, we explain important aspects of Flink’s architecture." ) val counts = text .flatMap(_.toLowerCase.split("\\W+")) // 將多行輸入分割以後壓扁成一個列表 .filter(_.nonEmpty) // 過濾掉空字符串 .map((_, 1)) // 將字符轉換爲元組,字符做爲元組的第一個參數,數字1做爲元組的第二個參數 .keyBy(0) // 使用元組的第一個元素看成key作分組 .sum(1) // 使用元組的第二個元素統計該單詞一共出現了多少次 counts.print() env.execute("第一個FLink WordCount程序Scala版") } }