今年6月發佈的kafka 0.11.0.0包含兩個比較大的特性,exactly once delivery和transactional transactional messaging。以前一直對事務這塊比較感興趣,因此抽空詳細學習了一下,感受收穫仍是挺多的。html
對這兩個特性的詳細描述能夠看這三篇文檔,數據庫
https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messagingapache
https://cwiki.apache.org/confluence/display/KAFKA/Idempotent+Producer網絡
https://cwiki.apache.org/confluence/display/KAFKA/Transactional+Messaging+in+Kafka架構
消息重複一直是消息領域的一個痛點,而消息重複可能發生於下面這些場景app
1.消息發送端發出消息,服務端落盤之後由於網絡等種種緣由發送端獲得一個發送失敗的響應,而後發送端重發消息致使消息重複。 異步
2.消息消費端在消費過程當中掛掉另外一個消費端啓動拿以前記錄的位點開始消費,因爲位點的滯後性可能會致使新啓動的客戶端有少許重複消費。分佈式
先說說問題2,通常的解決方案是讓下游作冪等或者儘可能每消費一條消息都記位點,對於少數嚴格的場景可能須要把位點和下游狀態更新放在同一個數據庫裏面作事務來保證精確的一次更新或者在下游數據表裏面同時記錄消費位點,而後更新下游數據的時候用消費位點作樂觀鎖拒絕掉舊位點的數據更新。
問題1的解決方案也就是kafka實現的方案是每一個producer有一個producer id,服務端會經過這個id關聯記錄每一個producer的狀態,每一個producer的每條消息會帶上一個遞增的sequence,服務端會記錄每一個producer對應的當前最大sequence,若是新的消息帶上的sequence不大於當前的最大sequence就拒絕這條消息,問題1的場景若是消息落盤會同時更新最大sequence,這個時候重發的消息會被服務端拒掉從而避免消息重複。後面展開詳細說一下這個解決方案。
ide
1.最簡單的需求是producer發的多條消息組成一個事務這些消息須要對consumer同時可見或者同時不可見
2.producer可能會給多個topic,多個partition發消息,這些消息也須要能放在一個事務裏面,這就造成了一個典型的分佈式事務
3.kafka的應用場景常常是應用先消費一個topic,而後作處理再發到另外一個topic,這個consume-transform-produce過程須要放到一個事務裏面,好比在消息處理或者發送的過程當中若是失敗了,消費位點也不能提交
4.producer或者producer所在的應用可能會掛掉,新的producer啓動之後須要知道怎麼處理以前未完成的事務
5.流式處理的拓撲可能會比較深,若是下游只有等上游消息事務提交之後才能讀到,可能會致使rt很是長吞吐量也隨之降低不少,因此須要實現read committed和read uncommitted兩種事務隔離級別
一個比較典型的consume-transform-produce的場景像下面這樣 學習
public class KafkaTransactionsExample { public static void main(String args[]) { KafkaConsumer consumer = new KafkaConsumer<>(consumerConfig); KafkaProducer producer = new KafkaProducer<>(producerConfig); producer.initTransactions(); while(true) { ConsumerRecords records = consumer.poll(CONSUMER_POLL_TIMEOUT); if (!records.isEmpty()) { producer.beginTransaction(); List> outputRecords = processRecords(records); for (ProducerRecord outputRecord : outputRecords) { producer.send(outputRecord); } sendOffsetsResult = producer.sendOffsetsToTransaction(getUncommittedOffsets()); producer.endTransaction(); } } } }
1.由於producer發送消息多是分佈式事務,因此引入了經常使用的2PC,因此有事務協調者(Transaction Coordinator)。Transaction Coordinator和以前爲了解決腦裂和驚羣問題引入的Group Coordinator在選舉和failover上面相似。
2.事務管理中事務日誌是必不可少的,kafka使用一個內部topic來保存事務日誌,這個設計和以前使用內部topic保存位點的設計保持一致。事務日誌是Transaction Coordinator管理的狀態的持久化,由於不須要回溯事務的歷史狀態,因此事務日誌只用保存最近的事務狀態。
3.由於事務存在commit和abort兩種操做,而客戶端又有read committed和read uncommitted兩種隔離級別,因此消息隊列必須能標識事務狀態,這個被稱做Control Message。
4.producer掛掉重啓或者漂移到其它機器須要能關聯的以前的未完成事務因此須要有一個惟一標識符來進行關聯,這個就是TransactionalId,一個producer掛了,另外一個有相同TransactionalId的producer可以接着處理這個事務未完成的狀態。注意不要把TransactionalId和數據庫事務中常見的transaction id搞混了,kafka目前沒有引入全局序,因此也沒有transaction id,這個TransactionalId是用戶提早配置的。
5. TransactionalId能關聯producer,也須要避免兩個使用相同TransactionalId的producer同時存在,因此引入了producer epoch來保證對應一個TransactionalId只有一個活躍的producer epoch
官方文檔的數據流組件圖
上圖中每一個方框表明一臺獨立的機器,圖下方比較長的圓角矩形表明kafka topic,圖中間的兩個角是圓的的方框表明broker裏面的邏輯組件,箭頭表明rpc調用。
接下來講一下事務的數據流,這裏基本按照官方文檔的結構加上我本身看代碼的一點補充
1.首先producer須要找到transaction coordinator。TransactionManager.lookupCoordinator
private synchronized void lookupCoordinator(FindCoordinatorRequest.CoordinatorType type, String coordinatorKey) { switch (type) { case GROUP: consumerGroupCoordinator = null; break; case TRANSACTION: transactionCoordinator = null; break; default: throw new IllegalStateException("Invalid coordinator type: " + type); } FindCoordinatorRequest.Builder builder = new FindCoordinatorRequest.Builder(type, coordinatorKey); enqueueRequest(new FindCoordinatorHandler(builder)); }
2.獲取producer id,producer id是比較重要的概念,精確一次投遞須要producer id+sequence防止重複投遞,事務消息也須要保存transactional id和producer id的對應關係。客戶端調用KafkaProducer.initTransactions的時候會向coordinator請求producer id,TransactionManager.initializeTransactions
public synchronized TransactionalRequestResult initializeTransactions() { ensureTransactional(); transitionTo(State.INITIALIZING); setProducerIdAndEpoch(ProducerIdAndEpoch.NONE); this.sequenceNumbers.clear(); InitProducerIdRequest.Builder builder = new InitProducerIdRequest.Builder(transactionalId, transactionTimeoutMs); InitProducerIdHandler handler = new InitProducerIdHandler(builder); enqueueRequest(handler); return handler.result; }
coordinator端處理邏輯在TransactionCoordinator.handleInitProducerId流程比較複雜,首先若是對應的transactional id沒有產生過producer id會找producerIdManager生成一個
val coordinatorEpochAndMetadata = txnManager.getTransactionState(transactionalId).right.flatMap { case None => val producerId = producerIdManager.generateProducerId() val createdMetadata = new TransactionMetadata(transactionalId = transactionalId, producerId = producerId, producerEpoch = RecordBatch.NO_PRODUCER_EPOCH, txnTimeoutMs = transactionTimeoutMs, state = Empty, topicPartitions = collection.mutable.Set.empty[TopicPartition], txnLastUpdateTimestamp = time.milliseconds()) txnManager.putTransactionStateIfNotExists(transactionalId, createdMetadata) case Some(epochAndTxnMetadata) => Right(epochAndTxnMetadata) }
producer id須要全局惟一,有點相似於tddl sequence的生成邏輯,ProducerIdManager.generateProducerId會一次申請一批id而後在zk上面保存狀態,本地每次生成+1,若是超出了當前批次的範圍就去找zk從新申請
拿到了producer id接下來處理事務狀態,保證以前的事務狀態可以處理完畢,該提交的提交,該回滾的回滾。
TransactionCoordinator.prepareInitProduceIdTransit處理producer id的變化好比開始一個新的事務可能會增長producer epoch,也可能生成新的producer id
case PrepareAbort | PrepareCommit => // reply to client and let it backoff and retry Left(Errors.CONCURRENT_TRANSACTIONS) case CompleteAbort | CompleteCommit | Empty => val transitMetadata = if (txnMetadata.isProducerEpochExhausted) { val newProducerId = producerIdManager.generateProducerId() txnMetadata.prepareProducerIdRotation(newProducerId, transactionTimeoutMs, time.milliseconds()) } else { txnMetadata.prepareIncrementProducerEpoch(transactionTimeoutMs, time.milliseconds()) } Right(coordinatorEpoch, transitMetadata) case Ongoing => // indicate to abort the current ongoing txn first. Note that this epoch is never returned to the // user. We will abort the ongoing transaction and return CONCURRENT_TRANSACTIONS to the client. // This forces the client to retry, which will ensure that the epoch is bumped a second time. In // particular, if fencing the current producer exhausts the available epochs for the current producerId, // then when the client retries, we will generate a new producerId. Right(coordinatorEpoch, txnMetadata.prepareFenceProducerEpoch())
最後若是以前的事務處於進行中的狀態會回滾事務
handleEndTransaction(transactionalId, newMetadata.producerId, newMetadata.producerEpoch, TransactionResult.ABORT, sendRetriableErrorCallback)
或者就是新事務,往事務日誌裏面插一條日誌(對應數據流圖中的2a)
txnManager.appendTransactionToLog(transactionalId, coordinatorEpoch, newMetadata, sendPidResponseCallback)
3.客戶端調用KafkaProducer.beginTransaction開始新事務。這一步相對簡單,就是客戶端設置狀態成State.IN_TRANSACTION
4.consume-transform-produce過程,這一步是實際消費消息和生產消息的過程。
(1)客戶端發送消息時(KafkaProducer.send),對於新碰到的TopicPartition會觸發AddPartitionsToTxnRequest。服務端對應的處理在TransactionCoordinator.handleAddPartitionsToTransaction,主要作的事情是更新事務元數據和記錄事務日誌(對應數據流圖中的4.1a)。在事務中記錄partition的做用是後面給事務每一個partition發送提交或者回滾標記時須要事務全部的partition。
(2)客戶端經過KafkaProducer.send發送消息(ProduceRequest),比較早的kafka版本增長了PID,epoch,sequence number等幾個字段,對應數據流圖中的4.2a
(3)客戶端調用KafkaProducer.sendOffsetsToTransaction保存事務消費位點。服務端的處理邏輯在TransactionCoordinator.handleAddPartitionsToTransaction,和4.1基本是同樣的,不一樣的是4.3記錄的是記錄消費位點的topic(GROUP_METADATA_TOPIC_NAME)。
(4)4.3調用的後半部分會觸發TxnOffsetCommitRequest,經過數據消息的方式把消費位點持久化到GROUP_METADATA_TOPIC_NAME(__consumer-offsets)這個topic裏面去,對應數據流圖中的4.4a。
客戶端發起邏輯在AddOffsetsToTxnHandler.handleResponse
if (error == Errors.NONE) { log.debug("{}Successfully added partition for consumer group {} to transaction", logPrefix, builder.consumerGroupId()); // note the result is not completed until the TxnOffsetCommit returns pendingRequests.add(txnOffsetCommitHandler(result, offsets, builder.consumerGroupId())); transactionStarted = true; }
由於須要處理可見性相關的邏輯,服務端事務消費位點和普通消費位點提交的處理邏輯稍有不一樣,調用GroupCoordinator.handleTxnCommitOffsets而不是handleCommitOffsets。
5.結束事務須要調用KafkaProducer.commitTransaction或者KafkaProducer.abortTransaction
(1)首先客戶端會發送一個EndTxnRequest,而服務端由TransactionCoordinator.handleEndTransaction處理。
handleEndTransaction首先會作一個可能的狀態轉換讓事務進入預提交或者預放棄階段
else txnMetadata.state match { case Ongoing => val nextState = if (txnMarkerResult == TransactionResult.COMMIT) PrepareCommit else PrepareAbort Right(coordinatorEpoch, txnMetadata.prepareAbortOrCommit(nextState, time.milliseconds()))
接下來會在事務日誌裏面記錄PREPARE_COMMIT或者PREPARE_ABORT日誌,對應數據流圖中的5.1a
txnManager.appendTransactionToLog(transactionalId, coordinatorEpoch, newMetadata, sendTxnMarkersCallback)
再接下來會往用戶數據日誌裏面發送COMMIT或者ABORT的Control Message,最後往事務日誌裏面寫入COMMIT或者ABORT,纔算完成了事務的提交過程。這個過程是用回調的方式組織起來的,代碼的流程是TransactionStateManager.appendTransactionToLog->TransactionMarkerChannelManager.addTxnMarkersToSend->TransactionStateManager.appendTransactionToLog
(2)往用戶數據日誌裏面發送COMMIT或者ABORT的Control Message的過程,對應數據流圖中的5.2a
發起點在回調方法sendTxnMarkersCallback,這個方法首先會作轉檯轉換讓事務進入CompleteCommit或者CompleteAbort狀態
case PrepareCommit => if (txnMarkerResult != TransactionResult.COMMIT) logInvalidStateTransitionAndReturnError(transactionalId, txnMetadata.state, txnMarkerResult) else Right(txnMetadata, txnMetadata.prepareComplete(time.milliseconds())) case PrepareAbort => if (txnMarkerResult != TransactionResult.ABORT) logInvalidStateTransitionAndReturnError(transactionalId, txnMetadata.state, txnMarkerResult) else Right(txnMetadata, txnMetadata.prepareComplete(time.milliseconds()))
最後會往事務相關的每一個broker發送WriteTxnMarkersRequest,若是事務包含消費位點也會往__consumer-offsets所在的broker發請求。 broker端的處理在KafkaApis.handleWriteTxnMarkersRequest會把control message寫入日誌
replicaManager.appendRecords( timeout = config.requestTimeoutMs.toLong, requiredAcks = -1, internalTopicsAllowed = true, isFromClient = false, entriesPerPartition = controlRecords, responseCallback = maybeSendResponseCallback(producerId, marker.transactionResult))
(3)事務日誌寫入最終的COMMIT或者ABORT日誌,對應數據流圖的5.3,這一步完成了一個事務就算完全完成了。
發起點在回調方法appendToLogCallback
大體列一下流程中的關鍵節點
1.在客戶端每次發送消息以前會檢查是否有producerId若是沒有會找服務端去申請,Sender.run
if (transactionManager != null) { if (!transactionManager.isTransactional()) { // this is an idempotent producer, so make sure we have a producer id maybeWaitForProducerId(); }
服務端會在TransactionCoordinator.handleInitProducerId處理,前面事務消息提到過
2.在生成消息內容的時候(RecordAccumulator.drain)會獲取當前的的sequenceNumber(TransactionManager.sequenceNumber)放到消息體裏面。而sequenceNumber的自增是在發送上一批消息返回是觸發的(Sender.handleProduceResponse)。
3.broker實際寫入消息以前(Log.append)纔會對sequenceNumber進行校驗,校驗的具體邏輯在ProducerStateManager.validateAppend
1.sequenceNumber能夠設計成每一個producer惟一或者更細粒度的對每一個topic-partition惟一,topic-partition惟一的好處是對於每一個topic-partition sequenceNumber能夠設計成連續的,這樣broker端能夠作更強的校驗,好比檢查丟消息,kafka使用的就是細粒度的方法,發現sequenceNumber不連續的時候會拋異常OutOfOrderSequenceException
2.發消息(KafkaProducer.doSend)是個異步的過程,但同時提供Future返回值使得在必要的時候能夠把異步變成同步等待。kafka也實現了攢消息批量發送的能力(RecordAccumulator.append),攢消息的存放方式是一個大hash map,key是topic-partition,消息實際刷出和發送在一個單獨的線程中執行,調用Sender.sendProducerData。被刷出的消息的斷定在RecordAccumulator.ready,主要依據是消息集是否已滿或者是否超時。
3.Consumer消費的時候怎樣控制事務可見性呢?一個比較直觀的方法就是先把事務消息buffer起來,而後遇到提交或者回滾標誌的時候作相應的處理,kafka處理的更巧妙一些。首先是不能讀到未提交的事務的控制,kafka引入了lastStableOffset這個概念,lastStableOffset是當前已經提交的事務的最大位點。在ReplicaManager.readFromLocalLog裏面有控制,
val initialHighWatermark = localReplica.highWatermark.messageOffset val lastStableOffset = if (isolationLevel == IsolationLevel.READ_COMMITTED) Some(localReplica.lastStableOffset.messageOffset) else None // decide whether to only fetch committed data (i.e. messages below high watermark) val maxOffsetOpt = if (readOnlyCommitted) Some(lastStableOffset.getOrElse(initialHighWatermark)) else None ... val fetch = log.read(offset, adjustedFetchSize, maxOffsetOpt, minOneMessage, isolationLevel)
這樣未提交的事務對客戶端就不可見了
4.還有一個需求是要識別而且跳過那些在aborted事務內的消息,這些消息可能和非事務消息混在一塊兒。kafka讀消息的返回信息中會帶上本批讀取的消息中回滾事務列表來幫助客戶端跳過。
case class FetchDataInfo(fetchOffsetMetadata: LogOffsetMetadata, records: Records, firstEntryIncomplete: Boolean = false, abortedTransactions: Option[List[AbortedTransaction]] = None)
回滾事務列表是在讀取消息日誌(Log.read)的過程當中擼的
return isolationLevel match { case IsolationLevel.READ_UNCOMMITTED => fetchInfo case IsolationLevel.READ_COMMITTED => addAbortedTransactions(startOffset, segmentEntry, fetchInfo) }
那接下來的問題是kafka是如何快速拿到回滾事務列表的呢?kafka爲這件事作了一個文件索引,文件後綴名是'.txnindex',相關的管理邏輯在TransactionIndex。
5.broker端對於producer的狀態管理,broker須要記錄producer對應的最大sequenceNumber,epoch之類的信息。相關邏輯是放在ProducerStateManager裏面的,broker每次寫入消息的時候(Log.append)都會更新producer信息(ProducerStateManager.update)。因爲只有當有消息寫入的時候producer state纔會被更新,因此當broker掛掉的時候producer的狀態須要被持久化,kafka又弄了一個文件'.snapshot'來持久化producer信息。
感受kafka在設計上概念的統一和架構的連貫上作的特別好,好比producerId的引入把精確一次投遞和事務消息都給串聯起來了。
印象更深入的例子是kafka早先幾個版本依次推出了幾個特性,
1.把位點當作普通消息保存
2.加入了消息清理機制,只保留key最新的value
3.加入了broker端的coordinator解決驚羣和腦裂問題
而後這幾個特性在事務消息這塊都用上了,首先把位點當普通消息保存在概念上統一了消息發送和消費,同時消息同步也成了broker之間同步狀態的基礎機制這樣就不用再弄一套狀態同步機制了,不過這樣作的缺點是隻有寫消息才能同步broker狀態某些特殊狀況可能有點小麻煩。利用消息隊列保存狀態的一個毛病是比較浪費資源,而消息清理機制剛好解決了這個問題。最後是broker端的coordinator機制能夠用在consumer group協調者也能夠用在事務協調者上面。這種層次遞進的特性累加真是至關有美感並且感受是深思熟慮的結果。
https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging
https://cwiki.apache.org/confluence/display/KAFKA/Idempotent+Producer
https://cwiki.apache.org/confluence/display/KAFKA/Transactional+Messaging+in+Kafka
https://medium.com/@jaykreps/exactly-once-support-in-apache-kafka-55e1fdd0a35f
http://jianbeike.blogspot.com/2017/10/kafka.html