注意:配置基於Kafka 0.8.2.1html
broker配置java
#非負整數,用於惟一標識broker
broker.id 0算法
#kafka持久化數據存儲的路徑,能夠指定多個,以逗號分隔
log.dirs /tmp/kafka-logssql
#broker接收鏈接請求的端口
port 9092apache
#指定zk鏈接字符串,[hostname:port]以逗號分隔
zookeeper.connectbootstrap
#單條消息最大大小控制,消費端的最大拉取大小須要略大於該值
message.max.bytes 1000000數組
#接收網絡請求的線程數
num.network.threads 3緩存
#用於執行請求的I/O線程數
num.io.threads 8服務器
#用於各類後臺處理任務(如文件刪除)的線程數
background.threads 10網絡
#待處理請求最大可緩衝的隊列大小
queued.max.requests 500
#配置該機器的IP地址
host.name
#默認分區個數
num.partitions 1
#分段文件大小,超事後會輪轉
log.segment.bytes 1024 * 1024 * 1024
#日誌沒達到大小,若是達到這個時間也會輪轉
log.roll.{ms,hours} 168
#日誌保留時間
log.retention.{ms,minutes,hours}
#不存在topic的時候是否自動建立
auto.create.topics.enable true
#partition默認的備份因子
default.replication.factor 1
#若是這個時間內follower沒有發起fetch請求,被認爲dead,從ISR移除
replica.lag.time.max.ms 10000
#若是follower相比leader落後這麼多以上消息條數,會被從ISR移除
replica.lag.max.messages 4000
#從leader能夠拉取的消息最大大小
replica.fetch.max.bytes 1024 * 1024
#從leader拉取消息的fetch線程數
num.replica.fetchers 1
#zk會話超時時間
zookeeper.session.timeout.ms 6000
#zk鏈接所用時間
zookeeper.connection.timeout.ms
#zk follower落後leader的時間
zookeeper.sync.time.ms 2000
#是否開啓topic能夠被刪除的方式
delete.topic.enable false
producer配置
#參與消息確認的broker數量控制,0表明不須要任何確認 1表明須要leader replica確認 -1表明須要ISR中全部進行確認
request.required.acks 0
#從發送請求到收到ACK確認等待的最長時間(超時時間)
request.timeout.ms 10000
#設置消息發送模式,默認是同步方式, async異步模式下容許消息累計到必定量或一段時間又另外線程批量發送,吞吐量好但丟失數據風險增大
producer.type sync
#消息序列化類實現方式,默認是byte[]數組形式
serializer.class kafka.serializer.DefaultEncoder
#kafka消息分區策略實現方式,默認是對key進行hash
partitioner.class kafka.producer.DefaultPartitioner
#對發送的消息採起的壓縮編碼方式,有none|gzip|snappy
compression.codec none
#指定哪些topic的message須要壓縮
compressed.topics null
#消息發送失敗的狀況下,重試發送的次數 存在消息發送是成功的,只是因爲網絡致使ACK沒收到的重試,會出現消息被重複發送的狀況
message.send.max.retries 3
#在開始從新發起metadata更新操做須要等待的時間
retry.backoff.ms 100
#metadata刷新間隔時間,若是負值則失敗的時候纔會刷新,若是0則每次發送後都刷新,正值則是一種週期行爲
topic.metadata.refresh.interval.ms 600 * 1000
#異步發送模式下,緩存數據的最長時間,以後便會被髮送到broker
queue.buffering.max.ms 5000
#producer端異步模式下最多緩存的消息條數
queue.buffering.max.messages 10000
#0表明隊列沒滿的時候直接入隊,滿了當即扔棄,-1表明無條件阻塞且不丟棄
queue.enqueue.timeout.ms -1
#一次批量發送須要達到的消息條數,固然若是queue.buffering.max.ms達到的時候也會被髮送
batch.num.messages 200
consumer配置
#指明當前消費進程所屬的消費組,一個partition只能被同一個消費組的一個消費者消費
group.id
#針對一個partition的fetch request所能拉取的最大消息字節數,必須大於等於Kafka運行的最大消息
fetch.message.max.bytes 1024 * 1024
#是否自動週期性提交已經拉取到消費端的消息offset
auto.commit.enable true
#自動提交offset到zookeeper的時間間隔
auto.commit.interval.ms 60 * 1000
#消費均衡的重試次數
rebalance.max.retries 4
#消費均衡兩次重試之間的時間間隔
rebalance.backoff.ms 2000
#當從新去獲取partition的leader前須要等待的時間
refresh.leader.backoff.ms 200
#若是zookeeper上沒有offset合理的初始值狀況下獲取第一條消息開始的策略smallest|largeset
auto.offset.reset largest
#若是其超時,將會可能觸發rebalance並認爲已經死去
zookeeper.session.timeout.ms 6000
#確認zookeeper鏈接創建操做客戶端能等待的最長時間
zookeeper.connection.timeout.ms 6000
1.maven:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.8.2.1</version>
</dependency>
2.kafka生產者代碼:
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
/**
*
* @author FromX
*
*/
public class KProducer {
public static void main(String[] args) throws InterruptedException {
Properties props = new Properties();
//kafka服務器地址
props.put("bootstrap.servers", "slave1.com:6667,slave2.com:6667,slave3.com:6667");
//ack是判斷請求是否爲完整的條件(即判斷是否成功發送)。all將會阻塞消息,這種設置性能最低,可是最可靠。
props.put("acks", "1");
//retries,若是請求失敗,生產者會自動重試,咱們指定是0次,若是啓用重試,則會有重複消息的可能性。
props.put("retries", 0);
//producer緩存每一個分區未發送消息,緩存的大小是經過batch.size()配置設定的。值較大的話將會產生更大的批。並須要更多的內存(由於每一個「活躍」的分區都有一個緩衝區)
props.put("batch.size", 16384);
//默認緩衝區可當即發送,即使緩衝區空間沒有滿;可是,若是你想減小請求的數量,能夠設置linger.ms大於0.這將指示生產者發送請求以前等待一段時間
//但願更多的消息補填到未滿的批中。這相似於tcp的算法,例如上面的代碼段,可能100條消息在一個請求發送,由於咱們設置了linger時間爲1ms,而後,若是咱們
//沒有填滿緩衝區,這個設置將增長1ms的延遲請求以等待更多的消息。須要注意的是,在高負載下,相近的時間通常也會組成批,即便是linger.ms=0。
//不處於高負載的狀況下,若是設置比0大,以少許的延遲代價換取更少的,更有效的請求。
props.put("linger.ms", 1);
//buffer.memory控制生產者可用的緩存總量,若是消息發送速度比其傳輸到服務器的快,將會耗盡這個緩存空間。當緩存空間耗盡,其餘發送調用將被阻塞,阻塞時間的閾值
//經過max.block.ms設定,以後他將拋出一個TimeoutExecption。
props.put("buffer.memory", 33554432);
//key.serializer和value.serializer示例:將用戶提供的key和value對象ProducerRecord轉換成字節,你可使用附帶的ByteArraySerizlizaer或StringSerializer處理簡單的byte和String類型.
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//設置kafka的分區數量
props.put("kafka.partitions", 12);
Producer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 50; i++){
System.out.println("key-->key"+i+" value-->vvv"+i);
producer.send(new ProducerRecord<String, String>("aaa", "key"+i, "vvv"+i));
Thread.sleep(1000);
}
producer.close();
}
}
3.kafka消費者代碼:
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays;
import java.util.Properties;
/**
*
* @author FromX
*
*/
public class KConsumer {
public KafkaConsumer<String, String> getConsmer() {
Properties props = new Properties();
//設置kafka服務器
props.put("bootstrap.servers", "c1.wb3.com:6667,n1.wb1.com:6667");
//消費者羣組ID,發佈-訂閱模式,即若是一個生產者,多個消費者都要消費,那麼須要定義本身的羣組,同一個羣組內的消費者只有一個能消費到消息
props.put("group.id", "test");
//true,消費者的偏移量將在後臺按期提交;false關閉自動提交位移,在消息被完整處理以後再手動提交位移
props.put("enable.auto.commit", "true");
//如何設置爲自動提交(enable.auto.commit=true),這裏設置自動提交週期
props.put("auto.commit.interval.ms", "1000");
//session.timeout.ms:在使用kafka的組管理時,用於檢測消費者故障的超時
props.put("session.timeout.ms", "30000");
//key.serializer和value.serializer示例:將用戶提供的key和value對象ProducerRecord轉換成字節,你可使用附帶的ByteArraySerizlizaer或StringSerializer處理簡單的byte和String類型.
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
return consumer;
}
public static void main(String[] args) {
KConsumer kconsumer = new KConsumer();
KafkaConsumer<String, String> consumer = kconsumer.getConsmer();
consumer.subscribe(Arrays.asList("aaa"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.println("offset = "+record.offset()+", key = "+record.key()+", value = "+ record.value());
}
}
}
4.官網文檔地址:http://kafka.apache.org/documentation.html#configuration
5.極限狀況的數據丟失現象
a:即便將ack設置爲"all"也會在必定狀況下丟失消息,由於kafka的高性能特性,消息在寫入kafka時並無落盤而是寫入了OS buffer中,使用Os的髒頁刷新策略週期性落盤,就算落盤 仍然會有raid buffer。前者機器宕機數據丟失,後者機器跳電數據丟失。
b:對數據可靠性比較高的場景建議offset手動提交,自動提交當遇到業務系統上線關閉時,消息讀取而且offset已經提交,可是數據沒有儲存或者仍沒來得及消費時,消息狀態在內存中沒法保留,重啓應用會跳過消息,導致消息丟失。
---------------------
未分類
description: 本文介紹了Kafka實現事務性的幾個階段——正好一次語義與原子操做。以後詳細分析了Kafka事務機制的實現原理,並介紹了Kafka如何處理事務相關的異常狀況,如Transaction Coordinator宕機。最後介紹了Kafka的事務機制與PostgreSQL的MVCC以及Zookeeper的原子廣播實現事務的異同
本文全部Kafka原理性的描述除特殊說明外均基於Kafka 1.0.0版本。
Kafka事務機制的實現主要是爲了支持
Exactly Once
即正好一次語義Exactly Once
《Kafka背景及架構介紹》一文中有說明Kafka在0.11.0.0以前的版本中只支持At Least Once
和At Most Once
語義,尚不支持Exactly Once
語義。
可是在不少要求嚴格的場景下,如使用Kafka處理交易數據,Exactly Once
語義是必須的。咱們能夠經過讓下游系統具備冪等性來配合Kafka的At Least Once
語義來間接實現Exactly Once
。可是:
所以,Kafka自己對Exactly Once
語義的支持就很是必要。
操做的原子性是指,多個操做要麼所有成功要麼所有失敗,不存在部分紅功部分失敗的可能。
實現原子性操做的意義在於:
上文提到,實現Exactly Once
的一種方法是讓下游系統具備冪等處理特性,而在Kafka Stream中,Kafka Producer自己就是「下游」系統,所以若是能讓Producer具備冪等處理特性,那就可讓Kafka Stream在必定程度上支持Exactly once
語義。
爲了實現Producer的冪等語義,Kafka引入了Producer ID
(即PID
)和Sequence Number
。每一個新的Producer在初始化的時候會被分配一個惟一的PID,該PID對用戶徹底透明而不會暴露給用戶。
對於每一個PID,該Producer發送數據的每一個<Topic, Partition>
都對應一個從0開始單調遞增的Sequence Number
。
相似地,Broker端也會爲每一個<PID, Topic, Partition>
維護一個序號,而且每次Commit一條消息時將其對應序號遞增。對於接收的每條消息,若是其序號比Broker維護的序號(即最後一次Commit的消息的序號)大一,則Broker會接受它,不然將其丟棄:
InvalidSequenceNumber
DuplicateSequenceNumber
上述設計解決了0.11.0.0以前版本中的兩個問題:
上述冪等設計只能保證單個Producer對於同一個<Topic, Partition>
的Exactly Once
語義。
另外,它並不能保證寫操做的原子性——即多個寫操做,要麼所有被Commit要麼所有不被Commit。
更不能保證多個讀寫操做的的原子性。尤爲對於Kafka Stream應用而言,典型的操做便是從某個Topic消費數據,通過一系列轉換後寫回另外一個Topic,保證從源Topic的讀取與向目標Topic的寫入的原子性有助於從故障中恢復。
事務保證可以使得應用程序將生產數據和消費數據看成一個原子單元來處理,要麼所有成功,要麼所有失敗,即便該生產或消費跨多個<Topic, Partition>
。
另外,有狀態的應用也能夠保證重啓後從斷點處繼續處理,也即事務恢復。
爲了實現這種效果,應用程序必須提供一個穩定的(重啓後不變)惟一的ID,也即Transaction ID
。Transactin ID
與PID
可能一一對應。區別在於Transaction ID
由用戶提供,而PID
是內部的實現對用戶透明。
另外,爲了保證新的Producer啓動後,舊的具備相同Transaction ID
的Producer即失效,每次Producer經過Transaction ID
拿到PID的同時,還會獲取一個單調遞增的epoch。因爲舊的Producer的epoch比新Producer的epoch小,Kafka能夠很容易識別出該Producer是老的Producer並拒絕其請求。
有了Transaction ID
後,Kafka可保證:
Transaction ID
的新的Producer實例被建立且工做時,舊的且擁有相同Transaction ID
的Producer將再也不工做。須要注意的是,上述的事務保證是從Producer的角度去考慮的。從Consumer的角度來看,該保證會相對弱一些。尤爲是不能保證全部被某事務Commit過的全部消息都被一塊兒消費,由於:
這一節所說的事務主要指原子性,也即Producer將多條消息做爲一個事務批量發送,要麼所有成功要麼所有失敗。
爲了實現這一點,Kafka 0.11.0.0引入了一個服務器端的模塊,名爲Transaction Coordinator
,用於管理Producer發送的消息的事務性。
該Transaction Coordinator
維護Transaction Log
,該log存於一個內部的Topic內。因爲Topic數據具備持久性,所以事務的狀態也具備持久性。
Producer並不直接讀寫Transaction Log
,它與Transaction Coordinator
通訊,而後由Transaction Coordinator
將該事務的狀態插入相應的Transaction Log
。
Transaction Log
的設計與Offset Log
用於保存Consumer的Offset相似。
許多基於Kafka的應用,尤爲是Kafka Stream應用中同時包含Consumer和Producer,前者負責從Kafka中獲取消息,後者負責將處理完的數據寫回Kafka的其它Topic中。
爲了實現該場景下的事務的原子性,Kafka須要保證對Consumer Offset的Commit與Producer對發送消息的Commit包含在同一個事務中。不然,若是在兩者Commit中間發生異常,根據兩者Commit的順序可能會形成數據丟失和數據重複:
At Least Once
語義,可能形成數據重複。At Most Once
語義,可能形成數據丟失。爲了區分寫入Partition的消息被Commit仍是Abort,Kafka引入了一種特殊類型的消息,即Control Message
。該類消息的Value內不包含任何應用相關的數據,而且不會暴露給應用程序。它只用於Broker與Client間的內部通訊。
對於Producer端事務,Kafka以Control Message的形式引入一系列的Transaction Marker
。Consumer便可經過該標記斷定對應的消息被Commit了仍是Abort了,而後結合該Consumer配置的隔離級別決定是否應該將該消息返回給應用程序。
Producer<String, String> producer = new KafkaProducer<String, String>(props);
// 初始化事務,包括結束該Transaction ID對應的未完成的事務(若是有)
// 保證新的事務在一個正確的狀態下啓動
producer.initTransactions();
// 開始事務
producer.beginTransaction();
// 消費數據
ConsumerRecords<String, String> records = consumer.poll(100);
try{
// 發送數據
producer.send(new ProducerRecord<String, String>("Topic", "Key", "Value"));
// 發送消費數據的Offset,將上述數據消費與數據發送歸入同一個Transaction內
producer.sendOffsetsToTransaction(offsets, "group1");
// 數據發送及Offset發送均成功的狀況下,提交事務
producer.commitTransaction();
} catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
// 數據發送或者Offset發送出現異常時,終止事務
producer.abortTransaction();
} finally {
// 關閉Producer和Consumer
producer.close();
consumer.close();
}
Transaction Coordinator
因爲Transaction Coordinator
是分配PID和管理事務的核心,所以Producer要作的第一件事情就是經過向任意一個Broker發送FindCoordinator
請求找到Transaction Coordinator
的位置。
注意:只有應用程序爲Producer配置了Transaction ID
時纔可以使用事務特性,也才須要這一步。另外,因爲事務性要求Producer開啓冪等特性,所以經過將transactional.id
設置爲非空從而開啓事務特性的同時也須要經過將enable.idempotence
設置爲true來開啓冪等特性。
找到Transaction Coordinator
後,具備冪等特性的Producer必須發起InitPidRequest
請求以獲取PID。
注意:只要開啓了冪等特性即必須執行該操做,而無須考慮該Producer是否開啓了事務特性。
* 若是事務特性被開啓 *
InitPidRequest
會發送給Transaction Coordinator
。若是Transaction Coordinator
是第一次收到包含有該Transaction ID
的InitPidRequest請求,它將會把該<TransactionID, PID>
存入Transaction Log
,如上圖中步驟2.1所示。這樣可保證該對應關係被持久化,從而保證即便Transaction Coordinator
宕機該對應關係也不會丟失。
除了返回PID外,InitPidRequest
還會執行以下任務:
注意:InitPidRequest
的處理過程是同步阻塞的。一旦該調用正確返回,Producer便可開始新的事務。
另外,若是事務特性未開啓,InitPidRequest
可發送至任意Broker,而且會獲得一個全新的惟一的PID。該Producer將只能使用冪等特性以及單一Session內的事務特性,而不能使用跨Session的事務特性。
Kafka從0.11.0.0版本開始,提供beginTransaction()
方法用於開啓一個事務。調用該方法後,Producer本地會記錄已經開啓了事務,但Transaction Coordinator
只有在Producer發送第一條消息後才認爲事務已經開啓。
這一階段,包含了整個事務的數據處理過程,而且包含了多種請求。
AddPartitionsToTxnRequest
一個Producer可能會給多個<Topic, Partition>
發送數據,給一個新的<Topic, Partition>
發送數據前,它須要先向Transaction Coordinator
發送AddPartitionsToTxnRequest
。
Transaction Coordinator
會將該<Transaction, Topic, Partition>
存於Transaction Log
內,並將其狀態置爲BEGIN
,如上圖中步驟4.1所示。有了該信息後,咱們才能夠在後續步驟中爲每一個Topic, Partition>
設置COMMIT或者ABORT標記(如上圖中步驟5.2所示)。
另外,若是該<Topic, Partition>
爲該事務中第一個<Topic, Partition>
,Transaction Coordinator
還會啓動對該事務的計時(每一個事務都有本身的超時時間)。
ProduceRequest
Producer經過一個或多個ProduceRequest
發送一系列消息。除了應用數據外,該請求還包含了PID,epoch,和Sequence Number
。該過程如上圖中步驟4.2所示。
AddOffsetsToTxnRequest
爲了提供事務性,Producer新增了sendOffsetsToTransaction
方法,該方法將多組消息的發送和消費放入同一批處理內。
該方法先判斷在當前事務中該方法是否已經被調用並傳入了相同的Group ID。如果,直接跳到下一步;若不是,則向Transaction Coordinator
發送AddOffsetsToTxnRequests
請求,Transaction Coordinator
將對應的全部<Topic, Partition>
存於Transaction Log
中,並將其狀態記爲BEGIN
,如上圖中步驟4.3所示。該方法會阻塞直到收到響應。
TxnOffsetCommitRequest
做爲sendOffsetsToTransaction
方法的一部分,在處理完AddOffsetsToTxnRequest
後,Producer也會發送TxnOffsetCommit
請求給Consumer Coordinator
從而將本事務包含的與讀操做相關的各<Topic, Partition>
的Offset持久化到內部的__consumer_offsets
中,如上圖步驟4.4所示。
在此過程當中,Consumer Coordinator
會經過PID和對應的epoch來驗證是否應該容許該Producer的該請求。
這裏須要注意:
__consumer_offsets
的Offset信息在當前事務Commit前對外是不可見的。也即在當前事務被Commit前,可認爲該Offset還沒有Commit,也即對應的消息還沒有被完成處理。Consumer Coordinator
並不會當即更新緩存中相應<Topic, Partition>
的Offset,由於此時這些更新操做還沒有被COMMIT或ABORT。一旦上述數據寫入操做完成,應用程序必須調用KafkaProducer
的commitTransaction
方法或者abortTransaction
方法以結束當前事務。
EndTxnRequest
commitTransaction
方法使得Producer寫入的數據對下游Consumer可見。abortTransaction
方法經過Transaction Marker
將Producer寫入的數據標記爲Aborted
狀態。下游的Consumer若是將isolation.level
設置爲READ_COMMITTED
,則它讀到被Abort的消息後直接將其丟棄而不會返回給客戶程序,也即被Abort的消息對應用程序不可見。
不管是Commit仍是Abort,Producer都會發送EndTxnRequest
請求給Transaction Coordinator
,並經過標誌位標識是應該Commit仍是Abort。
收到該請求後,Transaction Coordinator
會進行以下操做
PREPARE_COMMIT
或PREPARE_ABORT
消息寫入Transaction Log
,如上圖中步驟5.1所示WriteTxnMarker
請求以Transaction Marker
的形式將COMMIT
或ABORT
信息寫入用戶數據日誌以及Offset Log
中,如上圖中步驟5.2所示COMPLETE_COMMIT
或COMPLETE_ABORT
信息寫入Transaction Log
中,如上圖中步驟5.3所示補充說明:對於commitTransaction
方法,它會在發送EndTxnRequest
以前先調用flush方法以確保全部發送出去的數據都獲得相應的ACK。對於abortTransaction
方法,在發送EndTxnRequest
以前直接將當前Buffer中的事務性消息(若是有)所有丟棄,但必須等待全部被髮送但還沒有收到ACK的消息發送完成。
上述第二步是實現將一組讀操做與寫操做做爲一個事務處理的關鍵。由於Producer寫入的數據Topic以及記錄Comsumer Offset的Topic會被寫入相同的Transactin Marker
,因此這一組讀操做與寫操做要麼所有COMMIT要麼所有ABORT。
WriteTxnMarkerRequest
上面提到的WriteTxnMarkerRequest
由Transaction Coordinator
發送給當前事務涉及到的每一個<Topic, Partition>
的Leader。收到該請求後,對應的Leader會將對應的COMMIT(PID)
或者ABORT(PID)
控制信息寫入日誌,如上圖中步驟5.2所示。
該控制消息向Broker以及Consumer代表對應PID的消息被Commit了仍是被Abort了。
這裏要注意,若是事務也涉及到__consumer_offsets
,即該事務中有消費數據的操做且將該消費的Offset存於__consumer_offsets
中,Transaction Coordinator
也須要向該內部Topic的各Partition的Leader發送WriteTxnMarkerRequest
從而寫入COMMIT(PID)
或COMMIT(PID)
控制信息。
寫入最終的COMPLETE_COMMIT
或COMPLETE_ABORT
消息
寫完全部的Transaction Marker
後,Transaction Coordinator
會將最終的COMPLETE_COMMIT
或COMPLETE_ABORT
消息寫入Transaction Log
中以標明該事務結束,如上圖中步驟5.3所示。
此時,Transaction Log
中全部關於該事務的消息所有能夠移除。固然,因爲Kafka內數據是Append Only的,不可直接更新和刪除,這裏說的移除只是將其標記爲null從而在Log Compact時再也不保留。
另外,COMPLETE_COMMIT
或COMPLETE_ABORT
的寫入並不須要獲得全部Rreplica的ACK,由於若是該消息丟失,能夠根據事務協議重發。
補充說明,若是參與該事務的某些<Topic, Partition>
在被寫入Transaction Marker
前不可用,它對READ_COMMITTED
的Consumer不可見,但不影響其它可用<Topic, Partition>
的COMMIT或ABORT。在該<Topic, Partition>
恢復可用後,Transaction Coordinator
會從新根據PREPARE_COMMIT
或PREPARE_ABORT
向該<Topic, Partition>
發送Transaction Marker
。
PID
與Sequence Number
的引入實現了寫操做的冪等性At Least Once
語義實現了單一Session內的Exactly Once
語義Transaction Marker
與PID
提供了識別消息是否應該被讀取的能力,從而實現了事務的隔離性Transaction Marker
)來實現事務中涉及的全部讀寫操做同時對外可見或同時對外不可見InvalidProducerEpoch
這是一種Fatal Error,它說明當前Producer是一個過時的實例,有Transaction ID
相同但epoch更新的Producer實例被建立並使用。此時Producer會中止並拋出Exception。
InvalidPidMapping
Transaction Coordinator
沒有與該Transaction ID
對應的PID。此時Producer會經過包含有Transaction ID
的InitPidRequest
請求建立一個新的PID。
NotCorrdinatorForGTransactionalId
該Transaction Coordinator
不負責該當前事務。Producer會經過FindCoordinatorRequest
請求從新尋找對應的Transaction Coordinator
。
InvalidTxnRequest
違反了事務協議。正確的Client實現不該該出現這種Exception。若是該異常發生了,用戶須要檢查本身的客戶端實現是否有問題。
CoordinatorNotAvailable
Transaction Coordinator
仍在初始化中。Producer只須要重試便可。
DuplicateSequenceNumber
發送的消息的序號低於Broker預期。該異常說明該消息已經被成功處理過,Producer能夠直接忽略該異常並處理下一條消息
InvalidSequenceNumber
這是一個Fatal Error,它說明發送的消息中的序號大於Broker預期。此時有兩種可能
max.inflight.requests.per.connection
被強制設置爲1,而acks
被強制設置爲all。故前面消息重試期間,後續消息不會被髮送,也即不會發生亂序。而且只有ISR中全部Replica都ACK,Producer纔會認爲消息已經被髮送,也即不存在Broker端數據丟失問題。InvalidTransactionTimeout
InitPidRequest
調用出現的Fatal Error。它代表Producer傳入的timeout時間不在可接受範圍內,應該中止Producer並報告給用戶。
Transaction Coordinator
失敗PREPARE_COMMIT/PREPARE_ABORT
前失敗Producer經過FindCoordinatorRequest
找到新的Transaction Coordinator
,並經過EndTxnRequest
請求發起COMMIT
或ABORT
流程,新的Transaction Coordinator
繼續處理EndTxnRequest
請求——寫PREPARE_COMMIT
或PREPARE_ABORT
,寫Transaction Marker
,寫COMPLETE_COMMIT
或COMPLETE_ABORT
。
PREPARE_COMMIT/PREPARE_ABORT
後失敗此時舊的Transaction Coordinator
可能已經成功寫入部分Transaction Marker
。新的Transaction Coordinator
會重複這些操做,因此部分Partition中可能會存在重複的COMMIT
或ABORT
,但只要該Producer在此期間沒有發起新的事務,這些重複的Transaction Marker
就不是問題。
COMPLETE_COMMIT/ABORT
後失敗舊的Transaction Coordinator
可能已經寫完了COMPLETE_COMMIT
或COMPLETE_ABORT
但在返回EndTxnRequest
以前失敗。該場景下,新的Transaction Coordinator
會直接給Producer返回成功。
transaction.timeout.ms
當Producer失敗時,Transaction Coordinator
必須可以主動的讓某些進行中的事務過時。不然沒有Producer的參與,Transaction Coordinator
沒法判斷這些事務應該如何處理,這會形成:
Transaction Coordinator
須要維護大量的事務狀態,大量佔用內存Transaction Log
內也會存在大量數據,形成新的Transaction Coordinator
啓動緩慢READ_COMMITTED
的Consumer須要緩存大量的消息,形成沒必要要的內存浪費甚至是OOMTransaction ID
不一樣的Producer交叉寫同一個Partition,當一個Producer的事務狀態不更新時,READ_COMMITTED
的Consumer爲了保證順序消費而被阻塞爲了不上述問題,Transaction Coordinator
會週期性遍歷內存中的事務狀態Map,並執行以下操做
BEGIN
而且其最後更新時間與當前時間差大於transaction.remove.expired.transaction.cleanup.interval.ms
(默認值爲1小時),則主動將其終止:1)未避免原Producer臨時恢復與當前終止流程衝突,增長該Producer對應的PID的epoch,並確保將該更新的信息寫入Transaction Log
;2)以更新後的epoch回滾事務,從而使得該事務相關的全部Broker都更新其緩存的該PID的epoch從而拒絕舊Producer的寫操做PREPARE_COMMIT
,完成後續的COMMIT流程————向各<Topic, Partition>
寫入Transaction Marker
,在Transaction Log
內寫入COMPLETE_COMMIT
PREPARE_ABORT
,完成後續ABORT流程Transaction ID
某Transaction ID
的Producer可能很長時間再也不發送數據,Transaction Coordinator
不必再保存該Transaction ID
與PID
等的映射,不然可能會形成大量的資源浪費。所以須要有一個機制探測再也不活躍的Transaction ID
並將其信息刪除。
Transaction Coordinator
會週期性遍歷內存中的Transaction ID
與PID
映射,若是某Transaction ID
沒有對應的正在進行中的事務而且它對應的最後一個事務的結束時間與當前時間差大於transactional.id.expiration.ms
(默認值是7天),則將其從內存中刪除並在Transaction Log
中將其對應的日誌的值設置爲null從而使得Log Compact可將其記錄刪除。
Kafka的事務機制與《MVCC PostgreSQL實現事務和多版本併發控制的精華》一文中介紹的PostgreSQL經過MVCC實現事務的機制很是相似,對於事務的回滾,並不須要刪除已寫入的數據,都是將寫入數據的事務標記爲Rollback/Abort從而在讀數據時過濾該數據。
Kafka的事務機制與《分佈式事務(一)兩階段提交及JTA》一文中所介紹的兩階段提交機制看似類似,都分PREPARE階段和最終COMMIT階段,但又有很大不一樣。
PREPARE_COMMIT
仍是PREPARE_ABORT
,而且只須在Transaction Log
中標記便可,無須其它組件參與。而兩階段提交的PREPARE須要發送給全部的分佈式事務參與方,而且事務參與方須要儘量準備好,並根據準備狀況返回Prepared
或Non-Prepared
狀態給事務管理器。PREPARE_COMMIT
或PREPARE_ABORT
,則肯定該事務最終的結果應該是被COMMIT
或ABORT
。而分佈式事務中,PREPARE後由各事務參與方返回狀態,只有全部參與方均返回Prepared
狀態纔會真正執行COMMIT,不然執行ROLLBACKTransaction Coordinator
實例,而分佈式事務中只有一個事務管理器Zookeeper的原子廣播協議與兩階段提交以及Kafka事務機制有類似之處,但又有各自的特色
Transaction Coordinator
實例,擴展性較好。而Zookeeper寫操做只能在Leader節點進行,因此其寫性能遠低於讀性能。