單獨KafkaConsumer實例and多worker線程。

一、單獨KafkaConsumer實例and多worker線程。
將獲取的消息和消息的處理解耦,將消息的處理放入單獨的工做者線程中,即工做線程中,同時維護一個或者若各幹consumer實例執行消息獲取任務。
本例使用全局的KafkaConsumer實例執行消息獲取,而後把獲取到的消息集合交給線程池中的worker線程執行工做,以後worker線程完成處理後上報位移狀態,由全局consumer提交位移。
java

 

  1 package com.bie.kafka.kafkaWorker;
  2 
  3 import java.time.Duration;
  4 import java.util.Arrays;
  5 import java.util.Collection;
  6 import java.util.Collections;
  7 import java.util.HashMap;
  8 import java.util.Map;
  9 import java.util.Properties;
 10 import java.util.concurrent.ArrayBlockingQueue;
 11 import java.util.concurrent.ExecutorService;
 12 import java.util.concurrent.ThreadPoolExecutor;
 13 import java.util.concurrent.TimeUnit;
 14 
 15 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 16 import org.apache.kafka.clients.consumer.ConsumerRecords;
 17 import org.apache.kafka.clients.consumer.KafkaConsumer;
 18 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 19 import org.apache.kafka.common.TopicPartition;
 20 import org.apache.kafka.common.errors.WakeupException;
 21 
 22 /**
 23  * 
 24  * @Description TODO
 25  * @author biehl
 26  * @Date 2019年6月1日 下午3:28:53
 27  * 
 28  * @param <K>
 29  * @param <V>
 30  * 
 31  *            一、consumer多線程管理類,用於建立線程池以及爲每一個線程分配消息集合。 另外consumer位移提交也在該類中完成。
 32  * 
 33  */
 34 public class ConsumerThreadHandler<K, V> {
 35 
 36     // KafkaConsumer實例
 37     private final KafkaConsumer<K, V> consumer;
 38     // ExecutorService實例
 39     private ExecutorService executors;
 40     // 位移信息offsets
 41     private final Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
 42 
 43     /**
 44      * 
 45      * @param brokerList
 46      *            kafka列表
 47      * @param groupId
 48      *            消費組groupId
 49      * @param topic
 50      *            主題topic
 51      */
 52     public ConsumerThreadHandler(String brokerList, String groupId, String topic) {
 53         Properties props = new Properties();
 54         // broker列表
 55         props.put("bootstrap.servers", brokerList);
 56         // 消費者組編號Id
 57         props.put("group.id", groupId);
 58         // 非自動提交位移信息
 59         props.put("enable.auto.commit", "false");
 60         // 從最先的位移處開始消費消息
 61         props.put("auto.offset.reset", "earliest");
 62         // key反序列化
 63         props.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
 64         // value反序列化
 65         props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
 66         // 將配置信息裝配到消費者實例裏面
 67         consumer = new KafkaConsumer<>(props);
 68         // 消費者訂閱消息,並實現重平衡rebalance
 69         // rebalance監聽器,建立一個匿名內部類。使用rebalance監聽器前提是使用消費者組(consumer group)。
 70         // 監聽器最多見用法就是手動提交位移到第三方存儲以及在rebalance先後執行一些必要的審計操做。
 71         consumer.subscribe(Arrays.asList(topic), new ConsumerRebalanceListener() {
 72 
 73             /**
 74              * 在coordinator開啓新一輪rebalance前onPartitionsRevoked方法會被調用。
 75              */
 76             @Override
 77             public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
 78                 // 提交位移
 79                 consumer.commitSync(offsets);
 80             }
 81 
 82             /**
 83              * rebalance完成後會調用onPartitionsAssigned方法。
 84              */
 85             @Override
 86             public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
 87                 // 清除位移信息
 88                 offsets.clear();
 89             }
 90         });
 91     }
 92 
 93     /**
 94      * 消費主方法
 95      * 
 96      * @param threadNumber
 97      *            線程池中的線程數
 98      */
 99     public void consume(int threadNumber) {
100         executors = new ThreadPoolExecutor(
101                 threadNumber, 
102                 threadNumber, 
103                 0L, 
104                 TimeUnit.MILLISECONDS,
105                 new ArrayBlockingQueue<Runnable>(1000), 
106                 new ThreadPoolExecutor.CallerRunsPolicy());
107         try {
108             // 消費者一直處於等待狀態,等待消息消費
109             while (true) {
110                 // 從主題中獲取消息
111                 ConsumerRecords<K, V> records = consumer.poll(Duration.ofSeconds(1000L));
112                 // 若是獲取到的消息不爲空
113                 if (!records.isEmpty()) {
114                     // 將消息信息、位移信息封裝到ConsumerWorker中進行提交
115                     executors.submit(new ConsumerWorker<>(records, offsets));
116                 }
117                 // 調用提交位移信息、儘可能下降synchronized塊對offsets鎖定的時間
118                 this.commitOffsets();
119             }
120         } catch (WakeupException e) {
121             // 此處忽略此異常的處理.WakeupException異常是從poll方法中拋出來的異常
122             //若是不忽略異常信息,此處會打印錯誤哦,親
123             //e.printStackTrace();
124         } finally {
125             // 調用提交位移信息、儘可能下降synchronized塊對offsets鎖定的時間
126             this.commitOffsets();
127             // 關閉consumer
128             consumer.close();
129         }
130     }
131 
132     /**
133      * 儘可能下降synchronized塊對offsets鎖定的時間
134      */
135     private void commitOffsets() {
136         // 儘可能下降synchronized塊對offsets鎖定的時間
137         Map<TopicPartition, OffsetAndMetadata> unmodfiedMap;
138         // 保證線程安全、同步鎖,鎖住offsets
139         synchronized (offsets) {
140             // 判斷若是offsets位移信息爲空,直接返回,節省同步鎖對offsets的鎖定的時間
141             if (offsets.isEmpty()) {
142                 return;
143             }
144             // 若是offsets位移信息不爲空,將位移信息offsets放到集合中,方便同步
145             unmodfiedMap = Collections.unmodifiableMap(new HashMap<>(offsets));
146             // 清除位移信息offsets
147             offsets.clear();
148         }
149         // 將封裝好的位移信息unmodfiedMap集合進行同步提交
150         // 手動提交位移信息
151         consumer.commitSync(unmodfiedMap);
152     }
153 
154     /**
155      * 關閉消費者
156      */
157     public void close() {
158         // 在另外一個線程中調用consumer.wakeup();方法來觸發consume的關閉。
159         // KafkaConsumer不是線程安全的,可是另一個例外,用戶能夠安全的在另外一個線程中調用consume.wakeup()。
160         // wakeup()方法是特例,其餘KafkaConsumer方法都不能同時在多線程中使用
161         consumer.wakeup();
162         // 關閉ExecutorService實例
163         executors.shutdown();
164     }
165 
166 }

 

 1 package com.bie.kafka.kafkaWorker;
 2 
 3 import java.util.List;
 4 import java.util.Map;
 5 
 6 import org.apache.kafka.clients.consumer.ConsumerRecord;
 7 import org.apache.kafka.clients.consumer.ConsumerRecords;
 8 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 9 import org.apache.kafka.common.TopicPartition;
10 
11 /**
12  * 
13  * @Description TODO
14  * @author biehl
15  * @Date 2019年6月1日 下午3:45:38
16  * 
17  * @param <K>
18  * @param <V>
19  * 
20  *            一、本質上是一個Runnable,執行真正的消費邏輯而且上報位移信息給ConsumerThreadHandler。
21  * 
22  */
23 public class ConsumerWorker<K, V> implements Runnable {
24 
25     // 獲取到的消息
26     private final ConsumerRecords<K, V> records;
27     // 位移信息
28     private final Map<TopicPartition, OffsetAndMetadata> offsets;
29 
30     /**
31      * ConsumerWorker有參構造方法
32      * 
33      * @param records
34      *            獲取到的消息
35      * @param offsets
36      *            位移信息
37      */
38     public ConsumerWorker(ConsumerRecords<K, V> records, Map<TopicPartition, OffsetAndMetadata> offsets) {
39         this.records = records;
40         this.offsets = offsets;
41     }
42 
43     /**
44      * 
45      */
46     @Override
47     public void run() {
48         // 獲取到分區的信息
49         for (TopicPartition partition : records.partitions()) {
50             // 獲取到分區的消息記錄
51             List<ConsumerRecord<K, V>> partConsumerRecords = records.records(partition);
52             // 遍歷獲取到的消息記錄
53             for (ConsumerRecord<K, V> record : partConsumerRecords) {
54                 // 打印消息
55                 System.out.println("topic: " + record.topic() + ",partition: " + record.partition() + ",offset: "
56                         + record.offset() 
57                         + ",消息記錄: " + record.value());
58             }
59             // 上報位移信息。獲取到最後的位移消息,因爲位移消息從0開始,因此最後位移減一獲取到位移位置
60             long lastOffset = partConsumerRecords.get(partConsumerRecords.size() - 1).offset();
61             // 同步鎖,鎖住offsets位移
62             synchronized (offsets) {
63                 // 若是offsets位移不包含partition這個key信息
64                 if (!offsets.containsKey(partition)) {
65                     // 就將位移信息設置到map集合裏面
66                     offsets.put(partition, new OffsetAndMetadata(lastOffset + 1));
67                 } else {
68                     // 不然,offsets位移包含partition這個key信息
69                     // 獲取到offsets的位置信息
70                     long curr = offsets.get(partition).offset();
71                     // 若是獲取到的位置信息小於等於上一次位移信息大小
72                     if (curr <= lastOffset + 1) {
73                         // 將這個partition的位置信息設置到map集合中。並保存到broker中。
74                         offsets.put(partition, new OffsetAndMetadata(lastOffset + 1));
75                     }
76                 }
77             }
78         }
79     }
80 
81 }
 1 package com.bie.kafka.kafkaWorker;
 2 
 3 /**
 4  * 
 5  * @Description TODO
 6  * @author biehl
 7  * @Date 2019年6月1日 下午4:13:25
 8  *
 9  *       一、單獨KafkaConsumer實例和多worker線程。
10  *       二、將獲取的消息和消息的處理解耦,將消息的處理放入單獨的工做者線程中,即工做線程中,
11  *       同時維護一個或者若各幹consumer實例執行消息獲取任務。
12  *       三、本例使用全局的KafkaConsumer實例執行消息獲取,而後把獲取到的消息集合交給線程池中的worker線程執行工做,
13  *       以後worker線程完成處理後上報位移狀態,由全局consumer提交位移。
14  * 
15  * 
16  */
17 
18 public class ConsumerMain {
19 
20     public static void main(String[] args) {
21         // broker列表
22         String brokerList = "slaver1:9092,slaver2:9092,slaver3:9092";
23         // 主題信息topic
24         String topic = "topic1";
25         // 消費者組信息group
26         String groupId = "group2";
27         // 根據ConsumerThreadHandler構造方法構造出消費者
28         final ConsumerThreadHandler<byte[], byte[]> handler = new ConsumerThreadHandler<>(brokerList, groupId, topic);
29         final int cpuCount = Runtime.getRuntime().availableProcessors();
30         System.out.println("cpuCount : " + cpuCount);
31         // 建立線程的匿名內部類
32         Runnable runnable = new Runnable() {
33 
34             @Override
35             public void run() {
36                 // 執行consume,在此線程中執行消費者消費消息。
37                 handler.consume(cpuCount);
38             }
39         };
40         // 直接調用runnable此線程,並運行
41         new Thread(runnable).start();
42 
43         try {
44             // 此線程休眠20000
45             Thread.sleep(20000L);
46         } catch (InterruptedException e) {
47             e.printStackTrace();
48         }
49         System.out.println("Starting to close the consumer...");
50         // 關閉消費者
51         handler.close();
52     }
53 
54 }

 

待續......apache

相關文章
相關標籤/搜索