在不少的流處理框架的介紹中,都會說kafka是一個可靠的數據源,而且推薦使用Kafka看成數據源來進行使用。這是由於與其餘消息引擎系統相比,kafka提供了可靠的數據保存及備份機制。而且經過消費者位移這一律念,可讓消費者在因某些緣由宕機而重啓後,能夠輕易得回到宕機前的位置。html
但其實kafka的可靠性也只能說是相對的,在整條數據鏈條中,總有可讓數據出現丟失的狀況,今天就來討論如何避免kafka數據丟失,以及實現精確一致處理的語義。git
在討論如何實現kafka無消息丟失的時候,首先要先清楚大部分狀況下消息丟失是在什麼狀況下發生的。爲何是大部分,由於總有一些很是特殊的狀況會被人忽略,而咱們只須要關注廣泛的狀況就足夠了。接下來咱們來討論如何較爲廣泛的數據丟失狀況。github
前面介紹Kafka分區和副本的時候,有提到過一個producer客戶端有一個acks的配置,這個配置爲0的時候,producer是發送以後無論的,這個時候就頗有可能由於網絡等緣由形成數據丟失,因此應該儘可能避免。可是將ack設置爲1就沒問題了嗎,那也不必定,由於有可能在leader副本接收到數據,但還沒同步給其餘副本的時候就掛掉了,這時候數據也是丟失了。而且這種時候是客戶端覺得消息發送成功,但kafka丟失了數據。算法
要達到最嚴格的無消息丟失配置,應該是要將acks的參數設置爲-1(也就是all),而且將min.insync.replicas配置項調高到大於1,這部份內容在上一篇副本機制有介紹詳細解析kafka之kafka分區和副本。編程
同時還須要使用帶有回調的producer api,來發送數據。注意這裏討論的都是異步發送消息,同步發送不在討論範圍。api
public class send{ ...... public static void main(){ ... /* * 第一個參數是 ProducerRecord 類型的對象,封裝了目標 Topic,消息的 kv * 第二個參數是一個 CallBack 對象,當生產者接收到 Kafka 發來的 ACK 確認消息的時候, * 會調用此 CallBack 對象的 onCompletion() 方法,實現回調功能 */ producer.send(new ProducerRecord<>(topic, messageNo, messageStr), new DemoCallBack(startTime, messageNo, messageStr)); ... } ...... } class DemoCallBack implements Callback { /* 開始發送消息的時間戳 */ private final long startTime; private final int key; private final String message; public DemoCallBack(long startTime, int key, String message) { this.startTime = startTime; this.key = key; this.message = message; } /** * 生產者成功發送消息,收到 Kafka 服務端發來的 ACK 確認消息後,會調用此回調函數 * @param metadata 生產者發送的消息的元數據,若是發送過程當中出現異常,此參數爲 null * @param exception 發送過程當中出現的異常,若是發送成功爲 null */ @Override public void onCompletion(RecordMetadata metadata, Exception exception) { long elapsedTime = System.currentTimeMillis() - startTime; if (metadata != null) { System.out.printf("message: (%d, %s) send to partition %d, offset: %d, in %d\n", key, message, metadata.partition(), metadata.offset(), elapsedTime); } else { exception.printStackTrace(); } } }
更詳細的代碼能夠參考這裏:Kafka生產者分析——KafkaProducer。網絡
咱們以前提到過,producer發送到kafka broker的時候,是有多種可能會失敗的,而回調函數能準確告訴你是否確認發送成功,固然這依託於acks和min.insync.replicas的配置。而當數據發送丟失的時候,就能夠進行手動重發或其餘操做,從而確保生產者發送成功。併發
有些時候,kafka內部由於一些不大好的配置,可能會出現一些極爲隱蔽的數據丟失狀況,那麼咱們分別討論下大體都有哪幾種狀況。框架
首先是replication.factor配置參數,這個配置決定了副本的數量,默認是1。注意這個參數不能超過broker的數量。說這個參數實際上是由於若是使用默認的1,或者不在建立topic的時候指定副本數量(也就是副本數爲1),那麼當一臺機器出現磁盤損壞等狀況,那麼數據也就從kafka裏面丟失了。因此replication.factor這個參數最好是配置大於1,好比說3。異步
接下來要說的仍是和副本相關的,也是上一篇副本中提到的unclean.leader.election.enable 參數,這個參數是在主副本掛掉,而後在ISR集合中沒有副本能夠成爲leader的時候,要不要讓進度比較慢的副本成爲leader的。不用多說,讓進度比較慢的副本成爲leader,確定是要丟數據的。雖然可能會提升一些可用性,但若是你的業務場景丟失數據更加不能忍受,那仍是將unclean.leader.election.enable設置爲false吧。
消費者丟失的狀況,其實跟消費者位移處理不當有關。消費者位移提交有一個參數,enable.auto.commit,默認是true,決定是否要讓消費者自動提交位移。若是開啓,那麼consumer每次都是先提交位移,再進行消費,好比先跟broker說這5個數據我消費好了,而後纔開始慢慢消費這5個數據。
這樣處理的話,好處是簡單,壞處就是漏消費數據,好比你說要消費5個數據,消費了2個本身就掛了。那下次該consumer重啓後,在broker的記錄中這個consumer是已經消費了5個的。
因此最好的作法就是將enable.auto.commit設置爲false,改成手動提交位移,在每次消費完以後再手動提交位移信息。固然這樣又有可能會重複消費數據,畢竟exactly once處理一直是一個問題呀(/攤手)。遺憾的是kafka目前沒有保證consumer冪等消費的措施,若是確實須要保證consumer的冪等,能夠對每條消息維持一個全局的id,每次消費進行去重,固然耗費這麼多的資源來實現exactly once的消費到底值不值,那就得看具體業務了。
那麼到這裏先來總結下無消息丟失的主要配置吧:
那麼接下來就來講說kafka實現精確一次(exactly once)處理的方法吧。
在分佈式環境下,要實現消息一致與精確一次(exactly once)語義處理是很難的。精確一次處理意味着一個消息只處理一次,形成一次的效果,不能多也不能少。
那麼kafka如何可以實現這樣的效果呢?在介紹以前,咱們先來介紹其餘兩個語義,至多一次(at most once)和至少一次(at least once)。
最多一次就是保證一條消息只發送一次,這個其實最簡單,異步發送一次而後無論就能夠,缺點是容易丟數據,因此通常不採用。
至少一次語義是kafka默認提供的語義,它保證每條消息都能至少接收並處理一次,缺點是可能有重複數據。
前面有介紹過acks機制,當設置producer客戶端的acks是1的時候,broker接收到消息就會跟producer確認。但producer發送一條消息後,可能由於網絡緣由消息超時未達,這時候producer客戶端會選擇重發,broker迴應接收到消息,但極可能最開始發送的消息延遲到達,就會形成消息重複接收。
那麼針對這些狀況,要如何實現精確一次處理的語義呢?
要介紹冪等的producer以前,得先了解一下冪等這個詞是什麼意思。冪等這個詞最先起源於函數式編程,意思是一個函數不管執行多少次都會返回同樣的結果。好比說讓一個數加1就不是冪等的,而讓一個數取整就是冪等的。由於這個特性因此冪等的函數適用於併發的場景下。
但冪等在分佈式系統中含義又作了進一步的延申,好比在kafka中,冪等性意味着一個消息不管重複多少次,都會被看成一個消息來持久化處理。
kafka的producer默認是支持最少一次語義,也就是說不是冪等的,這樣在一些好比支付等要求精確數據的場景會出現問題,在0.11.0後,kafka提供了讓producer支持冪等的配置操做。即:
props.put("enable.idempotence", ture)
在建立producer客戶端的時候,添加這一行配置,producer就變成冪等的了。注意開啓冪等性的時候,acks就自動是「all」了,若是這時候手動將ackss設置爲0,那麼會報錯。
而底層實現其實也很簡單,就是對每條消息生成一個id值,broker會根據這個id值進行去重,從而實現冪等,這樣一來就可以實現精確一次的語義了。
可是!冪等的producery也並不是萬能。有兩個主要是缺陷:
當遇到上述冪等性的缺陷沒法解決的時候,能夠考慮使用事務了。事務能夠支持多分區的數據完整性,原子性。而且支持跨會話的exactly once處理語義,也就是說若是producer宕機重啓,依舊能保證數據只處理一次。
開啓事務也很簡單,首先須要開啓冪等性,即設置enable.idempotence爲true。而後對producer發送代碼作一些小小的修改。
//初始化事務 producer.initTransactions(); try { //開啓一個事務 producer.beginTransaction(); producer.send(record1); producer.send(record2); //提交 producer.commitTransaction(); } catch (KafkaException e) { //出現異常的時候,終止事務 producer.abortTransaction(); }
但不管開啓冪等仍是事務的特性,都會對性能有必定影響,這是必然的。因此kafka默認也並無開啓這兩個特性,而是交由開發者根據自身業務特色進行處理。
以上~
推薦閱讀:
分佈式系統一致性問題與Raft算法(上)
Scala函數式編程(五) 函數式的錯誤處理
大數據存儲的進化史 --從 RAID 到 Hadoop Hdfs