kafka消費者Consumer參數設置及參數調優建議-kafka 商業環境實戰

本套系列博客從真實商業環境抽取案例進行總結和分享,並給出Spark商業應用實戰指導,請持續關注本套博客。版權聲明:本套Spark商業應用實戰歸做者(秦凱新)全部,禁止轉載,歡迎學習。apache

1 消息的接收->基於Consumer Group

Consumer Group 主要用於實現高伸縮性,高容錯性的Consumer機制。所以,消息的接收是基於Consumer Group 的。組內多個Consumer實例能夠同時讀取Kafka消息,同一時刻一條消息只能被一個消費者消費,並且一旦某一個consumer "掛了", Consumer Group 會當即將已經崩潰的Consumer負責的分區轉交給其餘Consumer來負責。從而保證 Consumer Group 可以正常工做。bootstrap

2 位移保存->基於Consumer Group

說來奇怪,位移保存是基於Consumer Group,同時引入檢查點模式,按期實現offset的持久化。session

3 位移提交->拋棄ZooKeeper

Consumer會按期向kafka集羣彙報本身消費數據的進度,這一過程叫作位移的提交。這一過程已經拋棄Zookeeper,由於Zookeeper只是一個協調服務組件,不能做爲存儲組件,高併發的讀取勢必形成Zk的壓力。架構

  • 新版本位移提交是在kafka內部維護了一個內部Topic(_consumer_offsets)。
  • 在kafka內部日誌目錄下面,總共有50個文件夾,每個文件夾包含日誌文件和索引文件。日誌文件主要是K-V結構,(group.id,topic,分區號)。
  • 假設線上有不少的consumer和ConsumerGroup,經過對group.id作Hash求模運算,這50個文件夾就能夠分散同時位移提交的壓力。

4 官方案例

4.1 自動提交位移

Properties props = new Properties();
     props.put("bootstrap.servers", "localhost:9092");
     props.put("group.id", "test");
     props.put("enable.auto.commit", "true");
     props.put("auto.commit.interval.ms", "1000");
     props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
     props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
     KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
     consumer.subscribe(Arrays.asList("foo", "bar"));
     while (true) {
         ConsumerRecords<String, String> records = consumer.poll(100);
         for (ConsumerRecord<String, String> record : records)
             System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
     }
複製代碼

4.2 手動提交位移

Properties props = new Properties();
     props.put("bootstrap.servers", "localhost:9092");
     props.put("group.id", "test");
     props.put("enable.auto.commit", "false");
     props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
     props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
     KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
     consumer.subscribe(Arrays.asList("foo", "bar"));
     final int minBatchSize = 200;
     List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
     while (true) {
         ConsumerRecords<String, String> records = consumer.poll(100);
         for (ConsumerRecord<String, String> record : records) {
             buffer.add(record);
         }
         if (buffer.size() >= minBatchSize) {
             insertIntoDb(buffer);
             consumer.commitSync();
             buffer.clear();
         }
     }
複製代碼

5 kafka Consumer參數設置


  • consumer.poll(1000) 重要參數
  • 新版本的Consumer的Poll方法使用了相似於Select I/O機制,所以全部相關事件(包括reblance,消息獲取等)都發生在一個事件循環之中。
  • 1000是一個超時時間,一旦拿到足夠多的數據(參數設置),consumer.poll(1000)會當即返回 ConsumerRecords<String, String> records。
  • 若是沒有拿到足夠多的數據,會阻塞1000ms,但不會超過1000ms就會返回。

  • session. timeout. ms <= coordinator檢測失敗的時間
  • 默認值是10s
  • 該參數是 Consumer Group 主動檢測 (組內成員comsummer)崩潰的時間間隔。若設置10min,那麼Consumer Group的管理者(group coordinator)可能須要10分鐘才能感覺到。太漫長了是吧。

  • max. poll. interval. ms <= 處理邏輯最大時間
  • 這個參數是0.10.1.0版本後新增的,可能不少地方看不到喔。這個參數須要根據實際業務處理時間進行設置,一旦Consumer處理不過來,就會被踢出Consumer Group 。
  • 注意:若是業務平均處理邏輯爲1分鐘,那麼max. poll. interval. ms須要設置稍微大於1分鐘便可,可是session. timeout. ms能夠設置小一點(如10s),用於快速檢測Consumer崩潰。

  • auto.offset.reset
  • 該屬性指定了消費者在讀取一個沒有偏移量後者偏移量無效(消費者長時間失效當前的偏移量已通過時而且被刪除了)的分區的狀況下,應該做何處理,默認值是latest,也就是從最新記錄讀取數據(消費者啓動以後生成的記錄),另外一個值是earliest,意思是在偏移量無效的狀況下,消費者從起始位置開始讀取數據。

  • enable.auto.commit
  • 對於精確到一次的語義,最好手動提交位移

  • fetch.max.bytes
  • 單次獲取數據的最大消息數。

  • max.poll.records <= 吞吐量
  • 單次poll調用返回的最大消息數,若是處理邏輯很輕量,能夠適當提升該值。
  • 一次從kafka中poll出來的數據條數,max.poll.records條數據須要在在session.timeout.ms這個時間內處理完
  • 默認值爲500

  • heartbeat. interval. ms <= 竟然拖家帶口
  • heartbeat心跳主要用於溝通交流,及時返回請求響應。這個時間間隔真是越快越好。由於一旦出現reblance,那麼就會將新的分配方案或者通知從新加入group的命令放進心跳響應中。

  • connection. max. idle. ms <= socket鏈接
  • kafka會按期的關閉空閒Socket鏈接。默認是9分鐘。若是不在意這些資源開銷,推薦把這些參數值爲-1,即不關閉這些空閒鏈接。

  • request. timeout. ms
  • 這個配置控制一次請求響應的最長等待時間。若是在超時時間內未獲得響應,kafka要麼重發這條消息,要麼超太重試次數的狀況下直接置爲失敗。
  • 消息發送的最長等待時間.需大於session.timeout.ms這個時間

  • fetch.min.bytes
  • server發送到消費端的最小數據,如果不知足這個數值則會等待直到知足指定大小。默認爲1表示當即接收。

  • fetch.wait.max.ms
  • 如果不知足fetch.min.bytes時,等待消費端請求的最長等待時間

  • 0.11 新功能
  • 空消費組延時rebalance,主要在server.properties文件配置
  • group.initial.rebalance.delay.ms <=牛逼了,個人kafka,防止成員加入請求後本應當即開啓的rebalance
  • 對於用戶來講,這個改進最直接的效果就是新增了一個broker配置:group.initial.rebalance.delay.ms
  • 默認是3秒鐘。
  • 主要做用是讓coordinator推遲空消費組接收到成員加入請求後本應當即開啓的rebalance。在實際使用時,假設你預估你的全部consumer組成員加入須要在10s內完成,那麼你就能夠設置該參數=10000。

6 線上採坑

org.apache.kafka.clients.consumer.CommitFailedException:
 Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. 
This means that the time between subsequent calls to poll() was longer than the configured session.timeout.ms, which typically implies that the poll loop is spending too much time message processing. 
You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records. [com.bonc.framework.server.kafka.consumer.ConsumerLoop]
複製代碼
基於最新版本10,注意此版本session. timeout. ms 與max.poll.interval.ms進行功能分離了。
  • 能夠發現頻繁reblance,並伴隨者重複性消費,這是一個很嚴重的問題,就是處理邏輯太重,max.poll. interval.ms 太小致使。發生的緣由就是 poll()的循環調用時間過長,出現了處理超時。此時只用調大max.poll. interval.ms ,調小max.poll.records便可,同時要把request. timeout. ms設置大於max.poll. interval.ms

7 總結

優化會繼續,暫時把核心放在request. timeout. ms, max. poll. interval. ms,max.poll.records 上,避免由於處理邏輯太重,致使Consumer被頻繁的踢出Consumer group。併發

秦凱新 於深圳運維

相關文章
相關標籤/搜索