本套技術專欄是做者(秦凱新)平時工做的總結和昇華,經過從真實商業環境抽取案例進行總結和分享,並給出商業應用的調優建議和集羣環境容量規劃等內容,請持續關注本套博客。期待加入IOT時代最具戰鬥力的團隊。QQ郵箱地址:1120746959@qq.com,若有任何學術交流,可隨時聯繫。apache
事務:跨分區原子寫入bootstrap
將容許一個生產者發送一批到不一樣分區的消息,這些消息要麼所有對任何一個消費者可見,要麼對任何一個消費者都不可見。這個特性也容許你在一個事務中處理消費數據和提交消費偏移量,從而實現端到端的精確一次語義。安全
主要針對消息通過Partioner分區器到多個分區的狀況。網絡
producer.initTransactions();
try {
producer.beginTransaction();
producer.send(record1);
producer.send(record2);
producer.commitTransaction();
} catch(ProducerFencedException e) {
producer.close();
} catch(KafkaException e) {
producer.abortTransaction();
}
複製代碼
在消費者方面,有兩種選擇來讀取事務性消息,經過隔離等級「isolation.level」消費者配置表示:session
read_commited:除了讀取不屬於事務的消息以外,還能夠讀取事務提交後的消息。
read_uncommited:按照偏移位置讀取全部消息,而不用等事務提交。這個選項相似Kafka消費者的當前語義。
複製代碼
爲了使用事務,須要配置消費者使用正確的隔離等級。ide
使用新版生產者,而且將生產者的「transactional . id」配置項設置爲某個惟一ID。 須要此惟一ID來提供跨越應用程序從新啓動的事務狀態的連續性。fetch
消費端精確到一次語義實現:consumer經過subscribe方法註冊到kafka,精確一次的語義要求必須手動管理offset,按照下述步驟進行設置:this
1.設置enable.auto.commit = false;spa
2.處理完消息以後不要手動提交offset,設計
3.經過subscribe方法將consumer註冊到某個特定topic,
4.實現ConsumerRebalanceListener接口和consumer.seek(topicPartition,offset)方法(讀取特定topic和partition的offset)
5.將offset和消息一塊存儲,確保原子性,推薦使用事務機制。
public class ExactlyOnceDynamicConsumer {
private static OffsetManager offsetManager = new OffsetManager("storage2");
public static void main(String[] str) throws InterruptedException {
System.out.println("Starting ManualOffsetGuaranteedExactlyOnceReadingDynamicallyBalancedPartitionConsumer ...");
readMessages();
}
private static void readMessages() throws InterruptedException {
KafkaConsumer<String, String> consumer = createConsumer();
// Manually controlling offset but register consumer to topics to get dynamically assigned partitions.
// Inside MyConsumerRebalancerListener use consumer.seek(topicPartition,offset) to control offset
consumer.subscribe(Arrays.asList("normal-topic"), new MyConsumerRebalancerListener(consumer));
processRecords(consumer);
}
private static KafkaConsumer<String, String> createConsumer() {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
String consumeGroup = "cg3";
props.put("group.id", consumeGroup);
props.put("enable.auto.commit", "false");
props.put("heartbeat.interval.ms", "2000");
props.put("session.timeout.ms", "6001");
* Control maximum data on each poll, make sure this value is bigger than the maximum single record size
props.put("max.partition.fetch.bytes", "140");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
return new KafkaConsumer<String, String>(props);
}
private static void processRecords(KafkaConsumer<String, String> consumer) {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value());
offsetManager.saveOffsetInExternalStore(record.topic(), record.partition(), record.offset());
}
}
}
複製代碼
}
public class MyConsumerRebalancerListener implements org.apache.kafka.clients.consumer.ConsumerRebalanceListener {
private OffsetManager offsetManager = new OffsetManager("storage2");
private Consumer<String, String> consumer;
public MyConsumerRebalancerListener(Consumer<String, String> consumer) {
this.consumer = consumer;
}
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
for (TopicPartition partition : partitions) {
offsetManager.saveOffsetInExternalStore(partition.topic(), partition.partition(), consumer.position(partition));
}
}
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
for (TopicPartition partition : partitions) {
consumer.seek(partition, offsetManager.readOffsetFromExternalStore(partition.topic(), partition.partition()));
}
}
複製代碼
}
public class OffsetManager {
private String storagePrefix;
public OffsetManager(String storagePrefix) {
this.storagePrefix = storagePrefix;
}
void saveOffsetInExternalStore(String topic, int partition, long offset) {
try {
FileWriter writer = new FileWriter(storageName(topic, partition), false);
BufferedWriter bufferedWriter = new BufferedWriter(writer);
bufferedWriter.write(offset + "");
bufferedWriter.flush();
bufferedWriter.close();
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException(e);
}
}
long readOffsetFromExternalStore(String topic, int partition) {
try {
Stream<String> stream = Files.lines(Paths.get(storageName(topic, partition)));
return Long.parseLong(stream.collect(Collectors.toList()).get(0)) + 1;
} catch (Exception e) {
e.printStackTrace();
}
return 0;
}
private String storageName(String topic, int partition) {
return storagePrefix + "-" + topic + "-" + partition;
}
}
複製代碼
Kafka 0.11.0.0版本的逆天之做,都是在消費者EOS語義較弱,須要進一步加強。
本套技術專欄是做者(秦凱新)平時工做的總結和昇華,經過從真實商業環境抽取案例進行總結和分享,並給出商業應用的調優建議和集羣環境容量規劃等內容,請持續關注本套博客。期待加入IOT時代最具戰鬥力的團隊。QQ郵箱地址:1120746959@qq.com,若有任何學術交流,可隨時聯繫。
秦凱新 於深圳 201812012146