1、啓動Kafka集羣和flink集羣css
- 環境變量配置(注:kafka 3臺都須要設置,flink僅master設置就好)
[root@master ~]
配置完執行命令:html
[root@master ~]
2.建立執行文件,添加啓動服務java
-
-
-
zookeeper-server-start.sh
-
-daemon $KAFKA_HOME/config/zookeeper.properties &
-
kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties &
-
[root@master ~]#
vim
start_flink
.sh
-
-
3.分別啓動kafka集羣apache
因爲kafka集羣依賴於zookeeper集羣,因此kafka提供了經過kafka去啓動zookeeper集羣的功能bootstrap
[root@master ~]
4.master啓動flink集羣vim
[root@master ~]
5.驗證:進程及WebUIcentos
(1)進程api
-
-
-
-
-
-
-
(2)WebUI服務器
輸入:ip:8081app
2、編寫Flink程序,實現consume kafka的數據
1.代碼前的操做及配置
使用idea建立maven建立工程前準備:
Jdk(1.8.0_181)
Scala plugin for IDEA(在IDEA中下載)
Maven(3.5.3)
Scala的jar包(2.11.0)
(1)打開IDEA軟件
(2)更改字體(非必須)
導航欄:File—->settings—->appearance&behavior—->appeareance—>override default fonts by(not recommended)
編輯器:file—–>settings—–>editor—->colors&fonts—–>font
控制檯:file—–>settings—–>editor—->colors&fonts—–>font—->console font
(3)下載scala for intellij idea的插件(如有則跳過此步)
Flie->settings->plugins
點擊下載安裝插件,而後重啓Intellij IDEA。
(4)使用"new project"建立一個帶scala的maven工程
(5)指定程序的groupId和artifactId
(6)指定程序的工程名和路徑
(7)更換下載源(根據須要)
安裝路徑下更改plugins\maven\lib\maven3\conf\settings.xml
而後找到mirrors標籤替換便可,瞬間滿速下載jar
-
-
-
<name>aliyun maven
</name>
-
<url>http://maven.aliyun.com/nexus/content/groups/public/
</url>
-
<mirrorOf>central
</mirrorOf>
-
(8)pom.xml配置(主要添加依賴和將項目打成jar包的插件),添加如下依賴:
添加的依賴:
groupId |
artifactId |
version |
org.apache.flink |
flink-core |
1.3.2 |
org.apache.flink |
flink-connector-kafka-0.10_2.11 |
1.3.2 |
org.apache.kafka |
kafka_2.11 |
0.10.2.0 |
org.apache.flink |
flink-streaming-java_2.11 |
1.3.2 |
添加的插件:
groupId |
artifactId |
version |
org.apache.maven.plugins |
maven-assembly-plugin |
2.4.1 |
具體配置以下:(注意修改maven-assembly-plugin的mainClass爲本身主類的路徑)
-
-
xmlns=
"http://maven.apache.org/POM/4.0.0"
xmlns:xsi=
"http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation=
"http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
-
<modelVersion>4.0.0
</modelVersion>
-
<groupId>com.wugenqiang.flink
</groupId>
-
<artifactId>flink_kafka
</artifactId>
-
<version>1.0-SNAPSHOT
</version>
-
<inceptionYear>2008
</inceptionYear>
-
-
<scala.version>2.11.8
</scala.version>
-
-
-
-
-
<id>scala-tools.org
</id>
-
<name>Scala-Tools Maven2 Repository
</name>
-
<url>http://scala-tools.org/repo-releases
</url>
-
-
-
-
-
-
<id>scala-tools.org
</id>
-
<name>Scala-Tools Maven2 Repository
</name>
-
<url>http://scala-tools.org/repo-releases
</url>
-
-
-
-
-
-
<groupId>org.scala-lang
</groupId>
-
<artifactId>scala-library
</artifactId>
-
<version>${scala.version}
</version>
-
-
-
<groupId>junit
</groupId>
-
<artifactId>junit
</artifactId>
-
-
-
-
-
<groupId>org.specs
</groupId>
-
<artifactId>specs
</artifactId>
-
<version>1.2.5
</version>
-
-
-
-
-
<groupId>org.apache.flink
</groupId>
-
<artifactId>flink-core
</artifactId>
-
<version>1.3.2
</version>
-
-
-
-
-
-
<groupId>org.apache.flink
</groupId>
-
<artifactId>flink-connector-kafka-0.10_2.11
</artifactId>
-
<version>1.3.2
</version>
-
-
-
-
-
-
<groupId>org.apache.kafka
</groupId>
-
<artifactId>kafka_2.11
</artifactId>
-
<version>0.10.2.0
</version>
-
-
-
-
-
-
<groupId>org.apache.flink
</groupId>
-
<artifactId>flink-streaming-java_2.11
</artifactId>
-
<version>1.3.2
</version>
-
-
-
-
-
-
-
<sourceDirectory>src/main/scala
</sourceDirectory>
-
<testSourceDirectory>src/test/scala
</testSourceDirectory>
-
-
-
<groupId>org.apache.maven.plugins
</groupId>
-
<artifactId>maven-compiler-plugin
</artifactId>
-
-
-
-
-
-
-
<groupId>org.apache.maven.plugins
</groupId>
-
<artifactId>maven-jar-plugin
</artifactId>
-
-
-
-
<addClasspath>true
</addClasspath>
-
<useUniqueVersions>false
</useUniqueVersions>
-
<classpathPrefix>lib/
</classpathPrefix>
-
<mainClass>com.wugenqiang.test.ReadingFromKafka
</mainClass>
-
-
-
-
-
-
<groupId>org.scala-tools
</groupId>
-
<artifactId>maven-scala-plugin
</artifactId>
-
-
-
-
-
<goal>testCompile
</goal>
-
-
-
-
-
<scalaVersion>${scala.version}
</scalaVersion>
-
-
<arg>-target:jvm-1.5
</arg>
-
-
-
-
-
<groupId>org.apache.maven.plugins
</groupId>
-
<artifactId>maven-eclipse-plugin
</artifactId>
-
-
<downloadSources>true
</downloadSources>
-
-
<buildcommand>ch.epfl.lamp.sdt.core.scalabuilder
</buildcommand>
-
-
<additionalProjectnatures>
-
<projectnature>ch.epfl.lamp.sdt.core.scalanature
</projectnature>
-
</additionalProjectnatures>
-
-
<classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER
</classpathContainer>
-
<classpathContainer>ch.epfl.lamp.sdt.launching.SCALA_CONTAINER
</classpathContainer>
-
-
-
-
-
<groupId>org.apache.maven.plugins
</groupId>
-
<artifactId>maven-assembly-plugin
</artifactId>
-
<version>2.4.1
</version>
-
-
-
-
<descriptorRef>jar-with-dependencies
</descriptorRef>
-
-
-
-
-
<mainClass>com.wugenqiang.flink.ReadingFromKafka
</mainClass>
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
<groupId>org.scala-tools
</groupId>
-
<artifactId>maven-scala-plugin
</artifactId>
-
-
<scalaVersion>${scala.version}
</scalaVersion>
-
-
-
-
-
2.正式開始,編寫Flink程序,實現consume kafka的數據
(1)在scala文件夾下建立scala類
(2)編寫flink讀取kafka數據的代碼
這裏就是簡單的實現接收kafka的數據,要指定zookeeper以及kafka的集羣配置,並指定topic的名字。
最後將consume的數據直接打印出來。
-
package com.wugenqiang.flink
-
-
import java.util.Properties
-
-
import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic}
-
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08
-
import org.apache.flink.streaming.util.serialization.SimpleStringSchema
-
import org.apache.flink.streaming.api.scala._
-
-
-
-
-
object
ReadingFromKafka {
-
-
private val
ZOOKEEPER_HOST =
"master:2181,slave1:2181,slave2:2181"
-
private val
KAFKA_BROKER =
"master:9092,slave1:9092,slave2:9092"
-
private val
TRANSACTION_GROUP =
"com.wugenqiang.flink"
-
-
def main(args :
Array[
String]){
-
val env =
StreamExecutionEnvironment.getExecutionEnvironment
-
env.setStreamTimeCharacteristic(
TimeCharacteristic.
EventTime)
-
env.enableCheckpointing(
1000)
-
env.getCheckpointConfig.setCheckpointingMode(
CheckpointingMode.
EXACTLY_ONCE)
-
-
-
val kafkaProps = new
Properties()
-
kafkaProps.setProperty(
"zookeeper.connect",
ZOOKEEPER_HOST)
-
kafkaProps.setProperty(
"bootstrap.servers",
KAFKA_BROKER)
-
kafkaProps.setProperty(
"group.id",
TRANSACTION_GROUP)
-
-
-
-
-
new
FlinkKafkaConsumer08[
String](
"mastertest", new
SimpleStringSchema(), kafkaProps)
-
-
-
-
-
-
-
-
-
(3)編譯測試
3.生成kafka到flink的鏈接jar包
(1)找窗口右邊的Maven Projects選項,,點擊Lifecycle,再選擇打包package(如需從新打包先clean,再package),
(2)成功code爲0,項目目錄會生成target目錄,裏面有打好的jar包
4.驗證jar包是否能夠將kafka數據傳輸給flink
(1)將jar包傳輸進centos中指定目錄下(好比說:/root,接下來操做在此目錄下完成)
(2)kafka生產數據
命令行輸入(集羣和topic根據實際修改):
[root@master ~]# kafka-console-producer.sh --broker-list master:9092,slave1:9092,slave2:9092 --topic mastertest
(3)flink運行jar進行鏈接消費kafka數據
(根據實際修改:com.wugenqiang.test.ReadingFromKafka(mainclass名)
root/flink_kafka-1.0-SNAPSHOT-jar-with-dependencies.jar(存路徑jar名))
[root@master ~]
(4)打開網址ip:8081查看是否正常啓動運行
(5)查看flink的標準輸出,驗證是否正常消費
到taskmanager節點上查看,根據上一步知道所在服務器,在taskmanager工做的服務器上執行命令操做:
-
-
注:第(2)步輸入kafka生產數據,第(5)步接收flink消費數據日誌反饋
到此,數據從kafka到flink傳輸任務完成···