Kafka讀取本地文件做爲生產者

package com.qf.utils;import kafka.javaapi.producer.Producer;import kafka.producer.KeyedMessage;import kafka.producer.ProducerConfig;import kafka.serializer.StringEncoder;import java.io.*;import java.util.Properties;public class CollectLog {   public static void main(String[] args){     Properties properties = new Properties();     properties.setProperty("metadata.broker.list",             "mini4:9092,mini5:9092,mini6:9092");     //消息傳遞到broker時的序列化方式      properties.setProperty("serializer.class",StringEncoder.class.getName());      //zk的地址      properties.setProperty("zookeeper.connect",              "mini4:2181,mini5:2181,mini6:2181");      //是否反饋消息 0是不反饋消息 1是反饋消息      properties.setProperty("request.required.acks","1");      ProducerConfig producerConfig = new ProducerConfig(properties);      Producer<String,String> producer = new Producer<String,String>(producerConfig);      try {         BufferedReader bf = new BufferedReader(                 new FileReader(                         new File(                                 "D:\\qf大數據\\spark\\day13_項目\\考試i\\JsonTest.json")));         String line = null;         while((line=bf.readLine())!=null){             KeyedMessage<String,String> keyedMessage = new KeyedMessage<String,String>("JsonData3",line);           Thread.sleep(5000);            producer.send(keyedMessage);         }         bf.close();         producer.close();         System.out.println("已經發送完畢");      } catch (Exception e) {         e.printStackTrace();      }   }}
相關文章
相關標籤/搜索