【Kafka】Exactly Once語義與事務

Kafka在0.11.0.0以前的版本中只支持At Least OnceAt Most Once語義,尚不支持Exactly Once語義。服務器

可是在不少要求嚴格的場景下,如使用Kafka處理交易數據,Exactly Once語義是必須的。咱們能夠經過讓下游系統具備冪等性來配合Kafka的At Least Once語義來間接實現Exactly Once。可是:設計

  • 該方案要求下游系統支持冪等操做,限制了Kafka的適用場景
  • 實現門檻相對較高,須要用戶對Kafka的工做機制很是瞭解
  • 對於Kafka Stream而言,Kafka自己便是本身的下游系統,但Kafka在0.11.0.0版本以前不具備冪等發送能力

所以,Kafka自己對Exactly Once語義的支持就很是必要。code

操做原子性

操做的原子性是指,多個操做要麼所有成功要麼所有失敗,不存在部分紅功部分失敗的可能。事務

實現原子性操做的意義在於:get

  • 操做結果更可控,有助於提高數據一致性
  • 便於故障恢復。由於操做是原子的,從故障中恢復時只須要重試該操做(若是原操做失敗)或者直接跳過該操做(若是原操做成功),而不須要記錄中間狀態,更不須要針對中間狀態做特殊處理

實現事務機制的幾個階段

冪等性發送

上文提到,實現Exactly Once的一種方法是讓下游系統具備冪等處理特性,而在Kafka Stream中,Kafka Producer自己就是「下游」系統,所以若是能讓Producer具備冪等處理特性,那就可讓Kafka Stream在必定程度上支持Exactly once語義。kafka

爲了實現Producer的冪等語義,Kafka引入了Producer ID(即PID)和Sequence Number。每一個新的Producer在初始化的時候會被分配一個惟一的PID,該PID對用戶徹底透明而不會暴露給用戶。it

對於每一個PID,該Producer發送數據的每一個<Topic, Partition>都對應一個從0開始單調遞增的Sequence Numberio

相似地,Broker端也會爲每一個<PID, Topic, Partition>維護一個序號,而且每次Commit一條消息時將其對應序號遞增。對於接收的每條消息,若是其序號比Broker維護的序號(即最後一次Commit的消息的序號)大一,則Broker會接受它,不然將其丟棄:ast

  • 若是消息序號比Broker維護的序號大一以上,說明中間有數據還沒有寫入,也即亂序,此時Broker拒絕該消息,Producer拋出InvalidSequenceNumber
  • 若是消息序號小於等於Broker維護的序號,說明該消息已被保存,即爲重複消息,Broker直接丟棄該消息,Producer拋出DuplicateSequenceNumber

上述設計解決了0.11.0.0以前版本中的兩個問題:原理

  • Broker保存消息後,發送ACK前宕機,Producer認爲消息未發送成功並重試,形成數據重複
  • 前一條消息發送失敗,後一條消息發送成功,前一條消息重試後成功,形成數據亂序

事務性保證

上述冪等設計只能保證單個Producer對於同一個<Topic, Partition>Exactly Once語義。

另外,它並不能保證寫操做的原子性——即多個寫操做,要麼所有被Commit要麼所有不被Commit。

更不能保證多個讀寫操做的的原子性。尤爲對於Kafka Stream應用而言,典型的操做便是從某個Topic消費數據,通過一系列轉換後寫回另外一個Topic,保證從源Topic的讀取與向目標Topic的寫入的原子性有助於從故障中恢復。

事務保證可以使得應用程序將生產數據和消費數據看成一個原子單元來處理,要麼所有成功,要麼所有失敗,即便該生產或消費跨多個<Topic, Partition>

另外,有狀態的應用也能夠保證重啓後從斷點處繼續處理,也即事務恢復。

爲了實現這種效果,應用程序必須提供一個穩定的(重啓後不變)惟一的ID,也即Transaction IDTransactin IDPID可能一一對應。區別在於Transaction ID由用戶提供,而PID是內部的實現對用戶透明。

另外,爲了保證新的Producer啓動後,舊的具備相同Transaction ID的Producer即失效,每次Producer經過Transaction ID拿到PID的同時,還會獲取一個單調遞增的epoch。因爲舊的Producer的epoch比新Producer的epoch小,Kafka能夠很容易識別出該Producer是老的Producer並拒絕其請求。

有了Transaction ID後,Kafka可保證:

  • 跨Session的數據冪等發送。當具備相同Transaction ID的新的Producer實例被建立且工做時,舊的且擁有相同Transaction ID的Producer將再也不工做。
  • 跨Session的事務恢復。若是某個應用實例宕機,新的實例能夠保證任何未完成的舊的事務要麼Commit要麼Abort,使得新實例從一個正常狀態開始工做。

事務機制原理

事務性消息傳遞

這一節所說的事務主要指原子性,也即Producer將多條消息做爲一個事務批量發送,要麼所有成功要麼所有失敗。

爲了實現這一點,Kafka 0.11.0.0引入了一個服務器端的模塊,名爲Transaction Coordinator,用於管理Producer發送的消息的事務性。

Transaction Coordinator維護Transaction Log,該log存於一個內部的Topic內。因爲Topic數據具備持久性,所以事務的狀態也具備持久性。

Producer並不直接讀寫Transaction Log,它與Transaction Coordinator通訊,而後由Transaction Coordinator將該事務的狀態插入相應的Transaction Log

Transaction Log的設計與Offset Log用於保存Consumer的Offset相似。

事務中Offset的提交

許多基於Kafka的應用,尤爲是Kafka Stream應用中同時包含Consumer和Producer,前者負責從Kafka中獲取消息,後者負責將處理完的數據寫回Kafka的其它Topic中。

爲了實現該場景下的事務的原子性,Kafka須要保證對Consumer Offset的Commit與Producer對發送消息的Commit包含在同一個事務中。不然,若是在兩者Commit中間發生異常,根據兩者Commit的順序可能會形成數據丟失和數據重複:

  • 若是先Commit Producer發送數據的事務再Commit Consumer的Offset,即At Least Once語義,可能形成數據重複。
  • 若是先Commit Consumer的Offset,再Commit Producer數據發送事務,即At Most Once語義,可能形成數據丟失。

總結

  • PIDSequence Number的引入實現了寫操做的冪等性
  • 寫操做的冪等性結合At Least Once語義實現了單一Session內的Exactly Once語義
  • Transaction MarkerPID提供了識別消息是否應該被讀取的能力,從而實現了事務的隔離性
  • Offset的更新標記了消息是否被讀取,從而將對讀操做的事務處理轉換成了對寫(Offset)操做的事務處理
  • Kafka事務的本質是,將一組寫操做(若是有)對應的消息與一組讀操做(若是有)對應的Offset的更新進行一樣的標記(即Transaction Marker)來實現事務中涉及的全部讀寫操做同時對外可見或同時對外不可見
  • Kafka只提供對Kafka自己的讀寫操做的事務性,不提供包含外部系統的事務性

出處:http://www.jasongj.com/kafka/transaction/

相關文章
相關標籤/搜索