在上一篇《Kafka Consumer多線程實例》中咱們討論了KafkaConsumer多線程的兩種寫法:多KafkaConsumer多線程以及單KafkaConsumer多線程。在第二種用法中我使用的是自動提交的方式,省去了多線程提交位移的麻煩。不少人跑來問若是是手動提交應該怎麼寫?因爲KafkaConsumer不是線程安全的,所以咱們不能簡單地在多個線程中直接調用consumer.commitSync來提交位移。本文將給出一個實際的例子來模擬多線程消費以及手動提交位移。html
本例中包含3個類:java
ConsumerWorker類apache
package huxi.test.consumer.multithreaded; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; import java.util.List; import java.util.Map; public class ConsumerWorker<K, V> implements Runnable { private final ConsumerRecords<K, V> records; private final Map<TopicPartition, OffsetAndMetadata> offsets; public ConsumerWorker(ConsumerRecords<K, V> record, Map<TopicPartition, OffsetAndMetadata> offsets) { this.records = record; this.offsets = offsets; } @Override public void run() { for (TopicPartition partition : records.partitions()) { List<ConsumerRecord<K, V>> partitionRecords = records.records(partition); for (ConsumerRecord<K, V> record : partitionRecords) { // 插入消息處理邏輯,本例只是打印消息 System.out.println(String.format("topic=%s, partition=%d, offset=%d", record.topic(), record.partition(), record.offset())); } // 上報位移信息 long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset(); synchronized (offsets) { if (!offsets.containsKey(partition)) { offsets.put(partition, new OffsetAndMetadata(lastOffset + 1)); } else { long curr = offsets.get(partition).offset(); if (curr <= lastOffset + 1) { offsets.put(partition, new OffsetAndMetadata(lastOffset + 1)); } } } } } }
ConsumerThreadHandler類bootstrap
package huxi.test.consumer.multithreaded; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.WakeupException; import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.Properties; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class ConsumerThreadHandler<K, V> { private final KafkaConsumer<K, V> consumer; private ExecutorService executors; private final Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>(); public ConsumerThreadHandler(String brokerList, String groupId, String topic) { Properties props = new Properties(); props.put("bootstrap.servers", brokerList); props.put("group.id", groupId); props.put("enable.auto.commit", "false"); props.put("auto.offset.reset", "earliest"); props.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer"); consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList(topic), new ConsumerRebalanceListener() { @Override public void onPartitionsRevoked(Collection<TopicPartition> partitions) { consumer.commitSync(offsets); } @Override public void onPartitionsAssigned(Collection<TopicPartition> partitions) { offsets.clear(); } }); } /** * 消費主方法 * @param threadNumber 線程池中線程數 */ public void consume(int threadNumber) { executors = new ThreadPoolExecutor( threadNumber, threadNumber, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(1000), new ThreadPoolExecutor.CallerRunsPolicy()); try { while (true) { ConsumerRecords<K, V> records = consumer.poll(1000L); if (!records.isEmpty()) { executors.submit(new ConsumerWorker<>(records, offsets)); } commitOffsets(); } } catch (WakeupException e) { // swallow this exception } finally { commitOffsets(); consumer.close(); } } private void commitOffsets() { // 儘可能下降synchronized塊對offsets鎖定的時間 Map<TopicPartition, OffsetAndMetadata> unmodfiedMap; synchronized (offsets) { if (offsets.isEmpty()) { return; } unmodfiedMap = Collections.unmodifiableMap(new HashMap<>(offsets)); offsets.clear(); } consumer.commitSync(unmodfiedMap); } public void close() { consumer.wakeup(); executors.shutdown(); } }
Main類安全
package huxi.test.consumer.multithreaded; public class Main { public static void main(String[] args) { String brokerList = "localhost:9092"; String topic = "test-topic"; String groupID = "test-group"; final ConsumerThreadHandler<byte[], byte[]> handler = new ConsumerThreadHandler<>(brokerList, groupID, topic); final int cpuCount = Runtime.getRuntime().availableProcessors(); Runnable runnable = new Runnable() { @Override public void run() { handler.consume(cpuCount); } }; new Thread(runnable).start(); try { // 20秒後自動中止該測試程序 Thread.sleep(20000L); } catch (InterruptedException e) { // swallow this exception } System.out.println("Starting to close the consumer..."); handler.close(); } }
1. 首先建立一個測試topic: test-topic,10個分區,並使用kafka-producer-perf-test.sh腳本生產50萬條消息多線程
2. 運行Main,假定group.id設置爲test-groupide
3. 新開一個終端,不斷地運行如下腳本監控consumer group的消費進度測試
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group test-groupthis
LAG列所有爲0表示consumer group的位移提交正常。值得一提的是,各位能夠經過控制consumer.poll的超時時間來控制ConsumerThreadHandler類提交位移的頻率。spa
感謝QQ羣友的提醒,這種方式有丟失數據的時間窗口——假設T1線程在t0時間消費分區0的位移=100的消息M1,而T2線程在t1時間消費分區0的位移=101的消息M2。如今假設t3時T2線程先完成處理,因而上報位移101給Handler,但此時T1線程還沒有處理完成。t4時handler提交位移101,以後T1線程發生錯誤,拋出異常致使位移100的消息消費失敗,但因爲位移已經提交到101,故消息丟失~。