Flink 分別提供了基於 Java 語言和 Scala 語言的 API ,若是想要使用 Scala 語言來開發 Flink 程序,能夠經過在 IDEA 中安裝 Scala 插件來提供語法提示,代碼高亮等功能。打開 IDEA , 依次點擊 File => settings => plugins
打開插件安裝頁面,搜索 Scala 插件並進行安裝,安裝完成後,重啓 IDEA 便可生效。html
Flink 官方支持使用 Maven 和 Gradle 兩種構建工具來構建基於 Java 語言的 Flink 項目;支持使用 SBT 和 Maven 兩種構建工具來構建基於 Scala 語言的 Flink 項目。 這裏以 Maven 爲例進行說明,由於其能夠同時支持 Java 語言和 Scala 語言項目的構建。須要注意的是 Flink 1.9 只支持 Maven 3.0.4 以上的版本,Maven 安裝完成後,能夠經過如下兩種方式來構建項目:java
1. 直接基於 Maven Archetype 構建git
直接使用下面的 mvn 語句來進行構建,而後根據交互信息的提示,依次輸入 groupId , artifactId 以及包名等信息後等待初始化的完成:github
$ mvn archetype:generate \
-DarchetypeGroupId=org.apache.flink \
-DarchetypeArtifactId=flink-quickstart-java \
-DarchetypeVersion=1.9.0
複製代碼
注:若是想要建立基於 Scala 語言的項目,只須要將 flink-quickstart-java 換成 flink-quickstart-scala 便可,後文亦同。shell
2. 使用官方腳本快速構建apache
爲了更方便的初始化項目,官方提供了快速構建腳本,能夠直接經過如下命令來進行調用:編程
$ curl https://flink.apache.org/q/quickstart.sh | bash -s 1.9.0
複製代碼
該方式其實也是經過執行 maven archetype 命令來進行初始化,其腳本內容以下:api
PACKAGE=quickstart
mvn archetype:generate \
-DarchetypeGroupId=org.apache.flink \
-DarchetypeArtifactId=flink-quickstart-java \
-DarchetypeVersion=${1:-1.8.0} \
-DgroupId=org.myorg.quickstart \
-DartifactId=$PACKAGE \
-Dversion=0.1 \
-Dpackage=org.myorg.quickstart \
-DinteractiveMode=false
複製代碼
能夠看到相比於第一種方式,該種方式只是直接指定好了 groupId ,artifactId ,version 等信息而已。bash
若是你使用的是開發工具是 IDEA ,能夠直接在項目建立頁面選擇 Maven Flink Archetype 進行項目初始化:curl
若是你的 IDEA 沒有上述 Archetype, 能夠經過點擊右上角的 ADD ARCHETYPE
,來進行添加,依次填入所需信息,這些信息均可以從上述的 archetype:generate
語句中獲取。點擊 OK
保存後,該 Archetype 就會一直存在於你的 IDEA 中,以後每次建立項目時,只須要直接選擇該 Archetype 便可:
選中 Flink Archetype ,而後點擊 NEXT
按鈕,以後的全部步驟都和正常的 Maven 工程相同。
建立完成後的自動生成的項目結構以下:
其中 BatchJob 爲批處理的樣例代碼,源碼以下:
import org.apache.flink.api.scala._
object BatchJob {
def main(args: Array[String]) {
val env = ExecutionEnvironment.getExecutionEnvironment
....
env.execute("Flink Batch Scala API Skeleton")
}
}
複製代碼
getExecutionEnvironment 表明獲取批處理的執行環境,若是是本地運行則獲取到的就是本地的執行環境;若是在集羣上運行,獲得的就是集羣的執行環境。若是想要獲取流處理的執行環境,則只須要將 ExecutionEnvironment
替換爲 StreamExecutionEnvironment
, 對應的代碼樣例在 StreamingJob 中:
import org.apache.flink.streaming.api.scala._
object StreamingJob {
def main(args: Array[String]) {
val env = StreamExecutionEnvironment.getExecutionEnvironment
...
env.execute("Flink Streaming Scala API Skeleton")
}
}
複製代碼
須要注意的是對於流處理項目 env.execute()
這句代碼是必須的,不然流處理程序就不會被執行,可是對於批處理項目則是可選的。
基於 Maven 骨架建立的項目主要提供瞭如下核心依賴:其中 flink-scala
用於支持開發批處理程序 ;flink-streaming-scala
用於支持開發流處理程序 ;scala-library
用於提供 Scala 語言所須要的類庫。若是在使用 Maven 骨架建立時選擇的是 Java 語言,則默認提供的則是 flink-java
和 flink-streaming-java
依賴。
<!-- Apache Flink dependencies -->
<!-- These dependencies are provided, because they should not be packaged into the JAR file. -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<!-- Scala Library, provided by Flink as well. -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
<scope>provided</scope>
</dependency>
複製代碼
須要特別注意的以上依賴的 scope
標籤所有被標識爲 provided ,這意味着這些依賴都不會被打入最終的 JAR 包。由於 Flink 的安裝包中已經提供了這些依賴,位於其 lib 目錄下,名爲 flink-dist_*.jar
,它包含了 Flink 的全部核心類和依賴:
scope
標籤被標識爲 provided 會致使你在 IDEA 中啓動項目時會拋出 ClassNotFoundException 異常。基於這個緣由,在使用 IDEA 建立項目時還自動生成了如下 profile 配置:
<!-- This profile helps to make things run out of the box in IntelliJ -->
<!-- Its adds Flink's core classes to the runtime class path. -->
<!-- Otherwise they are missing in IntelliJ, because the dependency is 'provided' -->
<profiles>
<profile>
<id>add-dependencies-for-IDEA</id>
<activation>
<property>
<name>idea.version</name>
</property>
</activation>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
<scope>compile</scope>
</dependency>
</dependencies>
</profile>
</profiles>
複製代碼
在 id 爲 add-dependencies-for-IDEA
的 profile 中,全部的核心依賴都被標識爲 compile,此時你能夠無需改動任何代碼,只須要在 IDEA 的 Maven 面板中勾選該 profile,便可直接在 IDEA 中運行 Flink 項目:
項目建立完成後,能夠先書寫一個簡單的詞頻統計的案例來嘗試運行 Flink 項目,如下以 Scala 語言爲例,分別介紹流處理程序和批處理程序的編程示例:
import org.apache.flink.api.scala._
object WordCountBatch {
def main(args: Array[String]): Unit = {
val benv = ExecutionEnvironment.getExecutionEnvironment
val dataSet = benv.readTextFile("D:\\wordcount.txt")
dataSet.flatMap { _.toLowerCase.split(",")}
.filter (_.nonEmpty)
.map { (_, 1) }
.groupBy(0)
.sum(1)
.print()
}
}
複製代碼
其中 wordcount.txt
中的內容以下:
a,a,a,a,a
b,b,b
c,c
d,d
複製代碼
本機不須要配置其餘任何的 Flink 環境,直接運行 Main 方法便可,結果以下:
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
object WordCountStreaming {
def main(args: Array[String]): Unit = {
val senv = StreamExecutionEnvironment.getExecutionEnvironment
val dataStream: DataStream[String] = senv.socketTextStream("192.168.0.229", 9999, '\n')
dataStream.flatMap { line => line.toLowerCase.split(",") }
.filter(_.nonEmpty)
.map { word => (word, 1) }
.keyBy(0)
.timeWindow(Time.seconds(3))
.sum(1)
.print()
senv.execute("Streaming WordCount")
}
}
複製代碼
這裏以監聽指定端口號上的內容爲例,使用如下命令來開啓端口服務:
nc -lk 9999
複製代碼
以後輸入測試數據便可觀察到流處理程序的處理狀況。
對於平常的 Demo 項目,若是你不想頻繁地啓動 IDEA 來觀察測試結果,能夠像 Spark 同樣,直接使用 Scala Shell 來運行程序,這對於平常的學習來講,效果更加直觀,也更省時。Flink 安裝包的下載地址以下:
https://flink.apache.org/downloads.html
複製代碼
Flink 大多數版本都提供有 Scala 2.11 和 Scala 2.12 兩個版本的安裝包可供下載:
下載完成後進行解壓便可,Scala Shell 位於安裝目錄的 bin 目錄下,直接使用如下命令便可以本地模式啓動:
./start-scala-shell.sh local
複製代碼
命令行啓動完成後,其已經提供了批處理 (benv 和 btenv)和流處理(senv 和 stenv)的運行環境,能夠直接運行 Scala Flink 程序,示例以下:
最後解釋一個常見的異常:這裏我使用的 Flink 版本爲 1.9.1,啓動時會拋出以下異常。這裏由於按照官方的說明,目前全部 Scala 2.12 版本的安裝包暫時都不支持 Scala Shell,因此若是想要使用 Scala Shell,只能選擇 Scala 2.11 版本的安裝包。
[root@hadoop001 bin]# ./start-scala-shell.sh local
錯誤: 找不到或沒法加載主類 org.apache.flink.api.scala.FlinkShell
複製代碼
更多大數據系列文章能夠參見 GitHub 開源項目: 大數據入門指南