一、首先啓動本身的kafka集羣喲。java
1 啓動zk: 2 bin/zkServer.sh start conf/zoo.cfg。 3 驗證zk是否啓動成功: 4 bin/zkServer.sh status conf/zoo.cfg。 5 啓動kafka: 6 bin/kafka-server-start.sh -daemon config/server.properties。
二、生產者生產消息,模擬生產一百條數據。apache
1 package com.bie.kafka.producer; 2 3 import java.util.Properties; 4 import java.util.concurrent.ExecutionException; 5 6 import org.apache.kafka.clients.producer.KafkaProducer; 7 import org.apache.kafka.clients.producer.Producer; 8 import org.apache.kafka.clients.producer.ProducerRecord; 9 import org.apache.kafka.clients.producer.RecordMetadata; 10 import org.apache.kafka.common.serialization.StringSerializer; 11 12 /** 13 * 14 * @Description TODO 15 * @author biehl 16 * @Date 2019年5月25日 下午2:34:46 17 * 18 */ 19 public class KafKaProducerHeart { 20 21 public static void main(String[] args) { 22 Properties props = new Properties(); 23 props.put("bootstrap.servers", "slaver1:9092,slaver2:9092,slaver3:9092"); 24 props.put("acks", "-1"); 25 props.put("retries", 3); 26 props.put("batch.size", 323840); 27 props.put("linger.ms", 10); 28 props.put("buffer.memory", 33554432); 29 props.put("max.block.ms", 3000); 30 StringSerializer keySerializer = new StringSerializer(); 31 StringSerializer valueSerializer = new StringSerializer(); 32 Producer<String, String> producer = new KafkaProducer<String, String>(props, keySerializer, valueSerializer); 33 String topic = "topic1"; 34 String value = " biehl 💗 wj 1314 "; 35 /* 36 * for (int i = 0; i < 100; i++) { 37 * //topic-key-value三元組肯定消息所在位置 38 * producer.send(new ProducerRecord<String, String>(topic, Integer.toString(i), 39 * value)); 40 * } 41 */ 42 43 // 異步發送 44 /*for (int i = 0; i < 100; i++) { 45 ProducerRecord<String, String> record = new ProducerRecord<>(topic, Integer.toString(i), value); 46 // 異步發送 47 producer.send(record, new Callback() { 48 49 @Override 50 public void onCompletion(RecordMetadata recordmetadata, Exception exception) { 51 if (exception == null) { 52 System.out.println("消息發送成功"); 53 System.out.println( 54 "topic: " + recordmetadata.topic() + ", partition分區: " + recordmetadata.partition()); 55 } else { 56 System.out.println("消息發送失敗"); 57 } 58 } 59 }); 60 }*/ 61 62 // 同步發送 63 for (int i = 0; i < 100; i++) { 64 ProducerRecord<String, String> record = new ProducerRecord<>(topic, Integer.toString(i), value); 65 try { 66 RecordMetadata recordMetadata = producer.send(record).get(); 67 System.out.println( 68 "topic: " + recordMetadata.topic() + ", partition分區: " + recordMetadata.partition()); 69 } catch (InterruptedException e) { 70 e.printStackTrace(); 71 } catch (ExecutionException e) { 72 e.printStackTrace(); 73 } 74 } 75 76 producer.close(); 77 } 78 79 }
三、kafka中消費者消費消息之每一個線程維護一個KafkaConsumer實例:
bootstrap
ConsumerRunnable,消費線程類,執行真正的消費任務安全
1 package com.bie.kafka.kafkaThrea; 2 3 import java.time.Duration; 4 import java.util.Arrays; 5 import java.util.Properties; 6 7 import org.apache.kafka.clients.consumer.ConsumerRecord; 8 import org.apache.kafka.clients.consumer.ConsumerRecords; 9 import org.apache.kafka.clients.consumer.KafkaConsumer; 10 11 /** 12 * 13 * @Description TODO 14 * @author biehl 15 * @Date 2019年6月1日 上午11:48:53 16 * 17 * 一、KafkaConsumer是非線程安全的,KafkaProducer是線程安全的。 18 * 二、該案例是每一個線程維護一個KafkaConsumer實例 19 * 用戶建立多個線程消費topic數據,每一個線程都會建立專屬該線程的KafkaConsumer實例 20 * 三、ConsumerRunnable,消費線程類,執行真正的消費任務 21 */ 22 public class ConsumerRunnable implements Runnable { 23 24 // 每一個線程維護私有的kafkaConsumer實例 25 private final KafkaConsumer<String, String> consumer; 26 27 /** 28 * 默認每一個消費者的配置參數初始化 29 * 30 * @param brokerList 31 * @param groupId 32 * @param topic 33 */ 34 public ConsumerRunnable(String brokerList, String groupId, String topic) { 35 // 帶參數的構造方法 36 Properties props = new Properties(); 37 // kafka的列表 38 props.put("bootstrap.servers", brokerList); 39 // 消費者組編號 40 props.put("group.id", groupId); 41 // 自動提交 42 props.put("enable.auto.commit", true); 43 // 提交提交每一個一秒鐘 44 props.put("auto.commit.interval.ms", "1000"); 45 // 反序列化key 46 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 47 // 反序列化value 48 props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 49 // 將配置信息進行初始化操做 50 this.consumer = new KafkaConsumer<>(props); 51 // 定義響應的主題信息topic 52 consumer.subscribe(Arrays.asList(topic)); 53 } 54 55 /** 56 * 57 */ 58 @Override 59 public void run() { 60 // 消費者保持一直消費的狀態 61 while (true) { 62 // 將獲取到消費的信息 63 ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(200)); 64 // 遍歷出每一個消費的消息 65 for (ConsumerRecord<String, String> record : records) { 66 // 輸出打印消息 67 System.out.println( 68 "當前線程名稱 : " + Thread.currentThread().getName() + ", 主題名稱 :" + record.topic() + ", 分區名稱 :" 69 + record.partition() + ", 位移名稱 :" + record.offset() + ", value :" + record.value()); 70 } 71 } 72 } 73 74 }
消費線程管理類,建立多個線程類執行消費任務:多線程
1 package com.bie.kafka.kafkaThrea; 2 3 import java.util.ArrayList; 4 import java.util.List; 5 6 /** 7 * 8 * @Description TODO 9 * @author biehl 10 * @Date 2019年6月1日 上午11:56:42 11 * 12 * 一、消費線程管理類,建立多個線程類執行消費任務 13 */ 14 public class ConsumerGroup { 15 16 // 消費者羣組,多消費者。 17 private List<ConsumerRunnable> consumers; 18 19 /** 20 * 21 * @param consumerNum 22 * @param groupId 23 * @param topic 24 * @param brokerList 25 */ 26 public ConsumerGroup(int consumerNum, String groupId, String topic, String brokerList) { 27 // 初始化消費者組 28 consumers = new ArrayList<>(consumerNum); 29 // 初始化消費者,建立多少個消費者 30 for (int i = 0; i < consumerNum; i++) { 31 // 根據消費者構造方法,建立消費者實例 32 ConsumerRunnable consumerRunnable = new ConsumerRunnable(brokerList, groupId, topic); 33 // 將建立的消費者實例添加到消費者組中 34 consumers.add(consumerRunnable); 35 } 36 } 37 38 /** 39 * 40 */ 41 public void execute() { 42 // 將消費者組裏面的消費者遍歷出來 43 for (ConsumerRunnable task : consumers) { 44 // 建立一個消費者線程,而且啓動該線程 45 new Thread(task).start(); 46 } 47 } 48 49 }
1 package com.bie.kafka.kafkaThrea; 2 3 /** 4 * 5 * @Description TODO 6 * @author biehl 7 * @Date 2019年6月1日 下午2:19:52 8 * 9 */ 10 public class ConsumerMain { 11 12 public static void main(String[] args) { 13 // kafka即broker列表 14 String brokerList = "slaver1:9092,slaver2:9092,slaver3:9092"; 15 // group組名稱 16 String groupId = "group1"; 17 // topic主題名稱 18 String topic = "topic1"; 19 // 消費者的數量 20 int consumerNum = 3; 21 // 經過構造器建立出一個對象 22 ConsumerGroup consumerGroup = new ConsumerGroup(consumerNum, groupId, topic, brokerList); 23 // 執行execute的方法,建立出ConsumerRunnable消費者實例。多線程多消費者實例 24 consumerGroup.execute(); 25 } 26 27 }
效果以下所示:異步
生產者生產消息的案例:ide
消費者消費消息的案例:this
待續......spa