一、單獨KafkaConsumer實例and多worker線程。
將獲取的消息和消息的處理解耦,將消息的處理放入單獨的工做者線程中,即工做線程中,同時維護一個或者若各幹consumer實例執行消息獲取任務。
本例使用全局的KafkaConsumer實例執行消息獲取,而後把獲取到的消息集合交給線程池中的worker線程執行工做,以後worker線程完成處理後上報位移狀態,由全局consumer提交位移。java
1 package com.bie.kafka.kafkaWorker; 2 3 import java.time.Duration; 4 import java.util.Arrays; 5 import java.util.Collection; 6 import java.util.Collections; 7 import java.util.HashMap; 8 import java.util.Map; 9 import java.util.Properties; 10 import java.util.concurrent.ArrayBlockingQueue; 11 import java.util.concurrent.ExecutorService; 12 import java.util.concurrent.ThreadPoolExecutor; 13 import java.util.concurrent.TimeUnit; 14 15 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; 16 import org.apache.kafka.clients.consumer.ConsumerRecords; 17 import org.apache.kafka.clients.consumer.KafkaConsumer; 18 import org.apache.kafka.clients.consumer.OffsetAndMetadata; 19 import org.apache.kafka.common.TopicPartition; 20 import org.apache.kafka.common.errors.WakeupException; 21 22 /** 23 * 24 * @Description TODO 25 * @author biehl 26 * @Date 2019年6月1日 下午3:28:53 27 * 28 * @param <K> 29 * @param <V> 30 * 31 * 一、consumer多線程管理類,用於建立線程池以及爲每一個線程分配消息集合。 另外consumer位移提交也在該類中完成。 32 * 33 */ 34 public class ConsumerThreadHandler<K, V> { 35 36 // KafkaConsumer實例 37 private final KafkaConsumer<K, V> consumer; 38 // ExecutorService實例 39 private ExecutorService executors; 40 // 位移信息offsets 41 private final Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>(); 42 43 /** 44 * 45 * @param brokerList 46 * kafka列表 47 * @param groupId 48 * 消費組groupId 49 * @param topic 50 * 主題topic 51 */ 52 public ConsumerThreadHandler(String brokerList, String groupId, String topic) { 53 Properties props = new Properties(); 54 // broker列表 55 props.put("bootstrap.servers", brokerList); 56 // 消費者組編號Id 57 props.put("group.id", groupId); 58 // 非自動提交位移信息 59 props.put("enable.auto.commit", "false"); 60 // 從最先的位移處開始消費消息 61 props.put("auto.offset.reset", "earliest"); 62 // key反序列化 63 props.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer"); 64 // value反序列化 65 props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer"); 66 // 將配置信息裝配到消費者實例裏面 67 consumer = new KafkaConsumer<>(props); 68 // 消費者訂閱消息,並實現重平衡rebalance 69 // rebalance監聽器,建立一個匿名內部類。使用rebalance監聽器前提是使用消費者組(consumer group)。 70 // 監聽器最多見用法就是手動提交位移到第三方存儲以及在rebalance先後執行一些必要的審計操做。 71 consumer.subscribe(Arrays.asList(topic), new ConsumerRebalanceListener() { 72 73 /** 74 * 在coordinator開啓新一輪rebalance前onPartitionsRevoked方法會被調用。 75 */ 76 @Override 77 public void onPartitionsRevoked(Collection<TopicPartition> partitions) { 78 // 提交位移 79 consumer.commitSync(offsets); 80 } 81 82 /** 83 * rebalance完成後會調用onPartitionsAssigned方法。 84 */ 85 @Override 86 public void onPartitionsAssigned(Collection<TopicPartition> partitions) { 87 // 清除位移信息 88 offsets.clear(); 89 } 90 }); 91 } 92 93 /** 94 * 消費主方法 95 * 96 * @param threadNumber 97 * 線程池中的線程數 98 */ 99 public void consume(int threadNumber) { 100 executors = new ThreadPoolExecutor( 101 threadNumber, 102 threadNumber, 103 0L, 104 TimeUnit.MILLISECONDS, 105 new ArrayBlockingQueue<Runnable>(1000), 106 new ThreadPoolExecutor.CallerRunsPolicy()); 107 try { 108 // 消費者一直處於等待狀態,等待消息消費 109 while (true) { 110 // 從主題中獲取消息 111 ConsumerRecords<K, V> records = consumer.poll(Duration.ofSeconds(1000L)); 112 // 若是獲取到的消息不爲空 113 if (!records.isEmpty()) { 114 // 將消息信息、位移信息封裝到ConsumerWorker中進行提交 115 executors.submit(new ConsumerWorker<>(records, offsets)); 116 } 117 // 調用提交位移信息、儘可能下降synchronized塊對offsets鎖定的時間 118 this.commitOffsets(); 119 } 120 } catch (WakeupException e) { 121 // 此處忽略此異常的處理.WakeupException異常是從poll方法中拋出來的異常 122 //若是不忽略異常信息,此處會打印錯誤哦,親 123 //e.printStackTrace(); 124 } finally { 125 // 調用提交位移信息、儘可能下降synchronized塊對offsets鎖定的時間 126 this.commitOffsets(); 127 // 關閉consumer 128 consumer.close(); 129 } 130 } 131 132 /** 133 * 儘可能下降synchronized塊對offsets鎖定的時間 134 */ 135 private void commitOffsets() { 136 // 儘可能下降synchronized塊對offsets鎖定的時間 137 Map<TopicPartition, OffsetAndMetadata> unmodfiedMap; 138 // 保證線程安全、同步鎖,鎖住offsets 139 synchronized (offsets) { 140 // 判斷若是offsets位移信息爲空,直接返回,節省同步鎖對offsets的鎖定的時間 141 if (offsets.isEmpty()) { 142 return; 143 } 144 // 若是offsets位移信息不爲空,將位移信息offsets放到集合中,方便同步 145 unmodfiedMap = Collections.unmodifiableMap(new HashMap<>(offsets)); 146 // 清除位移信息offsets 147 offsets.clear(); 148 } 149 // 將封裝好的位移信息unmodfiedMap集合進行同步提交 150 // 手動提交位移信息 151 consumer.commitSync(unmodfiedMap); 152 } 153 154 /** 155 * 關閉消費者 156 */ 157 public void close() { 158 // 在另外一個線程中調用consumer.wakeup();方法來觸發consume的關閉。 159 // KafkaConsumer不是線程安全的,可是另一個例外,用戶能夠安全的在另外一個線程中調用consume.wakeup()。 160 // wakeup()方法是特例,其餘KafkaConsumer方法都不能同時在多線程中使用 161 consumer.wakeup(); 162 // 關閉ExecutorService實例 163 executors.shutdown(); 164 } 165 166 }
1 package com.bie.kafka.kafkaWorker; 2 3 import java.util.List; 4 import java.util.Map; 5 6 import org.apache.kafka.clients.consumer.ConsumerRecord; 7 import org.apache.kafka.clients.consumer.ConsumerRecords; 8 import org.apache.kafka.clients.consumer.OffsetAndMetadata; 9 import org.apache.kafka.common.TopicPartition; 10 11 /** 12 * 13 * @Description TODO 14 * @author biehl 15 * @Date 2019年6月1日 下午3:45:38 16 * 17 * @param <K> 18 * @param <V> 19 * 20 * 一、本質上是一個Runnable,執行真正的消費邏輯而且上報位移信息給ConsumerThreadHandler。 21 * 22 */ 23 public class ConsumerWorker<K, V> implements Runnable { 24 25 // 獲取到的消息 26 private final ConsumerRecords<K, V> records; 27 // 位移信息 28 private final Map<TopicPartition, OffsetAndMetadata> offsets; 29 30 /** 31 * ConsumerWorker有參構造方法 32 * 33 * @param records 34 * 獲取到的消息 35 * @param offsets 36 * 位移信息 37 */ 38 public ConsumerWorker(ConsumerRecords<K, V> records, Map<TopicPartition, OffsetAndMetadata> offsets) { 39 this.records = records; 40 this.offsets = offsets; 41 } 42 43 /** 44 * 45 */ 46 @Override 47 public void run() { 48 // 獲取到分區的信息 49 for (TopicPartition partition : records.partitions()) { 50 // 獲取到分區的消息記錄 51 List<ConsumerRecord<K, V>> partConsumerRecords = records.records(partition); 52 // 遍歷獲取到的消息記錄 53 for (ConsumerRecord<K, V> record : partConsumerRecords) { 54 // 打印消息 55 System.out.println("topic: " + record.topic() + ",partition: " + record.partition() + ",offset: " 56 + record.offset() 57 + ",消息記錄: " + record.value()); 58 } 59 // 上報位移信息。獲取到最後的位移消息,因爲位移消息從0開始,因此最後位移減一獲取到位移位置 60 long lastOffset = partConsumerRecords.get(partConsumerRecords.size() - 1).offset(); 61 // 同步鎖,鎖住offsets位移 62 synchronized (offsets) { 63 // 若是offsets位移不包含partition這個key信息 64 if (!offsets.containsKey(partition)) { 65 // 就將位移信息設置到map集合裏面 66 offsets.put(partition, new OffsetAndMetadata(lastOffset + 1)); 67 } else { 68 // 不然,offsets位移包含partition這個key信息 69 // 獲取到offsets的位置信息 70 long curr = offsets.get(partition).offset(); 71 // 若是獲取到的位置信息小於等於上一次位移信息大小 72 if (curr <= lastOffset + 1) { 73 // 將這個partition的位置信息設置到map集合中。並保存到broker中。 74 offsets.put(partition, new OffsetAndMetadata(lastOffset + 1)); 75 } 76 } 77 } 78 } 79 } 80 81 }
1 package com.bie.kafka.kafkaWorker; 2 3 /** 4 * 5 * @Description TODO 6 * @author biehl 7 * @Date 2019年6月1日 下午4:13:25 8 * 9 * 一、單獨KafkaConsumer實例和多worker線程。 10 * 二、將獲取的消息和消息的處理解耦,將消息的處理放入單獨的工做者線程中,即工做線程中, 11 * 同時維護一個或者若各幹consumer實例執行消息獲取任務。 12 * 三、本例使用全局的KafkaConsumer實例執行消息獲取,而後把獲取到的消息集合交給線程池中的worker線程執行工做, 13 * 以後worker線程完成處理後上報位移狀態,由全局consumer提交位移。 14 * 15 * 16 */ 17 18 public class ConsumerMain { 19 20 public static void main(String[] args) { 21 // broker列表 22 String brokerList = "slaver1:9092,slaver2:9092,slaver3:9092"; 23 // 主題信息topic 24 String topic = "topic1"; 25 // 消費者組信息group 26 String groupId = "group2"; 27 // 根據ConsumerThreadHandler構造方法構造出消費者 28 final ConsumerThreadHandler<byte[], byte[]> handler = new ConsumerThreadHandler<>(brokerList, groupId, topic); 29 final int cpuCount = Runtime.getRuntime().availableProcessors(); 30 System.out.println("cpuCount : " + cpuCount); 31 // 建立線程的匿名內部類 32 Runnable runnable = new Runnable() { 33 34 @Override 35 public void run() { 36 // 執行consume,在此線程中執行消費者消費消息。 37 handler.consume(cpuCount); 38 } 39 }; 40 // 直接調用runnable此線程,並運行 41 new Thread(runnable).start(); 42 43 try { 44 // 此線程休眠20000 45 Thread.sleep(20000L); 46 } catch (InterruptedException e) { 47 e.printStackTrace(); 48 } 49 System.out.println("Starting to close the consumer..."); 50 // 關閉消費者 51 handler.close(); 52 } 53 54 }
待續......apache