0.11 版本之前保證的語義是:至少一次node
至少一次的解釋spa
可以作到消息不丟失--> 可以作到發送成功的消息一定可以被消費到。class
不能作到消息不重復。filter
## 發送成功的消息,表示業務邏輯認爲此消息已發送成功,即send方法已執行完成。 view
丟消息場景vi
異步發送端:360
a:send之後,等待發送的時候down(消息在緩衝區中),導致消息丟失。data
b:send時,緩衝區已滿,導致消息丟棄。index
同步(異步)發送端:
a:有限的重試
服務端Leader down時,因爲有與zk的超時timeout,導致在timeout之後才會進行切換, 如果重試次數 * 重試間隔 < zk session timeout + 切換耗時,則消息會丟失。
b:ack != -1
ack = 0 的場景,不需服務端確認,發送後,Leader down,導致消息丟失。
ack = 1 的場景,只需要Leader確認,Leader收到消息後,未同步到Replica之前,Leader down,導致消息丟失。
服務端:
a:min.insync.replicas < 2 且 unclean.leader.election.enable = true
min.insync.replicas < 2 的場景下,如果副本均落後Leader,在Leader down時,根據髒選開關,會選擇落後的副本做爲新的Leader,則落後的數據會丟失。
消費端:
a:自動offset提交
消息處理失敗,但是offset也提交了,對業務來說消息丟失。
b:先手動提交offset,後處理消息
先提交offset,後處理消息,但是處理邏輯失敗,對業務來說消息丟失。
不丟失數據的方法
發送端:
a:同步發送
b:retries = Long.MAX_VALUE
c:acks = -1
服務端:
a:replication factor = 3
b:min.insync.replicas = 2
c:unclean.leader.election.enable = false
消費端:
a:auto.commit.enable = false
b:僅當消息處理成功之後再提交offset
消息重復的場景
發送端:
a:發送端重試
發送端發送消息後,服務端實際已接收,但是客戶端因爲網絡或其他原因未收到確認響應, 再進行消息重試發送,導致消息重復。
消費端:
a:auto.commit.enable = true
消息處理後,爲進行自動offset提交之前,consumer down,恢復後從上個提交點開始消費導致消息處理重復。
消息不重復的方法
發送端 --> 作不到。
消費端 --> auto.commit.enable = false,並且等消息處理完成再提交offset。
0.11之後版本保證的語義是:恰好一次
恰好一次的解釋
可以作到消息不丟失 --> 可以作到發送成功的消息一定可以被消費到。
可以作到消息不重復。
發送端作不到消息不重復的解決辦法
方法:
給Producer編號,且給每條消息編號,服務端保存Producer編號與此Producer最後一條消息的映射,服務端校驗當前收到的消息是否與保存的最後一條消息編號相同,如果相同則拒絕。
配置:
enable.idempotence = true -->
retries = Integer.MAX_VALUE
max.in.flight.requests.per.connection = 1
acks = -1
過程:
a:Producer發送InitProducerIdRequest獲取ProducerIdAndEpoch producerId 是Broker從zk分段獲取後遞增分配的,保證惟一,epoch爲事務所用,暫時不提。
b:Producer發送消息(消息中加入ProducerIdAndEpoch,消息序列號(0))。
c:服務端收到消息,內存中維護 ProducerIdAndEpoch與此消息的映射,數據落盤, 同步到副本。
d:Producer發送消息(消息中加入ProducerIdAndEpoch,消息序列號(1))。
e:服務端收到消息,判斷內存中的映射是否存在,如不存在,更新映射,數據落盤, 同步到副本,如存在,返回消息重復異常。
判斷是否重復的條件:
isFromClient == true &&
batch.producerIdAndEpoch == producerIdAndEpoch &&
batch.baseSequence == firstSeq &&
batch.lastSequence == lastSeq
幾個問題:
a:重試的場景中,ProducerIdAndEpoch映射的會不會不是上次重試的消息。
max.in.flight.requests.per.connection代表了同時只能有一條(批)消
息在發送,retries 保證了必須發送成功才會進行下一條的發送,所以不會有映射成其他消息 的情況。
b:有了映射緩存,其他副本沒成功復制怎麼辦 acks = -1 保證了其他副本必須復制成功。
c:服務端啓動或新選舉爲Leader的時候,緩存內容爲空怎麼判斷重復 會先構造緩存內容再提供服務 初始化時加載.snapshot文件,append消息時更新緩存,退出時,寫入.snapshot文件。
事務消息
事務消息語義
保證發送消息不重復。
保證多條消息(發往不同服務端)發送的原子性,即同時提交或同時回滾。
保證 「消費-處理-發送」邏輯的原子性,及要麼全部成功,要麼整體回滾。
整體原理
Producer新增 transactionalId、producerId、epoch標識,可標記事務。
新增transaction coordinator,協調處理事務的開啓、提交、終止,記錄事務日志 ;
transaction coordinator依附在Broker上,基於Broker提供服務;
記錄事務日志是一個內部可根據Key壓縮的Topic,可復制的,當 transaction coordinator down,有其他的接管繼續事務處理。
服務端只要接收到消息,就寫入文件,不管後續是否是提交或者終止。
新增control類型消息,標記事務是提交還是終止(全部消息以transactionalId爲標識)。
消費端使用緩存堆積消息,直到看到control類型消息,返回給應用或丟棄。
多條消息發送的原子性
發送消息過程
a:初始化事務
發送FindCoordinatorRequest,查找transaction coordinator。
發送InitProducerIdRequest到transaction coordinator,獲取ProducerIdAndEpoch。
transaction coordinator 新增消息 transactionalId --> producerId 到事務日志。
transaction coordinator 恢復或終止此 transactionalId 之前的事務。
b:開啓事務
c:生產消息(可多次)
發送AddPartitionsToTxnRequest到transaction coordinator,transaction coordinator增加 BEGIN的事務日志,日志包含此partition 發送消息到實際的Broker,Broker接收消息寫入文件並復制到副本
d:提交或終止事務
Producer 發送 EndTxnRequest 到 transaction coordinator transaction coordinator 寫入PREPARE_COMMIT或 PREPARE_ABORT到事務日志 transaction coordinator 依次向全部partition發送 COMMIT/ ABORT的control消息 transaction coordinator 寫入 COMMIT/ ABORT到事務日志 Broker接收消息寫入文件並復制到副本
消費過程(READ_COMMITED)
a:初始化、Rebalance
b:拉消息,如果不包含transactionalId,則返回給應用,如果包含,則放入緩存,直到拉到該transactionalId的control消息,如果該control消息是COMMIT,則相關消息返回應用;如果是ABORT,則刪除這些消息
「消費-處理-發送」邏輯的原子性
offset提交:
offset提交除了寫zk外,還提供一種寫Broker的方式,即新增 consumercoordinator 接收offset 的提交請求,consumercoordinator 將 groupId-offset 寫入內部Topic,此Topic可壓縮,即舊的 groupId-offset 會被清除,只保留最新的。
處理過程
a:初始化事務
b:開啓事務
c:生產消息(可多次)
發送AddPartitionsToTxnRequest到 transaction coordinator,transaction coordinator增加 BEGIN的事務日志,日志包含此partition 發送消息到實際的Broker,Broker接收消息寫入文件並復制到副本。
發送 AddOffsetsToTxnRequest 到 transaction coordinator,寫入事務日志。
發送 TxnOffsetCommitRequest 到 consumercoordinator,寫入文件並復制到副本。
d:提交或終止事務
Producer 發送 EndTxnRequest 到 transaction。
coordinator transaction coordinator 寫入PREPARE_COMMIT或 PREPARE_ABORT到事務日志。
transaction coordinator 依次向全部partition發送 COMMIT/ ABORT的control消息。
transaction coordinator 向 consumercoordinator 發送 offset提交的control消息。
transaction coordinator 寫入 COMMIT/ ABORT到事務日志。
Broker接收消息寫入文件並復制到副本。
消息可見性
offset可見性:
offset提交後,事務提交前,此offset雖然寫入文件,但是 consumercoordinator 緩存不更新,直到收到事務提交的control消息才更新緩存,即事務提交前,外部不能查詢到此 offset,只能查詢到舊的offset。
消息可見性:
a:read_uncommited
所有消息都可被拉取,也可返回給應用處理。
b:read_commited
所有消息都可被拉取,但是只有判斷已提交的可返回給應用處理。
失敗場景分析
生產端事務開啓後,發消息前down:
事務無任何動做,後續會被超時處理掉,不影響事務語義。
生產端事務開啓後,發消息後,保存offset前down:
事務會被超時關閉,該消息沒有對應的control消息,對外不可見,不影響事務語義。
生產端事務開啓後,發消息後,保存offset後,提交事務前down:
事務會被超時關閉,該消息及offset沒有對應的control消息,對外不可見,不影響事務語義。
生產端事務提交事務後down:
後續事務動做與客戶端無關,由服務端處理,會全部執行完成。
transaction coordinator down:
因爲 transaction coordinator 實際爲 Broker,Kafka既有機制保證選舉新的 coordinator。
因爲事務日志是同步的,即最少有三個節點共享事務狀態,單個down不影響事務繼續。
consumercoordinator down:
事務提交前,offset信息是同步的,但是不生效。
事務提交後,offset信息是同步的,並可對外公開。
broker leader down:
Kafka既有機制保證重新選舉Leader,且消息同步。
消費端 down:
read_uncommited的場景,出現消費到未提交事務的消息,且有可能重復。
read_commited的場景,不會消費到未提交事務的消息,其他消費者接管後,從之前位置繼續。
最新穩定的消息位置(LSO)
問題:穿插在一批事務消息中間的非事務消息,被消費端消費後,如何提交消費位置
解答:通過LSO機制,即只能消費到最早的全部事務完成的消息位置
事務日志狀態
BEGIN
PREPARE_COMMIT
PREPARE_ABORT
COMMIT
ABORT