flink消費kafka的數據

1、啓動Kafka集羣和flink集羣css

  1. 環境變量配置(注:kafka 3臺都須要設置,flink僅master設置就好)
[root@master ~]# vim /etc/profile

配置完執行命令:html

[root@master ~]# source /etc/profile

2.建立執行文件,添加啓動服務java

  1.  
    [root@master ~] # vim start_kafka.sh
  2.  
    添加(注: 3臺都須要設置):
  3.  
    zookeeper-server-start.sh
  4.  
    -daemon $KAFKA_HOME/config/zookeeper.properties &
  5.  
    kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties &
  1.  
    [root@master ~]# vim start_flink .sh
  2.  
    添加(僅 master建立便可):
  3.  
    start-cluster .sh

3.分別啓動kafka集羣apache

因爲kafka集羣依賴於zookeeper集羣,因此kafka提供了經過kafka去啓動zookeeper集羣的功能bootstrap

[root@master ~]# ./start_kafka.sh

4.master啓動flink集羣vim

[root@master ~]# ./start_flink.sh

5.驗證:進程及WebUIcentos

(1)進程api

  1.  
    [root@master ~] # jps
  2.  
    1488 QuorumPeerMain
  3.  
    2945 Kafka
  4.  
    1977 SecondaryNameNode
  5.  
    2505 JobManager
  6.  
    1900 NameNode
  7.  
    2653 Jps

(2)WebUI服務器

輸入:ip:8081app

2、編寫Flink程序,實現consume kafka的數據

1.代碼前的操做及配置

使用idea建立maven建立工程前準備:

Jdk(1.8.0_181)

Scala plugin for IDEA(在IDEA中下載)

Maven(3.5.3)

Scala的jar包(2.11.0)

(1)打開IDEA軟件

(2)更改字體(非必須)

導航欄:File—->settings—->appearance&behavior—->appeareance—>override default fonts by(not recommended) 

編輯器:file—–>settings—–>editor—->colors&fonts—–>font

控制檯:file—–>settings—–>editor—->colors&fonts—–>font—->console font

(3)下載scala for intellij idea的插件(如有則跳過此步)

Flie->settings->plugins

點擊下載安裝插件,而後重啓Intellij IDEA。

(4)使用"new project"建立一個帶scala的maven工程

(5)指定程序的groupId和artifactId

(6)指定程序的工程名和路徑

(7)更換下載源(根據須要)

安裝路徑下更改plugins\maven\lib\maven3\conf\settings.xml

而後找到mirrors標籤替換便可,瞬間滿速下載jar

  1.  
    <mirror>
  2.  
    <id>alimaven </id>
  3.  
    <name>aliyun maven </name>
  4.  
    <url>http://maven.aliyun.com/nexus/content/groups/public/ </url>
  5.  
    <mirrorOf>central </mirrorOf>
  6.  
    </mirror>

(8)pom.xml配置(主要添加依賴和將項目打成jar包的插件),添加如下依賴:

添加的依賴:

groupId

artifactId

version

org.apache.flink

flink-core

1.3.2

org.apache.flink

flink-connector-kafka-0.10_2.11

1.3.2

org.apache.kafka

kafka_2.11

0.10.2.0

org.apache.flink

flink-streaming-java_2.11

1.3.2

添加的插件:

groupId

artifactId

version

org.apache.maven.plugins

maven-assembly-plugin

2.4.1

具體配置以下:(注意修改maven-assembly-plugin的mainClass爲本身主類的路徑)

  1.  
    <project
  2.  
    xmlns= "http://maven.apache.org/POM/4.0.0" xmlns:xsi= "http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation= "http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
  3.  
    <modelVersion>4.0.0 </modelVersion>
  4.  
    <groupId>com.wugenqiang.flink </groupId>
  5.  
    <artifactId>flink_kafka </artifactId>
  6.  
    <version>1.0-SNAPSHOT </version>
  7.  
    <inceptionYear>2008 </inceptionYear>
  8.  
    <properties>
  9.  
    <scala.version>2.11.8 </scala.version>
  10.  
    </properties>
  11.  
     
  12.  
    <repositories>
  13.  
    <repository>
  14.  
    <id>scala-tools.org </id>
  15.  
    <name>Scala-Tools Maven2 Repository </name>
  16.  
    <url>http://scala-tools.org/repo-releases </url>
  17.  
    </repository>
  18.  
    </repositories>
  19.  
     
  20.  
    <pluginRepositories>
  21.  
    <pluginRepository>
  22.  
    <id>scala-tools.org </id>
  23.  
    <name>Scala-Tools Maven2 Repository </name>
  24.  
    <url>http://scala-tools.org/repo-releases </url>
  25.  
    </pluginRepository>
  26.  
    </pluginRepositories>
  27.  
     
  28.  
    <dependencies>
  29.  
    <dependency>
  30.  
    <groupId>org.scala-lang </groupId>
  31.  
    <artifactId>scala-library </artifactId>
  32.  
    <version>${scala.version} </version>
  33.  
    </dependency>
  34.  
    <dependency>
  35.  
    <groupId>junit </groupId>
  36.  
    <artifactId>junit </artifactId>
  37.  
    <version>4.4 </version>
  38.  
    <scope>test </scope>
  39.  
    </dependency>
  40.  
    <dependency>
  41.  
    <groupId>org.specs </groupId>
  42.  
    <artifactId>specs </artifactId>
  43.  
    <version>1.2.5 </version>
  44.  
    <scope>test </scope>
  45.  
    </dependency>
  46.  
     
  47.  
    <dependency>
  48.  
    <groupId>org.apache.flink </groupId>
  49.  
    <artifactId>flink-core </artifactId>
  50.  
    <version>1.3.2 </version>
  51.  
    <scope>compile </scope>
  52.  
    </dependency>
  53.  
     
  54.  
    <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka-0.10_2.11 -->
  55.  
    <dependency>
  56.  
    <groupId>org.apache.flink </groupId>
  57.  
    <artifactId>flink-connector-kafka-0.10_2.11 </artifactId>
  58.  
    <version>1.3.2 </version>
  59.  
    <scope> compile </scope>
  60.  
    </dependency>
  61.  
     
  62.  
    <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka_2.11 -->
  63.  
    <dependency>
  64.  
    <groupId>org.apache.kafka </groupId>
  65.  
    <artifactId>kafka_2.11 </artifactId>
  66.  
    <version>0.10.2.0 </version>
  67.  
    <scope>compile </scope>
  68.  
    </dependency>
  69.  
     
  70.  
    <!-- flink-streaming的jar包,2.11爲scala版本號 -->
  71.  
    <dependency>
  72.  
    <groupId>org.apache.flink </groupId>
  73.  
    <artifactId>flink-streaming-java_2.11 </artifactId>
  74.  
    <version>1.3.2 </version>
  75.  
    <scope> compile </scope>
  76.  
    </dependency>
  77.  
     
  78.  
    </dependencies>
  79.  
     
  80.  
    <build>
  81.  
    <sourceDirectory>src/main/scala </sourceDirectory>
  82.  
    <testSourceDirectory>src/test/scala </testSourceDirectory>
  83.  
    <plugins>
  84.  
    <plugin>
  85.  
    <groupId>org.apache.maven.plugins </groupId>
  86.  
    <artifactId>maven-compiler-plugin </artifactId>
  87.  
    <configuration>
  88.  
    <source>1.8 </source>
  89.  
    <target>1.8 </target>
  90.  
    </configuration>
  91.  
    </plugin>
  92.  
    <plugin>
  93.  
    <groupId>org.apache.maven.plugins </groupId>
  94.  
    <artifactId>maven-jar-plugin </artifactId>
  95.  
    <configuration>
  96.  
    <archive>
  97.  
    <manifest>
  98.  
    <addClasspath>true </addClasspath>
  99.  
    <useUniqueVersions>false </useUniqueVersions>
  100.  
    <classpathPrefix>lib/ </classpathPrefix>
  101.  
    <mainClass>com.wugenqiang.test.ReadingFromKafka </mainClass>
  102.  
    </manifest>
  103.  
    </archive>
  104.  
    </configuration>
  105.  
    </plugin>
  106.  
    <plugin>
  107.  
    <groupId>org.scala-tools </groupId>
  108.  
    <artifactId>maven-scala-plugin </artifactId>
  109.  
    <executions>
  110.  
    <execution>
  111.  
    <goals>
  112.  
    <goal>compile </goal>
  113.  
    <goal>testCompile </goal>
  114.  
    </goals>
  115.  
    </execution>
  116.  
    </executions>
  117.  
    <configuration>
  118.  
    <scalaVersion>${scala.version} </scalaVersion>
  119.  
    <args>
  120.  
    <arg>-target:jvm-1.5 </arg>
  121.  
    </args>
  122.  
    </configuration>
  123.  
    </plugin>
  124.  
    <plugin>
  125.  
    <groupId>org.apache.maven.plugins </groupId>
  126.  
    <artifactId>maven-eclipse-plugin </artifactId>
  127.  
    <configuration>
  128.  
    <downloadSources>true </downloadSources>
  129.  
    <buildcommands>
  130.  
    <buildcommand>ch.epfl.lamp.sdt.core.scalabuilder </buildcommand>
  131.  
    </buildcommands>
  132.  
    <additionalProjectnatures>
  133.  
    <projectnature>ch.epfl.lamp.sdt.core.scalanature </projectnature>
  134.  
    </additionalProjectnatures>
  135.  
    <classpathContainers>
  136.  
    <classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER </classpathContainer>
  137.  
    <classpathContainer>ch.epfl.lamp.sdt.launching.SCALA_CONTAINER </classpathContainer>
  138.  
    </classpathContainers>
  139.  
    </configuration>
  140.  
    </plugin>
  141.  
    <plugin>
  142.  
    <groupId>org.apache.maven.plugins </groupId>
  143.  
    <artifactId>maven-assembly-plugin </artifactId>
  144.  
    <version>2.4.1 </version>
  145.  
    <configuration>
  146.  
    <!-- get all project dependencies -->
  147.  
    <descriptorRefs>
  148.  
    <descriptorRef>jar-with-dependencies </descriptorRef>
  149.  
    </descriptorRefs>
  150.  
    <!-- MainClass in mainfest make a executable jar -->
  151.  
    <archive>
  152.  
    <manifest>
  153.  
    <mainClass>com.wugenqiang.flink.ReadingFromKafka </mainClass>
  154.  
    </manifest>
  155.  
    </archive>
  156.  
     
  157.  
    </configuration>
  158.  
    <executions>
  159.  
    <execution>
  160.  
    <id>make-assembly </id>
  161.  
    <!-- bind to the packaging phase -->
  162.  
    <phase>package </phase>
  163.  
    <goals>
  164.  
    <goal>single </goal>
  165.  
    </goals>
  166.  
    </execution>
  167.  
    </executions>
  168.  
    </plugin>
  169.  
     
  170.  
    </plugins>
  171.  
    </build>
  172.  
    <reporting>
  173.  
    <plugins>
  174.  
    <plugin>
  175.  
    <groupId>org.scala-tools </groupId>
  176.  
    <artifactId>maven-scala-plugin </artifactId>
  177.  
    <configuration>
  178.  
    <scalaVersion>${scala.version} </scalaVersion>
  179.  
    </configuration>
  180.  
    </plugin>
  181.  
    </plugins>
  182.  
    </reporting>
  183.  
    </project>

2.正式開始,編寫Flink程序,實現consume kafka的數據

(1)在scala文件夾下建立scala類

(2)編寫flink讀取kafka數據的代碼

這裏就是簡單的實現接收kafka的數據,要指定zookeeper以及kafka的集羣配置,並指定topic的名字。

最後將consume的數據直接打印出來。

  1.  
    package com.wugenqiang.flink
  2.  
     
  3.  
    import java.util.Properties
  4.  
     
  5.  
    import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic}
  6.  
    import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
  7.  
    import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08
  8.  
    import org.apache.flink.streaming.util.serialization.SimpleStringSchema
  9.  
    import org.apache.flink.streaming.api.scala._
  10.  
     
  11.  
    /**
  12.  
    * 用Flink消費kafka
  13.  
    */
  14.  
    object ReadingFromKafka {
  15.  
     
  16.  
    private val ZOOKEEPER_HOST = "master:2181,slave1:2181,slave2:2181"
  17.  
    private val KAFKA_BROKER = "master:9092,slave1:9092,slave2:9092"
  18.  
    private val TRANSACTION_GROUP = "com.wugenqiang.flink"
  19.  
     
  20.  
    def main(args : Array[ String]){
  21.  
    val env = StreamExecutionEnvironment.getExecutionEnvironment
  22.  
    env.setStreamTimeCharacteristic( TimeCharacteristic. EventTime)
  23.  
    env.enableCheckpointing( 1000)
  24.  
    env.getCheckpointConfig.setCheckpointingMode( CheckpointingMode. EXACTLY_ONCE)
  25.  
     
  26.  
    // configure Kafka consumer
  27.  
    val kafkaProps = new Properties()
  28.  
    kafkaProps.setProperty( "zookeeper.connect", ZOOKEEPER_HOST)
  29.  
    kafkaProps.setProperty( "bootstrap.servers", KAFKA_BROKER)
  30.  
    kafkaProps.setProperty( "group.id", TRANSACTION_GROUP)
  31.  
     
  32.  
    //topicd的名字是new,schema默認使用SimpleStringSchema()便可
  33.  
    val transaction = env
  34.  
    .addSource(
  35.  
    new FlinkKafkaConsumer08[ String]( "mastertest", new SimpleStringSchema(), kafkaProps)
  36.  
    )
  37.  
     
  38.  
    transaction. print()
  39.  
     
  40.  
    env.execute()
  41.  
     
  42.  
    }
  43.  
     
  44.  
    }

(3)編譯測試

3.生成kafka到flink的鏈接jar包

(1)找窗口右邊的Maven Projects選項,,點擊Lifecycle,再選擇打包package(如需從新打包先clean,再package),

(2)成功code爲0,項目目錄會生成target目錄,裏面有打好的jar包

4.驗證jar包是否能夠將kafka數據傳輸給flink

(1)將jar包傳輸進centos中指定目錄下(好比說:/root,接下來操做在此目錄下完成)

(2)kafka生產數據

命令行輸入(集羣和topic根據實際修改):

[root@master ~]# kafka-console-producer.sh --broker-list master:9092,slave1:9092,slave2:9092 --topic mastertest

(3)flink運行jar進行鏈接消費kafka數據

(根據實際修改:com.wugenqiang.test.ReadingFromKafka(mainclass名)

root/flink_kafka-1.0-SNAPSHOT-jar-with-dependencies.jar(存路徑jar名))

[root@master ~]# flink run -c com.wugenqiang.test.ReadingFromKafka /root/flink_kafka-1.0-SNAPSHOT-jar-with-dependencies.jar

(4)打開網址ip:8081查看是否正常啓動運行

(5)查看flink的標準輸出,驗證是否正常消費

到taskmanager節點上查看,根據上一步知道所在服務器,在taskmanager工做的服務器上執行命令操做:

  1.  
    [root@slave1 ~] # cd /opt/flink-1.3.2/log/
  2.  
    [root@slave1 log] # tail -F flink-root-taskmanager-0-master.*

注:第(2)步輸入kafka生產數據,第(5)步接收flink消費數據日誌反饋

到此,數據從kafka到flink傳輸任務完成···

相關文章
相關標籤/搜索