windows下kafka簡單使用附java代碼

1.kafka下載

下載地址:http://kafka.apache.org/downloads 選擇二進制版本apache

2.啓動zookeeper

下載後解壓到本地,不用修改配置bootstrap

在kafka目錄下打開第一個cmd命令(按住shift 在文件夾空白處右鍵 直接打開命令行)windows

(先輸入 title zookeeper 能夠把cmd窗口名稱改成zookeeper)服務器

輸入 bin\windows\zookeeper-server-start.bat config\zookeeper.propertiessession

 

zk啓動成功工具

3.啓動kafka服務

在kafka目錄下新打開第二個cmd窗口測試

輸入 bin\windows\kafka-server-start.bat config\server.propertiesui

 

kafka 服務器啓動成功spa

4.建立topic

打開第三個cmd窗口(執行完畢能夠關閉)命令行

輸入  bin\windows\kafka-topics.bat --create --zookeeper 127.0.0.1:2181 --replication-factor 1 --partitions 1 --topic test

在本地 zk 下新建一個名爲 test 的 topic

(cmd下輸入 bin\windows\kafka-topics.bat -list -zookeeper 127.0.0.1:2181  可列出topic)

5.啓動生產者

打開第四個cmd窗口

輸入 bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic test

 

生產者啓動成功

6.啓動消費者

打開第五個cmd窗口

輸入  bin\windows\kafka-console-consumer.bat --zookeeper localhost:2181 --topic test

消費者啓動成功

 7.測試

在生產者 窗口中輸入任意字符串,能夠在消費者窗口中看到打印的信息

 8.JAVA 代碼建立topic

依賴的JAR包

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.12</artifactId>
            <version>1.1.0</version>
        </dependency>    
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>1.1.0</version>
        </dependency>

代碼

private static String TOPIC = "demo";

    @Test
    public void createTopic() {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "127.0.0.1:9092");
        AdminClient client = AdminClient.create(properties);
        List<NewTopic> topics = new ArrayList<NewTopic>();
        NewTopic newTopic = new NewTopic(TOPIC, 1, (short) 1);
        topics.add(newTopic);
        CreateTopicsResult result = client.createTopics(topics);
        try {
            KafkaFuture<Void> all = result.all();
            all.get();
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
    }

運行結果

9.建立消費者

    @Test
    public void consumerMsg() throws InterruptedException {
        Properties props = new Properties();
        //定義kakfa 服務的地址
        props.put("bootstrap.servers", "127.0.0.1:9092");
        //制定consumer group
        props.put("group.id", "test");
        //是否自動確認offset
        props.put("enable.auto.commit", "true");
        //自動確認offset的時間間隔
        props.put("auto.commit.interval.ms", "1000");
        props.put("session.timeout.ms", "30000");
        //key的序列化類
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        //value的序列化類
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        //定義consumer
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
        //消費者訂閱的topic, 可同時訂閱多個
        consumer.subscribe(Arrays.asList(TOPIC));
//        consumer.subscribe(Arrays.asList(topic), new ConsumerRebalanceListener() {
//            public void onPartitionsRevoked(Collection<TopicPartition> collection) {
//            }
//
//            public void onPartitionsAssigned(Collection<TopicPartition> collection) {
//                //將偏移設置到最開始
//                consumer.seekToBeginning(collection);
//            }
//        });
//        final int max = 100;
//        List<ConsumerRecord<String, String>> list = new ArrayList<>();
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records) {
                Thread.sleep(100);
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
//                list.add(record);
            }
//            if (list.size() >= max) {
//                consumer.commitSync();
//                list.clear();
//            }
        }
    }
}

10.建立生產者

  @Test
    public void sendMsg() {
        Properties props = new Properties();
        props.put("bootstrap.servers", "127.0.0.1:9092");
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        Producer<String, String> producer = new KafkaProducer<String, String>(props);
        for (int i = 0; i < 100; i++) {
            ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, Integer.toString(i), Integer.toString(i));
            producer.send(record);
        }
        producer.close();
    }

11.運行

先運行 consumerMsg 方法,再運行 sendMsg 方法

能夠看到 consumerMsg  的console 打印結果

 

排坑:

開始用了第二臺機器 在二號機運行kafka,用一號機執行代碼一直不能成功

kafka服務會崩(像是死循環而後內存溢出),再啓動kafka 也不能啓動

以後用了一個工具 ZooInspector,解壓後運行build 中的 jar (推薦這個工具)

鏈接本地zk,把數據都刪掉,再啓動kafka 就能夠正常啓動

最後發現一號機不成功緣由是端口不通但在一號機啓動kafka 代碼是成功的

相關文章
相關標籤/搜索