kafka 系列 -- 二、搭建與實踐

前言

動手實踐每每比看看更重要😇java

單機版 Docker 搭建

version: '2' 
services:
  zookeeper:
    image: wurstmeister/zookeeper
    ports:
      - "2181:2181"
  kafka:
    image: wurstmeister/kafka
    depends_on: [ zookeeper ]
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_HOST_NAME: kafka
      KAFKA_ADVERTISED_PORT: 9092
      KAFKA_CREATE_TOPICS: "test:1:1"
      KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"

注意事項:web

  • 若是想要 java 客戶端可以正常鏈接上 kafka, 須要配置宿主機的 host
sudo vim /etc/hosts

172.20.10.6 kafka
  • 如何使用 kafka 自帶的 kafka-console-producer 測試發送消息?
kafka-console-producer.sh --bootstrap-server kafka:9092 --topic test

集羣版 + kafka manager

kafka 集羣 docker-compose

version: '2'
services:
  zookeeper:
    image: wurstmeister/zookeeper
    ports:
      - "2181:2181"
  kafka1:
    restart: always
    image: wurstmeister/kafka
    depends_on: [ zookeeper ]
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
      KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://kafka1:9092"
      KAFKA_LISTENERS: "PLAINTEXT://kafka1:9092"
      KAFKA_PORT: 9092
  kafka2:
    restart: always
    image: wurstmeister/kafka
    depends_on: [ zookeeper ]
    ports:
      - "9093:9093"
    environment:
      KAFKA_BROKER_ID: 2
      KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
      KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://kafka2:9093"
      KAFKA_LISTENERS: "PLAINTEXT://kafka2:9093"
      KAFKA_PORT: 9093
  kafka3:
    restart: always
    image: wurstmeister/kafka
    depends_on: [ zookeeper ]
    ports:
      - "9094:9094"
    environment:
      KAFKA_BROKER_ID: 3
      KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
      KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://kafka3:9094"
      KAFKA_LISTENERS: "PLAINTEXT://kafka3:9094"
      KAFKA_PORT: 9094

注意事項:spring

  • 若是想要 java 客戶端可以正常鏈接上 kafka, 須要配置宿主機的 host
sudo vim /etc/hosts

172.20.10.6 kafka1
172.20.10.6 kafka2
172.20.10.6 kafka3
  • 如何使用 kafka 自帶的 kafka-console-producer 測試發送消息?這裏假設是進入到 kafka3 容器中
kafka-topics.sh --create --zookeeper zookeeper:2181 --replication-factor 3 --partitions 3 --topic test2
kafka-console-producer.sh --bootstrap-server kafka3:9094 --topic test

kafka-manager docker-compose

version: "2"
services:
  kafka-manager:
    image: kafkamanager/kafka-manager
    container_name: kafka-manager
    ports:
      - "9000:9000"
    external_links:  # 鏈接本compose文件之外的container
      - kafka_kafka1_1
      - kafka_kafka2_1
      - kafka_kafka3_1
    environment:
      ZK_HOSTS: kafka_zookeeper_1:2181
networks:
  default:
    external:
      name: kafka_default

注意事項
kafka-manager、與 kafka 集羣不在同一個 compose 中。所以這裏須要使用 networks 鏈接到 kafka 集羣的網絡中docker

基本操做

如下均在 docker 內操做shell

cd /opt/kafka/bin
  1. 建立主題

建立了 1 個副本 1 個分區的主題apache

kafka-topics.sh --create --zookeeper zookeeper:2181 --replication-factor 1 --partitions 1 --topic test2
  1. 查看主題
kafka-topics.sh  --zookeeper zookeeper:2181 --list
  1. 查看主題詳情
kafka-topics.sh  --zookeeper zookeeper:2181  --describe  --topic test2
  1. 發送消息
kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test2
  1. 消費消息
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test2 --from-beginning

springboot 鏈接 kafka

mavenbootstrap

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.1.10.RELEASE</version>
</parent>
<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>

    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency>

</dependencies>

yamlvim

server:
  port: 9009
spring:

  kafka:
    bootstrap-servers: 127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094

    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer

    consumer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      group-id: goup1 # 消費組

生產者springboot

@Autowired
private KafkaTemplate<String,Object> kafkaTemplate;
kafkaTemplate.send("test2","qweqwe");

消費者網絡

@KafkaListener(topics = "test2")
public void onMsg(String msg) {
    log.error("kafka {}" ,msg);
    System.out.println(msg);
}

相關文檔

相關文章
相關標籤/搜索