最近打算研究研究 Flink,根據官方文檔寫個 Hello,World。入門仍是比較容易的,不須要複雜的安裝環境、配置。這篇文章簡單介紹 Flink 的使用感覺以及入門。java
能夠看出 Flink 致力於爲開發者提供一種方便、易用的編程框架。同時,社區很是注重文檔的詳細程序以及開發者使用的便利性。web
下面的內容是搭建 Flink 環境,並運行 WordCount。apache
Flink 能夠運行在 Linux、Mac OS X 和 Windows 環境。我喜歡在 Windows 下開發,因此在 Windows 運行 Flink。Flink 的最新版本(1.8.0)須要 JDK 的版本爲 1.8 以上。本地啓動 Flink 很是容易,下載 Flink 二進制包,須要選擇 Scala 的版本,若是不用 Scala 開發 Flink 應用程序選哪一個版本無所謂。我下載的是 flink-1.8.0-bin-scala_2.11.tgz。啓動步驟以下:編程
cd flink-1.8.0 #解壓後的目錄
cd bin
start-cluster.bat #啓動本地 Flink複製代碼
啓動後會發現彈出了兩個 Java 程序的窗口。一個是 JobManager,另外一個是 TaskManater。經過 http://localhost:8081 訪問 Flink 的 web 頁面,該站點用於查看運行環境和資源、提交和監控 Flink 做業。api
經過簡單的 WordCount 感覺一下 Flink 應用程序的編寫過程。Flink 已經提供生成 Maven 工程的模板bash
# 使用 Java 的 maven 工程
mvn archetype:generate \
-DarchetypeGroupId=org.apache.flink \
-DarchetypeArtifactId=flink-quickstart-java \
-DarchetypeVersion=1.8.0
# 使用 Scala 的 maven 工程
mvn archetype:generate \
-DarchetypeGroupId=org.apache.flink \
-DarchetypeArtifactId=flink-quickstart-scala \
-DarchetypeVersion=1.8.0複製代碼
若是不想經過命令行的方式生成 maven 工程,能夠經過以下設置在 IDEA 中建立 Flink 應用的模板工程,以 Java 爲例微信
在如上的頁面點擊 「Add Archetype...」,而後再彈出的對話框填寫以下內容框架
選擇咱們添加的 archetype 即可繼續建立 maven 工程。除了 maven 工程還能夠建立 Gradle 和 Sbt 工程。socket
爲了快速運行 Flink 應用,咱們能夠直接將官網 WordCount 例子的代碼拷貝本身的項目。Java 代碼以下maven
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
public class FirstCase {
public static void main(String[] args) throws Exception {
// the port to connect to
final int port = 9000;
// get the execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// get input data by connecting to the socket
DataStream<String> text = env.socketTextStream("192.168.29.132", port, "\n");
// parse the data, group it, window it, and aggregate the counts
DataStream<WordWithCount> windowCounts = text
.flatMap(new FlatMapFunction<String, WordWithCount>() {
@Override
public void flatMap(String value, Collector<WordWithCount> out) {
for (String word : value.split("\\s")) {
out.collect(new WordWithCount(word, 1L));
}
}
})
.keyBy("word")
.timeWindow(Time.seconds(5), Time.seconds(1))
.reduce(new ReduceFunction<WordWithCount>() {
@Override
public WordWithCount reduce(WordWithCount a, WordWithCount b) {
return new WordWithCount(a.word, a.count + b.count);
}
});
// print the results with a single thread, rather than in parallel
windowCounts.print().setParallelism(1);
env.execute("Socket Window WordCount");
}
// Data type for words with count
public static class WordWithCount {
public String word;
public long count;
public WordWithCount() {}
public WordWithCount(String word, long count) {
this.word = word;
this.count = count;
}
@Override
public String toString() {
return word + " : " + count;
}
}
}複製代碼
雖然不太熟悉 Flink 編程模型,但從上面代碼中基本上能推測出每一步的含義。因爲咱們入門 Flink ,剛開始不必太糾結代碼自己。先將 Demo 運行起來,在慢慢深刻學習。如今統計程序已經有了,可是還缺乏數據源。官網的例子使用的是 netcat ,我在 Windows 下安裝了該工具,可是以爲用起來不方便。在 Linux 虛擬機上裝了一個,這樣用法跟官網一致的。個人虛擬機系統爲 Centos 7 64位,安裝命令以下
yum install nmap-ncat.x86_64複製代碼
啓動 netcat 用於發數據
nc -l 9000複製代碼
接下來即是啓動 Flink 應用程序鏈接數據源並進行統計。 啓動以前須要將如下代碼中 ip 和 端口換成本身的
DataStream<String> text = env.socketTextStream("192.168.29.132", port, "\n");複製代碼
啓動 Flink 應用程序有兩種方式,一種是直接直接在 IDEA 中直接運行 Java 程序;另外一種是經過 maven 打一個 jar 包,提交到 Flink 集羣運行。第二種方式的命令以下
$FLINK_HOME\bin\flink run $APP_HOME\flink-ex-1.0-SNAPSHOT.jar
FLINK_HOME 爲 flink 二進制包的目錄
APP_HOME 爲上面建立的 maven 工程的目錄複製代碼
啓動 Flink 應用後,咱們能夠在 netcat 中輸入文本,並觀察 Flink 的統計結果
$ nc -l 9000
a a複製代碼
咱們只發送了一行,內容爲「a a」。若是在 IDEA 中啓動程序能夠直接在 IDEA 控制檯看到輸出結果,若是經過 flink run 方式啓動,須要在 TaskManager 的窗口中查看輸出。輸出內容以下
a : 2
a : 2
a : 2
a : 2
a : 2複製代碼
爲何輸出了 5 次。來看一下咱們的應用程序中有這樣一句
.timeWindow(Time.seconds(5), Time.seconds(1))複製代碼
它表明 Flink 應用程序每次處理的數據窗口爲 5s,處理完後,整個窗口向前滑動 1s 。也就是每次處理的數據爲「最近 5s」的數據。由於最近 5s 數據源中只有「a a」這一條記錄,所以輸出 5 次。
以上即是 Java 版的 WordCount。固然咱們也能夠用 Scala 編寫,且 Scala 的寫法更簡潔,代碼量更少。
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
object SocketWindowWordCount {
def main(args: Array[String]) : Unit = {
// get the execution environment
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
// get input data by connecting to the socket
val text = env.socketTextStream("192.168.29.132", 9000, '\n')
// parse the data, group it, window it, and aggregate the counts
val windowCounts = text
.flatMap { w => w.split("\\s") }
.map { w => WordWithCount(w, 1) }
.keyBy("word")
.timeWindow(Time.seconds(5), Time.seconds(1))
.sum("count")
// print the results with a single thread, rather than in parallel
windowCounts.print().setParallelism(1)
env.execute("Socket Window WordCount")
}
// Data type for words with count
case class WordWithCount(word: String, count: Long)
}複製代碼
基本上是 Java 一半的代碼量。我的感受 Scala 作大數據統計代碼仍是挺合適的,雖然 Scala 門檻比較高。Scala 程序的運行方式跟 Java 同樣。編寫過程當中若是出現如下錯誤,須要看看是否是 import 語句沒寫對
Error:(29, 16) could not find implicit value for evidence parameter of type org.apache.flink.api.common.typeinfo.TypeInformation[String]
.flatMap { w => w.split("\\s") }複製代碼
解決方法
import org.apache.flink.streaming.api.scala._複製代碼
以上即是 Flink 的簡單入門,後續繼續關注 Flink 框架。
歡迎關注公衆號「渡碼」