kafka 多線程消費

1、java

   一、Kafka的消費並行度依賴Topic配置的分區數,如分區數爲10,那麼最多10臺機器來並行消費(每臺機器只能開啓一個線程),或者一臺機器消費(10個線程並行消費)。即消費並行度和分區數一致。多線程

   二、(1)若是指定了某個分區,會只講消息發到這個分區上 ide

       (2)若是同時指定了某個分區和key,則也會將消息發送到指定分區上,key不起做用 spa

       (3)若是沒有指定分區和key,那麼將會隨機發送到topic的分區中線程

       (4)若是指定了key,那麼將會以hash<key>的方式發送到分區中 blog

2、多線程消費實例get

     paritition 爲3,broker爲3,節點爲3kafka

一、生產者隨機分區提交數據hash

    這也是一個比較關鍵步驟,只有隨機提交到不一樣的分區才能實現多分區消費; 
自定義隨機分區:it

public class MyPartition implements Partitioner{
     private static Logger LOG = LoggerFactory.getLogger(MyPartition.class); 
    @Override
    public void configure(Map<String, ?> arg0) {
        // TODO Auto-generated method stub
    }

    @Override
    public void close() {
        // TODO Auto-generated method stub
    }

    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value,
            byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        int partitionNum = 0;
        try {
            partitionNum = Integer.parseInt((String) key);
        } catch (Exception e) {
            partitionNum = key.hashCode() ;
        }
//        System.out.println("kafkaMessage topic:"+ topic+" |key:"+ key+" |value:"+value);
        return Math.abs(partitionNum  % numPartitions);
    }
}  

    而後在初始化kafka生產者配置的時候修改以下配置

props.put("partitioner.class", properties.getProperty("com.mykafka.MyPartition"));

這樣就實現了kafka生產者隨機分區提交數據。

 二、消費者

最後一步就是消費者,修改單線程模式爲多線程,這裏的多線程實現方式有不少,本例知識用了最簡單的固定線程模式:

ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3);
        for (int i = 0; i < 3; i++) {
            fixedThreadPool.execute(new Runnable() {
                @Override
                public void run() {
                    kafkaConsumerService.getInstance();
                }
            });
        }

在消費方面得注意,這裏得遍歷全部分區,不然仍是隻消費了一個區:

ConsumerRecords<String, String> records = consumer.poll(1000);
        for (TopicPartition partition : records.partitions()) {  
            List<ConsumerRecord<String, String>> partitionRecords = records  
                    .records(partition); 
        for (ConsumerRecord<String, String> record : partitionRecords) {
            System.out.println(
                    "message==>key:" + record.key() + " value:" + record.value() + " offset:" + record.offset()
                    + " 分區:" + record.partition());
            if (record.value() == null || record.key() == null) {
                consumer.commitSync();
            } else {
                // dealMessage
                KafkaServer.dealMessage(record.key(),record.value(),consumer);
//              consumer.commitSync();
            }
         }
        }

  注意上面的線程爲啥只有3個,這裏得跟上面kafka的分區個數相對應起來,不然若是線程超過度區數量,那麼只會浪費線程,由於即便使用3個以上的線程也只會消費三個分區,而少了則沒法消費徹底。因此這裏必須更上面的對應起來。 

相關文章
相關標籤/搜索