kafka數據如何被重複消費

近段時間學習極客時間李玥老師的後端存儲實戰課時,看到一個不少意思的東西:用kafka存儲點擊流的數據,並重復處理。在以往的使用中,kafka只是一個消息傳輸的載體,消息被消費後就不能再次消費。新知識與印象相沖突,因而就有了本篇文章:kafka數據如何被重複消費。html

前期理論瞭解

首先我先去官網糾正了我對kafka的總體瞭解。java

官網對kafka的描述是:一個分佈式流平臺。怪本身的學藝不精。git

其次,我從新看了一下kafka消費者的消費過程:kafka首先經過push/poll(默認爲poll)獲取消息,接收消息處理完成後手動/自動提交消費成功,kafka服務器則根據提交狀況決定是否移動當前偏移量。github

方案肯定

kafka消費者讀取數據的位置是經過偏移量判斷,那若是我能將偏移量手動設置爲起始位置,就能實現重複消費?這個有搞頭。apache

如何手動設置偏移量是關鍵。後端

show me the code

代碼的關鍵主要在於偏移量設置 api 的調用,其他沒什麼特別。api

要注意的是,代碼中我分別調用了做用不一樣的設置偏移量,僅做爲展現,可按需取用。服務器

最後消費者消息消息時,我只使用默認的拉取條數設置消費一次,可按需進行修改。分佈式

 /**  
  * repeat kafka message  
  * @param host kafka host  
  * @param groupId kafka consumer group id  
  * @param autoCommit whether auto commit consume  
  * @param topic consume topic  
  * @param consumeTimeOut consume time out  
 */  
  private void textResetOffset(String host, String groupId, Boolean autoCommit, String topic, Long consumeTimeOut){  
  //form a properties to new consumer  
  Properties properties = new Properties();  
  properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, host);  
  properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);  
  properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit.toString());  
  properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());  
  properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());  
  KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);  
  //subscribe incoming topic  
  consumer.subscribe(Collections.singletonList(topic));  
  //get consumer consume partitions  
  List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic);  
  List<TopicPartition> topicPartitions = new ArrayList<>();  
  for(PartitionInfo partitionInfo : partitionInfos){  
  TopicPartition topicPartition = new TopicPartition(partitionInfo.topic(), partitionInfo.partition());  
  topicPartitions.add(topicPartition);  
  }  
  // poll data from kafka server to prevent lazy operation  
  consumer.poll(Duration.ofSeconds(consumeTimeOut));  
  //reset offset from beginning  
  consumer.seekToBeginning(topicPartitions);  
  //reset designated partition offset by designated spot  
  int offset = 20;  
  consumer.seek(topicPartitions.get(0), offset);  
  //reset offset to end  
  consumer.seekToEnd(topicPartitions);  
  //consume message as usual  
  ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));  
  Iterator<ConsumerRecord<String, String>> iterator = records.iterator();  
  while (iterator.hasNext()){  
  ConsumerRecord<String, String> record = iterator.next();  
  log.info("consume data: {}", record.value());  
  }  
  }
運行結果

需注意的點

在手動設置偏移量時,遇到了一個exceptionide

 java.lang.IllegalStateException: No current assignment for partition test-0

翻了一下stackoverflow以及官方文檔後,才瞭解到設置偏移量是一個lazy operation,官網的解釋以下。

Seek to the first offset for each of the given partitions. This function evaluates lazily, seeking to the first offset in all partitions only when poll(long)) or position(TopicPartition)) are called. If no partition is provided, seek to the first offset for all of the currently assigned partitions.

因而我先進行一次 poll 操做後再設置偏移量。

本文首發於 cartoon的博客
轉載請註明出處:https://cartoonyu.github.io/cartoon-blog/post/message-queue/kafka數據如何被重複消費/

相關文章
相關標籤/搜索