一、搭建部署好zookeeper集羣和kafka集羣,這裏省略。java
1 啓動zk: 2 bin/zkServer.sh start conf/zoo.cfg。 3 驗證zk是否啓動成功: 4 bin/zkServer.sh status conf/zoo.cfg。 5 啓動kafka: 6 bin/kafka-server-start.sh -daemon config/server.properties。
二、生產者和消費者代碼以下所示:apache
1 package com.bie.kafka.producer; 2 3 import java.util.Properties; 4 5 import org.apache.kafka.clients.producer.KafkaProducer; 6 import org.apache.kafka.clients.producer.Producer; 7 //import org.apache.kafka.clients.producer.ProducerConfig; 8 import org.apache.kafka.clients.producer.ProducerRecord; 9 10 /** 11 * 12 * @Description TODO 13 * @author biehl 14 * @Date 2019年4月6日 上午11:27:34 15 * 16 */ 17 public class ProducerTest { 18 19 public static void main(String[] args) { 20 // 構造一個java.util.Properties對象 21 Properties props = new Properties(); 22 // 指定bootstrap.servers屬性。必填,無默認值。用於建立向kafka broker服務器的鏈接。 23 props.put("bootstrap.servers", "192.168.110.130:9092,192.168.110.131:9092,192.168.110.132:9092"); 24 // 指定key.serializer屬性。必填,無默認值。被髮送到broker端的任何消息的格式都必須是字節數組。 25 // 所以消息的各個組件都必須首先作序列化,而後才能發送到broker。該參數就是爲消息的key作序列化只用的。 26 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 27 // 指定value.serializer屬性。必填,無默認值。和key.serializer相似。此被用來對消息體即消息value部分作序列化。 28 // 將消息value部分轉換成字節數組。 29 props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 30 //acks參數用於控制producer生產消息的持久性(durability)。參數可選值,0、一、-1(all)。 31 props.put("acks", "-1"); 32 //props.put(ProducerConfig.ACKS_CONFIG, "1"); 33 //在producer內部自動實現了消息從新發送。默認值0表明不進行重試。 34 props.put("retries", 3); 35 //props.put(ProducerConfig.RETRIES_CONFIG, 3); 36 //調優producer吞吐量和延時性能指標都有很是重要做用。默認值16384即16KB。 37 props.put("batch.size", 323840); 38 //props.put(ProducerConfig.BATCH_SIZE_CONFIG, 323840); 39 //控制消息發送延時行爲的,該參數默認值是0。表示消息須要被當即發送,無須關係batch是否被填滿。 40 props.put("linger.ms", 10); 41 //props.put(ProducerConfig.LINGER_MS_CONFIG, 10); 42 //指定了producer端用於緩存消息的緩衝區的大小,單位是字節,默認值是33554432即32M。 43 props.put("buffer.memory", 33554432); 44 //props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432); 45 props.put("max.block.ms", 3000); 46 //props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 3000); 47 //設置producer段是否壓縮消息,默認值是none。即不壓縮消息。GZIP、Snappy、LZ4 48 //props.put("compression.type", "none"); 49 //props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "none"); 50 //該參數用於控制producer發送請求的大小。producer端可以發送的最大消息大小。 51 //props.put("max.request.size", 10485760); 52 //props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 10485760); 53 //producer發送請求給broker後,broker須要在規定時間範圍內將處理結果返還給producer。默認30s 54 //props.put("request.timeout.ms", 60000); 55 //props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 60000); 56 57 58 // 使用上面建立的Properties對象構造KafkaProducer對象 59 //若是採用這種方式建立producer,那麼就不須要顯示的在Properties中指定key和value序列化類了呢。 60 // Serializer<String> keySerializer = new StringSerializer(); 61 // Serializer<String> valueSerializer = new StringSerializer(); 62 // Producer<String, String> producer = new KafkaProducer<String, String>(props, 63 // keySerializer, valueSerializer); 64 Producer<String, String> producer = new KafkaProducer<>(props); 65 for (int i = 0; i < 100; i++) { 66 //構造好kafkaProducer實例之後,下一步就是構造消息實例。 67 producer.send(new ProducerRecord<>("topic1", Integer.toString(i), Integer.toString(i))); 68 // 構造待發送的消息對象ProduceRecord的對象,指定消息要發送到的topic主題,分區以及對應的key和value鍵值對。 69 // 注意,分區和key信息能夠不用指定,由kafka自行肯定目標分區。 70 //ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>("my-topic", 71 // Integer.toString(i), Integer.toString(i)); 72 // 調用kafkaProduce的send方法發送消息 73 //producer.send(producerRecord); 74 } 75 System.out.println("消息生產結束......"); 76 // 關閉kafkaProduce對象 77 producer.close(); 78 System.out.println("關閉生產者......"); 79 } 80 81 }
消費者代碼以下所示:bootstrap
1 package com.bie.kafka.consumer; 2 3 import java.util.Arrays; 4 import java.util.Properties; 5 6 import org.apache.kafka.clients.consumer.ConsumerRecord; 7 import org.apache.kafka.clients.consumer.ConsumerRecords; 8 import org.apache.kafka.clients.consumer.KafkaConsumer; 9 10 /** 11 * 12 * @Description TODO 13 * @author biehl 14 * @Date 2019年4月6日 下午8:12:28 15 * 16 */ 17 public class ConsumerTest { 18 19 public static void main(String[] args) { 20 String topicName = "topic1"; 21 String groupId = "group1"; 22 //構造java.util.Properties對象 23 Properties props = new Properties(); 24 // 必須指定屬性。 25 props.put("bootstrap.servers", "192.168.110.130:9092,192.168.110.131:9092,192.168.110.132:9092"); 26 // 必須指定屬性。 27 props.put("group.id", groupId); 28 props.put("enable.auto.commit", "true"); 29 props.put("auto.commit.interval.ms", "1000"); 30 // 從最先的消息開始讀取 31 props.put("auto.offset.reset", "earliest"); 32 // 必須指定 33 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 34 // 必須指定 35 props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 36 37 // 使用建立的Properties實例構造consumer實例 38 KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props); 39 // 訂閱topic。調用kafkaConsumer.subscribe方法訂閱consumer group所需的topic列表 40 consumer.subscribe(Arrays.asList(topicName)); 41 try { 42 while (true) { 43 //循環調用kafkaConsumer.poll方法獲取封裝在ConsumerRecord的topic消息。 44 ConsumerRecords<String, String> records = consumer.poll(1000); 45 //獲取到封裝在ConsumerRecords消息之後,處理獲取到ConsumerRecord對象。 46 for (ConsumerRecord<String, String> record : records) { 47 //簡單的打印輸出 48 System.out.println( 49 "offset = " + record.offset() 50 + ",key = " + record.key() 51 + ",value =" + record.value()); 52 } 53 } 54 } catch (Exception e) { 55 //關閉kafkaConsumer 56 System.out.println("消息消費結束......"); 57 consumer.close(); 58 } 59 System.out.println("關閉消費者......"); 60 } 61 }
遇到的坑,一開始報的錯誤莫名其妙,一開始覺得使用的jar包版本問題,又是報slf4j的錯誤,又是報log4j的錯誤,又是報空指針的異常。最後百度意外遇到了多是本地沒有將ip地址放到hosts文件裏面,果真是這個問題。數組
添加以下所示便可:緩存
而後就能夠開心的生產消息和消費消息了啊。開心。服務器
項目結構以下所示:app
三、生產者生產消息異步或者同步發送消息的案例使用:dom
Synchronization 同步異步
1 package com.bie.kafka.producer; 2 3 import java.util.Properties; 4 import java.util.concurrent.ExecutionException; 5 import java.util.concurrent.Future; 6 7 import org.apache.kafka.clients.producer.KafkaProducer; 8 import org.apache.kafka.clients.producer.Producer; 9 //import org.apache.kafka.clients.producer.ProducerConfig; 10 import org.apache.kafka.clients.producer.ProducerRecord; 11 import org.apache.kafka.clients.producer.RecordMetadata; 12 13 /** 14 * 15 * @Description TODO 16 * @author biehl 17 * @Date 2019年4月6日 上午11:27:34 18 * 同步發送 19 * Synchronization 同步 20 */ 21 public class ProducerSynchronization { 22 23 public static void main(String[] args) { 24 // 構造一個java.util.Properties對象 25 Properties props = new Properties(); 26 // 指定bootstrap.servers屬性。必填,無默認值。用於建立向kafka broker服務器的鏈接。 27 props.put("bootstrap.servers", "192.168.110.130:9092,192.168.110.131:9092,192.168.110.132:9092"); 28 // 指定key.serializer屬性。必填,無默認值。被髮送到broker端的任何消息的格式都必須是字節數組。 29 // 所以消息的各個組件都必須首先作序列化,而後才能發送到broker。該參數就是爲消息的key作序列化只用的。 30 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 31 // 指定value.serializer屬性。必填,無默認值。和key.serializer相似。此被用來對消息體即消息value部分作序列化。 32 // 將消息value部分轉換成字節數組。 33 props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 34 //acks參數用於控制producer生產消息的持久性(durability)。參數可選值,0、一、-1(all)。 35 props.put("acks", "-1"); 36 //props.put(ProducerConfig.ACKS_CONFIG, "1"); 37 //在producer內部自動實現了消息從新發送。默認值0表明不進行重試。 38 props.put("retries", 3); 39 //props.put(ProducerConfig.RETRIES_CONFIG, 3); 40 //調優producer吞吐量和延時性能指標都有很是重要做用。默認值16384即16KB。 41 props.put("batch.size", 323840); 42 //props.put(ProducerConfig.BATCH_SIZE_CONFIG, 323840); 43 //控制消息發送延時行爲的,該參數默認值是0。表示消息須要被當即發送,無須關係batch是否被填滿。 44 props.put("linger.ms", 10); 45 //props.put(ProducerConfig.LINGER_MS_CONFIG, 10); 46 //指定了producer端用於緩存消息的緩衝區的大小,單位是字節,默認值是33554432即32M。 47 props.put("buffer.memory", 33554432); 48 //props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432); 49 props.put("max.block.ms", 3000); 50 //props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 3000); 51 //設置producer段是否壓縮消息,默認值是none。即不壓縮消息。GZIP、Snappy、LZ4 52 //props.put("compression.type", "none"); 53 //props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "none"); 54 //該參數用於控制producer發送請求的大小。producer端可以發送的最大消息大小。 55 //props.put("max.request.size", 10485760); 56 //props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 10485760); 57 //producer發送請求給broker後,broker須要在規定時間範圍內將處理結果返還給producer。默認30s 58 //props.put("request.timeout.ms", 60000); 59 //props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 60000); 60 61 62 // 使用上面建立的Properties對象構造KafkaProducer對象 63 //若是採用這種方式建立producer,那麼就不須要顯示的在Properties中指定key和value序列化類了呢。 64 // Serializer<String> keySerializer = new StringSerializer(); 65 // Serializer<String> valueSerializer = new StringSerializer(); 66 // Producer<String, String> producer = new KafkaProducer<String, String>(props, 67 // keySerializer, valueSerializer); 68 Producer<String, String> producer = new KafkaProducer<>(props); 69 for (int i = 0; i < 100; i++) { 70 //構造好kafkaProducer實例之後,下一步就是構造消息實例。 71 // producer.send(new ProducerRecord<>("topic1", Integer.toString(i), Integer.toString(i))); 72 73 try { 74 Future<RecordMetadata> future = producer.send(new ProducerRecord<>("topic1",Integer.toString(i),"biehl 💗 wj " + Integer.toString(i) + " 萬年呢!!!")); 75 //同步發送,調用get()方法無限等待返回結果 76 RecordMetadata recordMetadata = future.get(); 77 //成功返回RecordMetadata實例(包含發送的元數據信息) 78 System.out.println("第 " + i + " 條, " + recordMetadata.toString()); 79 } catch (InterruptedException e) { 80 e.printStackTrace(); 81 } catch (ExecutionException e) { 82 e.printStackTrace(); 83 } 84 85 // 構造待發送的消息對象ProduceRecord的對象,指定消息要發送到的topic主題,分區以及對應的key和value鍵值對。 86 // 注意,分區和key信息能夠不用指定,由kafka自行肯定目標分區。 87 //ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>("my-topic", 88 // Integer.toString(i), Integer.toString(i)); 89 // 調用kafkaProduce的send方法發送消息 90 //producer.send(producerRecord); 91 } 92 System.out.println("消息生產結束......"); 93 // 關閉kafkaProduce對象 94 producer.close(); 95 System.out.println("關閉生產者......"); 96 } 97 98 }
Asynchronous 異步:
1 package com.bie.kafka.producer; 2 3 import java.util.Properties; 4 import java.util.concurrent.Future; 5 6 import org.apache.kafka.clients.producer.Callback; 7 import org.apache.kafka.clients.producer.KafkaProducer; 8 import org.apache.kafka.clients.producer.Producer; 9 //import org.apache.kafka.clients.producer.ProducerConfig; 10 import org.apache.kafka.clients.producer.ProducerRecord; 11 import org.apache.kafka.clients.producer.RecordMetadata; 12 import org.apache.kafka.common.errors.RetriableException; 13 14 /** 15 * 16 * @Description TODO 17 * @author biehl 18 * @Date 2019年4月6日 上午11:27:34 19 * 異常發送 20 * Asynchronous 異步 21 */ 22 public class ProducerAsynchronous { 23 24 public static void main(String[] args) { 25 // 構造一個java.util.Properties對象 26 Properties props = new Properties(); 27 // 指定bootstrap.servers屬性。必填,無默認值。用於建立向kafka broker服務器的鏈接。 28 props.put("bootstrap.servers", "192.168.110.130:9092,192.168.110.131:9092,192.168.110.132:9092"); 29 // 指定key.serializer屬性。必填,無默認值。被髮送到broker端的任何消息的格式都必須是字節數組。 30 // 所以消息的各個組件都必須首先作序列化,而後才能發送到broker。該參數就是爲消息的key作序列化只用的。 31 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 32 // 指定value.serializer屬性。必填,無默認值。和key.serializer相似。此被用來對消息體即消息value部分作序列化。 33 // 將消息value部分轉換成字節數組。 34 props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 35 //acks參數用於控制producer生產消息的持久性(durability)。參數可選值,0、一、-1(all)。 36 props.put("acks", "-1"); 37 //props.put(ProducerConfig.ACKS_CONFIG, "1"); 38 //在producer內部自動實現了消息從新發送。默認值0表明不進行重試。 39 props.put("retries", 3); 40 //props.put(ProducerConfig.RETRIES_CONFIG, 3); 41 //調優producer吞吐量和延時性能指標都有很是重要做用。默認值16384即16KB。 42 props.put("batch.size", 323840); 43 //props.put(ProducerConfig.BATCH_SIZE_CONFIG, 323840); 44 //控制消息發送延時行爲的,該參數默認值是0。表示消息須要被當即發送,無須關係batch是否被填滿。 45 props.put("linger.ms", 10); 46 //props.put(ProducerConfig.LINGER_MS_CONFIG, 10); 47 //指定了producer端用於緩存消息的緩衝區的大小,單位是字節,默認值是33554432即32M。 48 props.put("buffer.memory", 33554432); 49 //props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432); 50 props.put("max.block.ms", 3000); 51 //props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 3000); 52 //設置producer段是否壓縮消息,默認值是none。即不壓縮消息。GZIP、Snappy、LZ4 53 //props.put("compression.type", "none"); 54 //props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "none"); 55 //該參數用於控制producer發送請求的大小。producer端可以發送的最大消息大小。 56 //props.put("max.request.size", 10485760); 57 //props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 10485760); 58 //producer發送請求給broker後,broker須要在規定時間範圍內將處理結果返還給producer。默認30s 59 //props.put("request.timeout.ms", 60000); 60 //props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 60000); 61 62 63 // 使用上面建立的Properties對象構造KafkaProducer對象 64 //若是採用這種方式建立producer,那麼就不須要顯示的在Properties中指定key和value序列化類了呢。 65 // Serializer<String> keySerializer = new StringSerializer(); 66 // Serializer<String> valueSerializer = new StringSerializer(); 67 // Producer<String, String> producer = new KafkaProducer<String, String>(props, 68 // keySerializer, valueSerializer); 69 Producer<String, String> producer = new KafkaProducer<>(props); 70 for (int i = 0; i < 100; i++) { 71 //構造好kafkaProducer實例之後,下一步就是構造消息實例。 72 // producer.send(new ProducerRecord<>("topic1", Integer.toString(i), Integer.toString(i))); 73 74 //Future<RecordMetadata> send = producer.send(new ProducerRecord<>("topic1",Integer.toString(i),"biehl 💗 wj " + i + " 萬年呢!!!")); 75 ProducerRecord<String, String> record = new ProducerRecord<String, String>("topic1", Integer.toString(i), "biehl 💗 wj "+ i +" 萬年呢!!!"); 76 //異步發送 77 producer.send(record, new Callback() { 78 79 @Override 80 public void onCompletion(RecordMetadata metadata, Exception exception) { 81 if(exception == null) { 82 //exception == null表明消息發送成功 83 System.out.println("消息發送成功......"); 84 }else { 85 //消息發送失敗,執行錯誤的邏輯 86 System.out.println("消息發送失敗......"); 87 if(exception instanceof RetriableException) { 88 //處理可重試瞬時異常 89 //... 90 }else { 91 //處理不可重試異常 92 //... 93 } 94 95 } 96 } 97 }); 98 99 100 // 構造待發送的消息對象ProduceRecord的對象,指定消息要發送到的topic主題,分區以及對應的key和value鍵值對。 101 // 注意,分區和key信息能夠不用指定,由kafka自行肯定目標分區。 102 //ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>("my-topic", 103 // Integer.toString(i), Integer.toString(i)); 104 // 調用kafkaProduce的send方法發送消息 105 //producer.send(producerRecord); 106 } 107 System.out.println("消息生產結束......"); 108 // 關閉kafkaProduce對象 109 producer.close(); 110 System.out.println("關閉生產者......"); 111 } 112 113 }
四、kafka自定義分區的使用。能夠根據分區,將指定的key消息發送到指定的分區,或者將value消息發送到指定的分區。ide
1 package com.bie.kafka.partitioner; 2 3 import java.util.Iterator; 4 import java.util.List; 5 import java.util.Map; 6 import java.util.Random; 7 8 import org.apache.kafka.clients.producer.Partitioner; 9 import org.apache.kafka.common.Cluster; 10 import org.apache.kafka.common.PartitionInfo; 11 12 /** 13 * 14 * @Description TODO 15 * @author biehl 16 * @Date 2019年5月18日 上午10:28:48 一、自定義分區 能夠將指定消息發送到指定的分區 17 */ 18 public class LovePartitioner implements Partitioner { 19 20 // 隨機數 21 private Random random; 22 23 @Override 24 public void configure(Map<String, ?> configs) { 25 // 該方法實現必要資源的初始化工做 26 random = new Random(); 27 } 28 29 @Override 30 public void close() { 31 //該方法實現必要的清理工做 32 } 33 34 @Override 35 public int partition(String topic, Object keyObject, byte[] keyBytes, Object value, byte[] valueBytes, 36 Cluster cluster) { 37 // 獲取到key 38 String key = (String) keyObject; 39 // 打印輸出key信息 40 System.out.println("key : " + key); 41 // 獲取到集羣的元數據信息 42 List<PartitionInfo> partitionsForTopic = cluster.availablePartitionsForTopic(topic); 43 //遍歷分區元數據信息 44 Iterator<PartitionInfo> it = partitionsForTopic.iterator(); 45 while(it.hasNext()) { 46 PartitionInfo partitionInfo = it.next(); 47 System.out.println("topic消息信息: " + partitionInfo.topic() + " , partition分區信息: " + partitionInfo.partition() 48 + " ,leader信息: " + partitionInfo.leader() + " , replicas備份信息: " + partitionInfo.replicas()); 49 } 50 // 獲取到分區的數量 51 int partitionCount = partitionsForTopic.size(); 52 // 獲取到最後一個分區 53 int lovePartition = partitionCount - 1; 54 //若是key不爲空且不是love消息,就將隨機分發到除最後一個分區的其餘分區,不然,分發到最後一個分區。 55 return key == null || key.isEmpty() || !key.contains("love") 56 ? random.nextInt(partitionCount - 1) 57 : lovePartition; 58 } 59 60 }
1 package com.bie.kafka.partitioner; 2 3 import java.util.Properties; 4 import java.util.concurrent.ExecutionException; 5 6 import org.apache.kafka.clients.producer.KafkaProducer; 7 import org.apache.kafka.clients.producer.Producer; 8 //import org.apache.kafka.clients.producer.ProducerConfig; 9 import org.apache.kafka.clients.producer.ProducerRecord; 10 import org.apache.kafka.clients.producer.RecordMetadata; 11 12 /** 13 * 14 * @Description TODO 15 * @author biehl 16 * @Date 2019年4月6日 上午11:27:34 17 * 異常發送 18 * Asynchronous 異步 19 */ 20 public class ProducerPartitions { 21 22 public static void main(String[] args) { 23 // 構造一個java.util.Properties對象 24 Properties props = new Properties(); 25 // 指定bootstrap.servers屬性。必填,無默認值。用於建立向kafka broker服務器的鏈接。 26 props.put("bootstrap.servers", "192.168.110.130:9092,192.168.110.131:9092,192.168.110.132:9092"); 27 // 指定key.serializer屬性。必填,無默認值。被髮送到broker端的任何消息的格式都必須是字節數組。 28 // 所以消息的各個組件都必須首先作序列化,而後才能發送到broker。該參數就是爲消息的key作序列化只用的。 29 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 30 // 指定value.serializer屬性。必填,無默認值。和key.serializer相似。此被用來對消息體即消息value部分作序列化。 31 // 將消息value部分轉換成字節數組。 32 props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 33 //acks參數用於控制producer生產消息的持久性(durability)。參數可選值,0、一、-1(all)。 34 props.put("acks", "-1"); 35 //props.put(ProducerConfig.ACKS_CONFIG, "1"); 36 //在producer內部自動實現了消息從新發送。默認值0表明不進行重試。 37 props.put("retries", 3); 38 //props.put(ProducerConfig.RETRIES_CONFIG, 3); 39 //調優producer吞吐量和延時性能指標都有很是重要做用。默認值16384即16KB。 40 props.put("batch.size", 323840); 41 //props.put(ProducerConfig.BATCH_SIZE_CONFIG, 323840); 42 //控制消息發送延時行爲的,該參數默認值是0。表示消息須要被當即發送,無須關係batch是否被填滿。 43 props.put("linger.ms", 10); 44 //props.put(ProducerConfig.LINGER_MS_CONFIG, 10); 45 //指定了producer端用於緩存消息的緩衝區的大小,單位是字節,默認值是33554432即32M。 46 props.put("buffer.memory", 33554432); 47 //props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432); 48 props.put("max.block.ms", 3000); 49 //props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 3000); 50 //設置producer段是否壓縮消息,默認值是none。即不壓縮消息。GZIP、Snappy、LZ4 51 //props.put("compression.type", "none"); 52 //props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "none"); 53 //該參數用於控制producer發送請求的大小。producer端可以發送的最大消息大小。 54 //props.put("max.request.size", 10485760); 55 //props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 10485760); 56 //producer發送請求給broker後,broker須要在規定時間範圍內將處理結果返還給producer。默認30s 57 //props.put("request.timeout.ms", 60000); 58 //props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 60000); 59 60 //指定自定義分區 61 props.put("partitioner.class", "com.bie.kafka.partitioner.LovePartitioner"); 62 //props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "com.bie.kafka.partitioner.LovePartitioner"); 63 64 // 使用上面建立的Properties對象構造KafkaProducer對象 65 //若是採用這種方式建立producer,那麼就不須要顯示的在Properties中指定key和value序列化類了呢。 66 // Serializer<String> keySerializer = new StringSerializer(); 67 // Serializer<String> valueSerializer = new StringSerializer(); 68 // Producer<String, String> producer = new KafkaProducer<String, String>(props, 69 // keySerializer, valueSerializer); 70 71 String topic = "topic1"; 72 Producer<Object, String> producer = new KafkaProducer<Object, String>(props); 73 //無key消息 74 ProducerRecord<Object, String> nonKeyRecord = new ProducerRecord<Object, String>(topic, "non-key record"); 75 //love消息 76 ProducerRecord<Object, String> loveRecord = new ProducerRecord<Object, String>(topic, "love","biehl 💗 wj"); 77 //非love消息 78 ProducerRecord<Object, String> nonLoveRecord = new ProducerRecord<Object, String>(topic, "other", "non-love record"); 79 80 //生產消息 81 try { 82 //發送無key消息 83 RecordMetadata recordMetadata1 = producer.send(nonKeyRecord).get(); 84 System.out.println(recordMetadata1.toString()); 85 86 //發送key不爲love的消息 87 RecordMetadata recordMetadata2 = producer.send(nonLoveRecord).get(); 88 System.out.println(recordMetadata2.toString()); 89 90 91 //發送key爲love的消息 92 RecordMetadata recordMetadata3 = producer.send(loveRecord).get(); 93 System.out.println(recordMetadata3.toString()); 94 95 //發送無key消息 96 RecordMetadata recordMetadata11 = producer.send(nonKeyRecord).get(); 97 System.out.println(recordMetadata11.toString()); 98 99 //發送key不爲love的消息 100 RecordMetadata recordMetadata22 = producer.send(nonLoveRecord).get(); 101 System.out.println(recordMetadata22.toString()); 102 103 } catch (InterruptedException e) { 104 e.printStackTrace(); 105 } catch (ExecutionException e) { 106 e.printStackTrace(); 107 } 108 System.out.println("消息生產結束......"); 109 // 關閉kafkaProduce對象 110 producer.close(); 111 System.out.println("關閉生產者......"); 112 } 113 114 }
執行完生產者生產消息之後,可使用命令進行查看:
1 [root@slaver1 kafka_2.11-2.1.0]# bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list slaver1:9092 --topic topic1
五、kafka自定義序列化value消息:
1 package com.bie.kafka.serializer; 2 3 /** 4 * 5 * @Description TODO 6 * @author biehl 7 * @Date 2019年5月18日 上午11:34:32 8 * 9 */ 10 public class User { 11 12 private int id; 13 private String name; 14 private String number; 15 private int age; 16 17 public int getId() { 18 return id; 19 } 20 21 public void setId(int id) { 22 this.id = id; 23 } 24 25 public String getName() { 26 return name; 27 } 28 29 public void setName(String name) { 30 this.name = name; 31 } 32 33 public String getNumber() { 34 return number; 35 } 36 37 public void setNumber(String number) { 38 this.number = number; 39 } 40 41 public int getAge() { 42 return age; 43 } 44 45 public void setAge(int age) { 46 this.age = age; 47 } 48 49 @Override 50 public String toString() { 51 return "User [id=" + id + ", name=" + name + ", number=" + number + ", age=" + age + "]"; 52 } 53 54 public User(int id, String name, String number, int age) { 55 super(); 56 this.id = id; 57 this.name = name; 58 this.number = number; 59 this.age = age; 60 } 61 62 public User() { 63 super(); 64 } 65 66 }
1 package com.bie.kafka.serializer; 2 3 import java.io.IOException; 4 import java.util.Map; 5 6 import org.apache.kafka.common.serialization.Serializer; 7 import org.apache.log4j.Logger; 8 import org.codehaus.jackson.map.ObjectMapper; 9 10 /** 11 * 12 * @Description TODO 13 * @author biehl 14 * @Date 2019年5月18日 上午11:36:57 一、自定義序列化操做 15 * 16 */ 17 public class UserSerializer implements Serializer<Object> { 18 19 private Logger logger = Logger.getLogger(UserSerializer.class); 20 21 // 22 private ObjectMapper objectMapper; 23 24 @Override 25 public void close() { 26 27 } 28 29 @Override 30 public void configure(Map<String, ?> configs, boolean isKey) { 31 objectMapper = new ObjectMapper(); 32 } 33 34 @Override 35 public byte[] serialize(String topic, Object data) { 36 byte[] ret = null; 37 try { 38 ret = objectMapper.writeValueAsString(data).getBytes("utf-8"); 39 } catch (IOException e) { 40 e.printStackTrace(); 41 logger.warn("自定義序列化失敗告警.....請緊急處理", e); 42 } 43 return ret; 44 } 45 46 }
1 package com.bie.kafka.serializer; 2 3 import java.util.Properties; 4 import java.util.concurrent.ExecutionException; 5 6 import org.apache.kafka.clients.producer.KafkaProducer; 7 import org.apache.kafka.clients.producer.Producer; 8 //import org.apache.kafka.clients.producer.ProducerConfig; 9 import org.apache.kafka.clients.producer.ProducerRecord; 10 import org.apache.kafka.clients.producer.RecordMetadata; 11 12 /** 13 * 14 * @Description TODO 15 * @author biehl 16 * @Date 2019年4月6日 上午11:27:34 異常發送 Asynchronous 異步 17 */ 18 public class ProducerPartitions { 19 20 public static void main(String[] args) { 21 // 構造一個java.util.Properties對象 22 Properties props = new Properties(); 23 // 指定bootstrap.servers屬性。必填,無默認值。用於建立向kafka broker服務器的鏈接。 24 props.put("bootstrap.servers", "192.168.110.130:9092,192.168.110.131:9092,192.168.110.132:9092"); 25 // 指定key.serializer屬性。必填,無默認值。被髮送到broker端的任何消息的格式都必須是字節數組。 26 // 所以消息的各個組件都必須首先作序列化,而後才能發送到broker。該參數就是爲消息的key作序列化只用的。 27 // props.put("key.serializer", 28 // "org.apache.kafka.common.serialization.StringSerializer"); 29 // 指定value.serializer屬性。必填,無默認值。和key.serializer相似。此被用來對消息體即消息value部分作序列化。 30 // 將消息value部分轉換成字節數組。 31 // props.put("value.serializer", 32 // "org.apache.kafka.common.serialization.StringSerializer"); 33 // acks參數用於控制producer生產消息的持久性(durability)。參數可選值,0、一、-1(all)。 34 props.put("acks", "-1"); 35 // props.put(ProducerConfig.ACKS_CONFIG, "1"); 36 // 在producer內部自動實現了消息從新發送。默認值0表明不進行重試。 37 props.put("retries", 3); 38 // props.put(ProducerConfig.RETRIES_CONFIG, 3); 39 // 調優producer吞吐量和延時性能指標都有很是重要做用。默認值16384即16KB。 40 props.put("batch.size", 323840); 41 // props.put(ProducerConfig.BATCH_SIZE_CONFIG, 323840); 42 // 控制消息發送延時行爲的,該參數默認值是0。表示消息須要被當即發送,無須關係batch是否被填滿。 43 props.put("linger.ms", 10); 44 // props.put(ProducerConfig.LINGER_MS_CONFIG, 10); 45 // 指定了producer端用於緩存消息的緩衝區的大小,單位是字節,默認值是33554432即32M。 46 props.put("buffer.memory", 33554432); 47 // props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432); 48 props.put("max.block.ms", 3000); 49 // props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 3000); 50 // 設置producer段是否壓縮消息,默認值是none。即不壓縮消息。GZIP、Snappy、LZ4 51 // props.put("compression.type", "none"); 52 // props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "none"); 53 // 該參數用於控制producer發送請求的大小。producer端可以發送的最大消息大小。 54 // props.put("max.request.size", 10485760); 55 // props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 10485760); 56 // producer發送請求給broker後,broker須要在規定時間範圍內將處理結果返還給producer。默認30s 57 // props.put("request.timeout.ms", 60000); 58 // props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 60000); 59 60 // 自定義序列化操做 61 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 62 // 自定義序列化value的值 63 props.put("value.serializer", "com.bie.kafka.serializer.UserSerializer"); 64 65 // 使用上面建立的Properties對象構造KafkaProducer對象 66 // 若是採用這種方式建立producer,那麼就不須要顯示的在Properties中指定key和value序列化類了呢。 67 // Serializer<String> keySerializer = new StringSerializer(); 68 // Serializer<String> valueSerializer = new StringSerializer(); 69 // Producer<String, String> producer = new KafkaProducer<String, String>(props, 70 // keySerializer, valueSerializer); 71 72 User user = new User(1,"張三","41076788898765444",25); 73 74 String topic = "topic1"; 75 Producer<String, User> producer = new KafkaProducer<String, User>(props); 76 //指定序列化生產的消息 77 ProducerRecord<String, User> userRecord = new ProducerRecord<String, User>(topic, user); 78 79 // 生產消息 80 try { 81 // 發送無key消息 82 RecordMetadata recordMetadata = producer.send(userRecord).get(); 83 System.out.println("topic主題:" + recordMetadata.topic() + ", partition分區" + recordMetadata.partition()); 84 } catch (InterruptedException e) { 85 e.printStackTrace(); 86 } catch (ExecutionException e) { 87 e.printStackTrace(); 88 } 89 System.out.println("消息生產結束......"); 90 // 關閉kafkaProduce對象 91 producer.close(); 92 System.out.println("關閉生產者......"); 93 } 94 95 }
六、kafka生產者攔截器鏈的使用。
1 package com.bie.kafka.interceptor; 2 3 import java.util.Map; 4 5 import org.apache.kafka.clients.producer.ProducerInterceptor; 6 import org.apache.kafka.clients.producer.ProducerRecord; 7 import org.apache.kafka.clients.producer.RecordMetadata; 8 9 /** 10 * 11 * @Description TODO 12 * @author biehl 13 * @Date 2019年5月18日 下午5:06:37 一、攔截器: 能夠將攔截器組成攔截器鏈。 14 * 15 */ 16 public class TimeStampPrependerInterceptor implements ProducerInterceptor<String, String> { 17 18 @Override 19 public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) { 20 return new ProducerRecord<String, String>(record.topic(), record.partition(), record.timestamp(), record.key(), 21 System.currentTimeMillis() + "," + record.value().toLowerCase()); 22 } 23 24 @Override 25 public void configure(Map<String, ?> arg0) { 26 27 } 28 29 @Override 30 public void close() { 31 32 } 33 34 @Override 35 public void onAcknowledgement(RecordMetadata arg0, Exception arg1) { 36 37 } 38 39 }
1 package com.bie.kafka.interceptor; 2 3 import java.util.Map; 4 5 import org.apache.kafka.clients.producer.ProducerInterceptor; 6 import org.apache.kafka.clients.producer.ProducerRecord; 7 import org.apache.kafka.clients.producer.RecordMetadata; 8 9 /** 10 * 11 * @Description TODO 12 * @author biehl 13 * @Date 2019年5月18日 下午5:15:04 14 * 15 */ 16 public class CounterInterceptor implements ProducerInterceptor<String, String> { 17 18 private int errorCount = 0;// 失敗的消息數量 19 private int successCount = 0;// 成功的消息數量 20 21 @Override 22 public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) { 23 24 return record; 25 } 26 27 @Override 28 public void configure(Map<String, ?> arg0) { 29 30 } 31 32 @Override 33 public void close() { 34 // 35 System.out.println("success count is : " + successCount); 36 System.out.println("error count is : " + errorCount); 37 } 38 39 @Override 40 public void onAcknowledgement(RecordMetadata metadata, Exception exception) { 41 if (exception == null) { 42 successCount++; 43 } else { 44 errorCount++; 45 } 46 } 47 48 }
1 package com.bie.kafka.interceptor; 2 3 import java.util.ArrayList; 4 import java.util.List; 5 import java.util.Properties; 6 import java.util.concurrent.ExecutionException; 7 8 import org.apache.kafka.clients.producer.KafkaProducer; 9 import org.apache.kafka.clients.producer.Producer; 10 import org.apache.kafka.clients.producer.ProducerConfig; 11 //import org.apache.kafka.clients.producer.ProducerConfig; 12 import org.apache.kafka.clients.producer.ProducerRecord; 13 import org.apache.kafka.clients.producer.RecordMetadata; 14 15 /** 16 * 17 * @Description TODO 18 * @author biehl 19 * @Date 2019年4月6日 上午11:27:34 攔截器鏈的使用 20 * 21 */ 22 public class ProducerInterceptor { 23 24 public static void main(String[] args) { 25 // 構造一個java.util.Properties對象 26 Properties props = new Properties(); 27 // 指定bootstrap.servers屬性。必填,無默認值。用於建立向kafka broker服務器的鏈接。 28 props.put("bootstrap.servers", "192.168.110.130:9092,192.168.110.131:9092,192.168.110.132:9092"); 29 // 指定key.serializer屬性。必填,無默認值。被髮送到broker端的任何消息的格式都必須是字節數組。 30 // 所以消息的各個組件都必須首先作序列化,而後才能發送到broker。該參數就是爲消息的key作序列化只用的。 31 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 32 // 指定value.serializer屬性。必填,無默認值。和key.serializer相似。此被用來對消息體即消息value部分作序列化。 33 // 將消息value部分轉換成字節數組。 34 props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 35 // acks參數用於控制producer生產消息的持久性(durability)。參數可選值,0、一、-1(all)。 36 props.put("acks", "-1"); 37 // props.put(ProducerConfig.ACKS_CONFIG, "1"); 38 // 在producer內部自動實現了消息從新發送。默認值0表明不進行重試。 39 props.put("retries", 3); 40 // props.put(ProducerConfig.RETRIES_CONFIG, 3); 41 // 調優producer吞吐量和延時性能指標都有很是重要做用。默認值16384即16KB。 42 props.put("batch.size", 323840); 43 // props.put(ProducerConfig.BATCH_SIZE_CONFIG, 323840); 44 // 控制消息發送延時行爲的,該參數默認值是0。表示消息須要被當即發送,無須關係batch是否被填滿。 45 props.put("linger.ms", 10); 46 // props.put(ProducerConfig.LINGER_MS_CONFIG, 10); 47 // 指定了producer端用於緩存消息的緩衝區的大小,單位是字節,默認值是33554432即32M。 48 props.put("buffer.memory", 33554432); 49 // props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432); 50 props.put("max.block.ms", 3000); 51 // props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 3000); 52 // 設置producer段是否壓縮消息,默認值是none。即不壓縮消息。GZIP、Snappy、LZ4 53 // props.put("compression.type", "none"); 54 // props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "none"); 55 // 該參數用於控制producer發送請求的大小。producer端可以發送的最大消息大小。 56 // props.put("max.request.size", 10485760); 57 // props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 10485760); 58 // producer發送請求給broker後,broker須要在規定時間範圍內將處理結果返還給producer。默認30s 59 // props.put("request.timeout.ms", 60000); 60 // props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 60000); 61 62 // 構造鏈接器鏈 63 List<String> interceptors = new ArrayList<String>(); 64 interceptors.add("com.bie.kafka.interceptor.CounterInterceptor"); 65 interceptors.add("com.bie.kafka.interceptor.TimeStampPrependerInterceptor"); 66 props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors); 67 68 // 使用上面建立的Properties對象構造KafkaProducer對象 69 // 若是採用這種方式建立producer,那麼就不須要顯示的在Properties中指定key和value序列化類了呢。 70 // Serializer<String> keySerializer = new StringSerializer(); 71 // Serializer<String> valueSerializer = new StringSerializer(); 72 // Producer<String, String> producer = new KafkaProducer<String, String>(props, 73 // keySerializer, valueSerializer); 74 75 String topic = "topic1"; 76 Producer<String, String> producer = new KafkaProducer<String, String>(props); 77 for (int i = 0; i < 100; i++) { 78 ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, "biehl 💗 wj " + i + " 年喲"); 79 try { 80 RecordMetadata recordMetadata = producer.send(record).get(); 81 System.out.println("topic : " + recordMetadata.topic() + ", partition" + recordMetadata.partition()); 82 } catch (InterruptedException | ExecutionException e) { 83 e.printStackTrace(); 84 } 85 } 86 87 System.out.println("消息生產結束......"); 88 // 關閉kafkaProduce對象 89 producer.close(); 90 System.out.println("關閉生產者......"); 91 } 92 93 }
運行生產者能夠看到能夠統計正確或者錯誤消息的格式,運行消費者能夠看到已經將時間戳攔截器的時間戳加到了消息頭上面。
待續.....