kafka的消費者API提供從kafka服務端拉取消息的能力,kafka引入了消費者組的概念,不一樣消費者組之間互不影響,獨自擁有一份數據,而同一個消費者組內的消費者則有以下規律:apache
分區數=消費者數:一個消費者拉取一個分區的數據bootstrap
分區數>消費者數:同一個消費者可能拉取不一樣分區的數據oop
分區數<消費者數:一個消費者拉取一個分區的數據,多餘的消費者不參與工做,當正在工做的消費者掛了之 後,這些閒着的消費者會頂替它幹活,但會出現重複消費數據的狀況spa
全部提交的offset都在kafka內建的一個消息隊列中存在的,有50個分區,可使用以下命令查看.net
查看全部topicdebug
./kafka-topics.sh --zookeeper hadoop01:2181 --listcode
查看某個消費者組訂閱的topic的當前offset和滯後進度server
./kafka-consumer-groups.sh --bootstrap-server hadoop01:9092 --describe --group my_groupblog
1.偏移量-自動提交隊列
/* 消費者拉取數據以後自動提交偏移量,不關心後續對消息的處理是否正確 優勢:消費快,適用於數據一致性弱的業務場景 缺點:消息很容易丟失 */ @Test public void autoCommit() { Properties props = new Properties(); //設置kafka集羣的地址 props.put("bootstrap.servers", "hadoop01:9092,hadoop02:9092,hadoop03:9092"); //設置消費者組,組名字自定義,組名字相同的消費者在一個組 props.put("group.id", "my_group"); //開啓offset自動提交 props.put("enable.auto.commit", "true"); //自動提交時間間隔 props.put("auto.commit.interval.ms", "1000"); //序列化器 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); //實例化一個消費者 KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); //消費者訂閱主題,能夠訂閱多個主題 consumer.subscribe(Arrays.asList("mytopic1")); //死循環不停的從broker中拿數據 while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } }
運行上面的程序輸出結果:
使用以下命令查看offset提交後當前位置
./kafka-consumer-groups.sh --bootstrap-server hadoop01:9092 --describe --group my_group
比較上面兩張圖,最後一次消費的OFFSET=216493,下一個要消費的OFFSET=216494
一般從Kafka拿到的消息是要作業務處理,並且業務處理完成纔算真正消費成功,因此須要客戶端控制offset提交時間
@Test public void munualCommit() { Properties props = new Properties(); //設置kafka集羣的地址 props.put("bootstrap.servers", "hadoop01:9092,hadoop02:9092,hadoop03:9092"); //設置消費者組,組名字自定義,組名字相同的消費者在一個組 props.put("group.id", "my_group"); //開啓offset自動提交 props.put("enable.auto.commit", "false"); //序列化器 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); //實例化一個消費者 KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); //消費者訂閱主題,能夠訂閱多個主題 consumer.subscribe(Arrays.asList("mytopic1")); final int minBatchSize = 50; List<ConsumerRecord<String, String>> buffer = new ArrayList<>(); while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { buffer.add(record); } if (buffer.size() >= minBatchSize) { //insertIntoDb(buffer); for (ConsumerRecord bf : buffer) { System.out.printf("offset = %d, key = %s, value = %s%n", bf.offset(), bf.key(), bf.value()); } consumer.commitSync(); buffer.clear(); } } }
在munualCommit的基礎上更細粒度的提交數據,按照每一個分區手動提交偏移量
這裏實現了按照分區取數據,所以能夠從分區入手,不一樣的分區能夠作不一樣的操做,能夠靈活實現一些功能
爲了驗證手動提交偏移量,有兩種方式:
1.debug的時候,在poll數據以後,手動提交前偏移量以前終止程序,再次啓動看數據是否重複被拉取 2.debug的時候,在poll數據以後,手動提交前偏移量以前終止程序,登陸Linux 主機執行以下命令:
/kafka-consumer-groups.sh --bootstrap-server hadoop01:9092 --describe --group my_group
命令的輸出結果能夠看到當前topic每一個區分被提交後的當前偏移量、還未被消費的最大偏移量、二者之間的差等信息
@Test public void munualCommitByPartition() { Properties props = new Properties(); //設置kafka集羣的地址 props.put("bootstrap.servers", "hadoop01:9092,hadoop02:9092,hadoop03:9092"); //設置消費者組,組名字自定義,組名字相同的消費者在一個組 props.put("group.id", "my_group"); //開啓offset自動提交 props.put("enable.auto.commit", "false"); //序列化器 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); //實例化一個消費者 KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); //消費者訂閱主題,能夠訂閱多個主題 consumer.subscribe(Arrays.asList("mytopic3")); try { while (true) { ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE); for (TopicPartition partition : records.partitions()) { List<ConsumerRecord<String, String>> partitionRecords = records.records(partition); for (ConsumerRecord<String, String> record : partitionRecords) { System.out.println("partition: " + partition.partition() + " , " + record.offset() + ": " + record.value()); } long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset(); /* 提交的偏移量應該始終是您的應用程序將要讀取的下一條消息的偏移量。所以,在調用commitSync()時, offset應該是處理的最後一條消息的偏移量加1 爲何這裏要加上面不加喃?由於上面Kafka可以自動幫咱們維護全部分區的偏移量設置,有興趣的同窗能夠看看SubscriptionState.allConsumed()就知道 */ consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1))); } } } finally { consumer.close(); } }
消費只讀取特定分區數據,這種方式比上面的更加靈活,在實際應用場景中會常常使用
由於分區的數據是有序的,利用這個特性能夠用於數據到達有前後順序的業務,好比一個用戶將訂單提交,緊接着又取消訂單,那麼取消的訂單必定要後於提交的訂單到達某一個分區,這樣保證業務處理的正確性
一旦指定了分區,要注意如下兩點:
a.kafka提供的消費者組內的協調功能就再也不有效
b.這樣的寫法可能出現不一樣消費者分配了相同的分區,爲了不偏移量提交衝突,每一個消費者實例的group_id要不重複
@Test public void munualPollByPartition() { Properties props = new Properties(); //設置kafka集羣的地址 props.put("bootstrap.servers", "hadoop01:9092,hadoop02:9092,hadoop03:9092"); //設置消費者組,組名字自定義,組名字相同的消費者在一個組 props.put("group.id", "my_group"); //開啓offset自動提交 props.put("enable.auto.commit", "false"); //序列化器 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); //實例化一個消費者 KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); //消費者訂閱主題,並設置要拉取的分區 TopicPartition partition0 = new TopicPartition("mytopic3", 0); //TopicPartition partition1 = new TopicPartition("mytopic2", 1); //consumer.assign(Arrays.asList(partition0, partition1)); consumer.assign(Arrays.asList(partition0)); try { while (true) { ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE); for (TopicPartition partition : records.partitions()) { List<ConsumerRecord<String, String>> partitionRecords = records.records(partition); for (ConsumerRecord<String, String> record : partitionRecords) { System.out.println("partition: " + partition.partition() + " , " + record.offset() + ": " + record.value()); } long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset(); consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1))); } } } finally { consumer.close(); } }