zk kafka mariadb scala flink integration

zk kafka mariadb scala flink integrationhtml

I do not want to write this paper in the beginning , put the codes onto github.com/git.jd.com, while there some errors since moved to jdd(jd finance) this month .
so in order to put the code in somewhere ,I started this paper .java

Here is the summary of this parpermysql

1.set the zk cluster on windows ,three
2.set the kafka cluster on windwos ,three too
3.create a maven scala project by ide
4.create a flink producer sink data to kafka topic named scalatopic  ,where create a source program by extends source api
5.create a flink consumer read data from kafka and sink to mariadb ,where create a sink program by extends sink apigit

Tips : about how to set zk and kafka cluster on windows ,there is a lot of materiel on internet ,it's a easy job will not show here againgithub

Script 1 : Here is a batch start or one button start for zk and kafka cluster  and
code :sql

d:
cd D:\works\JDD\jdd_coding\kafka_zk_cluster\zk_cluster\zookeeper-3.4.12-1\bin\
start cmd /k "zkServer.cmd"
cd D:\works\JDD\jdd_coding\kafka_zk_cluster\zk_cluster\zookeeper-3.4.12-2\bin\
start cmd /k "zkServer.cmd"
cd D:\works\JDD\jdd_coding\kafka_zk_cluster\zk_cluster\zookeeper-3.4.12-3\bin\
start cmd /k "zkServer.cmd"

ping -n 15 127.0.0.1>nul
cd D:\works\JDD\kafka_cluster\kafka-0\bin\windows\
start cmd /k "kafka-server-start.bat D:\works\JDD\kafka_cluster\kafka-0\config\server.properties"
cd D:\works\JDD\kafka_cluster\kafka-1\bin\windows\
start cmd /k "kafka-server-start.bat D:\works\JDD\kafka_cluster\kafka-1\config\server.properties"
cd D:\works\JDD\kafka_cluster\kafka-2\bin\windows\
start cmd /k "kafka-server-start.bat D:\works\JDD\kafka_cluster\kafka-2\config\server.properties"

Script 2 :KafkaProduce ,in this Script througth flink create a environmentapache

package com.ran.xiao.yun.zkkafflink

import com.ran.xiao.yun.jdd.SimpleStringGenerator
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer


//https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/connectors/kafka.html#kafka-100-connector
//https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/connectors/kafka.html

object KafkaProduce {

  def main(args: Array[String]): Unit = {
    val brokers = "localhost:9092,localhost:9093,localhost:9094"
    val topic = "ScalaTopic";

    val env = StreamExecutionEnvironment.getExecutionEnvironment()

    val myProducer = new FlinkKafkaProducer[String](
      brokers,         // broker list
      topic,               // target topic
      new SimpleStringSchema)

    myProducer.setWriteTimestampToKafka(true)
    var stream: DataStream[String] = env.addSource(new SimpleStringGenerator())  //define source and generate stream data to kafka
    stream.addSink(myProducer) //sink data to kafka
    env.execute()
    env.clean()

  }

}

Script 3:bootstrap

package com.ran.xiao.yun.jdd

import scala.util.Random
import org.apache.commons.lang3.RandomStringUtils
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.source.{RichParallelSourceFunction, SourceFunction}
import org.slf4j.{Logger, LoggerFactory}

/**
  * 自定義source flink 從mysql 中讀取數據 while here used random function , not from log or mysql db 
  * 其實主要就是一個while循環,而後collect一下,關閉的時候讓循環中止就行了。
  * 必須重寫的方法就是run和cancel,
  * open這個方法可重寫也能夠不重寫,若是有一些須要初始化的東西,也能夠放到這裏面。
  */
class SimpleStringGenerator extends RichParallelSourceFunction[String]{
  protected lazy val logger: Logger = LoggerFactory.getLogger(getClass.getName)
  private var getRuleDuration : Long = 0
  var isRunning = true

  override def open(parameters: Configuration): Unit = {
    print("starting....")
  }

  override def run(ctx: SourceFunction.SourceContext[String]): Unit = {
    logger.info("try to generate some value for example...")
    while(isRunning){
      var rand = new Random();
      var josn=rand.nextInt(10000000)+":"+ RandomStringUtils.randomAlphabetic(3)+":"+rand.nextInt(1000);
      ctx.collect("element-" + josn);   //why need to collect ,and how the SourceContext works 
      println(josn)
      logger.info("generate data from producer is successful...")
      Thread.sleep(6*1000)   // one minute is too long for a testing ,set to 6 seconds 
    }
  }

  override def cancel(): Unit = {
    isRunning = false
  }
}

Script 4: define a consuer write data to kafkawindows

package com.ran.xiao.yun.zkkafflink

import java.util.Properties

import com.ran.xiao.yun.jdd.MySQLSink
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer


//https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/connectors/kafka.html#kafka-100-connector
//define consumer

        object Kafka2MySQL {

          def main(args: Array[String]): Unit = {

            val properties = new Properties()
            import java.io.FileInputStream
            val prop = Kafka2MySQL.getClass.getClassLoader.getResourceAsStream("Config.properties")
            properties.load(prop)
            val kafkas=properties.getProperty("kafka.hosts")
            val toppic=properties.getProperty("kafka.topic")

            properties.setProperty("bootstrap.servers", kafkas)
            properties.setProperty("group.id", "test")

            val env = StreamExecutionEnvironment.getExecutionEnvironment()
            env.enableCheckpointing(7000)
            val myConsumer  = new FlinkKafkaConsumer[String](toppic, new SimpleStringSchema(), properties)   // what's this SimpleStringSchema very important
            //myConsumer.setStartFromEarliest()  //read data from the earliest
            myConsumer.setStartFromLatest()
            val stream = env.addSource(myConsumer)//.print()  //print collect client ,read data from kafka
            stream.print()
            stream.addSink(new MySQLSink())  //write data to mysql process 
            env.execute("starting")

          }

        }

Script 5:api

package com.ran.xiao.yun.jdd

import java.sql.{Connection, DriverManager, PreparedStatement}

import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
/*
flink 自定義sink
invoke,這個方法在每條數據進來的時候都會調用,把寫下游系統的邏輯寫到這裏面就好了;
open方法初始化下游系統接口實例;
cancel換成了close。來關閉下游系統的接口;
 */

class MySQLSink extends RichSinkFunction[String]{   //Tuple3<Integer,String,String> want to pass a tuple to the sink

  private var getRuleDuration : Long = 0
  var isRunning = true
  private var connection: Connection = null
  private var ps: PreparedStatement = null

  //here we try to connect to local mysql db ,so we do not connect to db each time in invoke function and close
  override def open(parameters: Configuration): Unit = {
    super.open(parameters)
    val driver = "org.mariadb.jdbc.Driver"   // there is a error on internet which wirite mariadb to maria 
    val dburlc = "jdbc:mariadb://localhost:3306/data_platformwarehouse" // jdbc:mysql://localhost:3306/data_platformwarehouse  works too
    val usrnam = "root"
    val passwd = "root"

    Class.forName(driver) //here we load the driver
    connection =DriverManager.getConnection(dburlc,usrnam,passwd) //create the connection
    val sql = "insert into flink_kafka_zk_scala(id,name,age) values(?,?,?)"   //generate the sql and pass the parameters from invoke function
    ps =  connection.prepareStatement(sql)

}

  override def invoke(str: String ): Unit = {   //type tuple3 take type parameter seem not okay
    //data like : element-id:sun:age

    val values = str.split("-"){1}.split(":")
    ps.setInt(1,Integer.parseInt(values{0}))
    ps.setString(2,values{1})
    ps.setInt(3,Integer.parseInt(values{2}))
    ps.executeUpdate()

  }

  override def close():Unit={

    super.close()
    if(connection!=null){
      connection.close()
    }
    if(ps !=null){
      ps.close()
    }

}

}

 

Tips for self
the relationship for zk and kafka :

Kafka使用zk的分佈式協調服務,將生產者,消費者,消息儲存(broker,用於存儲信息,消息讀寫等)結合在一塊兒。同時藉助zk,kafka可以將生產者,消費者和broker在內的全部組件在無狀態的條件下創建起生產者和消費者的訂閱關係,實現生產者的負載均衡。

1. broker在zk中註冊

kafka的每一個broker(至關於一個節點,至關於一個機器)在啓動時,都會在zk中註冊,告訴zk其brokerid,在整個的集羣中,broker.id/brokers/ids,當節點失效時,zk就會刪除該節點,就很方便的監控整個集羣broker的變化,及時調整負載均衡。

2. topic在zk中註冊

在kafka中能夠定義不少個topic,每一個topic又被分爲不少個分區。通常狀況下,每一個分區獨立在存在一個broker上,全部的這些topic和broker的對應關係都有zk進行維護

3. consumer(消費者)在zk中註冊

3.1     註冊新的消費者,當有新的消費者註冊到zk中,zk會建立專用的節點來保存相關信息,路徑ls /consumers/{group_id}/  [ids,owners,offset],Ids:記錄該消費分組有幾個正在消費的消費者,Owmners:記錄該消費分組消費的topic信息,Offset:記錄topic每一個分區中的每一個offset

3.2     監聽消費者分組中消費者的變化 ,監聽/consumers/{group_id}/ids的子節點的變化,一旦發現消費者新增或者減小及時調整消費者的負載均衡。 ---------------------

相關文章
相關標籤/搜索