轉自 https://blog.csdn.net/qq_18581221/article/details/89766073node
在使用kafka時,大多數場景對於數據少許的不一致(重複或者丟失)並不關注,好比日誌,由於不會影響最終的使用或者分析,可是在某些應用場景(好比業務數據),須要對任何一條消息都要作到精確一次的消費,才能保證系統的正確性,kafka並不提供準確一致的消費API,須要咱們在實際使用時借用外部的一些手段來保證消費的精確性,下面咱們介紹如何實現sql
這篇文章KafkaConsumer使用介紹、參數配置介紹瞭如何kafka具備兩種提交offset(消費偏移量)方式,咱們在Kafka簡介以及安裝和使用可知每一個分區具有一offset記錄消費位置,若是消費者一直處於正常的運行轉態,那麼offset將沒有什麼用處,由於正常消費時,consumer記錄了本次消費的offset和下一次將要進行poll數據的offset起始位置,可是若是消費者發生崩潰或者有新的消費者加入消費者組,就會觸發再均衡Rebalance,Rebalance以後,每一個消費者將會分配到新的分區,而消費者對於新的分區應該從哪裏進行起始消費,這時候提交的offset信息就起做用了,提交的offset信息包括消費者組全部分區的消費進度,這時候消費者能夠根據消費進度繼續消費,提交offset提交自動提交是最不具肯定性的,因此要使用手動提交來控制offset數據庫
/** * 手動提交offset * 實現至少一次的消費語義 at least once * 當手動提交位移失敗,會重複消費數據 */ @Test public void testCommitOffset() { String topic = "first-topic"; String group = "g1"; Properties props = new Properties(); props.put("bootstrap.servers", "node00:9092,node03:9092"); //required props.put("group.id", group); //required props.put("enable.auto.commit", "false"); // 關閉自動提交 props.put("auto.commit.interval.ms", "1000"); props.put("auto.offset.reset", "latest"); //從最先的消息開始讀取 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); //required props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); //required Consumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList(topic)); //訂閱topic final int minBatchSize = 10; // 緩存 List<ConsumerRecord<String, String>> buffer = new ArrayList<>(minBatchSize); try { while (true) { ConsumerRecords<String, String> records = consumer.poll(1000); records.forEach(buffer::add); // 緩存滿了纔對數據進行處理 if (buffer.size() >= minBatchSize) { // 業務邏輯--插入數據庫 // insertIntoDb(buffer); // 等數據插入數據庫以後,再異步提交位移 // 經過異步的方式提交位移 consumer.commitAsync(((offsets, exception) -> { if (exception == null) { offsets.forEach((topicPartition, metadata) -> { System.out.println(topicPartition + " -> offset=" + metadata.offset()); }); } else { exception.printStackTrace(); // 若是出錯了,同步提交位移 consumer.commitSync(offsets); } })); // 若是提交位移失敗了,那麼重啓consumer後會重複消費以前的數據,再次插入到數據庫中 // 清空緩衝區 buffer.clear(); } } } finally { consumer.close(); } }
代碼實現apache
/** * 實現最多一次語義 * 在消費前提交位移,當後續業務出現異常時,可能丟失數據 */ @Test public void testAtMostOnce() { Properties props = new Properties(); props.put("enable.auto.commit", "false"); KafkaConsumer<String, String> kafkaConsumer = KafkaFactory.buildConsumer(props); kafkaConsumer.subscribe(Arrays.asList("first-topic")); try { while (true) { ConsumerRecords<String, String> records = kafkaConsumer.poll(500); // 處理業務以前就提交位移 kafkaConsumer.commitAsync(); // 下面是業務邏輯 records.forEach(record -> { System.out.println(record.value() + ", offset=" + record.offset()); }); } } catch (Exception e) { } finally { kafkaConsumer.close(); } }
從kafka的消費機制,咱們能夠獲得是否可以精確的消費關鍵在消費進度信息的準確性,若是可以保證消費進度的準確性,也就保證了消費數據的準確性bootstrap
這裏簡單說明一下實現思路
1) 利用consumer api的seek方法能夠指定offset進行消費,在啓動消費者時查詢數據庫中記錄的offset信息,若是是第一次啓動,那麼數據庫中將沒有offset信息,須要進行消費的元數據插入,而後從offset=0開始消費api
2) 關係型數據庫具有事務的特性,當數據入庫時,同時也將offset信息更新,借用關係型數據庫事務的特性保證數據入庫和修改offset記錄這兩個操做是在同一個事務中進行緩存
3) 使用ConsumerRebalanceListener來完成在分配分區時和Relalance時做出相應的處理邏輯異步
4) 要弄清楚的是,咱們在消費的時候,關閉了自動提交,咱們也沒有經過consumer.commitAsync()手動提交咱們的位移信息,而是在每次啓動一個新的consumer的時候,觸發rebalance時,讀取數據庫中的位移信息,從該位移中開始讀取partition的信息(初始化的時候爲0),在沒有出現異常的狀況下,咱們的consumer會不斷從producer讀取信息,這個位移是最新的那個消息位移,並且會同時把這個位移更新到數據庫中,可是,當出現了rebalance時,那麼consumer就會從數據庫中讀取開始的位移。ide
表設計ui
create table kafka_info( topic_group_partition varchar(32) primary key, //主題+組名+分區號 這裏冗餘設計方便經過這個主鍵進行更新提高效率 topic_group varchar(30), //主題和組名 partition_num tinyint,//分區號 offsets bigint default 0 //offset信息 );
代碼
/** * @Description: 實現Kafka的精確一次消費 * @author: HuangYn * @date: 2019/10/15 21:10 */ public class ExactlyOnceConsume { private final KafkaConsumer<String, String> consumer; private Map<TopicPartition, Long> tpOffsetMap; private List<ConsumerRecord> list; private JDBCHelper jdbcHelper = JDBCHelper.getInstance(); private String groupId; private String topic; public ExactlyOnceConsume(Properties props, String topic, String groupId) { this.consumer = KafkaFactory.buildConsumer(props); this.list = new ArrayList<>(100); this.tpOffsetMap = new HashMap<>(); this.groupId = groupId; this.topic = topic; this.consumer.subscribe(Arrays.asList(this.topic), new HandleRebalance()); } public void receiveMsg() { try { while (true) { ConsumerRecords<String, String> records = consumer.poll(1000); if (!records.isEmpty()) { // 處理每一個partition的記錄 records.partitions().forEach(tp -> { List<ConsumerRecord<String, String>> tpRecords = records.records(tp); // 記錄加到緩存中 tpRecords.forEach(record -> { System.out.println("partition=" + record.partition() + ", offset= " + record.offset() + ", value=" + record.value()); list.add(record); }); // 將partition對應的offset加到map中, 獲取partition中最後一個元素的offset, // +1 就是下一次讀取的位移,就是本次須要提交的位移 tpOffsetMap.put(tp, tpRecords.get(tpRecords.size() - 1).offset() + 1); }); } // 緩存中有數據 if (!list.isEmpty()) { // 將數據插入數據庫,而且將位移信息也插入數據庫 // 所以,每次讀取到數據,都要更新本consumer在數據庫中的位移信息 boolean success = insertIntoDB(list, tpOffsetMap); if (success) { list.clear(); tpOffsetMap.clear(); } } } } catch (Exception e) { e.printStackTrace(); } finally { consumer.close(); } } private boolean insertIntoDB(List<ConsumerRecord> list, Map<TopicPartition, Long> tpOffsetMap) { // 這裏應該是在同一個事務中進行的 // 爲了方便就省略了 try { // TODO 將數據入庫,這裏省略了 // 將partition位移更新 String sql = "UPDATE kafka_info SET offsets = ? WHERE topic_group_partition = ?"; List<Object[]> params = new ArrayList<>(tpOffsetMap.size()); tpOffsetMap.forEach((tp, offset) -> { Object[] param = new Object[]{offset, topic + "_" + groupId + "_" + tp.partition()}; params.add(param); }); jdbcHelper.batchExecute(sql, params); return true; } catch (Exception e) { // 回滾事務 } } /** * rebalance觸發的處理器 */ private class HandleRebalance implements ConsumerRebalanceListener { // rebalance以前觸發 @Override public void onPartitionsRevoked(Collection<TopicPartition> partitions) { //發生Rebalance時,只須要將list中數據和記錄offset信息清空便可 //這裏爲何要清除數據,應爲在Rebalance的時候有可能還有一批緩存數據在內存中沒有進行入庫, //而且offset信息也沒有更新,若是不清除,那麼下一次還會從新poll一次這些數據,將會致使數據重複 System.out.println("==== onPartitionsRevoked ===== "); list.clear(); tpOffsetMap.clear(); } // rebalance後調用,consumer抓取數據以前觸發 @Override public void onPartitionsAssigned(Collection<TopicPartition> partitions) { System.out.println("== onPartitionsAssigned =="); List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic); // 從數據庫讀取當前partition的信息 Map<TopicPartition, Long> partitionOffsetMapFromDB = getPartitionOffsetMapFromDB(partitionInfos.size()); // 在分配分區時指定消費位置 for (TopicPartition partition : partitions) { // 指定consumer在每一個partition上的消費開始位置 // 若是在數據庫中有對應partition的信息則使用,不然將默認從offset=0開始消費 if (partitionOffsetMapFromDB.get(partition) != null) { consumer.seek(partition, partitionOffsetMapFromDB.get(partition)); } else { consumer.seek(partition, 0L); } } } } /** * 從數據庫讀取offset信息 * * @param size * @return */ private Map<TopicPartition, Long> getPartitionOffsetMapFromDB(int size) { Map<TopicPartition, Long> partitionOffsetMapFromDB = new HashMap<>(); String sql = "SELECT partition_num, offsets FROM kafka_info WHERE topic_group = ?"; jdbcHelper.executeQuery(sql, new Object[]{topic + "_" + groupId}, resultSet -> { int partition_num = -1; long offsets = -1; while (resultSet.next()) { partition_num = resultSet.getInt("partition_num"); offsets = resultSet.getLong("offsets"); System.out.println("partition_num=" + partition_num + ", offset=" + offsets); partitionOffsetMapFromDB.put(new TopicPartition(topic, partition_num), offsets); } System.out.println("partitionOffsetMapFromDB.size = " + partitionOffsetMapFromDB.size()); //判斷數據庫是否存在全部的分區的信息,若是沒有,則須要進行初始化 if (partitionOffsetMapFromDB.size() < size) { String insert = "INSERT INTO kafka_info (topic_group_partition,topic_group,partition_num) VALUES(?,?,?)"; List<Object[]> params = new ArrayList<>(); for (int p_num = 0; p_num < size; p_num++) { Object[] param = new Object[]{ topic + "_" + groupId + "_" + p_num, topic + "_" + groupId, p_num }; params.add(param); } jdbcHelper.batchExecute(insert, params); } }); try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } return partitionOffsetMapFromDB; } }
數據庫中記錄