動手實踐每每比看看更重要😇java
version: '2' services: zookeeper: image: wurstmeister/zookeeper ports: - "2181:2181" kafka: image: wurstmeister/kafka depends_on: [ zookeeper ] ports: - "9092:9092" environment: KAFKA_ADVERTISED_HOST_NAME: kafka KAFKA_ADVERTISED_PORT: 9092 KAFKA_CREATE_TOPICS: "test:1:1" KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
注意事項:web
sudo vim /etc/hosts 172.20.10.6 kafka
kafka-console-producer.sh --bootstrap-server kafka:9092 --topic test
version: '2' services: zookeeper: image: wurstmeister/zookeeper ports: - "2181:2181" kafka1: restart: always image: wurstmeister/kafka depends_on: [ zookeeper ] ports: - "9092:9092" environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181" KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://kafka1:9092" KAFKA_LISTENERS: "PLAINTEXT://kafka1:9092" KAFKA_PORT: 9092 kafka2: restart: always image: wurstmeister/kafka depends_on: [ zookeeper ] ports: - "9093:9093" environment: KAFKA_BROKER_ID: 2 KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181" KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://kafka2:9093" KAFKA_LISTENERS: "PLAINTEXT://kafka2:9093" KAFKA_PORT: 9093 kafka3: restart: always image: wurstmeister/kafka depends_on: [ zookeeper ] ports: - "9094:9094" environment: KAFKA_BROKER_ID: 3 KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181" KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://kafka3:9094" KAFKA_LISTENERS: "PLAINTEXT://kafka3:9094" KAFKA_PORT: 9094
注意事項:spring
sudo vim /etc/hosts 172.20.10.6 kafka1 172.20.10.6 kafka2 172.20.10.6 kafka3
kafka-topics.sh --create --zookeeper zookeeper:2181 --replication-factor 3 --partitions 3 --topic test2
kafka-console-producer.sh --bootstrap-server kafka3:9094 --topic test
version: "2" services: kafka-manager: image: kafkamanager/kafka-manager container_name: kafka-manager ports: - "9000:9000" external_links: # 鏈接本compose文件之外的container - kafka_kafka1_1 - kafka_kafka2_1 - kafka_kafka3_1 environment: ZK_HOSTS: kafka_zookeeper_1:2181 networks: default: external: name: kafka_default
注意事項
kafka-manager、與 kafka 集羣不在同一個 compose 中。所以這裏須要使用 networks 鏈接到 kafka 集羣的網絡中docker
如下均在 docker 內操做shell
cd /opt/kafka/bin
建立了 1 個副本 1 個分區的主題apache
kafka-topics.sh --create --zookeeper zookeeper:2181 --replication-factor 1 --partitions 1 --topic test2
kafka-topics.sh --zookeeper zookeeper:2181 --list
kafka-topics.sh --zookeeper zookeeper:2181 --describe --topic test2
kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test2
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test2 --from-beginning
mavenbootstrap
<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.1.10.RELEASE</version> </parent> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> </dependencies>
yamlvim
server: port: 9009 spring: kafka: bootstrap-servers: 127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094 producer: key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer consumer: key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer group-id: goup1 # 消費組
生產者springboot
@Autowired private KafkaTemplate<String,Object> kafkaTemplate; kafkaTemplate.send("test2","qweqwe");
消費者網絡
@KafkaListener(topics = "test2") public void onMsg(String msg) { log.error("kafka {}" ,msg); System.out.println(msg); }