zk kafka mariadb scala flink integrationhtml
I do not want to write this paper in the beginning , put the codes onto github.com/git.jd.com, while there some errors since moved to jdd(jd finance) this month .
so in order to put the code in somewhere ,I started this paper .java
Here is the summary of this parpermysql
1.set the zk cluster on windows ,three
2.set the kafka cluster on windwos ,three too
3.create a maven scala project by ide
4.create a flink producer sink data to kafka topic named scalatopic ,where create a source program by extends source api
5.create a flink consumer read data from kafka and sink to mariadb ,where create a sink program by extends sink apigit
Tips : about how to set zk and kafka cluster on windows ,there is a lot of materiel on internet ,it's a easy job will not show here againgithub
Script 1 : Here is a batch start or one button start for zk and kafka cluster and
code :sql
d: cd D:\works\JDD\jdd_coding\kafka_zk_cluster\zk_cluster\zookeeper-3.4.12-1\bin\ start cmd /k "zkServer.cmd" cd D:\works\JDD\jdd_coding\kafka_zk_cluster\zk_cluster\zookeeper-3.4.12-2\bin\ start cmd /k "zkServer.cmd" cd D:\works\JDD\jdd_coding\kafka_zk_cluster\zk_cluster\zookeeper-3.4.12-3\bin\ start cmd /k "zkServer.cmd" ping -n 15 127.0.0.1>nul cd D:\works\JDD\kafka_cluster\kafka-0\bin\windows\ start cmd /k "kafka-server-start.bat D:\works\JDD\kafka_cluster\kafka-0\config\server.properties" cd D:\works\JDD\kafka_cluster\kafka-1\bin\windows\ start cmd /k "kafka-server-start.bat D:\works\JDD\kafka_cluster\kafka-1\config\server.properties" cd D:\works\JDD\kafka_cluster\kafka-2\bin\windows\ start cmd /k "kafka-server-start.bat D:\works\JDD\kafka_cluster\kafka-2\config\server.properties"
Script 2 :KafkaProduce ,in this Script througth flink create a environmentapache
package com.ran.xiao.yun.zkkafflink import com.ran.xiao.yun.jdd.SimpleStringGenerator import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.streaming.api.datastream.DataStream import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer //https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/connectors/kafka.html#kafka-100-connector //https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/connectors/kafka.html object KafkaProduce { def main(args: Array[String]): Unit = { val brokers = "localhost:9092,localhost:9093,localhost:9094" val topic = "ScalaTopic"; val env = StreamExecutionEnvironment.getExecutionEnvironment() val myProducer = new FlinkKafkaProducer[String]( brokers, // broker list topic, // target topic new SimpleStringSchema) myProducer.setWriteTimestampToKafka(true) var stream: DataStream[String] = env.addSource(new SimpleStringGenerator()) //define source and generate stream data to kafka stream.addSink(myProducer) //sink data to kafka env.execute() env.clean() } }
Script 3:bootstrap
package com.ran.xiao.yun.jdd import scala.util.Random import org.apache.commons.lang3.RandomStringUtils import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.functions.source.{RichParallelSourceFunction, SourceFunction} import org.slf4j.{Logger, LoggerFactory} /** * 自定義source flink 從mysql 中讀取數據 while here used random function , not from log or mysql db * 其實主要就是一個while循環,而後collect一下,關閉的時候讓循環中止就行了。 * 必須重寫的方法就是run和cancel, * open這個方法可重寫也能夠不重寫,若是有一些須要初始化的東西,也能夠放到這裏面。 */ class SimpleStringGenerator extends RichParallelSourceFunction[String]{ protected lazy val logger: Logger = LoggerFactory.getLogger(getClass.getName) private var getRuleDuration : Long = 0 var isRunning = true override def open(parameters: Configuration): Unit = { print("starting....") } override def run(ctx: SourceFunction.SourceContext[String]): Unit = { logger.info("try to generate some value for example...") while(isRunning){ var rand = new Random(); var josn=rand.nextInt(10000000)+":"+ RandomStringUtils.randomAlphabetic(3)+":"+rand.nextInt(1000); ctx.collect("element-" + josn); //why need to collect ,and how the SourceContext works println(josn) logger.info("generate data from producer is successful...") Thread.sleep(6*1000) // one minute is too long for a testing ,set to 6 seconds } } override def cancel(): Unit = { isRunning = false } }
Script 4: define a consuer write data to kafkawindows
package com.ran.xiao.yun.zkkafflink import java.util.Properties import com.ran.xiao.yun.jdd.MySQLSink import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer //https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/connectors/kafka.html#kafka-100-connector //define consumer object Kafka2MySQL { def main(args: Array[String]): Unit = { val properties = new Properties() import java.io.FileInputStream val prop = Kafka2MySQL.getClass.getClassLoader.getResourceAsStream("Config.properties") properties.load(prop) val kafkas=properties.getProperty("kafka.hosts") val toppic=properties.getProperty("kafka.topic") properties.setProperty("bootstrap.servers", kafkas) properties.setProperty("group.id", "test") val env = StreamExecutionEnvironment.getExecutionEnvironment() env.enableCheckpointing(7000) val myConsumer = new FlinkKafkaConsumer[String](toppic, new SimpleStringSchema(), properties) // what's this SimpleStringSchema very important //myConsumer.setStartFromEarliest() //read data from the earliest myConsumer.setStartFromLatest() val stream = env.addSource(myConsumer)//.print() //print collect client ,read data from kafka stream.print() stream.addSink(new MySQLSink()) //write data to mysql process env.execute("starting") } }
Script 5:api
package com.ran.xiao.yun.jdd import java.sql.{Connection, DriverManager, PreparedStatement} import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.functions.sink.RichSinkFunction /* flink 自定義sink invoke,這個方法在每條數據進來的時候都會調用,把寫下游系統的邏輯寫到這裏面就好了; open方法初始化下游系統接口實例; cancel換成了close。來關閉下游系統的接口; */ class MySQLSink extends RichSinkFunction[String]{ //Tuple3<Integer,String,String> want to pass a tuple to the sink private var getRuleDuration : Long = 0 var isRunning = true private var connection: Connection = null private var ps: PreparedStatement = null //here we try to connect to local mysql db ,so we do not connect to db each time in invoke function and close override def open(parameters: Configuration): Unit = { super.open(parameters) val driver = "org.mariadb.jdbc.Driver" // there is a error on internet which wirite mariadb to maria val dburlc = "jdbc:mariadb://localhost:3306/data_platformwarehouse" // jdbc:mysql://localhost:3306/data_platformwarehouse works too val usrnam = "root" val passwd = "root" Class.forName(driver) //here we load the driver connection =DriverManager.getConnection(dburlc,usrnam,passwd) //create the connection val sql = "insert into flink_kafka_zk_scala(id,name,age) values(?,?,?)" //generate the sql and pass the parameters from invoke function ps = connection.prepareStatement(sql) } override def invoke(str: String ): Unit = { //type tuple3 take type parameter seem not okay //data like : element-id:sun:age val values = str.split("-"){1}.split(":") ps.setInt(1,Integer.parseInt(values{0})) ps.setString(2,values{1}) ps.setInt(3,Integer.parseInt(values{2})) ps.executeUpdate() } override def close():Unit={ super.close() if(connection!=null){ connection.close() } if(ps !=null){ ps.close() } } }
Tips for self
the relationship for zk and kafka :
Kafka使用zk的分佈式協調服務,將生產者,消費者,消息儲存(broker,用於存儲信息,消息讀寫等)結合在一塊兒。同時藉助zk,kafka可以將生產者,消費者和broker在內的全部組件在無狀態的條件下創建起生產者和消費者的訂閱關係,實現生產者的負載均衡。
1. broker在zk中註冊
kafka的每一個broker(至關於一個節點,至關於一個機器)在啓動時,都會在zk中註冊,告訴zk其brokerid,在整個的集羣中,broker.id/brokers/ids,當節點失效時,zk就會刪除該節點,就很方便的監控整個集羣broker的變化,及時調整負載均衡。
2. topic在zk中註冊
在kafka中能夠定義不少個topic,每一個topic又被分爲不少個分區。通常狀況下,每一個分區獨立在存在一個broker上,全部的這些topic和broker的對應關係都有zk進行維護
3. consumer(消費者)在zk中註冊
3.1 註冊新的消費者,當有新的消費者註冊到zk中,zk會建立專用的節點來保存相關信息,路徑ls /consumers/{group_id}/ [ids,owners,offset],Ids:記錄該消費分組有幾個正在消費的消費者,Owmners:記錄該消費分組消費的topic信息,Offset:記錄topic每一個分區中的每一個offset
3.2 監聽消費者分組中消費者的變化 ,監聽/consumers/{group_id}/ids的子節點的變化,一旦發現消費者新增或者減小及時調整消費者的負載均衡。 ---------------------