5分鐘帶你體驗一把 Kafka

Guide哥答應你們的 Kafka系列的第2篇原創文章。爲了保證內容實時更新,我將相關文章也發送到了Gihub上!地址:https://github.com/Snailclimb/springboot-kafkajava

相關閱讀:入門篇!大白話帶你認識 Kafka!git

前置條件:你的電腦已經安裝 Docker程序員

主要內容:github

  1. 使用 Docker 安裝
  2. 使用命令行測試消息隊列的功能
  3. zookeeper和kafka可視化管理工具
  4. Java 程序中簡單使用Kafka

使用 Docker 安裝搭建Kafka環境

單機版

下面使用的單機版的Kafka 來做爲演示,推薦先搭建單機版的Kafka來學習。面試

如下使用 Docker 搭建Kafka基本環境來自開源項目:github.com/simplesteph… 。固然,你也能夠按照官方提供的來:github.com/wurstmeiste…正則表達式

新建一個名爲 zk-single-kafka-single.yml 的文件,文件內容以下:spring

version: '2.1'

services:
 zoo1:
 image: zookeeper:3.4.9
 hostname: zoo1
 ports:
 - "2181:2181"
 environment:
 ZOO_MY_ID: 1
 ZOO_PORT: 2181
 ZOO_SERVERS: server.1=zoo1:2888:3888
 volumes:
 - ./zk-single-kafka-single/zoo1/data:/data
 - ./zk-single-kafka-single/zoo1/datalog:/datalog

 kafka1:
 image: confluentinc/cp-kafka:5.3.1
 hostname: kafka1
 ports:
 - "9092:9092"
 environment:
 KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka1:19092,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092
 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
 KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
 KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181"
 KAFKA_BROKER_ID: 1
 KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
 KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

 volumes:
 - ./zk-single-kafka-single/kafka1/data:/var/lib/kafka/data
 depends_on:
 - zoo1

複製代碼

運行如下命令便可完成環境搭建(會自動下載並運行一個 zookeeper 和 kafka )docker

docker-compose -f zk-single-kafka-single.yml up
複製代碼

若是須要中止Kafka相關容器的話,運行如下命令便可:shell

docker-compose -f zk-single-kafka-single.yml down
複製代碼

集羣版

如下使用 Docker 搭建Kafka基本環境來自開源項目:github.com/simplesteph…apache

新建一個名爲 zk-single-kafka-multiple.yml 的文件,文件內容以下:

version: '2.1'

services:
  zoo1:
    image: zookeeper:3.4.9
    hostname: zoo1
    ports:
      - "2181:2181"
    environment:
        ZOO_MY_ID: 1
        ZOO_PORT: 2181
        ZOO_SERVERS: server.1=zoo1:2888:3888
    volumes:
      - ./zk-single-kafka-multiple/zoo1/data:/data
      - ./zk-single-kafka-multiple/zoo1/datalog:/datalog

  kafka1:
    image: confluentinc/cp-kafka:5.4.0
    hostname: kafka1
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka1:19092,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
      KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181"
      KAFKA_BROKER_ID: 1
      KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
    volumes:
      - ./zk-single-kafka-multiple/kafka1/data:/var/lib/kafka/data
    depends_on:
      - zoo1

  kafka2:
    image: confluentinc/cp-kafka:5.4.0
    hostname: kafka2
    ports:
      - "9093:9093"
    environment:
      KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka2:19093,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9093
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
      KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181"
      KAFKA_BROKER_ID: 2
      KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
    volumes:
      - ./zk-single-kafka-multiple/kafka2/data:/var/lib/kafka/data
    depends_on:
      - zoo1


  kafka3:
    image: confluentinc/cp-kafka:5.4.0
    hostname: kafka3
    ports:
      - "9094:9094"
    environment:
      KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka3:19094,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9094
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
      KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181"
      KAFKA_BROKER_ID: 3
      KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
    volumes:
      - ./zk-single-kafka-multiple/kafka3/data:/var/lib/kafka/data
    depends_on:
      - zoo1

複製代碼

運行如下命令便可完成 1個節點 Zookeeper+3個節點的 Kafka 的環境搭建。

docker-compose -f zk-single-kafka-multiple.yml up
複製代碼

若是須要中止Kafka相關容器的話,運行如下命令便可:

docker-compose -f zk-single-kafka-multiple.yml down
複製代碼

使用命令行測試消息的生產和消費

通常狀況下咱們不多會用到 Kafka 的命令行操做。

1.進入 Kafka container 內部執行 Kafka 官方自帶了一些命令

docker exec -ti docker_kafka1_1 bash
複製代碼

2.列出全部 Topic

root@kafka1:/# kafka-topics --describe --zookeeper zoo1:2181
複製代碼

3.建立一個 Topic

root@kafka1:/# kafka-topics --create --topic test --partitions 3 --zookeeper zoo1:2181 --replication-factor 1
Created topic test.
複製代碼

咱們建立了一個名爲 test 的 Topic, partition 數爲 3, replica 數爲 1。

4.消費者訂閱主題

root@kafka1:/# kafka-console-consumer --bootstrap-server localhost:9092 --topic test
send hello from console -producer
複製代碼

咱們訂閱了 名爲 test 的 Topic。

5.生產者向 Topic 發送消息

root@kafka1:/# kafka-console-producer --broker-list localhost:9092 --topic test
>send hello from console -producer
>
複製代碼

咱們使用 kafka-console-producer 命令向名爲 test 的 Topic 發送了一條消息,消息內容爲:「send hello from console -producer」

這個時候,你會發現消費者成功接收到了消息:

root@kafka1:/# kafka-console-consumer --bootstrap-server localhost:9092 --topic test
send hello from console -producer
複製代碼

IDEA相關插件推薦

Zoolytic-Zookeeper tool

這是一款 IDEA 提供的 Zookeeper 可視化工具插件,很是好用! 咱們能夠經過它:

  1. 可視化ZkNodes節點信息
  2. ZkNodes節點管理-添加/刪除
  3. 編輯zkNodes數據
  4. ......

實際使用效果以下:

使用方法:

  1. 打開工具:View->Tool windows->Zoolytic;
  2. 點擊 「+」 號後在彈出框數據:「127.0.0.1:2181」 鏈接 zookeeper;
  3. 鏈接以後點擊新建立的鏈接而後點擊「+」號旁邊的刷新按鈕便可!

Kafkalytic

IDEA 提供的 Kafka 可視化管理插件。這個插件爲咱們提供了下面這寫功能:

  1. 多個集羣支持
  2. 主題管理:建立/刪除/更改分區
  3. 使用正則表達式搜索主題
  4. 發佈字符串/字節序列化的消息
  5. 使用不一樣的策略消費消息

實際使用效果以下:

使用方法:

  1. 打開工具:View->Tool windows->kafkalytic;

  2. 點擊 「+」 號後在彈出框數據:「127.0.0.1:9092」 鏈接;

Java 程序中簡單使用Kafka

代碼地址:github.com/Snailclimb/…

Step 1:新建一個Maven項目

Step2: pom.xml 中添加相關依賴

<dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.2.0</version>
        </dependency>
複製代碼

Step 3:初始化消費者和生產者

KafkaConstants常量類中定義了Kafka一些經常使用配置常量。

public class KafkaConstants {
    public static final String BROKER_LIST = "localhost:9092";
    public static final String CLIENT_ID = "client1";
    public static String GROUP_ID_CONFIG="consumerGroup1";
    private KafkaConstants() {

    }
}

複製代碼

ProducerCreator 中有一個 createProducer() 方法方法用於返回一個 KafkaProducer對象

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

/** * @author shuang.kou */
public class ProducerCreator {


    public static Producer<String, String> createProducer() {
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaConstants.BROKER_LIST);
        properties.put(ProducerConfig.CLIENT_ID_CONFIG, KafkaConstants.CLIENT_ID);
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        return new KafkaProducer<>(properties);
    }
}

複製代碼

ConsumerCreator 中有一個createConsumer() 方法方法用於返回一個 KafkaConsumer 對象

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.util.Properties;

public class ConsumerCreator {

    public static Consumer<String, String> createConsumer() {
        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaConstants.BROKER_LIST);
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, KafkaConstants.GROUP_ID_CONFIG);
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        return new KafkaConsumer<>(properties);
    }
}

複製代碼

Step 4:發送和消費消息

生產者發送消息:

private static final String TOPIC = "test-topic";
Producer<String, String> producer = ProducerCreator.createProducer();
ProducerRecord<String, String> record =
 new ProducerRecord<>(TOPIC, "hello, Kafka!");
try {
 //send message
 RecordMetadata metadata = producer.send(record).get();
 System.out.println("Record sent to partition " + metadata.partition()
                    + " with offset " + metadata.offset());
} catch (ExecutionException | InterruptedException e) {
 System.out.println("Error in sending record");
 e.printStackTrace();
}
producer.close();
複製代碼

消費者消費消息:

Consumer<String, String> consumer = ConsumerCreator.createConsumer();
// 循環消費消息
while (true) {
  //subscribe topic and consume message
  consumer.subscribe(Collections.singletonList(TOPIC));

  ConsumerRecords<String, String> consumerRecords =
    consumer.poll(Duration.ofMillis(1000));
  for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
    System.out.println("Consumer consume message:" + consumerRecord.value());
  }
}
複製代碼

Step 5:測試

運行程序控制臺打印出:

Record sent to partition 0 with offset 20
Consumer consume message:hello, Kafka!
複製代碼

開源項目推薦

做者的其餘開源項目推薦:

  1. JavaGuide:【Java學習+面試指南】 一份涵蓋大部分Java程序員所須要掌握的核心知識。
  2. springboot-guide : 適合新手入門以及有經驗的開發人員查閱的 Spring Boot 教程(業餘時間維護中,歡迎一塊兒維護)。
  3. programmer-advancement : 我以爲技術人員應該有的一些好習慣!
  4. spring-security-jwt-guide :從零入門 !Spring Security With JWT(含權限驗證)後端部分代碼。

公衆號

相關文章
相關標籤/搜索