本套系列博客從真實商業環境抽取案例進行總結和分享,並給出Spark商業應用實戰指導,請持續關注本套博客。版權聲明:本套Spark商業應用實戰歸做者(秦凱新)全部,禁止轉載,歡迎學習。apache
Consumer Group 主要用於實現高伸縮性,高容錯性的Consumer機制。所以,消息的接收是基於Consumer Group 的。組內多個Consumer實例能夠同時讀取Kafka消息,同一時刻一條消息只能被一個消費者消費,並且一旦某一個consumer "掛了", Consumer Group 會當即將已經崩潰的Consumer負責的分區轉交給其餘Consumer來負責。從而保證 Consumer Group 可以正常工做。bootstrap
說來奇怪,位移保存是基於Consumer Group,同時引入檢查點模式,按期實現offset的持久化。session
Consumer會按期向kafka集羣彙報本身消費數據的進度,這一過程叫作位移的提交。這一過程已經拋棄Zookeeper,由於Zookeeper只是一個協調服務組件,不能做爲存儲組件,高併發的讀取勢必形成Zk的壓力。架構
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
複製代碼
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "false");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
final int minBatchSize = 200;
List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
buffer.add(record);
}
if (buffer.size() >= minBatchSize) {
insertIntoDb(buffer);
consumer.commitSync();
buffer.clear();
}
}
複製代碼
org.apache.kafka.clients.consumer.CommitFailedException:
Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member.
This means that the time between subsequent calls to poll() was longer than the configured session.timeout.ms, which typically implies that the poll loop is spending too much time message processing.
You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records. [com.bonc.framework.server.kafka.consumer.ConsumerLoop]
複製代碼
優化會繼續,暫時把核心放在request. timeout. ms, max. poll. interval. ms,max.poll.records 上,避免由於處理邏輯太重,致使Consumer被頻繁的踢出Consumer group。併發
秦凱新 於深圳運維