KafkaConsumer實現精確一次消費

轉自 https://blog.csdn.net/qq_18581221/article/details/89766073node

簡介

在使用kafka時,大多數場景對於數據少許的不一致(重複或者丟失)並不關注,好比日誌,由於不會影響最終的使用或者分析,可是在某些應用場景(好比業務數據),須要對任何一條消息都要作到精確一次的消費,才能保證系統的正確性,kafka並不提供準確一致的消費API,須要咱們在實際使用時借用外部的一些手段來保證消費的精確性,下面咱們介紹如何實現sql

kafka消費機制

這篇文章KafkaConsumer使用介紹、參數配置介紹瞭如何kafka具備兩種提交offset(消費偏移量)方式,咱們在Kafka簡介以及安裝和使用可知每一個分區具有一offset記錄消費位置,若是消費者一直處於正常的運行轉態,那麼offset將沒有什麼用處,由於正常消費時,consumer記錄了本次消費的offset和下一次將要進行poll數據的offset起始位置,可是若是消費者發生崩潰或者有新的消費者加入消費者組,就會觸發再均衡Rebalance,Rebalance以後,每一個消費者將會分配到新的分區,而消費者對於新的分區應該從哪裏進行起始消費,這時候提交的offset信息就起做用了,提交的offset信息包括消費者組全部分區的消費進度,這時候消費者能夠根據消費進度繼續消費,提交offset提交自動提交是最不具肯定性的,因此要使用手動提交來控制offset數據庫

消費時出現幾種異常狀況

自動提交

  • 重複消費:當數據已經被處理,而後自動提交offset時消費者出現故障或者有新消費者加入組致使再均衡,這時候offset提交失敗,致使這批已經處理的數據的信息沒有記錄,後續會重複消費一次
  • 丟失數據:若是業務處理時間較長一點,這時候數據處理業務還未完成,offset信息已經提交了,可是在後續處理數據過程當中程序發生了崩潰,致使這批數據未正常消費,這時候offset已經提交,消費者後續將不在消費這批數據,致使這批數據將會丟失

手動提交

  • 重複消費(最少一次消費語義實現):消費數據處理業務完成後進行offset提交,能夠保證數據最少一次消費,由於在提交offset的過程當中可能出現提交失敗的狀況,致使數據重複消費
/**
 * 手動提交offset
 * 實現至少一次的消費語義 at least once
 * 當手動提交位移失敗,會重複消費數據
 */
@Test
public void testCommitOffset() {
    String topic = "first-topic";
    String group = "g1";

    Properties props = new Properties();
    props.put("bootstrap.servers", "node00:9092,node03:9092");   //required
    props.put("group.id", group);   //required
    props.put("enable.auto.commit", "false"); // 關閉自動提交
    props.put("auto.commit.interval.ms", "1000");
    props.put("auto.offset.reset", "latest");     //從最先的消息開始讀取
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");  //required
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); //required

    Consumer<String, String> consumer = new KafkaConsumer<>(props);
    consumer.subscribe(Arrays.asList(topic));       //訂閱topic
    final int minBatchSize = 10;
    // 緩存
    List<ConsumerRecord<String, String>> buffer = new ArrayList<>(minBatchSize);
    try {
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(1000);
            records.forEach(buffer::add);

            // 緩存滿了纔對數據進行處理
            if (buffer.size() >= minBatchSize) {

                // 業務邏輯--插入數據庫
                // insertIntoDb(buffer);
                // 等數據插入數據庫以後,再異步提交位移

                // 經過異步的方式提交位移
                consumer.commitAsync(((offsets, exception) -> {
                    if (exception == null) {
                        offsets.forEach((topicPartition, metadata) -> {
                            System.out.println(topicPartition + " -> offset=" + metadata.offset());
                        });
                    } else {
                        exception.printStackTrace();
                        // 若是出錯了,同步提交位移
                        consumer.commitSync(offsets);
                    }
                }));

               
                // 若是提交位移失敗了,那麼重啓consumer後會重複消費以前的數據,再次插入到數據庫中
                // 清空緩衝區
                buffer.clear();
            }
        }
    } finally {
        consumer.close();
    }
}

 

 

  • 丟失數據(最多一次消費語義實現):在消費數據業務處理前進行offset提交,能夠保證最多一次消費,在後續數據業務處理程序出現故障,將致使數據丟失

代碼實現apache

/**
 * 實現最多一次語義
 * 在消費前提交位移,當後續業務出現異常時,可能丟失數據
 */
@Test
public void testAtMostOnce() {
    Properties props = new Properties();
    props.put("enable.auto.commit", "false");
    KafkaConsumer<String, String> kafkaConsumer = KafkaFactory.buildConsumer(props);
    kafkaConsumer.subscribe(Arrays.asList("first-topic"));
    try {

        while (true) {
            ConsumerRecords<String, String> records = kafkaConsumer.poll(500);
            // 處理業務以前就提交位移
            kafkaConsumer.commitAsync();
            // 下面是業務邏輯
            records.forEach(record -> {
                System.out.println(record.value() + ", offset=" + record.offset());
            });
        }
    } catch (Exception e) {

    } finally {
        kafkaConsumer.close();
    }

}

精確一次消費實現

從kafka的消費機制,咱們能夠獲得是否可以精確的消費關鍵在消費進度信息的準確性,若是可以保證消費進度的準確性,也就保證了消費數據的準確性bootstrap

  • 數據有狀態:能夠根據數據信息進行確認數據是否重複消費,這時候可使用手動提交的最少一次消費語義實現,即便消費的數據有重複,能夠經過狀態進行數據去重,以達到冪等的效果
  • 存儲數據容器具有冪等性:在數據存入的容器具有自然的冪等(好比ElasticSearch的put操做具有冪等性,相同的數據屢次執行Put操做和一次執行Put操做的結果是一致的),這樣的場景也可使用手動提交的最少一次消費語義實現,由存儲數據端來進行數據去重
  • 數據無狀態,而且存儲容器不具有冪等:這種場景須要自行控制offset的準確性,今天文章主要說明這種場景下的處理方式,這裏數據不具有狀態,存儲使用關係型數據庫,好比MySQL

這裏簡單說明一下實現思路
1) 利用consumer api的seek方法能夠指定offset進行消費,在啓動消費者時查詢數據庫中記錄的offset信息,若是是第一次啓動,那麼數據庫中將沒有offset信息,須要進行消費的元數據插入,而後從offset=0開始消費api

2) 關係型數據庫具有事務的特性,當數據入庫時,同時也將offset信息更新,借用關係型數據庫事務的特性保證數據入庫和修改offset記錄這兩個操做是在同一個事務中進行緩存

3) 使用ConsumerRebalanceListener來完成在分配分區時和Relalance時做出相應的處理邏輯異步

4) 要弄清楚的是,咱們在消費的時候,關閉了自動提交,咱們也沒有經過consumer.commitAsync()手動提交咱們的位移信息,而是在每次啓動一個新的consumer的時候,觸發rebalance時,讀取數據庫中的位移信息,從該位移中開始讀取partition的信息(初始化的時候爲0),在沒有出現異常的狀況下,咱們的consumer會不斷從producer讀取信息,這個位移是最新的那個消息位移,並且會同時把這個位移更新到數據庫中,可是,當出現了rebalance時,那麼consumer就會從數據庫中讀取開始的位移。ide

表設計ui

create table kafka_info(
    topic_group_partition varchar(32) primary key, //主題+組名+分區號 這裏冗餘設計方便經過這個主鍵進行更新提高效率 
    topic_group varchar(30), //主題和組名
    partition_num tinyint,//分區號
    offsets bigint default 0 //offset信息
);

 

代碼

/**
 * @Description: 實現Kafka的精確一次消費
 * @author: HuangYn
 * @date: 2019/10/15 21:10
 */
public class ExactlyOnceConsume {

    private final KafkaConsumer<String, String> consumer;
    private Map<TopicPartition, Long> tpOffsetMap;
    private List<ConsumerRecord> list;
    private JDBCHelper jdbcHelper = JDBCHelper.getInstance();
    private String groupId;
    private String topic;

    public ExactlyOnceConsume(Properties props, String topic, String groupId) {
        this.consumer = KafkaFactory.buildConsumer(props);
        this.list = new ArrayList<>(100);
        this.tpOffsetMap = new HashMap<>();
        this.groupId = groupId;
        this.topic = topic;
        this.consumer.subscribe(Arrays.asList(this.topic), new HandleRebalance());
    }

    public void receiveMsg() {
        try {

            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(1000);
                if (!records.isEmpty()) {
                    // 處理每一個partition的記錄
                    records.partitions().forEach(tp -> {
                        List<ConsumerRecord<String, String>> tpRecords = records.records(tp);
                        // 記錄加到緩存中
                        tpRecords.forEach(record -> {
                            System.out.println("partition=" + record.partition() +
                                    ", offset= " + record.offset() +
                                    ", value=" + record.value());
                            list.add(record);
                        });
                        // 將partition對應的offset加到map中, 獲取partition中最後一個元素的offset,
                        // +1 就是下一次讀取的位移,就是本次須要提交的位移
                        tpOffsetMap.put(tp, tpRecords.get(tpRecords.size() - 1).offset() + 1);
                    });
                }
                // 緩存中有數據
                if (!list.isEmpty()) {
                    // 將數據插入數據庫,而且將位移信息也插入數據庫
                    // 所以,每次讀取到數據,都要更新本consumer在數據庫中的位移信息
                    boolean success = insertIntoDB(list, tpOffsetMap);
                    if (success) {
                        list.clear();
                        tpOffsetMap.clear();
                    }
                }
            }

        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            consumer.close();
        }
    }

    private boolean insertIntoDB(List<ConsumerRecord> list,
                                 Map<TopicPartition, Long> tpOffsetMap) {

        // 這裏應該是在同一個事務中進行的
        // 爲了方便就省略了

        try {
            // TODO 將數據入庫,這裏省略了

            // 將partition位移更新
            String sql = "UPDATE kafka_info SET offsets = ? WHERE topic_group_partition = ?";
            List<Object[]> params = new ArrayList<>(tpOffsetMap.size());
            tpOffsetMap.forEach((tp, offset) -> {
                Object[] param = new Object[]{offset, topic + "_" + groupId + "_" + tp.partition()};
                params.add(param);
            });
            jdbcHelper.batchExecute(sql, params);
            return true;
        } catch (Exception e) {
            // 回滾事務
        }
    }

    /**
     * rebalance觸發的處理器
     */
    private class HandleRebalance implements ConsumerRebalanceListener {

        // rebalance以前觸發
        @Override
        public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
            //發生Rebalance時,只須要將list中數據和記錄offset信息清空便可
            //這裏爲何要清除數據,應爲在Rebalance的時候有可能還有一批緩存數據在內存中沒有進行入庫,
            //而且offset信息也沒有更新,若是不清除,那麼下一次還會從新poll一次這些數據,將會致使數據重複
            System.out.println("==== onPartitionsRevoked ===== ");
            list.clear();
            tpOffsetMap.clear();
        }

        // rebalance後調用,consumer抓取數據以前觸發
        @Override
        public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
            System.out.println("== onPartitionsAssigned ==");

            List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic);
            // 從數據庫讀取當前partition的信息
            Map<TopicPartition, Long> partitionOffsetMapFromDB = getPartitionOffsetMapFromDB(partitionInfos.size());

            // 在分配分區時指定消費位置
            for (TopicPartition partition : partitions) {
                // 指定consumer在每一個partition上的消費開始位置
                // 若是在數據庫中有對應partition的信息則使用,不然將默認從offset=0開始消費
                if (partitionOffsetMapFromDB.get(partition) != null) {
                    consumer.seek(partition, partitionOffsetMapFromDB.get(partition));
                } else {
                    consumer.seek(partition, 0L);
                }
            }
        }
    }

    /**
     * 從數據庫讀取offset信息
     *
     * @param size
     * @return
     */
    private Map<TopicPartition, Long> getPartitionOffsetMapFromDB(int size) {
        Map<TopicPartition, Long> partitionOffsetMapFromDB = new HashMap<>();
        String sql = "SELECT partition_num, offsets FROM kafka_info WHERE topic_group = ?";
        jdbcHelper.executeQuery(sql, new Object[]{topic + "_" + groupId}, resultSet -> {

            int partition_num = -1;
            long offsets = -1;
            while (resultSet.next()) {
                partition_num = resultSet.getInt("partition_num");
                offsets = resultSet.getLong("offsets");
                System.out.println("partition_num=" + partition_num + ", offset=" + offsets);
                partitionOffsetMapFromDB.put(new TopicPartition(topic, partition_num), offsets);
            }

            System.out.println("partitionOffsetMapFromDB.size = " + partitionOffsetMapFromDB.size());

            //判斷數據庫是否存在全部的分區的信息,若是沒有,則須要進行初始化
            if (partitionOffsetMapFromDB.size() < size) {
                String insert = "INSERT INTO kafka_info (topic_group_partition,topic_group,partition_num) VALUES(?,?,?)";
                List<Object[]> params = new ArrayList<>();
                for (int p_num = 0; p_num < size; p_num++) {
                    Object[] param = new Object[]{
                            topic + "_" + groupId + "_" + p_num,
                            topic + "_" + groupId,
                            p_num
                    };
                    params.add(param);
                }
                jdbcHelper.batchExecute(insert, params);
            }

        });
        try {
            Thread.sleep(100);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return partitionOffsetMapFromDB;
    }

}

 

 數據庫中記錄

相關文章
相關標籤/搜索