Guide哥答應你們的 Kafka系列的第2篇原創文章。爲了保證內容實時更新,我將相關文章也發送到了Gihub上!地址:https://github.com/Snailclimb/springboot-kafkajava
相關閱讀:入門篇!大白話帶你認識 Kafka!git
前置條件:你的電腦已經安裝 Docker程序員
主要內容:github
下面使用的單機版的Kafka 來做爲演示,推薦先搭建單機版的Kafka來學習。面試
如下使用 Docker 搭建Kafka基本環境來自開源項目:github.com/simplesteph… 。固然,你也能夠按照官方提供的來:github.com/wurstmeiste… 。正則表達式
新建一個名爲 zk-single-kafka-single.yml
的文件,文件內容以下:spring
version: '2.1'
services:
zoo1:
image: zookeeper:3.4.9
hostname: zoo1
ports:
- "2181:2181"
environment:
ZOO_MY_ID: 1
ZOO_PORT: 2181
ZOO_SERVERS: server.1=zoo1:2888:3888
volumes:
- ./zk-single-kafka-single/zoo1/data:/data
- ./zk-single-kafka-single/zoo1/datalog:/datalog
kafka1:
image: confluentinc/cp-kafka:5.3.1
hostname: kafka1
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka1:19092,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181"
KAFKA_BROKER_ID: 1
KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
volumes:
- ./zk-single-kafka-single/kafka1/data:/var/lib/kafka/data
depends_on:
- zoo1
複製代碼
運行如下命令便可完成環境搭建(會自動下載並運行一個 zookeeper 和 kafka )docker
docker-compose -f zk-single-kafka-single.yml up
複製代碼
若是須要中止Kafka相關容器的話,運行如下命令便可:shell
docker-compose -f zk-single-kafka-single.yml down
複製代碼
如下使用 Docker 搭建Kafka基本環境來自開源項目:github.com/simplesteph… 。apache
新建一個名爲 zk-single-kafka-multiple.yml
的文件,文件內容以下:
version: '2.1'
services:
zoo1:
image: zookeeper:3.4.9
hostname: zoo1
ports:
- "2181:2181"
environment:
ZOO_MY_ID: 1
ZOO_PORT: 2181
ZOO_SERVERS: server.1=zoo1:2888:3888
volumes:
- ./zk-single-kafka-multiple/zoo1/data:/data
- ./zk-single-kafka-multiple/zoo1/datalog:/datalog
kafka1:
image: confluentinc/cp-kafka:5.4.0
hostname: kafka1
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka1:19092,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181"
KAFKA_BROKER_ID: 1
KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
volumes:
- ./zk-single-kafka-multiple/kafka1/data:/var/lib/kafka/data
depends_on:
- zoo1
kafka2:
image: confluentinc/cp-kafka:5.4.0
hostname: kafka2
ports:
- "9093:9093"
environment:
KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka2:19093,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9093
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181"
KAFKA_BROKER_ID: 2
KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
volumes:
- ./zk-single-kafka-multiple/kafka2/data:/var/lib/kafka/data
depends_on:
- zoo1
kafka3:
image: confluentinc/cp-kafka:5.4.0
hostname: kafka3
ports:
- "9094:9094"
environment:
KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka3:19094,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9094
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181"
KAFKA_BROKER_ID: 3
KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
volumes:
- ./zk-single-kafka-multiple/kafka3/data:/var/lib/kafka/data
depends_on:
- zoo1
複製代碼
運行如下命令便可完成 1個節點 Zookeeper+3個節點的 Kafka 的環境搭建。
docker-compose -f zk-single-kafka-multiple.yml up
複製代碼
若是須要中止Kafka相關容器的話,運行如下命令便可:
docker-compose -f zk-single-kafka-multiple.yml down
複製代碼
通常狀況下咱們不多會用到 Kafka 的命令行操做。
1.進入 Kafka container 內部執行 Kafka 官方自帶了一些命令
docker exec -ti docker_kafka1_1 bash
複製代碼
2.列出全部 Topic
root@kafka1:/# kafka-topics --describe --zookeeper zoo1:2181
複製代碼
3.建立一個 Topic
root@kafka1:/# kafka-topics --create --topic test --partitions 3 --zookeeper zoo1:2181 --replication-factor 1
Created topic test.
複製代碼
咱們建立了一個名爲 test 的 Topic, partition 數爲 3, replica 數爲 1。
4.消費者訂閱主題
root@kafka1:/# kafka-console-consumer --bootstrap-server localhost:9092 --topic test
send hello from console -producer
複製代碼
咱們訂閱了 名爲 test 的 Topic。
5.生產者向 Topic 發送消息
root@kafka1:/# kafka-console-producer --broker-list localhost:9092 --topic test
>send hello from console -producer
>
複製代碼
咱們使用 kafka-console-producer
命令向名爲 test 的 Topic 發送了一條消息,消息內容爲:「send hello from console -producer」
這個時候,你會發現消費者成功接收到了消息:
root@kafka1:/# kafka-console-consumer --bootstrap-server localhost:9092 --topic test
send hello from console -producer
複製代碼
這是一款 IDEA 提供的 Zookeeper 可視化工具插件,很是好用! 咱們能夠經過它:
實際使用效果以下:
使用方法:
IDEA 提供的 Kafka 可視化管理插件。這個插件爲咱們提供了下面這寫功能:
實際使用效果以下:
使用方法:
打開工具:View->Tool windows->kafkalytic;
點擊 「+」 號後在彈出框數據:「127.0.0.1:9092」 鏈接;
Step 1:新建一個Maven項目
Step2: pom.xml
中添加相關依賴
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.2.0</version>
</dependency>
複製代碼
Step 3:初始化消費者和生產者
KafkaConstants
常量類中定義了Kafka一些經常使用配置常量。
public class KafkaConstants {
public static final String BROKER_LIST = "localhost:9092";
public static final String CLIENT_ID = "client1";
public static String GROUP_ID_CONFIG="consumerGroup1";
private KafkaConstants() {
}
}
複製代碼
ProducerCreator
中有一個 createProducer()
方法方法用於返回一個 KafkaProducer
對象
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
/** * @author shuang.kou */
public class ProducerCreator {
public static Producer<String, String> createProducer() {
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaConstants.BROKER_LIST);
properties.put(ProducerConfig.CLIENT_ID_CONFIG, KafkaConstants.CLIENT_ID);
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
return new KafkaProducer<>(properties);
}
}
複製代碼
ConsumerCreator 中有一個createConsumer()
方法方法用於返回一個 KafkaConsumer
對象
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.Properties;
public class ConsumerCreator {
public static Consumer<String, String> createConsumer() {
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaConstants.BROKER_LIST);
properties.put(ConsumerConfig.GROUP_ID_CONFIG, KafkaConstants.GROUP_ID_CONFIG);
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
return new KafkaConsumer<>(properties);
}
}
複製代碼
Step 4:發送和消費消息
生產者發送消息:
private static final String TOPIC = "test-topic";
Producer<String, String> producer = ProducerCreator.createProducer();
ProducerRecord<String, String> record =
new ProducerRecord<>(TOPIC, "hello, Kafka!");
try {
//send message
RecordMetadata metadata = producer.send(record).get();
System.out.println("Record sent to partition " + metadata.partition()
+ " with offset " + metadata.offset());
} catch (ExecutionException | InterruptedException e) {
System.out.println("Error in sending record");
e.printStackTrace();
}
producer.close();
複製代碼
消費者消費消息:
Consumer<String, String> consumer = ConsumerCreator.createConsumer();
// 循環消費消息
while (true) {
//subscribe topic and consume message
consumer.subscribe(Collections.singletonList(TOPIC));
ConsumerRecords<String, String> consumerRecords =
consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
System.out.println("Consumer consume message:" + consumerRecord.value());
}
}
複製代碼
Step 5:測試
運行程序控制臺打印出:
Record sent to partition 0 with offset 20
Consumer consume message:hello, Kafka!
複製代碼
做者的其餘開源項目推薦: