Kafka添加了事務機制之後,consumer端有個須要解決的問題就是怎麼樣從收到的消息中濾掉aborted的消息。Kafka經過broker和consumer端的協做,利用一系列優化手段極大地下降了這部分工做的開銷。服務器
首先來看一下這部分工做的難點在哪。app
對於isolation.level爲read_committed的消費者來講,它只想獲取committed的消息。可是在服務器端的存儲中,committed的消息、aborted的消息、以及正在進行中的事務的消息在Log裏是緊挨在一塊兒的,並且這些狀態的消息可能源於不一樣的producerId。因此,若是broker對FetchRequest的處理和加入事務機制前同樣,那麼consumer就須要作不少地清理工做,並且須要buffer消息直到control marker的到來。那麼,就無端浪費了不少流量,並且consumer端的內存管理也很成問題。ide
Kafka大致採用了三個措施一塊兒來解決這個問題。性能
Kafka添加了一個很重要概念,叫作LSO,即last stable offset。對於同一個TopicPartition,其offset小於LSO的全部transactional message的狀態都已肯定,要不就是committed,要不就是aborted。而broker對於read_committed的consumer,只提供offset小於LSO的消息。這樣就避免了consumer收到狀態不肯定的消息,而不得不buffer這些消息。fetch
對於每一個LogSegment(對應於一個log文件),broker都維護一個aborted transaction index. 這是一個append only的文件,每當有事務被abort時,就會有一個entry被append進去。這個entry的格式是:優化
TransactionEntry => Version => int16 PID => int64 FirstOffset => int64 LastOffset => int64 LastStableOffset => int64
這涉及到FetchResponse的消息格式的變化,在FetchResponse裏包含了其中每一個TopicPartition的記錄裏的aborted transactions的信息,consumer使用這些信息,能夠更高效地從FetchResponse裏包含的消息裏過濾掉被abort的消息。spa
// FetchResponse v4 FetchResponse => ThrottleTime [TopicName [Partition ErrorCode HighwaterMarkOffset LastStableOffset AbortedTransactions MessageSetSize MessageSet]] ThrottleTime => int32 TopicName => string Partition => int32 ErrorCode => int16 HighwaterMarkOffset => int64
LastStableOffset => int64
AbortedTransactions => [PID FirstOffset] PID => int64 FirstOffset => int64 MessageSetSize => int32
(如下對只針對read_committed的consumer)debug
consumer端會根據fetch response裏提供的aborted transactions裏過濾掉aborted的消息,只返回給用戶committed的消息。code
其核心邏輯是這樣的:orm
首先,因爲broker只返回LSO以前的消息給consumer,因此consumer拉取的消息只有兩種可能的狀態:committed和aborted。
活躍的aborted transaction的pid集合
而後, 對於每一個在被fetch的消息裏包含的TopicPartition, consumer維護一個producerId的集合,這個集合就是當前活躍的aborted transaction所使用的pid。一個aborted transaction是「活躍的」,是說:在過濾過程當中,當前的待處理的消息的offset處於這個這個aborted transaction的initial offset和last offset之間。有了這個活躍的aborted transaction對應的PID的集合(如下簡稱"pid集合"),在過濾消息時,只要看一下這個消息的PID是否在此集合中,若是是,那麼消息就確定是aborted的,若是不是,那就是committed的。
這個pid集合在過濾的過程當中,是不斷變化的,爲了維護這個集合,consumer端還會對於每一個在被fetch的消息裏包含的TopicPartition 維護一個aborted transaction構成的mini heap, 這個heap是以aborted transaction的intial offset排序的。
public static final class AbortedTransaction { public final long producerId; public final long firstOffset; ... } private class PartitionRecords { private final TopicPartition partition; private final CompletedFetch completedFetch; private final Iterator<? extends RecordBatch> batches; private final Set<Long> abortedProducerIds; private final PriorityQueue<FetchResponse.AbortedTransaction> abortedTransactions; ... }
//這個heap的初始化過程,能夠看出是按offset排序的
private PriorityQueue<FetchResponse.AbortedTransaction> abortedTransactions(FetchResponse.PartitionData partition) {
if (partition.abortedTransactions == null || partition.abortedTransactions.isEmpty())
return null;
PriorityQueue<FetchResponse.AbortedTransaction> abortedTransactions = new PriorityQueue<>(
partition.abortedTransactions.size(),
new Comparator<FetchResponse.AbortedTransaction>() {
@Override
public int compare(FetchResponse.AbortedTransaction o1, FetchResponse.AbortedTransaction o2) {
return Long.compare(o1.firstOffset, o2.firstOffset);
}
}
);
abortedTransactions.addAll(partition.abortedTransactions);
return abortedTransactions;
}
按照Kafka文檔裏的說法:
- If the message is a transaction control message, and the status is ABORT, then remove the corresponding PID from the set of PIDs with active aborted transactions. If the status is COMMIT, ignore the message.
If the message is a normal message, compare the offset and PID with the head of the aborted transaction minheap. If the PID matches and the offset is greater than or equal to the corresponding initial offset from the aborted transaction entry, remove the head from the minheap and insert the PID into the set of PIDs with aborted transactions.
Check whether the PID is contained in the aborted transaction set. If so, discard the record set; otherwise, add it to the records to be returned to the user.
可是實際上考慮到batch的問題,狀況會比這簡單一些。在producer端發送的時候,同一個TopicPartition的不一樣transaction的消息是不可能在同一個message batch裏的, 並且committed的消息和aborted的消息也不可能在同一batch裏。由於在不一樣transaction的消息之間,確定會有transaction marker, 而transaction marker是單獨的一個batch。這就使得,一個batch要不所有被aborted了,要不所有被committed了。因此過濾aborted transaction時就能夠一次過濾一個batch,而非一條消息。
相關代碼爲PartitionRecords#nextFetchedRecord()中:
if (isolationLevel == IsolationLevel.READ_COMMITTED && currentBatch.hasProducerId()) { // remove from the aborted transaction queue all aborted transactions which have begun // before the current batch's last offset and add the associated producerIds to the // aborted producer set
//從aborted transaction裏移除那些其inital offset在當前的batch的末尾以前的那些。
//由於這些transaction開始於當前batch以前,而在處理這個batch以前沒有結束,因此它要不是活躍的aborted transaction,要不當前的batch就是control batch
//這裏須要考慮到aborted transaction可能開始於此次fetch到的全部records以前
consumeAbortedTransactionsUpTo(currentBatch.lastOffset()); long producerId = currentBatch.producerId(); if (containsAbortMarker(currentBatch)) { abortedProducerIds.remove(producerId); //若是當前batch是abort marker, 那麼它對應的transaction就結束了,因此從pid集合裏移除它對應的pid。 } else if (isBatchAborted(currentBatch)) { //若是當前batch被abort了,那就跳過它 log.debug("Skipping aborted record batch from partition {} with producerId {} and " + "offsets {} to {}", partition, producerId, currentBatch.baseOffset(), currentBatch.lastOffset()); nextFetchOffset = currentBatch.nextOffset(); continue; } }
經過對aborted transaction index和LSO的使用,Kafka使得consumer端能夠高效地過濾掉aborted transaction裏的消息,從而減少了事務機制的性能開銷。