package com.qf.utils;import kafka.javaapi.producer.Producer;import kafka.producer.KeyedMessage;import kafka.producer.ProducerConfig;import kafka.serializer.StringEncoder;import java.io.*;import java.util.Properties;public class CollectLog { public static void main(String[] args){ Properties properties = new Properties(); properties.setProperty("metadata.broker.list", "mini4:9092,mini5:9092,mini6:9092"); //消息傳遞到broker時的序列化方式 properties.setProperty("serializer.class",StringEncoder.class.getName()); //zk的地址 properties.setProperty("zookeeper.connect", "mini4:2181,mini5:2181,mini6:2181"); //是否反饋消息 0是不反饋消息 1是反饋消息 properties.setProperty("request.required.acks","1"); ProducerConfig producerConfig = new ProducerConfig(properties); Producer<String,String> producer = new Producer<String,String>(producerConfig); try { BufferedReader bf = new BufferedReader( new FileReader( new File( "D:\\qf大數據\\spark\\day13_項目\\考試i\\JsonTest.json"))); String line = null; while((line=bf.readLine())!=null){ KeyedMessage<String,String> keyedMessage = new KeyedMessage<String,String>("JsonData3",line); Thread.sleep(5000); producer.send(keyedMessage); } bf.close(); producer.close(); System.out.println("已經發送完畢"); } catch (Exception e) { e.printStackTrace(); } }}