本文主要研究下kafka0.8版本api的topicCountMap與topic的partition的關係。html
物理上把topic分紅一個或多個partition,每一個partition在物理上對應一個文件夾,該文件夾下存儲 這個partition的全部消息和索引文件。c++
kafka producer發送消息的時候,若是有key的話,根據key進行hash,而後分發到指定的partition;若是沒有key則按counter進行partition。apache
若是增減consumer,broker,partition會致使rebalance,rebalance後consumer對應的partition會發生變化。api
好比減小一個consumer,而後rebalance以後,consumer對應的partition會進行從新調整映射。session
告訴Kafka咱們在Consumer中將用多少個線程來消費該topic。topicCountMap的key是topic name,value針對該topic是線程的數量。併發
假設有個topic,有6個partiton,而後啓動了兩個consumer,每一個consumer的topicCount爲3,則觀察會發現,每一個consumer的消費線程都在運行;
若是每一個consumer的topicCount變爲4,則會發現,先啓動的consmer中4個線程都在運行,然後啓動的consumer中只有2個線程在運行,其餘2個被阻塞住了。dom
也就是說,對於consumer來講,實際的消費個數=consumer實例個數*每一個consumer的topicCount個數,若是這個值>partition,則會形成某些消費線程多餘,阻塞住。
若是這個值<=partition,則全部消費線程都在消費。
所以實際分佈式部署consumer的時候,其consumer實例個數*每一個consumer的topicCount個數<=topic的partition值。分佈式
sh kafka-topics.sh --create --topic topic20170921 --replication-factor 1 --partitions 6 --zookeeper localhost:2181
sh kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeper localhost:2181 --group test-group-0921
public class NativeConsumer { ExecutorService pool = Executors.newFixedThreadPool(10); public void exec(String topic,String zk,int consumerCount,String group) throws UnsupportedEncodingException { Properties props = new Properties(); props.put("zookeeper.connect", zk); // props.put("auto.offset.reset","smallest"); props.put("group.id",group); props.put("zookeeper.session.timeout.ms", "10000"); props.put("zookeeper.sync.time.ms", "2000"); props.put("auto.commit.interval.ms", "10000"); props.put(org.apache.kafka.clients.consumer.ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY, "range"); ConsumerConfig consumerConfig = new kafka.consumer.ConsumerConfig(props); ConsumerConnector consumerConnector = kafka.consumer.Consumer.createJavaConsumerConnector(consumerConfig); Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put(topic, consumerCount); Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumerConnector .createMessageStreams(topicCountMap); consumerMap.get(topic).stream().forEach(stream -> { pool.submit(new Runnable() { @Override public void run() { ConsumerIterator<byte[], byte[]> it = stream.iterator(); while (it.hasNext()) { System.out.println(Thread.currentThread().getName()+":"+new String(it.next().message())); } } }); }); } }
public class NativeProducer { public void produce(String topic,String brokerAddr) throws ExecutionException, InterruptedException { Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,brokerAddr); props.put(ProducerConfig.CLIENT_ID_CONFIG, "DemoProducer"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); try (KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(props)) { int totalCountOfSendedMessages = 0; long totalSendTime = 0; long timeOfLastUpdate = 0; int countOfMessagesInSec = 0; for(int i=0;i<1000000;i++){ //todo key不能相同,不然都發送到同一個partition了,消費者沒法scale out byte[] dataKey = SerializationUtils.serialize(UUID.randomUUID().toString()); byte[] dataValue = SerializationUtils.serialize(UUID.randomUUID().toString()); ProducerRecord<byte[], byte[]> producerRecord = new ProducerRecord<>( topic, dataKey, dataValue ); long sendingStartTime = System.currentTimeMillis(); // Sync send producer.send(producerRecord).get(); Thread.sleep(100); long currentTime = System.currentTimeMillis(); long sendTime = currentTime - sendingStartTime; totalSendTime += sendTime; totalCountOfSendedMessages++; countOfMessagesInSec++; if (currentTime - timeOfLastUpdate > TimeUnit.SECONDS.toMillis(1)) { System.out.println("Average send time: " + (double) (totalSendTime / totalCountOfSendedMessages) + " ms."); System.out.println("Count of messages in second: " + countOfMessagesInSec); timeOfLastUpdate = currentTime; countOfMessagesInSec = 0; } } } } }
String zkAddr = "localhost:2181"; String topic = "topic20170921"; //partition 6 String brokerAddr = "localhost:9092"; String group = "test-group-0921"; @Test public void testConsumer1() throws InterruptedException { NativeConsumer nativeConsumer = new NativeConsumer(); try { nativeConsumer.exec(topic,zkAddr,4,group); } catch (UnsupportedEncodingException e1) { e1.printStackTrace(); } Thread.sleep(100000); } @Test public void testConsumer2() throws InterruptedException { NativeConsumer nativeConsumer = new NativeConsumer(); try { nativeConsumer.exec(topic,zkAddr,4,group); } catch (UnsupportedEncodingException e1) { e1.printStackTrace(); } Thread.sleep(100000); } @Test public void testProducer() throws UnsupportedEncodingException, InterruptedException { NativeProducer producer = new NativeProducer(); try { producer.produce(topic,brokerAddr); } catch (ExecutionException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } }