近段時間學習極客時間李玥老師的後端存儲實戰課時,看到一個不少意思的東西:用kafka存儲點擊流的數據,並重復處理。在以往的使用中,kafka只是一個消息傳輸的載體,消息被消費後就不能再次消費。新知識與印象相沖突,因而就有了本篇文章:kafka數據如何被重複消費。html
首先我先去官網糾正了我對kafka的總體瞭解。java
官網對kafka的描述是:一個分佈式流平臺。怪本身的學藝不精。git
其次,我從新看了一下kafka消費者的消費過程:kafka首先經過push/poll(默認爲poll)獲取消息,接收消息處理完成後手動/自動提交消費成功,kafka服務器則根據提交狀況決定是否移動當前偏移量。github
kafka消費者讀取數據的位置是經過偏移量判斷,那若是我能將偏移量手動設置爲起始位置,就能實現重複消費?這個有搞頭。apache
如何手動設置偏移量是關鍵。後端
代碼的關鍵主要在於偏移量設置 api 的調用,其他沒什麼特別。api
要注意的是,代碼中我分別調用了做用不一樣的設置偏移量,僅做爲展現,可按需取用。服務器
最後消費者消息消息時,我只使用默認的拉取條數設置消費一次,可按需進行修改。分佈式
/** * repeat kafka message * @param host kafka host * @param groupId kafka consumer group id * @param autoCommit whether auto commit consume * @param topic consume topic * @param consumeTimeOut consume time out */ private void textResetOffset(String host, String groupId, Boolean autoCommit, String topic, Long consumeTimeOut){ //form a properties to new consumer Properties properties = new Properties(); properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, host); properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId); properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit.toString()); properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties); //subscribe incoming topic consumer.subscribe(Collections.singletonList(topic)); //get consumer consume partitions List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic); List<TopicPartition> topicPartitions = new ArrayList<>(); for(PartitionInfo partitionInfo : partitionInfos){ TopicPartition topicPartition = new TopicPartition(partitionInfo.topic(), partitionInfo.partition()); topicPartitions.add(topicPartition); } // poll data from kafka server to prevent lazy operation consumer.poll(Duration.ofSeconds(consumeTimeOut)); //reset offset from beginning consumer.seekToBeginning(topicPartitions); //reset designated partition offset by designated spot int offset = 20; consumer.seek(topicPartitions.get(0), offset); //reset offset to end consumer.seekToEnd(topicPartitions); //consume message as usual ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1)); Iterator<ConsumerRecord<String, String>> iterator = records.iterator(); while (iterator.hasNext()){ ConsumerRecord<String, String> record = iterator.next(); log.info("consume data: {}", record.value()); } }
在手動設置偏移量時,遇到了一個exceptionide
java.lang.IllegalStateException: No current assignment for partition test-0
翻了一下stackoverflow以及官方文檔後,才瞭解到設置偏移量是一個lazy operation,官網的解釋以下。
Seek to the first offset for each of the given partitions. This function evaluates lazily, seeking to the first offset in all partitions only whenpoll(long)
) orposition(TopicPartition)
) are called. If no partition is provided, seek to the first offset for all of the currently assigned partitions.
因而我先進行一次 poll 操做後再設置偏移量。
本文首發於 cartoon的博客
轉載請註明出處:https://cartoonyu.github.io/cartoon-blog/post/message-queue/kafka數據如何被重複消費/