kafka中消費者消費消息之每一個線程維護一個KafkaConsumer實例

一、首先啓動本身的kafka集羣喲。java

1 啓動zk:
2 bin/zkServer.sh start conf/zoo.cfg。
3 驗證zk是否啓動成功:
4 bin/zkServer.sh status conf/zoo.cfg。
5 啓動kafka:
6 bin/kafka-server-start.sh -daemon config/server.properties。

二、生產者生產消息,模擬生產一百條數據。apache

 1 package com.bie.kafka.producer;
 2 
 3 import java.util.Properties;
 4 import java.util.concurrent.ExecutionException;
 5 
 6 import org.apache.kafka.clients.producer.KafkaProducer;
 7 import org.apache.kafka.clients.producer.Producer;
 8 import org.apache.kafka.clients.producer.ProducerRecord;
 9 import org.apache.kafka.clients.producer.RecordMetadata;
10 import org.apache.kafka.common.serialization.StringSerializer;
11 
12 /**
13  * 
14  * @Description TODO
15  * @author biehl
16  * @Date 2019年5月25日 下午2:34:46
17  *
18  */
19 public class KafKaProducerHeart {
20 
21     public static void main(String[] args) {
22         Properties props = new Properties();
23         props.put("bootstrap.servers", "slaver1:9092,slaver2:9092,slaver3:9092");
24         props.put("acks", "-1");
25         props.put("retries", 3);
26         props.put("batch.size", 323840);
27         props.put("linger.ms", 10);
28         props.put("buffer.memory", 33554432);
29         props.put("max.block.ms", 3000);
30         StringSerializer keySerializer = new StringSerializer();
31         StringSerializer valueSerializer = new StringSerializer();
32         Producer<String, String> producer = new KafkaProducer<String, String>(props, keySerializer, valueSerializer);
33         String topic = "topic1";
34         String value = " biehl 💗   wj 1314 ";
35         /*
36          * for (int i = 0; i < 100; i++) { 
37          *     //topic-key-value三元組肯定消息所在位置
38          *     producer.send(new ProducerRecord<String, String>(topic, Integer.toString(i),
39          *     value)); 
40          * }
41          */
42 
43         // 異步發送
44         /*for (int i = 0; i < 100; i++) {
45             ProducerRecord<String, String> record = new ProducerRecord<>(topic, Integer.toString(i), value);
46             // 異步發送
47             producer.send(record, new Callback() {
48 
49                 @Override
50                 public void onCompletion(RecordMetadata recordmetadata, Exception exception) {
51                     if (exception == null) {
52                         System.out.println("消息發送成功");
53                         System.out.println(
54                                 "topic: " + recordmetadata.topic() + ", partition分區: " + recordmetadata.partition());
55                     } else {
56                         System.out.println("消息發送失敗");
57                     }
58                 }
59             });
60         }*/
61         
62         // 同步發送
63         for (int i = 0; i < 100; i++) {
64             ProducerRecord<String, String> record = new ProducerRecord<>(topic, Integer.toString(i), value);
65             try {
66                 RecordMetadata recordMetadata = producer.send(record).get();
67                 System.out.println(
68                         "topic: " + recordMetadata.topic() + ", partition分區: " + recordMetadata.partition());
69             } catch (InterruptedException e) {
70                 e.printStackTrace();
71             } catch (ExecutionException e) {
72                 e.printStackTrace();
73             }
74         }
75 
76         producer.close();
77     }
78 
79 }

三、kafka中消費者消費消息之每一個線程維護一個KafkaConsumer實例:
bootstrap

ConsumerRunnable,消費線程類,執行真正的消費任務安全

 1 package com.bie.kafka.kafkaThrea;
 2 
 3 import java.time.Duration;
 4 import java.util.Arrays;
 5 import java.util.Properties;
 6 
 7 import org.apache.kafka.clients.consumer.ConsumerRecord;
 8 import org.apache.kafka.clients.consumer.ConsumerRecords;
 9 import org.apache.kafka.clients.consumer.KafkaConsumer;
10 
11 /**
12  * 
13  * @Description TODO
14  * @author biehl
15  * @Date 2019年6月1日 上午11:48:53
16  * 
17  *       一、KafkaConsumer是非線程安全的,KafkaProducer是線程安全的。
18  *       二、該案例是每一個線程維護一個KafkaConsumer實例
19  *       用戶建立多個線程消費topic數據,每一個線程都會建立專屬該線程的KafkaConsumer實例
20  *       三、ConsumerRunnable,消費線程類,執行真正的消費任務
21  */
22 public class ConsumerRunnable implements Runnable {
23 
24     // 每一個線程維護私有的kafkaConsumer實例
25     private final KafkaConsumer<String, String> consumer;
26 
27     /**
28      * 默認每一個消費者的配置參數初始化
29      * 
30      * @param brokerList
31      * @param groupId
32      * @param topic
33      */
34     public ConsumerRunnable(String brokerList, String groupId, String topic) {
35         // 帶參數的構造方法
36         Properties props = new Properties();
37         // kafka的列表
38         props.put("bootstrap.servers", brokerList);
39         // 消費者組編號
40         props.put("group.id", groupId);
41         // 自動提交
42         props.put("enable.auto.commit", true);
43         // 提交提交每一個一秒鐘
44         props.put("auto.commit.interval.ms", "1000");
45         // 反序列化key
46         props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
47         // 反序列化value
48         props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
49         // 將配置信息進行初始化操做
50         this.consumer = new KafkaConsumer<>(props);
51         // 定義響應的主題信息topic
52         consumer.subscribe(Arrays.asList(topic));
53     }
54 
55     /**
56      * 
57      */
58     @Override
59     public void run() {
60         // 消費者保持一直消費的狀態
61         while (true) {
62             // 將獲取到消費的信息
63             ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(200));
64             // 遍歷出每一個消費的消息
65             for (ConsumerRecord<String, String> record : records) {
66                 // 輸出打印消息
67                 System.out.println(
68                         "當前線程名稱 : " + Thread.currentThread().getName() + ", 主題名稱 :" + record.topic() + ", 分區名稱 :"
69                                 + record.partition() + ", 位移名稱 :" + record.offset() + ", value :" + record.value());
70             }
71         }
72     }
73 
74 }

消費線程管理類,建立多個線程類執行消費任務:多線程

 1 package com.bie.kafka.kafkaThrea;
 2 
 3 import java.util.ArrayList;
 4 import java.util.List;
 5 
 6 /**
 7  * 
 8  * @Description TODO
 9  * @author biehl
10  * @Date 2019年6月1日 上午11:56:42
11  * 
12  *       一、消費線程管理類,建立多個線程類執行消費任務
13  */
14 public class ConsumerGroup {
15 
16     // 消費者羣組,多消費者。
17     private List<ConsumerRunnable> consumers;
18 
19     /**
20      * 
21      * @param consumerNum
22      * @param groupId
23      * @param topic
24      * @param brokerList
25      */
26     public ConsumerGroup(int consumerNum, String groupId, String topic, String brokerList) {
27         // 初始化消費者組
28         consumers = new ArrayList<>(consumerNum);
29         // 初始化消費者,建立多少個消費者
30         for (int i = 0; i < consumerNum; i++) {
31             // 根據消費者構造方法,建立消費者實例
32             ConsumerRunnable consumerRunnable = new ConsumerRunnable(brokerList, groupId, topic);
33             // 將建立的消費者實例添加到消費者組中
34             consumers.add(consumerRunnable);
35         }
36     }
37 
38     /**
39      * 
40      */
41     public void execute() {
42         // 將消費者組裏面的消費者遍歷出來
43         for (ConsumerRunnable task : consumers) {
44             // 建立一個消費者線程,而且啓動該線程
45             new Thread(task).start();
46         }
47     }
48 
49 }
 1 package com.bie.kafka.kafkaThrea;
 2 
 3 /**
 4  * 
 5  * @Description TODO
 6  * @author biehl
 7  * @Date 2019年6月1日 下午2:19:52
 8  *
 9  */
10 public class ConsumerMain {
11 
12     public static void main(String[] args) {
13         // kafka即broker列表
14         String brokerList = "slaver1:9092,slaver2:9092,slaver3:9092";
15         // group組名稱
16         String groupId = "group1";
17         // topic主題名稱
18         String topic = "topic1";
19         // 消費者的數量
20         int consumerNum = 3;
21         // 經過構造器建立出一個對象
22         ConsumerGroup consumerGroup = new ConsumerGroup(consumerNum, groupId, topic, brokerList);
23         // 執行execute的方法,建立出ConsumerRunnable消費者實例。多線程多消費者實例
24         consumerGroup.execute();
25     }
26 
27 }

效果以下所示:異步

生產者生產消息的案例:ide

 消費者消費消息的案例:this

 

待續......spa

相關文章
相關標籤/搜索