package test05import java.util.Propertiesimport org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}import org.apache.spark.rdd.RDDimport org.apache.spark.{SparkConf, SparkContext}object WriteDataToKafka { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("ReadS3LogToKafka").setMaster("local[*]") val sc = new SparkContext(conf) val logData:RDD[String] = sc.textFile("/Users/huiliyang/vwlog/") //logData.collect().foreach(println(_)) writeToKafkaTopic(logData,"192.168.1.112:9092","huiliyang") } //寫入數據到Kafka def writeToKafkaTopic(lines: RDD[String], kafkaServer: String, kafkaTopic: String): Unit ={ val props = new Properties() props.put("bootstrap.servers", kafkaServer) props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer") props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer") for (line <- lines) { val producer = new KafkaProducer[String, String](props) val record = new ProducerRecord(kafkaTopic, "key", line) producer.send(record) //Thread.sleep(10000) producer.close() } }}