前提
其中docker-compose不是必須的,單單使用docker也是能夠的,這裏主要介紹docker和docker-compose兩種方式java
docker部署
docker部署kafka很是簡單,只須要兩條命令便可完成kafka服務器的部署。git
docker run -d --name zookeeper -p 2181:2181 wurstmeister/zookeeper docker run -d --name kafka -p 9092:9092 -e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 --link zookeeper -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.1.60(機器IP):9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 -t wurstmeister/kafka
因爲kafka是須要和zookeeper共同工做的,因此須要部署一個zookeeper,但有了docker這對部署來講很是輕鬆.
能夠經過docker ps
查看到兩個容器的狀態,這裏再也不展現.github
接下來能夠進行生產者和消費者的嘗試docker
經過kafka自帶工具生產消費消息測試
- 首先,進入到kafka的docker容器中
docker exec -it kafka sh
- 運行消費者,進行消息的監聽
kafka-console-consumer.sh --bootstrap-server 192.168.1.60:9094 --topic kafeidou --from-beginning
- 打開一個新的ssh窗口,一樣進入kafka的容器中,執行下面這條命令生產消息
kafka-console-producer.sh --broker-list 192.168.1.60(機器IP):9092 --topic kafeidou
輸入完這條命令後會進入到控制檯,能夠輸入任何想發送的消息,這裏發送一個hello
apache
>> >hello > > >
- 能夠看到,在生產者的控制檯中輸入消息後,消費者的控制檯馬上看到了消息
到目前爲止,一個kafka完整的hello world就完成了.kafka的部署加上生產者消費者測試.bootstrap
經過java代碼進行測試
- 新建一個maven項目並加入如下依賴
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.1.1</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.11</artifactId> <version>0.11.0.2</version> </dependency>
- 生產者代碼
producer.java
import org.apache.kafka.clients.producer.*; import java.util.Date; import java.util.Properties; import java.util.Random; public class HelloWorldProducer { public static void main(String[] args) { long events = 30; Random rnd = new Random(); Properties props = new Properties(); props.put("bootstrap.servers", "192.168.1.60:9092"); props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("message.timeout.ms", "3000"); Producer<String, String> producer = new KafkaProducer<>(props); String topic = "kafeidou"; for (long nEvents = 0; nEvents < events; nEvents++) { long runtime = new Date().getTime(); String ip = "192.168.2." + rnd.nextInt(255); String msg = runtime + ",www.example.com," + ip; System.out.println(msg); ProducerRecord<String, String> data = new ProducerRecord<String, String>(topic, ip, msg); producer.send(data, new Callback() { public void onCompletion(RecordMetadata metadata, Exception e) { if(e != null) { e.printStackTrace(); } else { System.out.println("The offset of the record we just sent is: " + metadata.offset()); } } }); } System.out.println("send message done"); producer.close(); System.exit(-1); } }
- 消費者代碼
consumer.java
import java.util.Arrays; import java.util.Properties; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer; public class HelloWorldConsumer2 { public static void main(String[] args) { Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.60:9092"); props.put(ConsumerConfig.GROUP_ID_CONFIG ,"kafeidou_group") ; props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put("auto.offset.reset", "earliest"); Consumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("kafeidou")); while (true) { ConsumerRecords<String, String> records = consumer.poll(1000); for (ConsumerRecord<String, String> record : records) { System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } } } }
- 分別運行生產者和消費者便可
生產者打印消息
1581651496176,www.example.com,192.168.2.219 1581651497299,www.example.com,192.168.2.112 1581651497299,www.example.com,192.168.2.20
消費者打印消息服務器
offset = 0, key = 192.168.2.202, value = 1581645295298,www.example.com,192.168.2.202 offset = 1, key = 192.168.2.102, value = 1581645295848,www.example.com,192.168.2.102 offset = 2, key = 192.168.2.63, value = 1581645295848,www.example.com,192.168.2.63
源碼地址:FISHStack/kafka-demodom
經過docker-compose部署kafka
首先建立一個docker-compose.yml文件ssh
version: '3.7' services: zookeeper: image: wurstmeister/zookeeper volumes: - ./data:/data ports: - 2182:2181 kafka9094: image: wurstmeister/kafka ports: - 9092:9092 environment: KAFKA_BROKER_ID: 0 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.1.60:9092 KAFKA_CREATE_TOPICS: "kafeidou:2:0" #kafka啓動後初始化一個有2個partition(分區)0個副本名叫kafeidou的topic KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092 volumes: - ./kafka-logs:/kafka depends_on: - zookeeper
部署起來很簡單,在docker-compose.yml
文件的目錄下執行docker-compose up -d
就能夠了,測試方式和上面的同樣。
這個docker-compose作的東西比上面docker方式部署的東西要多一些maven
- 數據持久化,在當前目錄下掛在了兩個目錄分別存儲zookeeper和kafka的數據,固然在
docker run
命令中添加-v 選項
也是能夠作到這樣的效果的 - kafka在啓動後會初始化一個有分區的topic,一樣的,
docker run
的時候添加-e KAFKA_CREATE_TOPICS=kafeidou:2:0
也是能夠作到的。
總結:優先推薦docker-compose方式部署
爲何呢?
由於單純使用docker方式部署的話,若是有改動(例如:修改對外開放的端口號)的狀況下,docker須要把容器中止docker stop 容器ID/容器NAME
,而後刪除容器docker rm 容器ID/容器NAME
,最後啓動新效果的容器docker run ...
而若是在docker-compose部署的狀況下若是修改內容只須要修改docker-compose.yml文件對應的地方,例如2181:2181改爲2182:2182
,而後再次在docker-compose.yml文件對應的目錄下執行docker-compose up -d
就能達到更新後的效果。
始發於 四顆咖啡豆 發佈! 關注公衆號->[四顆咖啡豆] 獲取最新內容