mac 下 kafka 1.1.0 安裝與測試,利用java實現生產與消費

機器:localhost ,對應topic爲:courier-gpsjava

一、zookeeper相關

建立以下兩個目錄:apache

  • 數據存放目錄(dataDir): /Users/jiangcaijun/technicalSoftware/zookeeper-3.4.12/zookeeperData
  • 日誌存放目錄(dataLogDir): /Users/jiangcaijun/technicalSoftware/zookeeper-3.4.12/zookeeperLog

複製zoo_sample.cfg 文件並從新命名爲 zoo.cfg ,命令爲 cp zoo_sample.cfg zoo.cfgbootstrap

編輯zoo.cfg,修改 dataDirdataLogDir 爲上述路徑api

啓動命令: 進入解壓目錄下: bin/zkServer.sh start &session

啓動後,客戶端鏈接zk:測試

bin/zkCli.sh -server localhost:2181

可利用ls / 查看節點日誌

二、kafka安裝路徑:

cd /Users/jiangcaijun/technicalSoftware/kafka_2.11-1.0.0

修改 config/server.porpertiescode

zookeeper.connect=localhost:2181/kafka110

kafka啓動:server

bin/kafka-server-start.sh config/server.properties &

將會自動在zookeeper下建立kafka110節點;可利用zk客戶端,查看到該節點已建立ip

三、查看topic有哪些,並建立

bin/kafka-topics.sh --list --zookeeper localhost:2181/kafka110

建立 courier-gps

bin/kafka-topics.sh --create --zookeeper localhost:2181/kafka110 --replication-factor 1 --partitions 1 --topic courier-gps

四、建立kafka生產者

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic courier-gps

五、建立kafka消費者

  • 一、經過kafka(測試未經過,找不到信息)
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic courier-gps  --from-beginning
  • 二、經過zk路由
bin/kafka-console-consumer.sh    --zookeeper 127.0.0.1:2181/kafka110 --topic courier-gps --from-beginning
  • 注意:因爲設置zk根結點kafka110,方法1 失敗,方法 2 成功

六、java連接kafka

  • 一、引入依賴

    <!--使用kafka start-->
    <!--Kafka 消息依賴-->
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka_2.12</artifactId>
        <version>1.1.0</version>
    </dependency>
    <!--使用kafka end-->
  • 二、生產者:

    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.Producer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    
    import java.util.Properties;
    
    /**
     * @Auther: jiangcaijun
     * @Date: 2018/6/26 14:42
     * @Description:
     */
    public class MessageProducer {
    
        public static void main(String[] args) throws InterruptedException {
            Properties props = new Properties();
            props.put("bootstrap.servers", "172.23.1.130:9092");
            props.put("acks", "all");
            props.put("retries", 0);
            props.put("batch.size", 16384);
            props.put("linger.ms", 1);
            props.put("buffer.memory", 33554432);
            props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            // 指定自定義分區器
    //        props.put("partitioner.class", "com.mr.partitioner.MyPartitioner");
    
            Producer<String, String> producer = new KafkaProducer<>(props);
    
            String str = "[{\"order_number\":\"1101\",\"location_action\":\"1\",\"loginname\":\"x\",\"user_id\":\"897\",\"c_time\":\"2018-06-15 18:29:26.0\",\"posistion_data\":\"2018-06-15 18:54:07.0\",\"lng\":\"116.446672\",\"lat\":\"39.895109\",\"dt_ymd\":\"20180615\"},\n" +
                    "{\"order_number\":\"1102\",\"location_action\":\"1\",\"loginname\":\"y\",\"user_id\":\"897\",\"c_time\":\"2018-06-15 18:29:26.0\",\"posistion_data\":\"2018-06-15 18:43:49.0\",\"lng\":\"116.446245\",\"lat\":\"39.895983\",\"dt_ymd\":\"20180615\"}\n" +
                    "]";
            producer.send(new ProducerRecord<String, String>("courier-gps", null, str));
            producer.close();
        }
    }
  • 三、消費者

    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    import java.util.Properties;
    
    import kafka.consumer.ConsumerConfig;
    import kafka.consumer.ConsumerIterator;
    import kafka.consumer.KafkaStream;
    import kafka.javaapi.consumer.ConsumerConnector;
    import kafka.serializer.StringDecoder;
    import kafka.utils.VerifiableProperties;
    
    public class MessageConsumer {
    
        /*# kafka相關(版本1.1.0)*/
    
        private final ConsumerConnector consumer;
    
        private String TOPIC = "courier-gps";
    
        private MessageConsumer() {
            Properties props = new Properties();
    
            // zookeeper 配置
            props.put("zookeeper.connect", "172.23.0.13:2181/kafka110");
    
            // 消費者所在組
            props.put("group.id", "group-courier-gps");
    
            // zk鏈接超時
            props.put("zookeeper.session.timeout.ms", "1000");
            props.put("zookeeper.sync.time.ms", "1000");
            props.put("auto.commit.interval.ms", "1000");
    
            // 序列化類
            props.put("serializer.class", "kafka.serializer.StringEncoder");
    
            ConsumerConfig config = new ConsumerConfig(props);
    
            consumer = kafka.consumer.Consumer.createJavaConsumerConnector(config);
        }
    
        void consume() {
            Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
            topicCountMap.put(TOPIC, new Integer(1));
    
            StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties());
            StringDecoder valueDecoder = new StringDecoder(new VerifiableProperties());
    
            Map<String, List<KafkaStream<String, String>>> consumerMap =
                    consumer.createMessageStreams(topicCountMap,keyDecoder,valueDecoder);
            KafkaStream<String, String> stream = consumerMap.get(TOPIC).get(0);
            ConsumerIterator<String, String> it = stream.iterator();
    
            System.out.println("接收到消息以下:");
            while (it.hasNext()){
                System.out.println(it.next().message());
            }
        }
    
        public static void main(String[] args) {
            new MessageConsumer().consume();
        }
    }

其餘:

  • 一、Linux中記錄終端輸出到文本文件

    ls > ls.txt    #或者 ls-->ls.txt    #把ls命令的運行結果保存到文件ls.txt中
  • 二、若其餘機器沒法訪問該kafka,可修改 kafka/conf/server.properties 中 所有 localhost 改成本機實際ip

相關文章
相關標籤/搜索