本套系列博客從真實商業環境抽取案例進行總結和分享,並給出Spark商業應用實戰指導,請持續關注本套博客。版權聲明:本套Spark商業應用實戰歸做者(秦凱新)全部,禁止轉載,歡迎學習。java
一句話:多個Consumer訂閱了一個Topic時,根據分區策略進行消費者訂閱分區的重分配正則表達式
找到Coordinator的算法 與 找到_consumer_offsets目標分區的算法是一致的。算法
reblance 流程流程總體以下圖所示,值得強調的幾點以下:apache
reblance 監聽器解決用戶把位移提交到外部存儲的狀況,在監聽器中實現位移保存和位移的重定向。bootstrap
onPartitionsRevoked : rebalance開啓新一輪的重平衡前會調用,通常用於手動提交位移,及審計功能數組
onPartitionsAssigned :rebalance在重平衡結束後會調用,通常用於消費邏輯處理session
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "false");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
統計rebalance總時長
final AtomicLong totalRebalanceTimeMs =new AtomicLong(0L)
統計rebalance開始時刻
final AtomicLong rebalanceStart =new AtomicLong(0L)
1 重平衡監聽
consumer.subscribe(Arrays.asList("test-topic"), new ConsumerRebalanceListener(){
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
for(TopicPartition tp : partitions){
1 保存到外部存儲
saveToExternalStore(consumer.position(tp))
2 手動提交位移
//consumer.commitSync(toCommit);
}
rebalanceStart.set(System.currentTimeMillis())
}
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
totalRebalanceTimeMs.addAndGet(System.currentTimeMillis()-rebalanceStart.get())
for (TopicPartition tp : partitions) {
consumer.seek(tp,readFromExternalStore(tp))
}
}
});
2 消息處理
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
buffer.add(record);
}
if (buffer.size() >= minBatchSize) {
insertIntoDb(buffer);
consumer.commitSync();
buffer.clear();
}
}
複製代碼
實例主題:多線程
public class Main {
public static void main(String[] args) {
String brokerList = "localhost:9092";
String groupId = "testGroup1";
String topic = "test-topic";
int consumerNum = 3;
核心對外封裝
ConsumerGroup consumerGroup = new ConsumerGroup(consumerNum, groupId, topic, brokerList);
consumerGroup.execute();
}
}
複製代碼
import java.util.ArrayList;
import java.util.List;
public class ConsumerGroup {
private List<ConsumerRunnable> consumers;
public ConsumerGroup(int consumerNum, String groupId, String topic, String brokerList) {
consumers = new ArrayList<>(consumerNum);
for (int i = 0; i < consumerNum; ++i) {
ConsumerRunnable consumerThread = new ConsumerRunnable(brokerList, groupId, topic);
consumers.add(consumerThread);
}
}
public void execute() {
for (ConsumerRunnable task : consumers) {
new Thread(task).start();
}
}
}
複製代碼
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays;
import java.util.Properties;
public class ConsumerRunnable implements Runnable {
private final KafkaConsumer<String, String> consumer;
public ConsumerRunnable(String brokerList, String groupId, String topic) {
Properties props = new Properties();
props.put("bootstrap.servers", brokerList);
props.put("group.id", groupId);
props.put("enable.auto.commit", "true"); //本例使用自動提交位移
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
this.consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(topic)); // 本例使用分區副本自動分配策略
}
@Override
public void run() {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(200);
for (ConsumerRecord<String, String> record : records) {
System.out.println(Thread.currentThread().getName() + " consumed " + record.partition() +
"th message with offset: " + record.offset());
}
}
}
}
複製代碼
實例主題:架構
進一步優化建議;運維
public class Main {
public static void main(String[] args) {
String brokerList = "localhost:9092,localhost:9093,localhost:9094";
String groupId = "group2";
String topic = "test-topic";
int workerNum = 5;
ConsumerHandler consumers = new ConsumerHandler(brokerList, groupId, topic);
consumers.execute(workerNum);
try {
Thread.sleep(1000000);
} catch (InterruptedException ignored) {}
consumers.shutdown();
}
}
複製代碼
import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class ConsumerHandler {
private final KafkaConsumer<String, String> consumer;
private ExecutorService executors;
public ConsumerHandler(String brokerList, String groupId, String topic) {
Properties props = new Properties();
props.put("bootstrap.servers", brokerList);
props.put("group.id", groupId);
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(topic));
}
public void execute(int workerNum) {
executors = new ThreadPoolExecutor(workerNum, workerNum, 0L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(1000), new ThreadPoolExecutor.CallerRunsPolicy());
while (true) {
ConsumerRecords<String, String> records = consumer.poll(200);
for (final ConsumerRecord record : records) {
executors.submit(new Processor(record));
}
}
}
public void shutdown() {
if (consumer != null) {
consumer.close();
}
if (executors != null) {
executors.shutdown();
}
try {
if (!executors.awaitTermination(10, TimeUnit.SECONDS)) {
System.out.println("Timeout.... Ignore for this case");
}
} catch (InterruptedException ignored) {
System.out.println("Other thread interrupted this shutdown, ignore for this case.");
Thread.currentThread().interrupt();
}
}
}
複製代碼
import org.apache.kafka.clients.consumer.ConsumerRecord;
public class Processor implements Runnable {
private ConsumerRecord<String, String> consumerRecord;
public Processor(ConsumerRecord record) {
this.consumerRecord = record;
}
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " consumed " + consumerRecord.partition()
+ "th message with offset: " + consumerRecord.offset());
}
}
複製代碼
Standalone Consumer assign 用於接收指定分區列表的消息和Subscribe是矛盾的。只能二選一。
多個 Consumer 實例消費一個 Topic 藉助於 group reblance可謂是天做之合。
若要精準控制,assign逃不了。
poperties props = new Properties();
props.put("bootstrap.servers", brokerList);
props.put("group.id", groupId);
props.put("enable.auto.commit", "false");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
consumer = new KafkaConsumer<>(props);
List<TopicPartion> partitions = new ArrayList<>();
List<PartitionInfo> allPartitions = consumer.partitionsFor("kaiXinTopic")
if(allPartitions != null && !allPartitions.isEmpty){
for(PartitionInfo partitionInfo : allPartitions){
partitions.add(new TopicPartition(partitionInfo.topic(),partitionInfo.partition()))
}
consumer.assign(partitions)
}
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
buffer.add(record);
}
if (buffer.size() >= minBatchSize) {
insertIntoDb(buffer);
consumer.commitSync();
buffer.clear();
}
複製代碼
本文綜合了多本Kafka實戰書籍和博客,爲了寫好本文,參考了大量資料,進行了語言的重組,辛苦成文,各自珍惜!
秦凱新 2181119 2123