物聯網架構成長之路(28)-Docker練習之MQ中間件(Kafka)

0. 前言html

  消息隊列MQ,這個在通常的系統上都是會用到的一箇中間件,我選擇Kafka做爲練手的一箇中間件,Kafka依賴Zookeeper。Zookeeper安裝上一篇博客已經介紹過了。java

1. Kafka Image構建git

  Dockerfilegithub

 1 FROM openjdk:8-jdk-alpine
 2 
 3 RUN apk add --no-cache bash && rm -rf /var/cache/apk/* && /bin/bash
 4 
 5 RUN wget http://mirrors.aliyun.com/apache/kafka/2.0.1/kafka_2.12-2.0.1.tgz && \
 6     tar -zxvf kafka_2.12-2.0.1.tgz && \
 7     rm -rf kafka_2.12-2.0.1.tgz && \
 8     mv kafka_2.12-2.0.1 /var/app/
 9 
10 ENV PATH /var/app/bin:$PATH
11 
12 COPY env.sh /env.sh
13 RUN chmod +x env.sh
14 
15 EXPOSE 9092
16 
17 ENTRYPOINT ["/env.sh"]

  env.shdocker

 1 #!/bin/sh
 2 
 3 BROKER_ID=${BRODER_ID:-"0"}
 4 LISTENERS=${LISTENERS:-"PLAINTEXT://:9092"}
 5 ZOOKEEPER_CONNECT=${ZOOKEEPER_CONNECT:-"localhost:2181"}
 6 
 7 sed -i "s/^broker.id=.*$/broker.id=$BROKER_ID/" /var/app/config/server.properties
 8 sed -i "s;^#listeners=.*$;listeners=$LISTENERS;g" /var/app/config/server.properties
 9 sed -i "s/^zookeeper.connect=.*$/zookeeper.connect=$ZOOKEEPER_CONNECT/" /var/app/config/server.properties
10 
11 sed -i "s#/tmp/kafka-logs#/data#g" /var/app/config/server.properties
12 
13 /var/app/bin/kafka-server-start.sh /var/app/config/server.properties

 

2. docker-compose配置Zookeeper、zkui、Kafkaapache

  docker-compose.ymlbootstrap

 1 version: '3'
 2 services:
 3     zookeeper:
 4         image: registry.cn-shenzhen.aliyuncs.com/wunaozai/zookeeper
 5         ports:
 6             - 2181:2181
 7     zkui:
 8         image: registry.cn-shenzhen.aliyuncs.com/wunaozai/zkui
 9         ports:
10             - 9090:9090
11         environment:
12             - ZK_SERVER=zookeeper:2181
13     kafka:
14         image: registry.cn-shenzhen.aliyuncs.com/wunaozai/kafka
15         ports:
16             - 9092:9092
17         environment:
18             - BROKER_ID=1
19             - LISTENERS=PLAINTEXT://kafka:9092
20             - ZOOKEEPER_CONNECT=zookeeper:2181
21         volumes:
22             - /root/workspace/docker/kafka/kafka/data:/data

  啓動docker-compose、並經過zkui預覽、注意因爲kafka的監聽是使用域名方式,所以須要往 /etc/hosts 中寫入  127.0.0.1 kafka, 若是客戶端開發,在Windows系統的,要往 C:\Windows\System32\drivers\etc\hosts 裏寫入 172.16.23.203  kafkabash

 

3. Kafka讀寫app

  建立主題(Topic)測試

./bin/kafka-topics.sh --create --zookeeper 172.16.23.203:2181 --replication-factor 1 --partitions 1 --topic wunaozai

  查看主題(Topic)

./bin/kafka-topics.sh --list --zookeeper 172.16.23.203:2181 --list

  發送消息(生產者)

./bin/kafka-console-producer.sh --broker-list 172.16.23.203:9092 --topic wunaozai

  消費消息(消費者)

./bin/kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic wunaozai

 

4. Kafka Java Client 測試

  POM依賴

1  <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
2  <dependency>
3      <groupId>org.apache.kafka</groupId>
4      <artifactId>kafka-clients</artifactId>
5      <version>2.0.1</version>
6  </dependency>

  生產者ProducerSample.java

 1 package demo.kafka;
 2 
 3 import java.util.HashMap;
 4 import java.util.Map;
 5 
 6 import org.apache.kafka.clients.producer.KafkaProducer;
 7 import org.apache.kafka.clients.producer.Producer;
 8 import org.apache.kafka.clients.producer.ProducerRecord;
 9 
10 public class ProducerSample {
11 
12     public static void main(String[] args) {
13         Map<String, Object> props = new HashMap<>();
14         props.put("zk.connect", "172.16.23.203:2181");
15         props.put("bootstrap.servers", "172.16.23.203:9092");
16         props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
17         props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
18         props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
19         props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
20         
21         String topic = "wunaozai";
22         Producer<String, String> producer = new KafkaProducer<>(props);
23         producer.send(new ProducerRecord<String, String>(topic, "key1", "value111"));
24         producer.send(new ProducerRecord<String, String>(topic, "key2", "value222"));
25         producer.send(new ProducerRecord<String, String>(topic, "key3", "value333"));
26         producer.close();
27     }
28 }

  消費者 ConsumerSample.java

 1 package demo.kafka;
 2 
 3 import java.time.Duration;
 4 import java.util.Arrays;
 5 import java.util.Properties;
 6 
 7 import org.apache.kafka.clients.consumer.Consumer;
 8 import org.apache.kafka.clients.consumer.ConsumerRecord;
 9 import org.apache.kafka.clients.consumer.ConsumerRecords;
10 import org.apache.kafka.clients.consumer.KafkaConsumer;
11 
12 public class ConsumerSample {
13 
14     public static void main(String[] args) {
15         String topic = "wunaozai";
16         
17         Properties props = new Properties();
18         props.put("bootstrap.servers", "172.16.23.203:9092");
19         props.put("group.id", "JLGroup");// Consumer Group Name
20         props.put("enable.auto.commit", "true");// Consumer 的 offset 是否自動提交
21         props.put("auto.commit.interval.ms", "1000");// 自動提交 offset 到 zookeeper
22         props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
23         props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
24         Consumer<String, String> consumer = new KafkaConsumer<>(props);
25         consumer.subscribe(Arrays.asList(topic));
26         try {
27             while (true) {
28                 //Duration timeout = Duration.ofMillis(10000);
29                 Duration timeout = Duration.ofSeconds(5); 
30                 ConsumerRecords<String, String> records = consumer.poll(timeout);
31                 for (ConsumerRecord<String, String> record : records)
32                     System.out.printf("partition = %d, offset = %d, key = %s, value = %s%n", record.partition(), record.offset(), record.key(), record.value());
33             }
34         } catch (Exception e) {
35             e.printStackTrace();
36         }
37         consumer.close();
38     }
39 }

  ProducerSample 生產消息,經過./bin/kafka-console-consumer.sh 消費消息

  ./bin/kafka-console-producer.sh 生產消息,經過ConsumerSample 消費消息

  docker-compose 部分日誌 

 

5. 小結

  一路練習下來,發現這個Kafka配置仍是有點複雜的,就先了解到這裏,之後再深刻了解。一步一步慢慢來。

 

參考資料:

  https://hub.docker.com/r/wurstmeister/kafka/
  https://raw.githubusercontent.com/wurstmeister/kafka-docker/master/docker-compose.yml
  https://github.com/wurstmeister/kafka-docker
  https://github.com/apache/kafka
  http://www.javashuo.com/article/p-dhgawbvb-do.html
  http://www.javashuo.com/article/p-mlmppsqq-dc.html

本文地址: http://www.javashuo.com/article/p-htfhfaol-gv.html

相關文章
相關標籤/搜索