【原創】Kafka Consumer多線程實例續篇

  在上一篇《Kafka Consumer多線程實例》中咱們討論了KafkaConsumer多線程的兩種寫法:多KafkaConsumer多線程以及單KafkaConsumer多線程。在第二種用法中我使用的是自動提交的方式,省去了多線程提交位移的麻煩。不少人跑來問若是是手動提交應該怎麼寫?因爲KafkaConsumer不是線程安全的,所以咱們不能簡單地在多個線程中直接調用consumer.commitSync來提交位移。本文將給出一個實際的例子來模擬多線程消費以及手動提交位移。html

  本例中包含3個類:java

  • ConsumerThreadHandler類:consumer多線程的管理類,用於建立線程池以及爲每一個線程分配任務。另外consumer位移的提交也在這個類中進行
  • ConsumerWorker類:本質上是一個Runnable,執行真正的消費邏輯並上報位移信息給ConsumerThreadHandler
  • Main類:測試主方法類

測試代碼

ConsumerWorker類apache

 

package huxi.test.consumer.multithreaded;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;

import java.util.List;
import java.util.Map;

public class ConsumerWorker<K, V> implements Runnable {

    private final ConsumerRecords<K, V> records;
    private final Map<TopicPartition, OffsetAndMetadata> offsets;

    public ConsumerWorker(ConsumerRecords<K, V> record, Map<TopicPartition, OffsetAndMetadata> offsets) {
        this.records = record;
        this.offsets = offsets;
    }

    @Override
    public void run() {
        for (TopicPartition partition : records.partitions()) {
            List<ConsumerRecord<K, V>> partitionRecords = records.records(partition);
            for (ConsumerRecord<K, V> record : partitionRecords) {
                // 插入消息處理邏輯,本例只是打印消息
                System.out.println(String.format("topic=%s, partition=%d, offset=%d",
                        record.topic(), record.partition(), record.offset()));
            }

            // 上報位移信息
            long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
            synchronized (offsets) {
                if (!offsets.containsKey(partition)) {
                    offsets.put(partition, new OffsetAndMetadata(lastOffset + 1));
                } else {
                    long curr = offsets.get(partition).offset();
                    if (curr <= lastOffset + 1) {
                        offsets.put(partition, new OffsetAndMetadata(lastOffset + 1));
                    }
                }
            }
        }
    }
}

ConsumerThreadHandler類bootstrap

package huxi.test.consumer.multithreaded;

import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;

import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ConsumerThreadHandler<K, V> {

    private final KafkaConsumer<K, V> consumer;
    private ExecutorService executors;
    private final Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();

    public ConsumerThreadHandler(String brokerList, String groupId, String topic) {
        Properties props = new Properties();
        props.put("bootstrap.servers", brokerList);
        props.put("group.id", groupId);
        props.put("enable.auto.commit", "false");
        props.put("auto.offset.reset", "earliest");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList(topic), new ConsumerRebalanceListener() {
            @Override
            public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
                consumer.commitSync(offsets);
            }

            @Override
            public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
                offsets.clear();
            }
        });
    }

    /**
     * 消費主方法
     * @param threadNumber  線程池中線程數
     */
    public void consume(int threadNumber) {
        executors = new ThreadPoolExecutor(
                threadNumber,
                threadNumber,
                0L,
                TimeUnit.MILLISECONDS,
                new ArrayBlockingQueue<Runnable>(1000),
                new ThreadPoolExecutor.CallerRunsPolicy());
        try {
            while (true) {
                ConsumerRecords<K, V> records = consumer.poll(1000L);
                if (!records.isEmpty()) {
                    executors.submit(new ConsumerWorker<>(records, offsets));
                }
                commitOffsets();
            }
        } catch (WakeupException e) {
            // swallow this exception
        } finally {
            commitOffsets();
            consumer.close();
        }
    }

    private void commitOffsets() {
        // 儘可能下降synchronized塊對offsets鎖定的時間
        Map<TopicPartition, OffsetAndMetadata> unmodfiedMap;
        synchronized (offsets) {
            if (offsets.isEmpty()) {
                return;
            }
            unmodfiedMap = Collections.unmodifiableMap(new HashMap<>(offsets));
            offsets.clear();
        }
        consumer.commitSync(unmodfiedMap);
    }

    public void close() {
        consumer.wakeup();
        executors.shutdown();
    }
}

Main類安全

package huxi.test.consumer.multithreaded;

public class Main {

    public static void main(String[] args) {
        String brokerList = "localhost:9092";
        String topic = "test-topic";
        String groupID = "test-group";
        final ConsumerThreadHandler<byte[], byte[]> handler = new ConsumerThreadHandler<>(brokerList, groupID, topic);
        final int cpuCount = Runtime.getRuntime().availableProcessors();

        Runnable runnable = new Runnable() {
            @Override
            public void run() {
                handler.consume(cpuCount);
            }
        };
        new Thread(runnable).start();

        try {
            // 20秒後自動中止該測試程序
            Thread.sleep(20000L);
        } catch (InterruptedException e) {
            // swallow this exception
        }
        System.out.println("Starting to close the consumer...");
        handler.close();
    }
}  

測試步驟

1. 首先建立一個測試topic: test-topic,10個分區,並使用kafka-producer-perf-test.sh腳本生產50萬條消息多線程

2. 運行Main,假定group.id設置爲test-groupide

3. 新開一個終端,不斷地運行如下腳本監控consumer group的消費進度測試

bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group test-groupthis

測試結果

LAG列所有爲0表示consumer group的位移提交正常。值得一提的是,各位能夠經過控制consumer.poll的超時時間來控制ConsumerThreadHandler類提交位移的頻率。spa

感謝QQ羣友的提醒,這種方式有丟失數據的時間窗口——假設T1線程在t0時間消費分區0的位移=100的消息M1,而T2線程在t1時間消費分區0的位移=101的消息M2。如今假設t3時T2線程先完成處理,因而上報位移101給Handler,但此時T1線程還沒有處理完成。t4時handler提交位移101,以後T1線程發生錯誤,拋出異常致使位移100的消息消費失敗,但因爲位移已經提交到101,故消息丟失~。

相關文章
相關標籤/搜索