整活了!搭建Kafka和ZK集羣並結合API操做Kafka!

Kafka集羣搭建

本文經過實操Kafka的API來理解topic、partition等相關概念,我將經過搭建一個Kafka集羣來實現它。html

Kafka集羣依賴於ZooKeeper對齊Broker進行協調管理,因此咱們也須要考慮搭建一個ZooKeeper集羣。java

主機規劃node

主機名稱 角色 IP 地址 基礎軟件
node01 kafka 集羣節點 192.168.242.118 JDK1.8
node02 kafka 集羣節點、ZooKeeper 集羣節點 192.168.242.117 JDK1.8
node03 kafka 集羣節點、ZooKeeper 集羣節點 192.168.242.116 JDK1.8
node04 ZooKeeper 集羣節點 192.168.242.115 JDK1.8

其中:git

  • IP 地址與主機名之間的映射關係配置好。編輯/etc/hosts,添加以下內容:
192.168.242.118 node01
192.168.242.117 node02
192.168.242.116 node03
192.168.242.115 node04
複製代碼
  • node0一、node0二、node03 爲 Kafka 集羣節點,node0二、node0三、node04 爲 ZooKeeper 集羣節點。
  • ZooKeeper 集羣已搭建完畢。

附 ZooKeeper 集羣搭建過程:apache

在node02:bootstrap

  1. tar -zxf apache-zookeeper-3.5.8-bin.tar.gzapi

  2. mv apache-zookeeper-3.5.8-bin /usr/local/zookeeper服務器

  3. cd /usr/local/zookeeper/confmarkdown

  4. cp zoo_example.cfg zoo.cfg架構

  5. vi zoo.cfg

設置 dataDir=/var/zookeeper 末尾添加: server.1=node02:2888:3888 server.2=node03:2888:3888 server.3=node04:2888:3888

  1. mkdir -p /var/zookeeper

  2. echo 1 > /var/zookeeper/myid

  3. vi /etc/profile

export JAVA_HOME=/usr/local/java export ZK_HOME=/usr/local/zookeeper export PATH= P A T H : PATH: JAVA_HOME/bin:$ZK_HOME/bin

  1. source /etc/profile

  2. scp分發zk相關配置到node0三、node04

scp -r /usr/local/zookeeper/ root@node03:/usr/local/ scp /usr/local/zookeeper/conf/zoo.cfg root@node03:/usr/local/zookeeper/conf/ scp /etc/profile root@node03:/etc

scp -r /usr/local/zookeeper/ root@node04:/usr/local/ scp /usr/local/zookeeper/conf/zoo.cfg root@node04:/usr/local/zookeeper/conf/ scp /etc/profile root@node04:/etc

  1. 在node03

mkdir -p /var/zookeeper echo 2 > /var/zookeeper/myid source /etc/profile

  1. 在node04

mkdir -p /var/zookeeper echo 3 > /var/zookeeper/myid source /etc/profile

  1. 啓動zk集羣,在node02/node03/node04三個節點均執行

zkServer.sh start

ZK集羣搭建完成後,可用zkServer.sh status查看ZK集羣狀態:

node02,follower:

node03,leader:

node04,leader:

Kafka 集羣

解壓完Kafka安裝文件後,修改配置文件config/server.properties

broker.id=0

listeners=PLAINTEXT://node01:9092

log.dirs=/var/kafka-logs

zookeeper.connect=node02:2181,node03:2181,node04:2181/kafka
複製代碼

與ZK集羣搭建同樣,使用SCP分發,注意修改 broker.idlisteners

這裏值得注意的是ZK鏈接配置項要帶上/kafka。

凡是使用 ZooKeeper 的技術,通常按照項目部門之類的加一個節點路徑,不要在 ZK 根節點建立本身的東西,防止難以維護。

配置Kafka環境變量,方便使用Kafka命令,編輯文件/etc/profile

export JAVA_HOME=/usr/local/java
export KAFKA_HOME=/usr/local/kafka
export PATH=$PATH:$JAVA_HOME/bin:$KAFKA_HOME/bin
複製代碼

啓動 Kafka 集羣

每臺Kafka集羣節點執行命令kafka-server-start.sh

# 前臺啓動
kafka-server-start.sh $KAFKA_HOME/config/server.properties

# 後臺啓動
kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties

# 查看topic
kafka-topics.sh --bootstrap-server node01:9092,node02:9092,node03:9092 --list
複製代碼

啓動Kafka以後,再來看一下 ZK 節點:

[zk: localhost:2181(CONNECTED) 7] ls /
[kafka, zookeeper]
[zk: localhost:2181(CONNECTED) 5] ls /kafka
[admin, brokers, cluster, config, consumers, controller, controller_epoch, isr_change_notification, latest_producer_id_block, log_dir_event_notification]
複製代碼

多了個 kafka 節點,這是能夠想到爲何以前的配置文件zookeeper.connect=node02:2181,node03:2181,node04:2181/kafka這裏最後要加個/kafka了,就是 kafka 啓動以後生成了不少內容,若是都放到 zk 根節點將很難維護。

Kafka 生成的一些內容:

[zk: localhost:2181(CONNECTED) 8] ls /kafka/cluster
[id]
[zk: localhost:2181(CONNECTED) 9] get /kafka/cluster/id
{"version":"1","id":"7V2aCgVnQhuPdkdryBXt4w"}
[zk: localhost:2181(CONNECTED) 10] ls /kafka/con
config             consumers          controller         controller_epoch
[zk: localhost:2181(CONNECTED) 10] ls /kafka/controller
controller         controller_epoch
[zk: localhost:2181(CONNECTED) 10] ls /kafka/controller
[]
[zk: localhost:2181(CONNECTED) 11] get /kafka/controller
controller         controller_epoch
[zk: localhost:2181(CONNECTED) 11] get /kafka/controller
{"version":1,"brokerid":0,"timestamp":"1608979966643"}
複製代碼

API

本文代碼倉庫:gitee.com/xblzer/kafk…

Topic的管理相關和Producer生產消息的API很是簡單,這裏不作特別說明了,代碼中有註釋,下面從Consumer相關的API開始展開說明。

Consumer

sub 訂閱模式

訂閱模式,必須設置消費者組,去掉消費者組

註釋掉
// props.put(ConsumerConfig.GROUP_ID_CONFIG, "group01");
複製代碼

執行報錯

org.apache.kafka.common.errors.InvalidGroupIdException: To use the group management or offset commit APIs, you must provide a valid group.id in the consumer configuration.
複製代碼

也就是說,訂閱模式能夠用到消費者組的管理機制,在配置消費者的時候必須提供有效的group.id

Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "node01:9092,node02:9092,node03:9092");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
//消費者組,若是不設置消費者組會報錯
props.put(ConsumerConfig.GROUP_ID_CONFIG, "group01");

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
//訂閱topic名稱是「topic」開頭的topic
consumer.subscribe(Pattern.compile("^topic.*"));
複製代碼

如今咱們只開啓一個Consumer客戶端,能夠看到該消費者對產生的消息所有消費了:

單消費線程

再使用線程池,構造三個消費者線程,模擬不一樣的消費者客戶端(屬於同一消費組)。

也能夠在kafka服務器開幾個命令終端,命令以下

kafka-console-consumer.sh --bootstrap-server node01:9092,node02:9092,node03:9092 --topic topic01 --group group01 --property print.key=true --property print.value=true --property key.separator=,

kafka-console-consumer命令 這個命令的參數能夠用 kafka-console-consumer.sh --help查看。

線程池模擬多個消費者客戶端:

/** * 多個線程,不一樣的消費者(屬於同一消費組) */
@Test
@SneakyThrows
public void testKafkaConsumer() {
    Properties props = new Properties();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "node01:9092,node02:9092,node03:9092");
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    //消費者組,若是不設置消費者組會報錯
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "group01");

    //開啓三個線程,跑三個consumer客戶端,他們屬於同一消費組「group01」
    ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
            3,
            16,
            0L,
            TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<>(1024),
            new ThreadFactoryBuilder().setNameFormat("[消費者‐%d]").build(),
            new ThreadPoolExecutor.AbortPolicy()
    );
    for (int i = 0; i < 3; i++) {
        threadPoolExecutor.execute(() -> {
            KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
            //訂閱topic名稱是「topic」開頭的topic
            consumer.subscribe(Pattern.compile("^topic.*"));
            //訂閱topic01
// consumer.subscribe(Arrays.asList("topic01"));

            
            while (true) {
                ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));
                if (!consumerRecords.isEmpty()) {
                    Iterator<ConsumerRecord<String, String>> consumerRecordIterator = consumerRecords.iterator();
                    while (consumerRecordIterator.hasNext()) {
                        ConsumerRecord<String, String> consumerRecord = consumerRecordIterator.next();
                        String key = consumerRecord.key();
                        String value = consumerRecord.value();
                        //消息所在分區
                        int partition = consumerRecord.partition();
                        //消息在所在分區的偏移量
                        long offset = consumerRecord.offset();
                        System.out.println("線程" + Thread.currentThread().getName() + "key:" + key + ",value:" + value + ",partition:" + partition + ",offset:" + offset);
                    }
                }
            }
        });
    }
    threadPoolExecutor.shutdown();
    threadPoolExecutor.awaitTermination(1, TimeUnit.HOURS);
}
複製代碼

運行測試類,

Kafka Consumer協調器分配消費分區

能夠看到Consumer協調器(ConsumerCoordinator)分配消費分區狀況:

線程名稱 消費者 分區
消費者-0 consumer-1 topic01的分區0,topic02的分區0
消費者-1 consumer-2 topic01的分區1,topic02的分區1
消費者-2 consumer-3 topic01的分區2

從新生產消息,查看消息消費狀況:

10 條 record 被同一個消費組的三個消費者消費,這個是消費者組的特性之一,組內平分消費分區的消費進行消費,有一個負載均衡的理念在裏面。

當消息中不帶key(key=null)時,將按照輪詢的方式對partition中的消息進行消費:

客戶端宕機

再啓動一個消費者客戶端測試,

從新分配消費分區

從新分配消費分區

控制檯有新的日誌輸出,能夠看到ConsumerCoordinator從新分配了消費分區:

線程名稱 消費者 分區
消費者-0 consumer-1 topic01的分區1,topic02的分區1
消費者-1 consumer-2 topic01的分區2
消費者-2 consumer-3 沒有分配消費分區
新開的線程 新開的消費者 topic01的分區0,topic02的分區0

執行一下生產者,看下消費狀況:

消費者消費分配給本身的分區內的消息!

這個時候把新開的那個Consumer斷開,模擬消費者宕機,看Kafka的從新分配:

rebalancing:有一個Consumer宕機從新分配

Kafka消費者組內分區消費負載均衡。

消費者 assign 手動指定分區模式

上面演示的是consumer主動訂閱,主動訂閱的狀況下,消費者協調器會協調消費者進行分區消費,有一個負載均衡的理念在裏面。

手動指定分區進行消費的話,就會失去組的特性,assign 方法:

//從開始位置消費
List<TopicPartition> partitions = Arrays.asList(new TopicPartition("topic01", 0));
consumer.assign(partitions);
consumer.seekToBeginning(partitions);
複製代碼

Kafka的分區

探究Kafka高性能之道一文中,我已提到了Kafka是如何決定發送消息到topic的哪一個分區的:

kafka架構

Kafka默認的分區策略在DefaultPartitioner中也有定義:

/** * The default partitioning strategy: * <ul> * <li>If a partition is specified in the record, use it * <li>If no partition is specified but a key is present choose a partition based on a hash of the key * <li>If no partition or key is present choose a partition in a round-robin fashion */
public class DefaultPartitioner implements Partitioner {
    //...
    
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        if (keyBytes == null) {
            int nextValue = nextValue(topic);
            List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
            if (availablePartitions.size() > 0) {
                int part = Utils.toPositive(nextValue) % availablePartitions.size();
                return availablePartitions.get(part).partition();
            } else {
                // no partitions are available, give a non-available partition
                return Utils.toPositive(nextValue) % numPartitions;
            }
        } else {
            // hash the keyBytes to choose a partition
            return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
        }
    }
    
    //...
}
複製代碼

這裏說明了:

  • 若是發送消息指定了分區,那麼消息全發送到指定的分區中
  • 若是消息沒有指定分區可是設置了key,那麼按照消息的key進行hash而後和分區數進行取模,獲得一個值x,Kafka就往分區x中發送消息
  • 若是分區和key都沒有指定,則默認採用輪詢的方式。

上面已經使用API獲得了驗證。

通常狀況下,這種默認的分區策略就知足生產需求了,可是若是有特殊的業務需求,還能夠自定義分區策略,

public void testProducer() {
    Properties props = new Properties();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "node01:9092,node02:9092,node03:9092");
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

    //定義Partitioner
    props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, MyPartitioner.class.getName());
    
    //TODO
}
複製代碼

先看一下,ProducerConfig源碼中關於分區配置的說明:

public static final String PARTITIONER_CLASS_CONFIG = "partitioner.class";
private static final String PARTITIONER_CLASS_DOC = "Partitioner class that implements the <code>org.apache.kafka.clients.producer.Partitioner</code> interface.";
複製代碼

自定義的Partitioner必須實現org.apache.kafka.clients.producer.Partitioner接口,這裏自定義一個Partitioner,分區策略也按照DefaultPartitioner的策略來,只是其實現略有不一樣:

public class MyPartitioner implements Partitioner {

    private AtomicInteger counter = new AtomicInteger(0);

    /** * 返回分區號 * @param topic topic * @param key key * @param keyBytes key的字節數 * @param value value * @param valueBytes value的字節數 * @param cluster 集羣信息 * @return 分區號 */
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        //先獲取集羣的分區數
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();

        if (keyBytes == null) {
            int increment = counter.getAndIncrement();
            //拿這個值模上分區數
            // increment & Integer.MAX_VALUE 保證是個正數
            return (increment & Integer.MAX_VALUE) % numPartitions;
        } else {
            return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
        }
    }

    @Override
    public void close() {
        System.out.println("MyPartitioner#close.");
    }

    @Override
    public void configure(Map<String, ?> map) {
        System.out.println("MyPartitioner#configure.");
    }
}
複製代碼

Producer這裏:

//定義Partitioner
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, MyPartitioner.class.getName());

//建立生產者
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
複製代碼

運行生產這實例,這裏生產topic02的消息:

能夠看到分區策略走的是咱們自定義的分區策略,消費者:

前面API建立topic02的時候只設置了兩個分區,因此這裏是兩個分區的輪詢。同理能夠驗證消息帶key的分區消費策略。

序列化

前面API演示的時候,生產者和消費者有兩個重要的配置,

ProducerConfig

  • KEY_SERIALIZER_CLASS_CONFIG
  • VALUE_SERIALIZER_CLASS_CONFIG

ConsumerConfig

  • KEY_DESERIALIZER_CLASS_CONFIG
  • VALUE_DESERIALIZER_CLASS_CONFIG

這個是生產者生產消息是須要對key和value進行序列化,消費者消費消息須要對其進行反序列化,前面序列化和反序列化類是StringSerializerStringDeserializer,跟一下源碼,能夠看到他們都實現了規定好的接口(Serializer<String>Deserializer<String>):

StringSerializer

StringDeserializer

生產環境中,咱們發送的消息有時是對象,此時咱們能夠自定義對象序列化類,這樣能夠完成對象消息的傳輸,自定義序列化實現SerializerDeserializer接口便可。

這裏藉助於commons-lang3包下的SerializationUtils來進行序列化和反序列化:

//序列化
@Override
public byte[] serialize(String topic, Object data) {
// return new byte[0];
    return SerializationUtils.serialize((Serializable) data);
}
複製代碼
//反序列化
@Override
public Object deserialize(String topic, byte[] data) {
    System.out.println("自定義反序列化 topic:" + topic);
    return SerializationUtils.deserialize(data);
}
複製代碼

生產消息,key是String類型,value是Order對象:

props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ObjectSerializer.class.getName());

//建立生產者
KafkaProducer<String, Order> producer = new KafkaProducer<>(props);
複製代碼

消費消息:

props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ObjectDeserializer.class.getName());
props.put(ConsumerConfig.GROUP_ID_CONFIG, "group01");

KafkaConsumer<String, Order> consumer = new KafkaConsumer<>(props);
複製代碼

啓動生產者:

自定義的序列化生效,再啓動消費者,控制檯打印:

成功將Order信息打印出來,自定義反序列化也生效了。

攔截器

發送數據的時候,能夠經過攔截器拿到數據的一些消息,而後能夠任意擺佈這些數據了(對數據作一些裝飾),好比發送失敗了,咱們能夠經過攔截器把錯誤信息拿到進行分析。

只要在ProducerConfig中配置INTERCEPTOR_CLASSES_CONFIG這個配置項就能夠設置攔截起了,和前面的PartitionerSerializer同理,看一下這個配置項的源碼描述:

/** <code>interceptor.classes</code> */
public static final String INTERCEPTOR_CLASSES_CONFIG = "interceptor.classes";
public static final String INTERCEPTOR_CLASSES_DOC = "A list of classes to use as interceptors. "
                                                    + "Implementing the <code>org.apache.kafka.clients.producer.ProducerInterceptor</code> interface allows you to intercept (and possibly mutate) the records "
                                                    + "received by the producer before they are published to the Kafka cluster. By default, there are no interceptors.";
複製代碼

這裏說明了默認是沒有攔截器的,自定義攔截器須要實現ProducerInterceptor接口。

public class MyProducerInterceptor implements ProducerInterceptor {
    @Override
    public ProducerRecord onSend(ProducerRecord record) {
        return new ProducerRecord(record.topic(), record.key(), record.value() + " --- 攔截了。");
    }

    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
        System.out.println("metadata:" + metadata + ",exception:" + exception);
    }

    @Override
    public void close() {
        System.out.println("MyProducerInterceptor#close");
    }

    @Override
    public void configure(Map<String, ?> configs) {
        System.out.println("MyProducerInterceptor#configure");
    }
}
複製代碼

運行生產着消費者便可觀察到消息成功攔截。

小結

  1. Kafka集羣須要ZooKeeper對其Broker進行協調管理,搭建Kafka集羣前須要搭建ZK集羣,搭建ZK集羣須要注意配置每臺節點的myid
  2. Kafka集羣的每一個節點的配置文件中,須要注意的配置項(KAFKA_HOME/config/server.properties文件broker.idlistenerslog.dirszookeeper.connect
  3. Kafka基礎API對topic進行管理,實現Producer生產消息,Consumer消費消息,並經過運行狀況理解topic的分區,以及消費者組內消費消息的負載均衡。
  4. 利用Kafka相關API實現自定義的分區策略、自定義序列化、以及自定義Producer攔截器。
相關文章
相關標籤/搜索