下載地址:http://kafka.apache.org/downloads 選擇二進制版本apache
下載後解壓到本地,不用修改配置bootstrap
在kafka目錄下打開第一個cmd命令(按住shift 在文件夾空白處右鍵 直接打開命令行)windows
(先輸入 title zookeeper 能夠把cmd窗口名稱改成zookeeper)服務器
輸入 bin\windows\zookeeper-server-start.bat config\zookeeper.propertiessession
zk啓動成功工具
在kafka目錄下新打開第二個cmd窗口測試
輸入 bin\windows\kafka-server-start.bat config\server.propertiesui
kafka 服務器啓動成功spa
打開第三個cmd窗口(執行完畢能夠關閉)命令行
輸入 bin\windows\kafka-topics.bat --create --zookeeper 127.0.0.1:2181 --replication-factor 1 --partitions 1 --topic test
在本地 zk 下新建一個名爲 test 的 topic
(cmd下輸入 bin\windows\kafka-topics.bat -list -zookeeper 127.0.0.1:2181 可列出topic)
打開第四個cmd窗口
輸入 bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic test
生產者啓動成功
打開第五個cmd窗口
輸入 bin\windows\kafka-console-consumer.bat --zookeeper localhost:2181 --topic test
消費者啓動成功
在生產者 窗口中輸入任意字符串,能夠在消費者窗口中看到打印的信息
依賴的JAR包
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.12</artifactId> <version>1.1.0</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>1.1.0</version> </dependency>
代碼
private static String TOPIC = "demo"; @Test public void createTopic() { Properties properties = new Properties(); properties.put("bootstrap.servers", "127.0.0.1:9092"); AdminClient client = AdminClient.create(properties); List<NewTopic> topics = new ArrayList<NewTopic>(); NewTopic newTopic = new NewTopic(TOPIC, 1, (short) 1); topics.add(newTopic); CreateTopicsResult result = client.createTopics(topics); try { KafkaFuture<Void> all = result.all(); all.get(); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } }
運行結果
@Test public void consumerMsg() throws InterruptedException { Properties props = new Properties(); //定義kakfa 服務的地址 props.put("bootstrap.servers", "127.0.0.1:9092"); //制定consumer group props.put("group.id", "test"); //是否自動確認offset props.put("enable.auto.commit", "true"); //自動確認offset的時間間隔 props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); //key的序列化類 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); //value的序列化類 props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); //定義consumer KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props); //消費者訂閱的topic, 可同時訂閱多個 consumer.subscribe(Arrays.asList(TOPIC)); // consumer.subscribe(Arrays.asList(topic), new ConsumerRebalanceListener() { // public void onPartitionsRevoked(Collection<TopicPartition> collection) { // } // // public void onPartitionsAssigned(Collection<TopicPartition> collection) { // //將偏移設置到最開始 // consumer.seekToBeginning(collection); // } // }); // final int max = 100; // List<ConsumerRecord<String, String>> list = new ArrayList<>(); while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { Thread.sleep(100); System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); // list.add(record); } // if (list.size() >= max) { // consumer.commitSync(); // list.clear(); // } } } }
@Test public void sendMsg() { Properties props = new Properties(); props.put("bootstrap.servers", "127.0.0.1:9092"); props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer<String, String> producer = new KafkaProducer<String, String>(props); for (int i = 0; i < 100; i++) { ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, Integer.toString(i), Integer.toString(i)); producer.send(record); } producer.close(); }
先運行 consumerMsg 方法,再運行 sendMsg 方法
能夠看到 consumerMsg 的console 打印結果
排坑:
開始用了第二臺機器 在二號機運行kafka,用一號機執行代碼一直不能成功
kafka服務會崩(像是死循環而後內存溢出),再啓動kafka 也不能啓動
以後用了一個工具 ZooInspector,解壓後運行build 中的 jar (推薦這個工具)
鏈接本地zk,把數據都刪掉,再啓動kafka 就能夠正常啓動
最後發現一號機不成功緣由是端口不通但在一號機啓動kafka 代碼是成功的