[TOC]java
前面的《Kafka筆記整理(一)》中有提到消費者的消費形式,說明以下:shell
一、每一個consumer屬於一個consumer group,能夠指定組id。group.id 二、消費形式: 組內:組內的消費者消費同一份數據;同時只能有一個consumer消費一個Topic中的1個partition; 一個consumer能夠消費多個partitions中的消息。因此,對於一個topic,同一個group中推薦不能有多於 partitions個數的consumer同時消費,不然將意味着某些consumer將沒法獲得消息。 組間:每一個消費組消費相同的數據,互不影響。 三、在一個consumer多個線程的狀況下,一個線程至關於一個消費者。 例如:partition爲3,一個consumer起了3個線程消費,另外一個後來的consumer就沒法消費。
下面就來驗證Kafka的消費形式,不過須要說明的是,在消費者的程序代碼中,能夠指定消費者的group.id(咱們下面將會在配置文件中指定)。apache
而在使用kafka的shell命令時,其實也是能夠指定配置文件來指定消費者的group.id的,若是不指定,那麼kafka將會隨機生成一個group.id(kafka-console-consumer.sh中的kafka.tools.ConsoleConsumer類,若是沒有指定group.id,其策略是隨機生成)。bootstrap
在後面的程序代碼中,會使用同一group.id開啓4個消費的線程(由於咱們建立的topic有3個partition),而後在終端中經過kafka shell來開啓另一個消費者,進而達到驗證kafka消費形式的目的。app
另外,在測試中使用的topic以下:dom
$ kafka-topics.sh --describe hadoop --zookeeper uplooking01:2181,uplooking02:2181,uplooking03:2181 Topic:hadoop PartitionCount:3 ReplicationFactor:3 Configs: Topic: hadoop Partition: 0 Leader: 103 Replicas: 103,101,102 Isr: 103,101,102 Topic: hadoop Partition: 1 Leader: 101 Replicas: 101,102,103 Isr: 101,102,103 Topic: hadoop Partition: 2 Leader: 102 Replicas: 102,103,101 Isr: 102,103,101
即partition爲3,副本由於也爲3.ide
package com.uplooking.bigdata.kafka.producer; import com.uplooking.bigdata.kafka.constants.Constants; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import java.io.IOException; import java.io.InputStream; import java.util.Properties; import java.util.Random; /** * 經過這個KafkaProducerOps向Kafka topic中生產相關的數據 * <p> * Producer */ public class KafkaProducerOps { public static void main(String[] args) throws IOException { /** * 專門加載配置文件 * 配置文件的格式: * key=value * * 在代碼中要儘可能減小硬編碼 * 不要將代碼寫死,要可配置化 */ Properties properties = new Properties(); InputStream in = KafkaProducerOps.class.getClassLoader().getResourceAsStream("producer.properties"); properties.load(in); /** * 兩個泛型參數 * 第一個泛型參數:指的就是kafka中一條記錄key的類型 * 第二個泛型參數:指的就是kafka中一條記錄value的類型 */ String[] girls = new String[]{"姚慧瑩", "劉向前", "周 新", "楊柳"}; Producer<String, String> producer = new KafkaProducer<String, String>(properties); Random random = new Random(); int start = 1; for (int i = start; i <= start + 20; i++) { String topic = properties.getProperty(Constants.KAFKA_PRODUCER_TOPIC); String key = i + ""; String value = "今天的<--" + girls[random.nextInt(girls.length)] + "-->很美很美哦~"; ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(topic, key, value); producer.send(producerRecord); } producer.close(); } }
package com.uplooking.bigdata.kafka.consumer; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.io.IOException; import java.util.Arrays; import java.util.Collection; import java.util.Properties; import java.util.concurrent.*; /** * 從kafka topic中消費數據 */ public class KafkaConsumerOps { public static void main(String[] args) throws IOException { //線程池 ScheduledExecutorService service = Executors.newScheduledThreadPool(4); System.out.println("外部開始時間:" + System.currentTimeMillis()); for (int i =0; i < 4; i++){ ScheduledFuture<?> schedule = service.schedule( new ConsumerThread(), 5L, TimeUnit.SECONDS); } } } class ConsumerThread implements Runnable { public void run() { System.out.println("線程ID:" + Thread.currentThread().getId() + "線程開始時間:" + System.currentTimeMillis()); /** * 兩個泛型參數 * 第一個泛型參數:指的就是kafka中一條記錄key的類型 * 第二個泛型參數:指的就是kafka中一條記錄value的類型 */ Properties properties = new Properties(); try { properties.load(KafkaConsumerOps.class.getClassLoader().getResourceAsStream("consumer.properties")); } catch (IOException e) { e.printStackTrace(); } Consumer<String, String> consumer = new KafkaConsumer<String, String>(properties); Collection<String> topics = Arrays.asList("hadoop"); //消費者訂閱topic consumer.subscribe(topics); ConsumerRecords<String, String> consumerRecords = null; while (true) { //接下來就要從topic中拉取數據 consumerRecords = consumer.poll(1000); //遍歷每一條記錄 for (ConsumerRecord consumerRecord : consumerRecords) { long offset = consumerRecord.offset(); Object key = consumerRecord.key(); Object value = consumerRecord.value(); int partition = consumerRecord.partition(); System.out.println("CurrentThreadID: " + Thread.currentThread().getId() + "\toffset: " + offset + "\tpartition: " + partition + "\tkey: " + key + "\tvalue: " + value); } } } }
package com.uplooking.bigdata.kafka.partitioner; import org.apache.kafka.clients.producer.Partitioner; import org.apache.kafka.common.Cluster; import java.util.Map; import java.util.Random; /** * 建立自定義的分區,根據數據的key來進行劃分 * <p> * 能夠根據key或者value的hashCode * 還能夠根據本身業務上的定義將數據分散在不一樣的分區中 * 需求: * 根據用戶輸入的key的hashCode值和partition個數求模 */ public class MyKafkaPartitioner implements Partitioner { public void configure(Map<String, ?> configs) { } /** * 根據給定的數據設置相關的分區 * * @param topic 主題名稱 * @param key key * @param keyBytes 序列化以後的key * @param value value * @param valueBytes 序列化以後的value * @param cluster 當前集羣的元數據信息 */ public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { Integer partitionNums = cluster.partitionCountForTopic(topic); int targetPartition = -1; if (key == null || keyBytes == null) { targetPartition = new Random().nextInt(10000) % partitionNums; } else { int hashCode = key.hashCode(); targetPartition = hashCode % partitionNums; System.out.println("key: " + key + ", value: " + value + ", hashCode: " + hashCode + ", partition: " + targetPartition); } return targetPartition; } public void close() { } }
package com.uplooking.bigdata.kafka.constants; public interface Constants { /** * 生產的key對應的常量 */ String KAFKA_PRODUCER_TOPIC = "producer.topic"; }
############################# Producer Basics ############################# # list of brokers used for bootstrapping knowledge about the rest of the cluster # format: host1:port1,host2:port2 ... bootstrap.servers=uplooking01:9092,uplooking02:9092,uplooking03:9092 # specify the compression codec for all data generated: none, gzip, snappy, lz4 compression.type=none # name of the partitioner class for partitioning events; default partition spreads data randomly partitioner.class=com.uplooking.bigdata.kafka.partitioner.MyKafkaPartitioner # the maximum amount of time the client will wait for the response of a request #request.timeout.ms= # how long `KafkaProducer.send` and `KafkaProducer.partitionsFor` will block for #max.block.ms= # the producer will wait for up to the given delay to allow other records to be sent so that the sends can be batched together #linger.ms= # the maximum size of a request in bytes #max.request.size= # the default batch size in bytes when batching multiple records sent to a partition #batch.size= # the total bytes of memory the producer can use to buffer records waiting to be sent to the server #buffer.memory= #####設置自定義的topic producer.topic=hadoop key.serializer=org.apache.kafka.common.serialization.StringSerializer value.serializer=org.apache.kafka.common.serialization.StringSerializer
# Zookeeper connection string # comma separated host:port pairs, each corresponding to a zk # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002" zookeeper.connect= uplooking01:2181,uplooking02:2181,uplooking03:2181 bootstrap.servers=uplooking01:9092,uplooking02:9092,uplooking03:9092 # timeout in ms for connecting to zookeeper zookeeper.connection.timeout.ms=6000 #consumer group id group.id=test-consumer-group #consumer timeout #consumer.timeout.ms=5000 key.deserializer=org.apache.kafka.common.serialization.StringDeserializer value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
主要是kafka-clients的依賴:工具
<dependencies> <!--kafka--> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.10.0.1</version> </dependency> </dependencies>
先在終端啓動一個消費者,注意因爲沒有指定配置文件,因此其group.id是隨機生成的:oop
$ kafka-console-consumer.sh --topic hadoop --zookeeper uplooking01:2181,uplooking02:2181,uplooking03:2181
接下來分別執行消費者的代碼和生產者的代碼,而後觀察各個終端的輸出。性能
生產者程序的終端輸出以下:
key: 1, value: 今天的<--劉向前-->很美很美哦~, hashCode: 49, partition: 1 key: 2, value: 今天的<--劉向前-->很美很美哦~, hashCode: 50, partition: 2 key: 3, value: 今天的<--劉向前-->很美很美哦~, hashCode: 51, partition: 0 key: 4, value: 今天的<--楊柳-->很美很美哦~, hashCode: 52, partition: 1 key: 5, value: 今天的<--姚慧瑩-->很美很美哦~, hashCode: 53, partition: 2 key: 6, value: 今天的<--姚慧瑩-->很美很美哦~, hashCode: 54, partition: 0 key: 7, value: 今天的<--楊柳-->很美很美哦~, hashCode: 55, partition: 1 key: 8, value: 今天的<--劉向前-->很美很美哦~, hashCode: 56, partition: 2 key: 9, value: 今天的<--姚慧瑩-->很美很美哦~, hashCode: 57, partition: 0 key: 10, value: 今天的<--楊柳-->很美很美哦~, hashCode: 1567, partition: 1 key: 11, value: 今天的<--姚慧瑩-->很美很美哦~, hashCode: 1568, partition: 2 key: 12, value: 今天的<--周 新-->很美很美哦~, hashCode: 1569, partition: 0 key: 13, value: 今天的<--姚慧瑩-->很美很美哦~, hashCode: 1570, partition: 1 key: 14, value: 今天的<--姚慧瑩-->很美很美哦~, hashCode: 1571, partition: 2 key: 15, value: 今天的<--劉向前-->很美很美哦~, hashCode: 1572, partition: 0 key: 16, value: 今天的<--劉向前-->很美很美哦~, hashCode: 1573, partition: 1 key: 17, value: 今天的<--楊柳-->很美很美哦~, hashCode: 1574, partition: 2 key: 18, value: 今天的<--劉向前-->很美很美哦~, hashCode: 1575, partition: 0 key: 19, value: 今天的<--楊柳-->很美很美哦~, hashCode: 1576, partition: 1 key: 20, value: 今天的<--姚慧瑩-->很美很美哦~, hashCode: 1598, partition: 2 key: 21, value: 今天的<--楊柳-->很美很美哦~, hashCode: 1599, partition: 0
消費者程序的終端輸出以下:
外部開始時間:1521991118178 線程ID:20線程開始時間:1521991123182 線程ID:21線程開始時間:1521991123182 線程ID:23線程開始時間:1521991123182 線程ID:22線程開始時間:1521991123182 CurrentThreadID: 22 offset: 78 partition: 1 key: 1 value: 今天的<--劉向前-->很美很美哦~ CurrentThreadID: 22 offset: 79 partition: 1 key: 4 value: 今天的<--楊柳-->很美很美哦~ CurrentThreadID: 22 offset: 80 partition: 1 key: 7 value: 今天的<--楊柳-->很美很美哦~ CurrentThreadID: 22 offset: 81 partition: 1 key: 10 value: 今天的<--楊柳-->很美很美哦~ CurrentThreadID: 22 offset: 82 partition: 1 key: 13 value: 今天的<--姚慧瑩-->很美很美哦~ CurrentThreadID: 23 offset: 81 partition: 0 key: 3 value: 今天的<--劉向前-->很美很美哦~ CurrentThreadID: 23 offset: 82 partition: 0 key: 6 value: 今天的<--姚慧瑩-->很美很美哦~ CurrentThreadID: 23 offset: 83 partition: 0 key: 9 value: 今天的<--姚慧瑩-->很美很美哦~ CurrentThreadID: 23 offset: 84 partition: 0 key: 12 value: 今天的<--周 新-->很美很美哦~ CurrentThreadID: 23 offset: 85 partition: 0 key: 15 value: 今天的<--劉向前-->很美很美哦~ CurrentThreadID: 23 offset: 86 partition: 0 key: 18 value: 今天的<--劉向前-->很美很美哦~ CurrentThreadID: 22 offset: 83 partition: 1 key: 16 value: 今天的<--劉向前-->很美很美哦~ CurrentThreadID: 23 offset: 87 partition: 0 key: 21 value: 今天的<--楊柳-->很美很美哦~ CurrentThreadID: 21 offset: 78 partition: 2 key: 2 value: 今天的<--劉向前-->很美很美哦~ CurrentThreadID: 22 offset: 84 partition: 1 key: 19 value: 今天的<--楊柳-->很美很美哦~ CurrentThreadID: 21 offset: 79 partition: 2 key: 5 value: 今天的<--姚慧瑩-->很美很美哦~ CurrentThreadID: 21 offset: 80 partition: 2 key: 8 value: 今天的<--劉向前-->很美很美哦~ CurrentThreadID: 21 offset: 81 partition: 2 key: 11 value: 今天的<--姚慧瑩-->很美很美哦~ CurrentThreadID: 21 offset: 82 partition: 2 key: 14 value: 今天的<--姚慧瑩-->很美很美哦~ CurrentThreadID: 21 offset: 83 partition: 2 key: 17 value: 今天的<--楊柳-->很美很美哦~ CurrentThreadID: 21 offset: 84 partition: 2 key: 20 value: 今天的<--姚慧瑩-->很美很美哦~
消費者shell的終端輸出以下:
$ kafka-console-consumer.sh --topic hadoop --zookeeper uplooking01:2181,uplooking02:2181,uplooking03:2181 今天的<--劉向前-->很美很美哦~ 今天的<--姚慧瑩-->很美很美哦~ 今天的<--劉向前-->很美很美哦~ 今天的<--姚慧瑩-->很美很美哦~ 今天的<--姚慧瑩-->很美很美哦~ 今天的<--楊柳-->很美很美哦~ 今天的<--姚慧瑩-->很美很美哦~ 今天的<--劉向前-->很美很美哦~ 今天的<--姚慧瑩-->很美很美哦~ 今天的<--姚慧瑩-->很美很美哦~ 今天的<--周 新-->很美很美哦~ 今天的<--劉向前-->很美很美哦~ 今天的<--劉向前-->很美很美哦~ 今天的<--楊柳-->很美很美哦~ 今天的<--劉向前-->很美很美哦~ 今天的<--楊柳-->很美很美哦~ 今天的<--楊柳-->很美很美哦~ 今天的<--楊柳-->很美很美哦~ 今天的<--姚慧瑩-->很美很美哦~ 今天的<--劉向前-->很美很美哦~ 今天的<--楊柳-->很美很美哦~
由於使用kafka shell的消費者的group.id是隨機生成的,因此其確定能夠消費到topic下partition的消息,這是屬於組間的消費。
而因爲在消費者的程序代碼中,4個線程都是使用同一個group.id的(都是使用consumer.properties這個配置文件),按照理論知識的理解,由於topic hadoop只有3個partition,因此只能有3個線程即3個consumer進行消息的消費,而觀察輸出,經過線程ID,發現確實只有三個線程消費了topic中的消息,這也驗證了kafka組內消息的消費形式。
參考文檔:https://cwiki.apache.org/confluence/display/KAFKA/Performance+testing
在kafka的安裝目錄的bin裏有性能的評估工具bin/kafka-producer-perf-test.sh
,主要輸出4項指標,總共發送消息量(以MB爲單位),每秒發送消息量(MB/second),發送消息總數,每秒發送消息數(records/second)。
測試以下:
[uplooking@uplooking01 ~]$ kafka-producer-perf-test.sh --topic flume-kafka --num-records 1000000 --producer-props bootstrap.servers=uplooking01:9092,uplooking02:9092,uplooking03:9092 --throughput 10000 --record-size 100 49972 records sent, 9994.4 records/sec (0.95 MB/sec), 3.1 ms avg latency, 258.0 max latency. 50200 records sent, 10040.0 records/sec (0.96 MB/sec), 2.4 ms avg latency, 141.0 max latency. 50020 records sent, 10004.0 records/sec (0.95 MB/sec), 1.2 ms avg latency, 19.0 max latency. 50010 records sent, 10000.0 records/sec (0.95 MB/sec), 2.3 ms avg latency, 127.0 max latency. 50010 records sent, 10002.0 records/sec (0.95 MB/sec), 1.3 ms avg latency, 24.0 max latency. 50020 records sent, 10004.0 records/sec (0.95 MB/sec), 2.4 ms avg latency, 186.0 max latency. 50010 records sent, 10002.0 records/sec (0.95 MB/sec), 15.1 ms avg latency, 466.0 max latency. 50020 records sent, 10002.0 records/sec (0.95 MB/sec), 11.1 ms avg latency, 405.0 max latency. 50000 records sent, 10000.0 records/sec (0.95 MB/sec), 1.2 ms avg latency, 19.0 max latency. 50030 records sent, 10004.0 records/sec (0.95 MB/sec), 1.2 ms avg latency, 20.0 max latency. 50000 records sent, 10000.0 records/sec (0.95 MB/sec), 1.2 ms avg latency, 30.0 max latency. 50010 records sent, 10002.0 records/sec (0.95 MB/sec), 1.3 ms avg latency, 19.0 max latency. 49990 records sent, 9998.0 records/sec (0.95 MB/sec), 1.4 ms avg latency, 49.0 max latency. 50033 records sent, 10006.6 records/sec (0.95 MB/sec), 37.9 ms avg latency, 617.0 max latency. 50010 records sent, 10002.0 records/sec (0.95 MB/sec), 1.5 ms avg latency, 74.0 max latency. 50007 records sent, 10001.4 records/sec (0.95 MB/sec), 1.3 ms avg latency, 19.0 max latency. 50000 records sent, 10000.0 records/sec (0.95 MB/sec), 1.8 ms avg latency, 132.0 max latency. 50010 records sent, 10002.0 records/sec (0.95 MB/sec), 1.2 ms avg latency, 15.0 max latency. 50020 records sent, 10000.0 records/sec (0.95 MB/sec), 1.9 ms avg latency, 121.0 max latency. 1000000 records sent, 9999.200064 records/sec (0.95 MB/sec), 4.96 ms avg latency, 617.00 ms max latency, 1 ms 50th, 3 ms 95th, 105 ms 99th, 541 ms 99.9th.
參數說明以下:
--num-records 1000000 總共生產的消息數量 --throughput 10000 每秒須要生產的消息數量 --record-size 100 每條消息的大小,單位爲字節
[uplooking@uplooking01 ~]$ kafka-consumer-perf-test.sh --topic flume-kafka --messages 1000000 --broker-list uplooking01:9092,uplooking02:9092,uplooking03:9092 --threads 3 --zookeeper uplooking01:2181,uplooking02:2181,uplooking03:2181 start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec 2018-03-26 05:17:21:185, 2018-03-26 05:17:22:458, 97.3055, 76.4380, 1020325, 801512.1760
上面的測試爲須要消費一百萬條消息,輸出的參數說明以下:
開始時間 結束時間 消費消息總大小 每秒消費大小 消費消息總條數 每秒消費條數 start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec 2018-03-26 05:17:21:185, 2018-03-26 05:17:22:458, 97.3055, 76.4380, 1020325, 801512.1760