《Kafka筆記》三、Kafka高級API

1 Kafka高級API特性

1.1 Offset的自動控制

1.1.1 消費者offset初始策略

通常來講每一個消費者消費以後,都會把本身消費到分區的位置(也就是offset提交給Kafka集羣),可是對於沒有消費過該分區的消費者,他以前並未提交給集羣自身偏移量的信息。html

Kafka消費者默認對於未訂閱的topic的offset的時候,也就是系統並無存儲該消費者的消費分區的記錄信息(offset),默認Kafka消費者的默認首次消費策略:latest。面試

配置項爲:auto.offset.reset=latestapache

能夠在官方文檔,找到對於各個配置項的解釋,例如 http://kafka.apache.org/20/documentation.html#brokerconfigs 能夠找到auto.offset.reset配置項。api

  • earliest - 自動將偏移量重置爲最先的偏移量服務器

  • latest - 自動將偏移量重置爲最新的偏移量網絡

  • none - 若是未找到消費者組的先前偏移量,則向消費者拋出異常dom

消費者的配置中增長凸顯默認配置,latest能夠換成earliest:異步

// 默認配置,若是系統中沒有該消費組的偏移量,該消費者組讀取最新的偏移量
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"latest");

// 配置earliest,若是集羣沒有該消費者組的偏移量,系統會讀取該分區最先的偏移量開始消費
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");

1.1.2 消費者offset自動提交策略

Kafka消費者在消費數據的時候默認會按期的提交消費的偏移量,這樣就能夠保證全部的消息至少能夠被消費者消費1次,用戶能夠經過如下兩個參數配置:分佈式

enable.auto.commit = true 默認ide

auto.commit.interval.ms = 5000 默認

若是用戶須要本身管理offset的自動提交,能夠關閉offset的自動提交,手動管理offset提交的偏移量,注意用戶提交的offset偏移量永遠都要比本次消費的偏移量+1,由於提交的offset是kafka消費者下一次抓取數據的位置。

// 消費者自動提交開啓
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);
// 配置offset自動提交時間間隔,10秒自動提交offset
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,10000);

自定義偏移量提交策略,先關閉偏移量自定提交配置後,每次消費完,提交偏移量信息給集羣:

public class KafkaConsumerDemo_02 {
    public static void main(String[] args) {
        //1.建立Kafka連接參數
        Properties props=new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"CentOSA:9092,CentOSB:9092,CentOSC:9092");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
        props.put(ConsumerConfig.GROUP_ID_CONFIG,"group01");
        // 關閉offset自動提交
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);

        //2.建立Topic消費者
        KafkaConsumer<String,String> consumer=new KafkaConsumer<String, String>(props);
        //3.訂閱topic開頭的消息隊列
        consumer.subscribe(Pattern.compile("^topic.*$"));

        while (true){
            ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));
            Iterator<ConsumerRecord<String, String>> recordIterator = consumerRecords.iterator();
            while (recordIterator.hasNext()){
                ConsumerRecord<String, String> record = recordIterator.next();
                String key = record.key();
                String value = record.value();
                long offset = record.offset();
                int partition = record.partition();
                
                // offset維護的Map
                Map<TopicPartition, OffsetAndMetadata> offsets=new HashMap<TopicPartition, OffsetAndMetadata>();

                // 本身維護offset,每次提交當前信息的offset加1
                offsets.put(new TopicPartition(record.topic(),partition),new OffsetAndMetadata(offset + 1));
                // 異步提交偏移量給集羣,且回調打印
                consumer.commitAsync(offsets, new OffsetCommitCallback() {
                    @Override
                    public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
                        System.out.println("完成:"+offset+"提交!");
                    }
                });
                System.out.println("key:"+key+",value:"+value+",partition:"+partition+",offset:"+offset);

            }
        }
    }
}

1.2 Acks & Retries(應答和重試)

Kafka生產者在發送完一個的消息以後,要求Leader所在的Broker在規定的時間Ack應答,若是沒有在規定時間內應答,Kafka生產者會嘗試n次從新發送消息(超時重傳)。目的是確保咱們的消息,必定要發送的隊列中去

acks=1 默認

面試常問,kafka爲何存在數據的寫入丟失?其中一種狀況爲下面的第一點

一、acks=1表示:Leader會將Record寫到其本地日誌中,但會在不等待全部Follower的徹底確認的狀況下作出響應。在這種狀況下,若是Leader在確認記錄後當即失敗,但在Follower複製記錄以前失敗,則記錄將丟失,經常使用在不重要的日誌收集時

二、acks=0表示:生產者根本不會等待服務器的任何確認。該記錄將當即添加到網絡套接字緩衝區中並視爲已發送。在這種狀況下,不能保證服務器已收到記錄。這種狀況是不可靠的,可是性能高

三、acks=all表示:這意味着Leader將等待全套同步副本確認記錄。這保證了只要至少一個同步副本仍處於活動狀態,記錄就不會丟失。這是最有力的保證。這等效於acks = -1設置。用在一些比較重要的系統,不容許丟數據

若是生產者在規定的時間內,並無獲得Kafka的Leader的Ack應答,Kafka能夠開啓reties機制。

request.timeout.ms = 30000 默認(30s沒有收到leader的ack則重試)

retries = 2147483647 默認(重試次數爲Max_Value,默認一直重試)

超時重試

public class KafkaProducerDemo_01{
    public static void main(String[] args) {
        //1.建立連接參數
        Properties props=new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"CentOSA:9092,CentOSB:9092,CentOSC:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
        props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,UserDefineProducerInterceptor.class.getName());
        props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG,1);
        props.put(ProducerConfig.ACKS_CONFIG,"-1");
        props.put(ProducerConfig.RETRIES_CONFIG,10);

        //2.建立生產者
        KafkaProducer<String,String> producer=new KafkaProducer<String, String>(props);

        //3.封帳消息隊列
        for(Integer i=0;i< 1;i++){
            ProducerRecord<String, String> record = new ProducerRecord<>("topic01", "key" + i, "value" + i);
            producer.send(record);
        }

        producer.close();
    }
}

能夠經過生產者自定義配置重複發送的次數:

// 不包括第一次發送,若是嘗試發送三次,失敗,則系統放棄發送
props.put(ProducerConfig.RETRIES_CONFIG, 3);
public class KafkaProducerDemo_02 {
    public static void main(String[] args) {
        //1.建立連接參數
        Properties props=new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"CentOSA:9092,CentOSB:9092,CentOSC:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
        props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,UserDefineProducerInterceptor.class.getName());
        props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG,1);
        props.put(ProducerConfig.ACKS_CONFIG,"-1");
        props.put(ProducerConfig.RETRIES_CONFIG,3);
        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true);

        //2.建立生產者
        KafkaProducer<String,String> producer=new KafkaProducer<String, String>(props);

        //3.封帳消息隊列
        for(Integer i=0;i< 1;i++){
            ProducerRecord<String, String> record = new ProducerRecord<>("topic01", "key" + i, "value" + i);
            producer.send(record);
        }

        producer.close();
    }
}

總結:應答和重試機制,能夠盡最大可能保證咱們把數據發送到Kafka集羣。但也會伴隨着一些問題,好比重複數據的產生。在一些訂單業務場景中,好比用戶下訂單的記錄,是絕對不能出現重複數據的。怎麼保證?Kafka提供了冪等和事務機制,來解決重複數據的問題

1.3 Kafka冪等寫機制

1.3.1 Kafka冪等概念

一、HTTP/1.1中對冪等性的定義是:一次和屢次請求某一個資源對於資源自己應該具備一樣的結果(網絡超時等問題除外)。也就是說,其任意屢次執行對資源自己所產生的影響均與一次執行的影響相同。

二、Kafka在0.11.0.0版本支持增長了對冪等的支持。冪等是針對生產者角度的特性。冪等能夠保證生產者發送的消息,不會丟失(底層retries重試機制支撐),並且不會重複(冪等去重機制保證)。實現冪等的關鍵點就是服務端能夠區分請求是否重複,過濾掉重複的請求。要區分請求是否重複的有兩點:

  • 惟一標識:要想區分請求是否重複,請求中就得有惟一標識。例如支付請求中,訂單號就是惟一標識
  • 記錄下已處理過的請求標識:光有惟一標識還不夠,還須要記錄下那些請求是已經處理過的,這樣當收到新的請求時,用新請求中的標識和處理記錄進行比較,若是處理記錄中有相同的標識,說明是重複記錄,拒絕掉。

1.3.2 Kafka冪等實現策略

一、冪等又稱爲exactly once(精準一次)。要中止屢次處理消息,必須僅將其持久化到Kafka Topic中僅僅一次。在初始化期間,kafka會給生產者生成一個惟一的ID稱爲Producer ID或PID。

二、PID和序列號與消息捆綁在一塊兒,而後發送給Broker。因爲序列號從零開始而且單調遞增,所以,僅當消息的序列號比該PID / TopicPartition對中最後提交的消息正好大1時,Broker纔會接受該消息。若是不是這種狀況,則Broker認定是生產者從新發送該消息。

三、對應配置項:enable.idempotence= false 默認關閉,開啓設置爲true

四、注意:在使用冪等性的時候,要求必須開啓retries=true和acks=all(保證不丟)

五、max.in.flight.requests.per.connection配置項默認是5,若是咱們要保證嚴格有序,咱們能夠設置爲1。該配置項表達的意思爲:在發生阻塞以前,客戶端的一個鏈接上容許出現未確認請求的最大數量。

Tips: 精準一次的概念嚐嚐出如今流式處理中

冪等機制

代碼配置實現:

public class KafkaProducerDemo_02 {
    public static void main(String[] args) {
        //1.建立連接參數
        Properties props=new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"CentOSA:9092,CentOSB:9092,CentOSC:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
        props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,UserDefineProducerInterceptor.class.getName());
        // 將檢測超時時間設置爲1ms,方便觸發看到重試機制
        props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG,1);
        // ACKS要設置爲all
        props.put(ProducerConfig.ACKS_CONFIG,"all");
        // 重試3次
        props.put(ProducerConfig.RETRIES_CONFIG,3);
        // 開啓冪等
        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true);
        
        // 開啓冪等,客戶端的一個鏈接上容許出現未確認請求的最大數量要大於1小於5。設置爲1能夠保證順序。
        // 若是有一個發送不成功,就阻塞,一直等待發送成功爲止
        props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION,1);

        //2.建立生產者
        KafkaProducer<String,String> producer=new KafkaProducer<String, String>(props);


        ProducerRecord<String, String> record = new ProducerRecord<>("topic01", "idempotence", "test idempotence");
        producer.send(record);
   
        producer.close();
    }
}

總結:開啓冪等會保證不會重複傳送消息到消息隊列,客戶端的一個鏈接上容許出現未確認請求的最大數量設置爲1的話,能夠保證順序不會亂。要須要使用冪等功能,kafka的版本須要保證在0.11.0.0版本以上

以上的保證順序,保證惟一,只是針對一個分區而言,若是kafka有多個分區,那麼就須要用Kafka的事務來控制原子性,事務能控制不重複,但沒法控制多分區全局有序

1.4 Kafka的事務控制

一、Kafka的冪等性,只能保證一條記錄的在分區發送的原子性,可是若是要保證多條記錄(多分區不重複,但多分區沒法有序,參照第一章)之間的完整性,這個時候就須要開啓kafk的事務操做。事務通常是把消費者和生產者綁定,中間業務系統對下游Kafka的生產失敗了,中間業務系統消費過上游的Kafka偏移量不提交

二、在Kafka0.11.0.0除了引入的冪等性的概念,同時也引入了事務的概念。一般Kafka的事務分爲 生產者事務Only、消費者&生產者事務。通常來講默認消費者消費的消息的級別是read_uncommited數據,這有可能讀取到事務失敗的數據,全部在開啓生產者事務以後,須要用戶設置消費者的事務隔離級別。

三、默認配置項爲:isolation.level = read_uncommitted

四、該選項有兩個值read_committed|read_uncommitted,若是開始事務控制,消費端必須將事務的隔離級別設置爲read_committed,可以保證在回滾後清除kafka中存儲的該條發送信息

五、開啓的生產者事務的時候,只須要指定transactional.id屬性便可,一旦開啓了事務,默認生產者就已經開啓了冪等性。可是要求"transactional.id"的取值必須是惟一的,同一時刻只能有一個"transactional.id"存儲在,其餘的將會被關閉。

1.4.1 生產者事務only使用場景

一、生產者

public class KafkaProducerDemo02 {
    public static void main(String[] args) {

        //1.生產者&消費者的配置項
        KafkaProducer<String,String> producer=buildKafkaProducer();

        producer.initTransactions();//一、初始化事務

        try{
            while(true){
           
                //二、開啓事務控制
                producer.beginTransaction();
                for(i=0; i<10; i++) {
                    if(i == 8) {
                        // 異常
                        int j = 10/0;
                    }
                    //建立Record
                    ProducerRecord<String,String> producerRecord=
                    new ProducerRecord<String,String>("topic01","transation","error......");
                    
                    producer.send(producerRecord);
                    // 事務終止前,把以前數據刷入kafka隊列
                    ptoducer.flush();
                }
                //三、提交事務
                producer.sendOffsetsToTransaction(offsets,"group01");
                producer.commitTransaction();
            }
        }catch (Exception e){
            producer.abortTransaction();//四、終止事務
        }finally {
            producer.close();
        }
    }
    
    // 生產者在生產環境的一些常規配置
    public static KafkaProducer<String,String> buildKafkaProducer(){
        Properties props=new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"CentOSA:9092,CentOSB:9092,CentOSC:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
        // 必須配置事務id,且惟一
        props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"transaction-id" + UUID.randomUUID().toString());
        // 配置批處理大小,達到1024字節須要提交
        props.put(ProducerConfig.BATCH_SIZE_CONFIG,1024);
        // 當沒達到1024字節,可是時間達到了5ms,也須要提交給集羣的topic
        props.put(ProducerConfig.LINGER_MS_CONFIG,5);
        // 配置冪等,和重試
        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true);
        // ack
        props.put(ProducerConfig.ACKS_CONFIG,"all");
        // 請求超時重發時間20ms
        props.put(ProducerConfig.REQUERT_TIMEOUT_MS_CONFIG,20000);
        return new KafkaProducer<String, String>(props);
    }
    
    public static KafkaConsumer<String,String> buildKafkaConsumer(String group){
        Properties props=new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"CentOSA:9092,CentOSB:9092,CentOSC:9092");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
        props.put(ConsumerConfig.GROUP_ID_CONFIG,group);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
        props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG,"read_committed");

        return new KafkaConsumer<String, String>(props);
    }
}

二、消費者

public class KafkaConsumerDemo {
    public static void main(String[] args) {
        //1.建立Kafka連接參數
        Properties props=new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"CentOSA:9092,CentOSB:9092,CentOSC:9092");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
        // 消費者所屬的消費組
        props.put(ConsumerConfig.GROUP_ID_CONFIG,"group01");
        // 設置消費者消費事務的隔離級別read_committed,消費者不可能讀到未提交的數據
        props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG,"read_committed");

        //2.建立Topic消費者
        KafkaConsumer<String,String> consumer=new KafkaConsumer<String, String>(props);
        //3.訂閱topic開頭的消息隊列
        consumer.subscribe(Pattern.compile("topic01"));

        while (true){
            ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));
            Iterator<ConsumerRecord<String, String>> recordIterator = consumerRecords.iterator();
            while (recordIterator.hasNext()){
                ConsumerRecord<String, String> record = recordIterator.next();
                String key = record.key();
                String value = record.value();
                long offset = record.offset();
                int partition = record.partition();
                System.out.println("key:"+key+",value:"+value+",partition:"+partition+",offset:"+offset);
            }
        }
    }
}

1.4.1 生產者消費者事務

public class KafkaProducerDemo02 {
    public static void main(String[] args) {

        //1.生產者&消費者
        KafkaProducer<String,String> producer=buildKafkaProducer();
        KafkaConsumer<String, String> consumer = buildKafkaConsumer("group01");
        
        // 消費者先訂閱消費topic數據
        consumer.subscribe(Arrays.asList("topic01"));
        producer.initTransactions();//初始化事務

        try{
            while(true){
                // 消費者1秒鐘拉取一次數據
                ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));
                // 獲取的消息迭代
                Iterator<ConsumerRecord<String, String>> consumerRecordIterator = consumerRecords.iterator();
                //開啓事務控制
                producer.beginTransaction();
                // 維護偏移量
                Map<TopicPartition, OffsetAndMetadata> offsets=new HashMap<TopicPartition, OffsetAndMetadata>();
                // 消費者讀取到消息後進行業務處理
                while (consumerRecordIterator.hasNext()){
                    ConsumerRecord<String, String> record = consumerRecordIterator.next();
                    //業務處理,這裏建立Record,經過消費topic01的消息記錄,發送到topic02
                    ProducerRecord<String,String> producerRecord=
                    new ProducerRecord<String,String>("topic02",record.key(),record.value()+"to topic02");
                    producer.send(producerRecord);
                    //記錄元數據下次須要提交的便宜量
                    offsets.put(new TopicPartition(record.topic(),record.partition()),
                    new OffsetAndMetadata(record.offset()+1));
                }
                //提交事務,先提交消費者的偏移量,須要指定消費者組
                producer.sendOffsetsToTransaction(offsets,"group01");
                // 提交事務,再提交生產者的偏移量
                producer.commitTransaction();
            }
        }catch (Exception e){
            // 消費者端業務處理邏輯出現錯誤,要捕獲回滾
            producer.abortTransaction();//終止事務
        }finally {
            producer.close();
        }
    }
    public static KafkaProducer<String,String> buildKafkaProducer(){
        Properties props=new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"CentOSA:9092,CentOSB:9092,CentOSC:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
        // 必須配置事務id,且惟一
        props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"transaction-id" + UUID.randomUUID().toString());
        // 配置批處理大小,達到1024字節須要提交
        props.put(ProducerConfig.BATCH_SIZE_CONFIG,1024);
        // 當沒達到1024字節,可是時間達到了5ms,也須要提交給集羣的topic
        props.put(ProducerConfig.LINGER_MS_CONFIG,5);
        // 配置冪等,和重試
        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true);
        // ack
        props.put(ProducerConfig.ACKS_CONFIG,"all");
        // 請求超時重發時間20ms
        props.put(ProducerConfig.REQUERT_TIMEOUT_MS_CONFIG,20000);
        return new KafkaProducer<String, String>(props);
    }
    public static KafkaConsumer<String,String> buildKafkaConsumer(String group){
        Properties props=new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"CentOSA:9092,CentOSB:9092,CentOSC:9092");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
        props.put(ConsumerConfig.GROUP_ID_CONFIG,group);
        // 消費者自動提交offset策略,關閉,必須設置
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
        // 設置消費者消費事務的隔離級別read_committed,消費者不可能讀到未提交的數據
        props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG,"read_committed");

        return new KafkaConsumer<String, String>(props);
    }
}

總結:消費端掌握偏移量控制,生產者端掌握超時重傳應答重試和分區冪等。實際的生產開發過程當中,要熟練掌握事務控制,包括生產者only和生產者&消費者事務控制。kafka事務在分佈式微服務的開發中,有比較強的應用。

相關文章
相關標籤/搜索