0. 前言html
消息隊列MQ,這個在通常的系統上都是會用到的一箇中間件,我選擇Kafka做爲練手的一箇中間件,Kafka依賴Zookeeper。Zookeeper安裝上一篇博客已經介紹過了。java
1. Kafka Image構建git
Dockerfilegithub
1 FROM openjdk:8-jdk-alpine 2 3 RUN apk add --no-cache bash && rm -rf /var/cache/apk/* && /bin/bash 4 5 RUN wget http://mirrors.aliyun.com/apache/kafka/2.0.1/kafka_2.12-2.0.1.tgz && \ 6 tar -zxvf kafka_2.12-2.0.1.tgz && \ 7 rm -rf kafka_2.12-2.0.1.tgz && \ 8 mv kafka_2.12-2.0.1 /var/app/ 9 10 ENV PATH /var/app/bin:$PATH 11 12 COPY env.sh /env.sh 13 RUN chmod +x env.sh 14 15 EXPOSE 9092 16 17 ENTRYPOINT ["/env.sh"]
env.shdocker
1 #!/bin/sh 2 3 BROKER_ID=${BRODER_ID:-"0"} 4 LISTENERS=${LISTENERS:-"PLAINTEXT://:9092"} 5 ZOOKEEPER_CONNECT=${ZOOKEEPER_CONNECT:-"localhost:2181"} 6 7 sed -i "s/^broker.id=.*$/broker.id=$BROKER_ID/" /var/app/config/server.properties 8 sed -i "s;^#listeners=.*$;listeners=$LISTENERS;g" /var/app/config/server.properties 9 sed -i "s/^zookeeper.connect=.*$/zookeeper.connect=$ZOOKEEPER_CONNECT/" /var/app/config/server.properties 10 11 sed -i "s#/tmp/kafka-logs#/data#g" /var/app/config/server.properties 12 13 /var/app/bin/kafka-server-start.sh /var/app/config/server.properties
2. docker-compose配置Zookeeper、zkui、Kafkaapache
docker-compose.ymlbootstrap
1 version: '3' 2 services: 3 zookeeper: 4 image: registry.cn-shenzhen.aliyuncs.com/wunaozai/zookeeper 5 ports: 6 - 2181:2181 7 zkui: 8 image: registry.cn-shenzhen.aliyuncs.com/wunaozai/zkui 9 ports: 10 - 9090:9090 11 environment: 12 - ZK_SERVER=zookeeper:2181 13 kafka: 14 image: registry.cn-shenzhen.aliyuncs.com/wunaozai/kafka 15 ports: 16 - 9092:9092 17 environment: 18 - BROKER_ID=1 19 - LISTENERS=PLAINTEXT://kafka:9092 20 - ZOOKEEPER_CONNECT=zookeeper:2181 21 volumes: 22 - /root/workspace/docker/kafka/kafka/data:/data
啓動docker-compose、並經過zkui預覽、注意因爲kafka的監聽是使用域名方式,所以須要往 /etc/hosts 中寫入 127.0.0.1 kafka, 若是客戶端開發,在Windows系統的,要往 C:\Windows\System32\drivers\etc\hosts 裏寫入 172.16.23.203 kafkabash
3. Kafka讀寫app
建立主題(Topic)測試
./bin/kafka-topics.sh --create --zookeeper 172.16.23.203:2181 --replication-factor 1 --partitions 1 --topic wunaozai
查看主題(Topic)
./bin/kafka-topics.sh --list --zookeeper 172.16.23.203:2181 --list
發送消息(生產者)
./bin/kafka-console-producer.sh --broker-list 172.16.23.203:9092 --topic wunaozai
消費消息(消費者)
./bin/kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic wunaozai
4. Kafka Java Client 測試
POM依賴
1 <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients --> 2 <dependency> 3 <groupId>org.apache.kafka</groupId> 4 <artifactId>kafka-clients</artifactId> 5 <version>2.0.1</version> 6 </dependency>
生產者ProducerSample.java
1 package demo.kafka; 2 3 import java.util.HashMap; 4 import java.util.Map; 5 6 import org.apache.kafka.clients.producer.KafkaProducer; 7 import org.apache.kafka.clients.producer.Producer; 8 import org.apache.kafka.clients.producer.ProducerRecord; 9 10 public class ProducerSample { 11 12 public static void main(String[] args) { 13 Map<String, Object> props = new HashMap<>(); 14 props.put("zk.connect", "172.16.23.203:2181"); 15 props.put("bootstrap.servers", "172.16.23.203:9092"); 16 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 17 props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 18 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 19 props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 20 21 String topic = "wunaozai"; 22 Producer<String, String> producer = new KafkaProducer<>(props); 23 producer.send(new ProducerRecord<String, String>(topic, "key1", "value111")); 24 producer.send(new ProducerRecord<String, String>(topic, "key2", "value222")); 25 producer.send(new ProducerRecord<String, String>(topic, "key3", "value333")); 26 producer.close(); 27 } 28 }
消費者 ConsumerSample.java
1 package demo.kafka; 2 3 import java.time.Duration; 4 import java.util.Arrays; 5 import java.util.Properties; 6 7 import org.apache.kafka.clients.consumer.Consumer; 8 import org.apache.kafka.clients.consumer.ConsumerRecord; 9 import org.apache.kafka.clients.consumer.ConsumerRecords; 10 import org.apache.kafka.clients.consumer.KafkaConsumer; 11 12 public class ConsumerSample { 13 14 public static void main(String[] args) { 15 String topic = "wunaozai"; 16 17 Properties props = new Properties(); 18 props.put("bootstrap.servers", "172.16.23.203:9092"); 19 props.put("group.id", "JLGroup");// Consumer Group Name 20 props.put("enable.auto.commit", "true");// Consumer 的 offset 是否自動提交 21 props.put("auto.commit.interval.ms", "1000");// 自動提交 offset 到 zookeeper 22 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 23 props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 24 Consumer<String, String> consumer = new KafkaConsumer<>(props); 25 consumer.subscribe(Arrays.asList(topic)); 26 try { 27 while (true) { 28 //Duration timeout = Duration.ofMillis(10000); 29 Duration timeout = Duration.ofSeconds(5); 30 ConsumerRecords<String, String> records = consumer.poll(timeout); 31 for (ConsumerRecord<String, String> record : records) 32 System.out.printf("partition = %d, offset = %d, key = %s, value = %s%n", record.partition(), record.offset(), record.key(), record.value()); 33 } 34 } catch (Exception e) { 35 e.printStackTrace(); 36 } 37 consumer.close(); 38 } 39 }
ProducerSample 生產消息,經過./bin/kafka-console-consumer.sh 消費消息
./bin/kafka-console-producer.sh 生產消息,經過ConsumerSample 消費消息
docker-compose 部分日誌
5. 小結
一路練習下來,發現這個Kafka配置仍是有點複雜的,就先了解到這裏,之後再深刻了解。一步一步慢慢來。
參考資料:
https://hub.docker.com/r/wurstmeister/kafka/
https://raw.githubusercontent.com/wurstmeister/kafka-docker/master/docker-compose.yml
https://github.com/wurstmeister/kafka-docker
https://github.com/apache/kafka
http://www.javashuo.com/article/p-dhgawbvb-do.html
http://www.javashuo.com/article/p-mlmppsqq-dc.html