1.由於producer發送消息多是分佈式事務,因此引入了經常使用的2PC,因此有事務協調者(Transaction Coordinator)。Transaction Coordinator和以前爲了解決腦裂和驚羣問題引入的Group Coordinator在選舉和failover上面相似。html
2.事務管理中事務日誌是必不可少的,kafka使用一個內部topic來保存事務日誌,這個設計和以前使用內部topic保存位點的設計保持一致。事務日誌是Transaction Coordinator管理的狀態的持久化,由於不須要回溯事務的歷史狀態,因此事務日誌只用保存最近的事務狀態。
3.由於事務存在commit和abort兩種操做,而客戶端又有read committed和read uncommitted兩種隔離級別,因此消息隊列必須能標識事務狀態,這個被稱做Control Message。
4.producer掛掉重啓或者漂移到其它機器須要能關聯的以前的未完成事務因此須要有一個惟一標識符來進行關聯,這個就是TransactionalId,一個producer掛了,另外一個有相同TransactionalId的producer可以接着處理這個事務未完成的狀態。注意不要把TransactionalId和數據庫事務中常見的transaction id搞混了,kafka目前沒有引入全局序,因此也沒有transaction id,這個TransactionalId是用戶提早配置的。
5. TransactionalId能關聯producer,也須要避免兩個使用相同TransactionalId的producer同時存在,因此引入了producer epoch來保證對應一個TransactionalId只有一個活躍的producer epochjava
事務可以保證Kafka topic下每一個分區的原子寫入。事務中全部的消息都將被成功寫入或者丟棄。例如,處理過程當中發生了異常並致使事務終止,這種狀況下,事務中的消息都不會被Consumer讀取。如今咱們來看下Kafka是如何實現原子的「讀取-處理-寫入」過程的。數據庫
首先,咱們來考慮一下原子「讀取-處理-寫入」週期是什麼意思。簡而言之,這意味着若是某個應用程序在某個topic tp0的偏移量X處讀取到了消息A,而且在對消息A進行了一些處理(如B = F(A))以後將消息B寫入topic tp1,則只有當消息A和B被認爲被成功地消費並一塊兒發佈,或者徹底不發佈時,整個讀取過程寫入操做是原子的。apache
如今,只有當消息A的偏移量X被標記爲消耗時,消息A才被認爲是從topic tp0消耗的,消費到的數據偏移量(record offset)將被標記爲提交偏移量(Committing offset)。在Kafka中,咱們經過寫入一個名爲offsets topic的內部Kafka topic來記錄offset commit。消息僅在其offset被提交給offsets topic時才被認爲成功消費。bootstrap
因爲offset commit只是對Kafkatopic的另外一次寫入,而且因爲消息僅在提交偏移量時被視爲成功消費,因此跨多個主題和分區的原子寫入也啓用原子「讀取-處理-寫入」循環:提交偏移量X到offset topic和消息B到tp1的寫入將是單個事務的一部分,因此整個步驟都是原子的。api
咱們經過爲每一個事務Producer分配一個稱爲transactional.id的惟一標識符來解決殭屍實例的問題。在進程從新啓動時可以識別相同的Producer實例。緩存
API要求事務性Producer的第一個操做應該是在Kafka集羣中顯示註冊transactional.id。 當註冊的時候,Kafka broker用給定的transactional.id檢查打開的事務而且完成處理。 Kafka也增長了一個與transactional.id相關的epoch。Epoch存儲每一個transactional.id內部元數據。session
一旦這個epoch被觸發,任何具備相同的transactional.id和更舊的epoch的Producer被視爲殭屍,並被圍起來, Kafka會拒絕來自這些Procedure的後續事務性寫入。框架
如今,讓咱們把注意力轉向數據讀取中的事務一致性。分佈式
Kafka Consumer只有在事務實際提交時纔會將事務消息傳遞給應用程序。也就是說,Consumer不會提交做爲整個事務一部分的消息,也不會提交屬於停止事務的消息。
值得注意的是,上述保證不足以保證整個消息讀取的原子性,當使用Kafka consumer來消費來自topic的消息時,應用程序將不知道這些消息是否被寫爲事務的一部分,所以他們不知道事務什麼時候開始或結束;此外,給定的Consumer不能保證訂閱屬於事務一部分的全部Partition,而且沒法發現這一點,最終難以保證做爲事務中的全部消息被單個Consumer處理。
簡而言之:Kafka保證Consumer最終只能提供非事務性消息或提交事務性消息。它將保留來自未完成事務的消息,並過濾掉已停止事務的消息。
producer提供了五個事務方法:
在一個原子操做中,根據包含的操做類型,能夠分爲三種狀況,前兩種狀況是事務引入的場景,最後一種狀況沒有使用價值。
一、建立消費者代碼,須要:
二、建立生成者,代碼以下,須要:
package com.example.demo.transaction; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import java.util.Properties; import java.util.concurrent.Future; public class TransactionProducer { private static Properties getProps(){ Properties props = new Properties(); props.put("bootstrap.servers", "47.52.199.53:9092"); props.put("retries", 2); // 重試次數 props.put("batch.size", 100); // 批量發送大小 props.put("buffer.memory", 33554432); // 緩存大小,根據本機內存大小配置 props.put("linger.ms", 1000); // 發送頻率,知足任務一個條件發送 props.put("client.id", "producer-syn-2"); // 發送端id,便於統計 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("transactional.id","producer-1"); // 每臺機器惟一 props.put("enable.idempotence",true); // 設置冪等性 return props; } public static void main(String[] args) { KafkaProducer<String, String> producer = new KafkaProducer<>(getProps()); // 初始化事務 producer.initTransactions();try { Thread.sleep(2000); // 開啓事務 producer.beginTransaction(); // 發送消息到producer-syn producer.send(new ProducerRecord<String, String>("producer-syn","test3")); // 發送消息到producer-asyn Future<RecordMetadata> metadataFuture = producer.send(new ProducerRecord<String, String>("producer-asyn","test4")); // 提交事務 producer.commitTransaction(); }catch (Exception e){ e.printStackTrace(); // 終止事務 producer.abortTransaction(); } } }
package com.example.demo.transaction; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.TopicPartition; import java.util.Arrays; import java.util.HashMap; import java.util.Map; import java.util.Properties; import java.util.concurrent.Future; public class consumeTransformProduce { private static Properties getProducerProps(){ Properties props = new Properties(); props.put("bootstrap.servers", "47.52.199.51:9092"); props.put("retries", 3); // 重試次數 props.put("batch.size", 100); // 批量發送大小 props.put("buffer.memory", 33554432); // 緩存大小,根據本機內存大小配置 props.put("linger.ms", 1000); // 發送頻率,知足任務一個條件發送 props.put("client.id", "producer-syn-2"); // 發送端id,便於統計 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("transactional.id","producer-2"); // 每臺機器惟一 props.put("enable.idempotence",true); // 設置冪等性 return props; } private static Properties getConsumerProps(){ Properties props = new Properties(); props.put("bootstrap.servers", "47.52.199.51:9092"); props.put("group.id", "test_3"); props.put("session.timeout.ms", 30000); // 若是其超時,將會可能觸發rebalance並認爲已經死去,從新選舉Leader props.put("enable.auto.commit", "false"); // 開啓自動提交 props.put("auto.commit.interval.ms", "1000"); // 自動提交時間 props.put("auto.offset.reset","earliest"); // 從最先的offset開始拉取,latest:從最近的offset開始消費 props.put("client.id", "producer-syn-1"); // 發送端id,便於統計 props.put("max.poll.records","100"); // 每次批量拉取條數 props.put("max.poll.interval.ms","1000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("isolation.level","read_committed"); // 設置隔離級別 return props; } public static void main(String[] args) { // 建立生產者 KafkaProducer<String, String> producer = new KafkaProducer<>(getProducerProps()); // 建立消費者 KafkaConsumer<String, String> consumer = new KafkaConsumer<>(getConsumerProps()); // 初始化事務 producer.initTransactions(); // 訂閱主題 consumer.subscribe(Arrays.asList("consumer-tran")); for(;;){ // 開啓事務 producer.beginTransaction(); // 接受消息 ConsumerRecords<String, String> records = consumer.poll(500); // 處理邏輯 try { Map<TopicPartition, OffsetAndMetadata> commits = new HashMap<>(); for(ConsumerRecord record : records){ // 處理消息 System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value()); // 記錄提交的偏移量 commits.put(new TopicPartition(record.topic(), record.partition()),new OffsetAndMetadata(record.offset())); // 產生新消息 Future<RecordMetadata> metadataFuture = producer.send(new ProducerRecord<>("consumer-send",record.value()+"send")); } // 提交偏移量 producer.sendOffsetsToTransaction(commits,"group0323"); // 事務提交 producer.commitTransaction(); }catch (Exception e){ e.printStackTrace(); producer.abortTransaction(); } } } }
在一個事務中,既有生產消息操做又有消費消息操做,即常說的Consume-tansform-produce模式。以下實例代碼
在Kafka 0.11.0中與事務API一塊兒引入的組件是上圖右側的事務Coordinator和事務日誌。
事務Coordinator是每一個KafkaBroker內部運行的一個模塊。事務日誌是一個內部的Kafka Topic。每一個Coordinator擁有事務日誌所在分區的子集,即, 這些borker中的分區都是Leader。
每一個transactional.id都經過一個簡單的哈希函數映射到事務日誌的特定分區,事務日誌文件__transaction_state-0。這意味着只有一個Broker擁有給定的transactional.id。
經過這種方式,咱們利用Kafka可靠的複製協議和Leader選舉流程來確保事務協調器始終可用,而且全部事務狀態都可以持久存儲。
值得注意的是,事務日誌只保存事務的最新狀態而不是事務中的實際消息。消息只存儲在實際的Topic的分區中。事務能夠處於諸如「Ongoing」,「prepare commit」和「Completed」之類的各類狀態中。正是這種狀態和關聯的元數據存儲在事務日誌中。
數據流在抽象層面上有四種不一樣的類型。
執行事務時,Producer向事務協調員發出以下請求:
隨着事務的進行,Producer發送上面的請求來更新Coordinator上事務的狀態。事務Coordinator會在內存中保存每一個事務的狀態,而且把這個狀態寫到事務日誌中(這是以三種方式複製的,所以是持久保存的)。
事務Coordinator是讀寫事務日誌的惟一組件。若是一個給定的Borker故障了,一個新的Coordinator會被選爲新的事務日誌的Leader,這個事務日誌分割了這個失效的代理,它從傳入的分區中讀取消息並在內存中重建狀態。
在Coordinator的事務中註冊新的分區後,Producer將數據正常地發送到真實數據所在分區。這與producer.send流程徹底相同,但有一些額外的驗證,以確保Producer不被隔離。
一、ransactional.id.timeout.ms:
在ms中,事務協調器在生產者TransactionalId提早過時以前等待的最長時間,而且沒有從該生產者TransactionalId接收到任何事務狀態更新。默認是604800000(7天)。這容許每週一次的生產者做業維護它們的id
二、max.transaction.timeout.ms
事務容許的最大超時。若是客戶端請求的事務時間超過此時間,broke將在InitPidRequest中返回InvalidTransactionTimeout錯誤。這能夠防止客戶機超時過大,從而致使用戶沒法從事務中包含的主題讀取內容。
默認值爲900000(15分鐘)。這是消息事務須要發送的時間的保守上限。
三、transaction.state.log.replication.factor
事務狀態topic的副本數量。默認值:3
四、transaction.state.log.num.partitions
事務狀態主題的分區數。默認值:50
五、transaction.state.log.min.isr
事務狀態主題的每一個分區ISR最小數量。默認值:2
一、enable.idempotence:開啓冪等
二、transaction.timeout.ms:事務超時時間
事務協調器在主動停止正在進行的事務以前等待生產者更新事務狀態的最長時間。
這個配置值將與InitPidRequest一塊兒發送到事務協調器。若是該值大於max.transaction.timeout。在broke中設置ms時,請求將失敗,並出現InvalidTransactionTimeout錯誤。
默認是60000。這使得交易不會阻塞下游消費超過一分鐘,這在實時應用程序中一般是容許的。
三、transactional.id
用於事務性交付的TransactionalId。這支持跨多個生產者會話的可靠性語義,由於它容許客戶端確保使用相同TransactionalId的事務在啓動任何新事務以前已經完成。若是沒有提供TransactionalId,則生產者僅限於冪等交付。
一、isolation.level
讓咱們把注意力轉向事務如何執行。首先,事務只形成中等的寫入放大。
額外的寫入在於:
咱們能夠看到,開銷與做爲事務一部分寫入的消息數量無關。因此擁有更高吞吐量的關鍵是每一個事務包含更多的消息。
實際上,對於Producer以最大吞吐量生產1KB記錄,每100ms提交消息致使吞吐量僅下降3%。較小的消息或較短的事務提交間隔會致使更嚴重的降級。
增長事務時間的主要折衷是增長了端到端延遲。回想一下,Consum閱讀事務消息不會傳遞屬於公開傳輸的消息。所以,提交之間的時間間隔越長,消耗的應用程序就越須要等待,從而增長了端到端的延遲。
Consumer在開啓事務的場景比Producer簡單得多,它須要作的是:
所以,當以read_committed模式讀取事務消息時,事務Consumer的吞吐量沒有下降。這樣作的主要緣由是咱們在讀取事務消息時保持零拷貝讀取。
此外,Consumer不須要任何緩衝等待事務完成。相反,Broker不容許提早抵消包括公開事務。
所以,Consumer是很是輕巧和高效的。感興趣的讀者能夠在本文檔(連接2)中瞭解Consumer設計的細節。
咱們剛剛講述了Apache Kafka中事務的表面。 幸運的是,幾乎全部的設計細節都保存在在線文檔中。 相關文件是:
最初的Kafka KIP(連接3):它提供了關於數據流的設計細節,而且詳細介紹了公共接口,特別是與事務相關的配置選項。
原始設計文檔(連接4):不是爲了內核,這是源代碼以外的權威地方 - 瞭解每一個事務性RPC如何處理,如何維護事務日誌,如何清除事務性數據等等。
KafkaProducerjavadocs(連接5):這是學習如何使用新API的好地方。頁面開始處的示例以及send方法的文檔是很好的起點。
在這篇文章中,咱們瞭解了ApacheKafka中關於事務API的關鍵設計目標,咱們理解了事務API的語義,並對API的實際工做有了更高層次的理解。
若是咱們考慮「讀取-處理-寫入」週期,這篇文章主要介紹了讀寫路徑,處理自己就是一個黑盒子。事實是,在處理階段中能夠作不少事情,使得一次處理不可能保證單獨使用事務API。例如,若是處理對其餘存儲系統有反作用,則這裏覆蓋的API不足以保證exactly once。
Kafka Streams框架使用事務API向上移動整個價值鏈,併爲各類各樣的流處理應用提供exactly once,甚至可以在處理期間更新某些附加狀態並進行存儲。
後續的博客文章將介紹KafkaStreams如何提供一次處理語義,以及如何編寫利用它的應用程序。
最後,對於那些渴望瞭解上述API實現細節的人,咱們將會有另外一篇博客文章,其中涵蓋了這裏描述的一些更有趣的解決方案。
1. https://www.confluent.io/blog/exactly-once-semantics-are-possible-heres-how-apache-kafka-does-it/
2. https://docs.google.com/document/d/1Rlqizmk7QCDe8qAnVW5e5X8rGvn6m2DCR3JR2yqwVjc/edit?usp=sharing
4. https://docs.google.com/document/d/11Jqy_GjUGtdXJK94XGsEIK7CP1SnQGdp2eF0wSw9ra8/edit?usp=sharing
6. https://my.oschina.net/xiaominmin/blog/1816437
7. https://blog.csdn.net/ransom0512/article/details/78840042
8. https://blog.csdn.net/mlljava1111/article/details/81180351