通常來講每一個消費者消費以後,都會把本身消費到分區的位置(也就是offset提交給Kafka集羣),可是對於沒有消費過該分區的消費者,他以前並未提交給集羣自身偏移量的信息。html
Kafka消費者默認對於未訂閱的topic的offset的時候,也就是系統並無存儲該消費者的消費分區的記錄信息(offset),默認Kafka消費者的默認首次消費策略:latest。面試
配置項爲:auto.offset.reset=latestapache
能夠在官方文檔,找到對於各個配置項的解釋,例如 http://kafka.apache.org/20/documentation.html#brokerconfigs 能夠找到auto.offset.reset配置項。api
earliest - 自動將偏移量重置爲最先的偏移量服務器
latest - 自動將偏移量重置爲最新的偏移量網絡
none - 若是未找到消費者組的先前偏移量,則向消費者拋出異常dom
消費者的配置中增長凸顯默認配置,latest能夠換成earliest:異步
// 默認配置,若是系統中沒有該消費組的偏移量,該消費者組讀取最新的偏移量 props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"latest"); // 配置earliest,若是集羣沒有該消費者組的偏移量,系統會讀取該分區最先的偏移量開始消費 props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
Kafka消費者在消費數據的時候默認會按期的提交消費的偏移量,這樣就能夠保證全部的消息至少能夠被消費者消費1次,用戶能夠經過如下兩個參數配置:分佈式
enable.auto.commit = true 默認ide
auto.commit.interval.ms = 5000 默認
若是用戶須要本身管理offset的自動提交,能夠關閉offset的自動提交,手動管理offset提交的偏移量,注意用戶提交的offset偏移量永遠都要比本次消費的偏移量+1,由於提交的offset是kafka消費者下一次抓取數據的位置。
// 消費者自動提交開啓 props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true); // 配置offset自動提交時間間隔,10秒自動提交offset props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,10000);
自定義偏移量提交策略,先關閉偏移量自定提交配置後,每次消費完,提交偏移量信息給集羣:
public class KafkaConsumerDemo_02 { public static void main(String[] args) { //1.建立Kafka連接參數 Properties props=new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"CentOSA:9092,CentOSB:9092,CentOSC:9092"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName()); props.put(ConsumerConfig.GROUP_ID_CONFIG,"group01"); // 關閉offset自動提交 props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false); //2.建立Topic消費者 KafkaConsumer<String,String> consumer=new KafkaConsumer<String, String>(props); //3.訂閱topic開頭的消息隊列 consumer.subscribe(Pattern.compile("^topic.*$")); while (true){ ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1)); Iterator<ConsumerRecord<String, String>> recordIterator = consumerRecords.iterator(); while (recordIterator.hasNext()){ ConsumerRecord<String, String> record = recordIterator.next(); String key = record.key(); String value = record.value(); long offset = record.offset(); int partition = record.partition(); // offset維護的Map Map<TopicPartition, OffsetAndMetadata> offsets=new HashMap<TopicPartition, OffsetAndMetadata>(); // 本身維護offset,每次提交當前信息的offset加1 offsets.put(new TopicPartition(record.topic(),partition),new OffsetAndMetadata(offset + 1)); // 異步提交偏移量給集羣,且回調打印 consumer.commitAsync(offsets, new OffsetCommitCallback() { @Override public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) { System.out.println("完成:"+offset+"提交!"); } }); System.out.println("key:"+key+",value:"+value+",partition:"+partition+",offset:"+offset); } } } }
Kafka生產者在發送完一個的消息以後,要求Leader所在的Broker在規定的時間Ack應答,若是沒有在規定時間內應答,Kafka生產者會嘗試n次從新發送消息(超時重傳)。目的是確保咱們的消息,必定要發送的隊列中去
acks=1 默認
面試常問,kafka爲何存在數據的寫入丟失?其中一種狀況爲下面的第一點
一、acks=1表示:Leader會將Record寫到其本地日誌中,但會在不等待全部Follower的徹底確認的狀況下作出響應。在這種狀況下,若是Leader在確認記錄後當即失敗,但在Follower複製記錄以前失敗,則記錄將丟失,經常使用在不重要的日誌收集時。
二、acks=0表示:生產者根本不會等待服務器的任何確認。該記錄將當即添加到網絡套接字緩衝區中並視爲已發送。在這種狀況下,不能保證服務器已收到記錄。這種狀況是不可靠的,可是性能高
三、acks=all表示:這意味着Leader將等待全套同步副本確認記錄。這保證了只要至少一個同步副本仍處於活動狀態,記錄就不會丟失。這是最有力的保證。這等效於acks = -1設置。用在一些比較重要的系統,不容許丟數據。
若是生產者在規定的時間內,並無獲得Kafka的Leader的Ack應答,Kafka能夠開啓reties機制。
request.timeout.ms = 30000 默認(30s沒有收到leader的ack則重試)
retries = 2147483647 默認(重試次數爲Max_Value,默認一直重試)
public class KafkaProducerDemo_01{ public static void main(String[] args) { //1.建立連接參數 Properties props=new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"CentOSA:9092,CentOSB:9092,CentOSC:9092"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName()); props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,UserDefineProducerInterceptor.class.getName()); props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG,1); props.put(ProducerConfig.ACKS_CONFIG,"-1"); props.put(ProducerConfig.RETRIES_CONFIG,10); //2.建立生產者 KafkaProducer<String,String> producer=new KafkaProducer<String, String>(props); //3.封帳消息隊列 for(Integer i=0;i< 1;i++){ ProducerRecord<String, String> record = new ProducerRecord<>("topic01", "key" + i, "value" + i); producer.send(record); } producer.close(); } }
能夠經過生產者自定義配置重複發送的次數:
// 不包括第一次發送,若是嘗試發送三次,失敗,則系統放棄發送 props.put(ProducerConfig.RETRIES_CONFIG, 3);
public class KafkaProducerDemo_02 { public static void main(String[] args) { //1.建立連接參數 Properties props=new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"CentOSA:9092,CentOSB:9092,CentOSC:9092"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName()); props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,UserDefineProducerInterceptor.class.getName()); props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG,1); props.put(ProducerConfig.ACKS_CONFIG,"-1"); props.put(ProducerConfig.RETRIES_CONFIG,3); props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true); //2.建立生產者 KafkaProducer<String,String> producer=new KafkaProducer<String, String>(props); //3.封帳消息隊列 for(Integer i=0;i< 1;i++){ ProducerRecord<String, String> record = new ProducerRecord<>("topic01", "key" + i, "value" + i); producer.send(record); } producer.close(); } }
總結:應答和重試機制,能夠盡最大可能保證咱們把數據發送到Kafka集羣。但也會伴隨着一些問題,好比重複數據的產生。在一些訂單業務場景中,好比用戶下訂單的記錄,是絕對不能出現重複數據的。怎麼保證?Kafka提供了冪等和事務機制,來解決重複數據的問題
一、HTTP/1.1中對冪等性的定義是:一次和屢次請求某一個資源對於資源自己應該具備一樣的結果(網絡超時等問題除外)。也就是說,其任意屢次執行對資源自己所產生的影響均與一次執行的影響相同。
二、Kafka在0.11.0.0版本支持增長了對冪等的支持。冪等是針對生產者角度的特性。冪等能夠保證生產者發送的消息,不會丟失(底層retries重試機制支撐),並且不會重複(冪等去重機制保證)。實現冪等的關鍵點就是服務端能夠區分請求是否重複,過濾掉重複的請求。要區分請求是否重複的有兩點:
一、冪等又稱爲exactly once(精準一次)。要中止屢次處理消息,必須僅將其持久化到Kafka Topic中僅僅一次。在初始化期間,kafka會給生產者生成一個惟一的ID稱爲Producer ID或PID。
二、PID和序列號與消息捆綁在一塊兒,而後發送給Broker。因爲序列號從零開始而且單調遞增,所以,僅當消息的序列號比該PID / TopicPartition對中最後提交的消息正好大1時,Broker纔會接受該消息。若是不是這種狀況,則Broker認定是生產者從新發送該消息。
三、對應配置項:enable.idempotence= false 默認關閉,開啓設置爲true
四、注意:在使用冪等性的時候,要求必須開啓retries=true和acks=all(保證不丟)
五、max.in.flight.requests.per.connection配置項默認是5,若是咱們要保證嚴格有序,咱們能夠設置爲1。該配置項表達的意思爲:在發生阻塞以前,客戶端的一個鏈接上容許出現未確認請求的最大數量。
Tips: 精準一次的概念嚐嚐出如今流式處理中
代碼配置實現:
public class KafkaProducerDemo_02 { public static void main(String[] args) { //1.建立連接參數 Properties props=new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"CentOSA:9092,CentOSB:9092,CentOSC:9092"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName()); props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,UserDefineProducerInterceptor.class.getName()); // 將檢測超時時間設置爲1ms,方便觸發看到重試機制 props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG,1); // ACKS要設置爲all props.put(ProducerConfig.ACKS_CONFIG,"all"); // 重試3次 props.put(ProducerConfig.RETRIES_CONFIG,3); // 開啓冪等 props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true); // 開啓冪等,客戶端的一個鏈接上容許出現未確認請求的最大數量要大於1小於5。設置爲1能夠保證順序。 // 若是有一個發送不成功,就阻塞,一直等待發送成功爲止 props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION,1); //2.建立生產者 KafkaProducer<String,String> producer=new KafkaProducer<String, String>(props); ProducerRecord<String, String> record = new ProducerRecord<>("topic01", "idempotence", "test idempotence"); producer.send(record); producer.close(); } }
總結:開啓冪等會保證不會重複傳送消息到消息隊列,客戶端的一個鏈接上容許出現未確認請求的最大數量設置爲1的話,能夠保證順序不會亂。要須要使用冪等功能,kafka的版本須要保證在0.11.0.0版本以上
以上的保證順序,保證惟一,只是針對一個分區而言,若是kafka有多個分區,那麼就須要用Kafka的事務來控制原子性,事務能控制不重複,但沒法控制多分區全局有序
一、Kafka的冪等性,只能保證一條記錄的在分區發送的原子性,可是若是要保證多條記錄(多分區不重複,但多分區沒法有序,參照第一章)之間的完整性,這個時候就須要開啓kafk的事務操做。事務通常是把消費者和生產者綁定,中間業務系統對下游Kafka的生產失敗了,中間業務系統消費過上游的Kafka偏移量不提交
二、在Kafka0.11.0.0除了引入的冪等性的概念,同時也引入了事務的概念。一般Kafka的事務分爲 生產者事務Only、消費者&生產者事務。通常來講默認消費者消費的消息的級別是read_uncommited數據,這有可能讀取到事務失敗的數據,全部在開啓生產者事務以後,須要用戶設置消費者的事務隔離級別。
三、默認配置項爲:isolation.level = read_uncommitted
四、該選項有兩個值read_committed|read_uncommitted,若是開始事務控制,消費端必須將事務的隔離級別設置爲read_committed,可以保證在回滾後清除kafka中存儲的該條發送信息
五、開啓的生產者事務的時候,只須要指定transactional.id屬性便可,一旦開啓了事務,默認生產者就已經開啓了冪等性。可是要求"transactional.id"的取值必須是惟一的,同一時刻只能有一個"transactional.id"存儲在,其餘的將會被關閉。
一、生產者
public class KafkaProducerDemo02 { public static void main(String[] args) { //1.生產者&消費者的配置項 KafkaProducer<String,String> producer=buildKafkaProducer(); producer.initTransactions();//一、初始化事務 try{ while(true){ //二、開啓事務控制 producer.beginTransaction(); for(i=0; i<10; i++) { if(i == 8) { // 異常 int j = 10/0; } //建立Record ProducerRecord<String,String> producerRecord= new ProducerRecord<String,String>("topic01","transation","error......"); producer.send(producerRecord); // 事務終止前,把以前數據刷入kafka隊列 ptoducer.flush(); } //三、提交事務 producer.sendOffsetsToTransaction(offsets,"group01"); producer.commitTransaction(); } }catch (Exception e){ producer.abortTransaction();//四、終止事務 }finally { producer.close(); } } // 生產者在生產環境的一些常規配置 public static KafkaProducer<String,String> buildKafkaProducer(){ Properties props=new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"CentOSA:9092,CentOSB:9092,CentOSC:9092"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName()); // 必須配置事務id,且惟一 props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"transaction-id" + UUID.randomUUID().toString()); // 配置批處理大小,達到1024字節須要提交 props.put(ProducerConfig.BATCH_SIZE_CONFIG,1024); // 當沒達到1024字節,可是時間達到了5ms,也須要提交給集羣的topic props.put(ProducerConfig.LINGER_MS_CONFIG,5); // 配置冪等,和重試 props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true); // ack props.put(ProducerConfig.ACKS_CONFIG,"all"); // 請求超時重發時間20ms props.put(ProducerConfig.REQUERT_TIMEOUT_MS_CONFIG,20000); return new KafkaProducer<String, String>(props); } public static KafkaConsumer<String,String> buildKafkaConsumer(String group){ Properties props=new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"CentOSA:9092,CentOSB:9092,CentOSC:9092"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName()); props.put(ConsumerConfig.GROUP_ID_CONFIG,group); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false); props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG,"read_committed"); return new KafkaConsumer<String, String>(props); } }
二、消費者
public class KafkaConsumerDemo { public static void main(String[] args) { //1.建立Kafka連接參數 Properties props=new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"CentOSA:9092,CentOSB:9092,CentOSC:9092"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName()); // 消費者所屬的消費組 props.put(ConsumerConfig.GROUP_ID_CONFIG,"group01"); // 設置消費者消費事務的隔離級別read_committed,消費者不可能讀到未提交的數據 props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG,"read_committed"); //2.建立Topic消費者 KafkaConsumer<String,String> consumer=new KafkaConsumer<String, String>(props); //3.訂閱topic開頭的消息隊列 consumer.subscribe(Pattern.compile("topic01")); while (true){ ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1)); Iterator<ConsumerRecord<String, String>> recordIterator = consumerRecords.iterator(); while (recordIterator.hasNext()){ ConsumerRecord<String, String> record = recordIterator.next(); String key = record.key(); String value = record.value(); long offset = record.offset(); int partition = record.partition(); System.out.println("key:"+key+",value:"+value+",partition:"+partition+",offset:"+offset); } } } }
public class KafkaProducerDemo02 { public static void main(String[] args) { //1.生產者&消費者 KafkaProducer<String,String> producer=buildKafkaProducer(); KafkaConsumer<String, String> consumer = buildKafkaConsumer("group01"); // 消費者先訂閱消費topic數據 consumer.subscribe(Arrays.asList("topic01")); producer.initTransactions();//初始化事務 try{ while(true){ // 消費者1秒鐘拉取一次數據 ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1)); // 獲取的消息迭代 Iterator<ConsumerRecord<String, String>> consumerRecordIterator = consumerRecords.iterator(); //開啓事務控制 producer.beginTransaction(); // 維護偏移量 Map<TopicPartition, OffsetAndMetadata> offsets=new HashMap<TopicPartition, OffsetAndMetadata>(); // 消費者讀取到消息後進行業務處理 while (consumerRecordIterator.hasNext()){ ConsumerRecord<String, String> record = consumerRecordIterator.next(); //業務處理,這裏建立Record,經過消費topic01的消息記錄,發送到topic02 ProducerRecord<String,String> producerRecord= new ProducerRecord<String,String>("topic02",record.key(),record.value()+"to topic02"); producer.send(producerRecord); //記錄元數據下次須要提交的便宜量 offsets.put(new TopicPartition(record.topic(),record.partition()), new OffsetAndMetadata(record.offset()+1)); } //提交事務,先提交消費者的偏移量,須要指定消費者組 producer.sendOffsetsToTransaction(offsets,"group01"); // 提交事務,再提交生產者的偏移量 producer.commitTransaction(); } }catch (Exception e){ // 消費者端業務處理邏輯出現錯誤,要捕獲回滾 producer.abortTransaction();//終止事務 }finally { producer.close(); } } public static KafkaProducer<String,String> buildKafkaProducer(){ Properties props=new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"CentOSA:9092,CentOSB:9092,CentOSC:9092"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName()); // 必須配置事務id,且惟一 props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"transaction-id" + UUID.randomUUID().toString()); // 配置批處理大小,達到1024字節須要提交 props.put(ProducerConfig.BATCH_SIZE_CONFIG,1024); // 當沒達到1024字節,可是時間達到了5ms,也須要提交給集羣的topic props.put(ProducerConfig.LINGER_MS_CONFIG,5); // 配置冪等,和重試 props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true); // ack props.put(ProducerConfig.ACKS_CONFIG,"all"); // 請求超時重發時間20ms props.put(ProducerConfig.REQUERT_TIMEOUT_MS_CONFIG,20000); return new KafkaProducer<String, String>(props); } public static KafkaConsumer<String,String> buildKafkaConsumer(String group){ Properties props=new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"CentOSA:9092,CentOSB:9092,CentOSC:9092"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName()); props.put(ConsumerConfig.GROUP_ID_CONFIG,group); // 消費者自動提交offset策略,關閉,必須設置 props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false); // 設置消費者消費事務的隔離級別read_committed,消費者不可能讀到未提交的數據 props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG,"read_committed"); return new KafkaConsumer<String, String>(props); } }
總結:消費端掌握偏移量控制,生產者端掌握超時重傳應答重試和分區冪等。實際的生產開發過程當中,要熟練掌握事務控制,包括生產者only和生產者&消費者事務控制。kafka事務在分佈式微服務的開發中,有比較強的應用。