Flink 系列(二)—— Flink 開發環境搭建

1、安裝 Scala 插件

Flink 分別提供了基於 Java 語言和 Scala 語言的 API ,若是想要使用 Scala 語言來開發 Flink 程序,能夠經過在 IDEA 中安裝 Scala 插件來提供語法提示,代碼高亮等功能。打開 IDEA , 依次點擊 File => settings => plugins 打開插件安裝頁面,搜索 Scala 插件並進行安裝,安裝完成後,重啓 IDEA 便可生效。html

https://github.com/heibaiying

2、Flink 項目初始化

2.1 使用官方腳本構建

Flink 官方支持使用 Maven 和 Gradle 兩種構建工具來構建基於 Java 語言的 Flink 項目;支持使用 SBT 和 Maven 兩種構建工具來構建基於 Scala 語言的 Flink 項目。 這裏以 Maven 爲例進行說明,由於其能夠同時支持 Java 語言和 Scala 語言項目的構建。須要注意的是 Flink 1.9 只支持 Maven 3.0.4 以上的版本,Maven 安裝完成後,能夠經過如下兩種方式來構建項目:java

1. 直接基於 Maven Archetype 構建git

直接使用下面的 mvn 語句來進行構建,而後根據交互信息的提示,依次輸入 groupId , artifactId 以及包名等信息後等待初始化的完成:github

$ mvn archetype:generate                               \
      -DarchetypeGroupId=org.apache.flink              \
      -DarchetypeArtifactId=flink-quickstart-java      \
      -DarchetypeVersion=1.9.0
複製代碼

注:若是想要建立基於 Scala 語言的項目,只須要將 flink-quickstart-java 換成 flink-quickstart-scala 便可,後文亦同。shell

2. 使用官方腳本快速構建apache

爲了更方便的初始化項目,官方提供了快速構建腳本,能夠直接經過如下命令來進行調用:編程

$ curl https://flink.apache.org/q/quickstart.sh | bash -s 1.9.0
複製代碼

該方式其實也是經過執行 maven archetype 命令來進行初始化,其腳本內容以下:api

PACKAGE=quickstart

mvn archetype:generate \
  -DarchetypeGroupId=org.apache.flink \
  -DarchetypeArtifactId=flink-quickstart-java \
  -DarchetypeVersion=${1:-1.8.0} \
  -DgroupId=org.myorg.quickstart \
  -DartifactId=$PACKAGE	\
  -Dversion=0.1 \
  -Dpackage=org.myorg.quickstart \
  -DinteractiveMode=false
複製代碼

能夠看到相比於第一種方式,該種方式只是直接指定好了 groupId ,artifactId ,version 等信息而已。bash

2.2 使用 IDEA 構建

若是你使用的是開發工具是 IDEA ,能夠直接在項目建立頁面選擇 Maven Flink Archetype 進行項目初始化:curl

https://github.com/heibaiying

若是你的 IDEA 沒有上述 Archetype, 能夠經過點擊右上角的 ADD ARCHETYPE ,來進行添加,依次填入所需信息,這些信息均可以從上述的 archetype:generate 語句中獲取。點擊 OK 保存後,該 Archetype 就會一直存在於你的 IDEA 中,以後每次建立項目時,只須要直接選擇該 Archetype 便可:

https://github.com/heibaiying

選中 Flink Archetype ,而後點擊 NEXT 按鈕,以後的全部步驟都和正常的 Maven 工程相同。

3、項目結構

3.1 項目結構

建立完成後的自動生成的項目結構以下:

https://github.com/heibaiying

其中 BatchJob 爲批處理的樣例代碼,源碼以下:

import org.apache.flink.api.scala._

object BatchJob {
  def main(args: Array[String]) {
    val env = ExecutionEnvironment.getExecutionEnvironment
      ....
    env.execute("Flink Batch Scala API Skeleton")
  }
}
複製代碼

getExecutionEnvironment 表明獲取批處理的執行環境,若是是本地運行則獲取到的就是本地的執行環境;若是在集羣上運行,獲得的就是集羣的執行環境。若是想要獲取流處理的執行環境,則只須要將 ExecutionEnvironment 替換爲 StreamExecutionEnvironment, 對應的代碼樣例在 StreamingJob 中:

import org.apache.flink.streaming.api.scala._

object StreamingJob {
  def main(args: Array[String]) {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
      ...
    env.execute("Flink Streaming Scala API Skeleton")
  }
}

複製代碼

須要注意的是對於流處理項目 env.execute() 這句代碼是必須的,不然流處理程序就不會被執行,可是對於批處理項目則是可選的。

3.2 主要依賴

基於 Maven 骨架建立的項目主要提供瞭如下核心依賴:其中 flink-scala 用於支持開發批處理程序 ;flink-streaming-scala 用於支持開發流處理程序 ;scala-library 用於提供 Scala 語言所須要的類庫。若是在使用 Maven 骨架建立時選擇的是 Java 語言,則默認提供的則是 flink-javaflink-streaming-java 依賴。

<!-- Apache Flink dependencies -->
<!-- These dependencies are provided, because they should not be packaged into the JAR file. -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-scala_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>

<!-- Scala Library, provided by Flink as well. -->
<dependency>
    <groupId>org.scala-lang</groupId>
    <artifactId>scala-library</artifactId>
    <version>${scala.version}</version>
    <scope>provided</scope>
</dependency>
複製代碼

須要特別注意的以上依賴的 scope 標籤所有被標識爲 provided ,這意味着這些依賴都不會被打入最終的 JAR 包。由於 Flink 的安裝包中已經提供了這些依賴,位於其 lib 目錄下,名爲 flink-dist_*.jar ,它包含了 Flink 的全部核心類和依賴:

https://github.com/heibaiying

scope 標籤被標識爲 provided 會致使你在 IDEA 中啓動項目時會拋出 ClassNotFoundException 異常。基於這個緣由,在使用 IDEA 建立項目時還自動生成了如下 profile 配置:

<!-- This profile helps to make things run out of the box in IntelliJ -->
<!-- Its adds Flink's core classes to the runtime class path. -->
<!-- Otherwise they are missing in IntelliJ, because the dependency is 'provided' -->
<profiles>
    <profile>
        <id>add-dependencies-for-IDEA</id>

        <activation>
            <property>
                <name>idea.version</name>
            </property>
        </activation>

        <dependencies>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-scala_${scala.binary.version}</artifactId>
                <version>${flink.version}</version>
                <scope>compile</scope>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
                <version>${flink.version}</version>
                <scope>compile</scope>
            </dependency>
            <dependency>
                <groupId>org.scala-lang</groupId>
                <artifactId>scala-library</artifactId>
                <version>${scala.version}</version>
                <scope>compile</scope>
            </dependency>
        </dependencies>
    </profile>
</profiles>
複製代碼

在 id 爲 add-dependencies-for-IDEA 的 profile 中,全部的核心依賴都被標識爲 compile,此時你能夠無需改動任何代碼,只須要在 IDEA 的 Maven 面板中勾選該 profile,便可直接在 IDEA 中運行 Flink 項目:

https://github.com/heibaiying

4、詞頻統計案例

項目建立完成後,能夠先書寫一個簡單的詞頻統計的案例來嘗試運行 Flink 項目,如下以 Scala 語言爲例,分別介紹流處理程序和批處理程序的編程示例:

4.1 批處理示例

import org.apache.flink.api.scala._

object WordCountBatch {

  def main(args: Array[String]): Unit = {
    val benv = ExecutionEnvironment.getExecutionEnvironment
    val dataSet = benv.readTextFile("D:\\wordcount.txt")
    dataSet.flatMap { _.toLowerCase.split(",")}
            .filter (_.nonEmpty)
            .map { (_, 1) }
            .groupBy(0)
            .sum(1)
            .print()
  }
}
複製代碼

其中 wordcount.txt 中的內容以下:

a,a,a,a,a
b,b,b
c,c
d,d
複製代碼

本機不須要配置其餘任何的 Flink 環境,直接運行 Main 方法便可,結果以下:

https://github.com/heibaiying

4.2 流處理示例

import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time

object WordCountStreaming {

  def main(args: Array[String]): Unit = {

    val senv = StreamExecutionEnvironment.getExecutionEnvironment

    val dataStream: DataStream[String] = senv.socketTextStream("192.168.0.229", 9999, '\n')
    dataStream.flatMap { line => line.toLowerCase.split(",") }
              .filter(_.nonEmpty)
              .map { word => (word, 1) }
              .keyBy(0)
              .timeWindow(Time.seconds(3))
              .sum(1)
              .print()
    senv.execute("Streaming WordCount")
  }
}
複製代碼

這裏以監聽指定端口號上的內容爲例,使用如下命令來開啓端口服務:

nc -lk 9999
複製代碼

以後輸入測試數據便可觀察到流處理程序的處理狀況。

5、使用 Scala Shell

對於平常的 Demo 項目,若是你不想頻繁地啓動 IDEA 來觀察測試結果,能夠像 Spark 同樣,直接使用 Scala Shell 來運行程序,這對於平常的學習來講,效果更加直觀,也更省時。Flink 安裝包的下載地址以下:

https://flink.apache.org/downloads.html
複製代碼

Flink 大多數版本都提供有 Scala 2.11 和 Scala 2.12 兩個版本的安裝包可供下載:

https://github.com/heibaiying

下載完成後進行解壓便可,Scala Shell 位於安裝目錄的 bin 目錄下,直接使用如下命令便可以本地模式啓動:

./start-scala-shell.sh local
複製代碼

命令行啓動完成後,其已經提供了批處理 (benv 和 btenv)和流處理(senv 和 stenv)的運行環境,能夠直接運行 Scala Flink 程序,示例以下:

https://github.com/heibaiying

最後解釋一個常見的異常:這裏我使用的 Flink 版本爲 1.9.1,啓動時會拋出以下異常。這裏由於按照官方的說明,目前全部 Scala 2.12 版本的安裝包暫時都不支持 Scala Shell,因此若是想要使用 Scala Shell,只能選擇 Scala 2.11 版本的安裝包。

[root@hadoop001 bin]# ./start-scala-shell.sh local
錯誤: 找不到或沒法加載主類 org.apache.flink.api.scala.FlinkShell
複製代碼

更多大數據系列文章能夠參見 GitHub 開源項目大數據入門指南

相關文章
相關標籤/搜索