1、java
一、Kafka的消費並行度依賴Topic配置的分區數,如分區數爲10,那麼最多10臺機器來並行消費(每臺機器只能開啓一個線程),或者一臺機器消費(10個線程並行消費)。即消費並行度和分區數一致。多線程
二、(1)若是指定了某個分區,會只講消息發到這個分區上 ide
(2)若是同時指定了某個分區和key,則也會將消息發送到指定分區上,key不起做用 spa
(3)若是沒有指定分區和key,那麼將會隨機發送到topic的分區中線程
(4)若是指定了key,那麼將會以hash<key>的方式發送到分區中 blog
2、多線程消費實例get
paritition 爲3,broker爲3,節點爲3kafka
一、生產者隨機分區提交數據hash
這也是一個比較關鍵步驟,只有隨機提交到不一樣的分區才能實現多分區消費;
自定義隨機分區:it
public class MyPartition implements Partitioner{ private static Logger LOG = LoggerFactory.getLogger(MyPartition.class); @Override public void configure(Map<String, ?> arg0) { // TODO Auto-generated method stub } @Override public void close() { // TODO Auto-generated method stub } @Override public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); int numPartitions = partitions.size(); int partitionNum = 0; try { partitionNum = Integer.parseInt((String) key); } catch (Exception e) { partitionNum = key.hashCode() ; } // System.out.println("kafkaMessage topic:"+ topic+" |key:"+ key+" |value:"+value); return Math.abs(partitionNum % numPartitions); } }
而後在初始化kafka生產者配置的時候修改以下配置
props.put("partitioner.class", properties.getProperty("com.mykafka.MyPartition"));
這樣就實現了kafka生產者隨機分區提交數據。
二、消費者
最後一步就是消費者,修改單線程模式爲多線程,這裏的多線程實現方式有不少,本例知識用了最簡單的固定線程模式:
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3); for (int i = 0; i < 3; i++) { fixedThreadPool.execute(new Runnable() { @Override public void run() { kafkaConsumerService.getInstance(); } }); }
在消費方面得注意,這裏得遍歷全部分區,不然仍是隻消費了一個區:
ConsumerRecords<String, String> records = consumer.poll(1000); for (TopicPartition partition : records.partitions()) { List<ConsumerRecord<String, String>> partitionRecords = records .records(partition); for (ConsumerRecord<String, String> record : partitionRecords) { System.out.println( "message==>key:" + record.key() + " value:" + record.value() + " offset:" + record.offset() + " 分區:" + record.partition()); if (record.value() == null || record.key() == null) { consumer.commitSync(); } else { // dealMessage KafkaServer.dealMessage(record.key(),record.value(),consumer); // consumer.commitSync(); } } }
注意上面的線程爲啥只有3個,這裏得跟上面kafka的分區個數相對應起來,不然若是線程超過度區數量,那麼只會浪費線程,由於即便使用3個以上的線程也只會消費三個分區,而少了則沒法消費徹底。因此這裏必須更上面的對應起來。