將數據寫到kafka的topic

package test05import java.util.Propertiesimport org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}import org.apache.spark.rdd.RDDimport org.apache.spark.{SparkConf, SparkContext}object WriteDataToKafka {  def main(args: Array[String]): Unit = {    val conf = new SparkConf().setAppName("ReadS3LogToKafka").setMaster("local[*]")    val sc = new SparkContext(conf)    val logData:RDD[String] = sc.textFile("/Users/huiliyang/vwlog/")    //logData.collect().foreach(println(_))    writeToKafkaTopic(logData,"192.168.1.112:9092","huiliyang")  }  //寫入數據到Kafka  def writeToKafkaTopic(lines: RDD[String], kafkaServer: String, kafkaTopic: String): Unit ={    val props = new Properties()    props.put("bootstrap.servers", kafkaServer)    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")    for (line <- lines) {      val producer = new KafkaProducer[String, String](props)      val record = new ProducerRecord(kafkaTopic, "key", line)      producer.send(record)      //Thread.sleep(10000)      producer.close()    }  }}
相關文章
相關標籤/搜索