Kafka無消息丟失配置

Kafka到底會不會丟數據(data loss)? 一般不會,但有些狀況下的確有可能會發生。下面的參數配置及Best practice列表能夠較好地保證數據的持久性(固然是trade-off,犧牲了吞吐量)。筆者會在該列表以後對列表中的每一項進行討論,有興趣的同窗能夠看下後面的分析。緩存

  1. block.on.buffer.full = true
  2. acks = all
  3. retries = MAX_VALUE
  4. max.in.flight.requests.per.connection = 1
  5. 使用KafkaProducer.send(record, callback)
  6. callback邏輯中顯式關閉producer:close(0) 
  7. unclean.leader.election.enable=false
  8. replication.factor = 3 
  9. min.insync.replicas = 2
  10. replication.factor > min.insync.replicas
  11. enable.auto.commit=false
  12. 消息處理完成以後再提交位移

給出列表以後,咱們從兩個方面來探討一下數據爲何會丟失:安全

1. Producer端網絡

  目前比較新版本的Kafka正式替換了Scala版本的old producer,使用了由Java重寫的producer。新版本的producer採用異步發送機制。KafkaProducer.send(ProducerRecord)方法僅僅是把這條消息放入一個緩存中(即RecordAccumulator,本質上使用了隊列來緩存記錄),同時後臺的IO線程會不斷掃描該緩存區,將知足條件的消息封裝到某個batch中而後發送出去。顯然,這個過程當中就有一個數據丟失的窗口:若IO線程發送以前client端掛掉了,累積在accumulator中的數據的確有可能會丟失。異步

  Producer的另外一個問題是消息的亂序問題。假設客戶端代碼依次執行下面的語句將兩條消息發到相同的分區oop

producer.send(record1);
producer.send(record2);

若是此時因爲某些緣由(好比瞬時的網絡抖動)致使record1沒有成功發送,同時Kafka又配置了重試機制和max.in.flight.requests.per.connection大於1(默認值是5,原本就是大於1的),那麼重試record1成功後,record1在分區中就在record2以後,從而形成消息的亂序。不少某些要求強順序保證的場景是不容許出現這種狀況的。性能

  鑑於producer的這兩個問題,咱們應該如何規避呢??對於消息丟失的問題,很容易想到的一個方案就是:既然異步發送有可能丟失數據, 我改爲同步發送總能夠吧?好比這樣:spa

producer.send(record).get();

這樣固然是能夠的,可是性能會不好,不建議這樣使用。所以特地總結了一份配置列表。我的認爲該配置清單應該可以比較好地規避producer端數據丟失狀況的發生:(特此說明一下,軟件配置的不少決策都是trade-off,下面的配置也不例外:應用了這些配置,你可能會發現你的producer/consumer 吞吐量會降低,這是正常的,由於你換取了更高的數據安全性)線程

  • block.on.buffer.full = true  儘管該參數在0.9.0.0已經被標記爲「deprecated」,但鑑於它的含義很是直觀,因此這裏仍是顯式設置它爲true,使得producer將一直等待緩衝區直至其變爲可用。不然若是producer生產速度過快耗盡了緩衝區,producer將拋出異常
  • acks=all  很好理解,全部follower都響應了才認爲消息提交成功,即"committed"
  • retries = MAX 無限重試,直到你意識到出現了問題:)
  • max.in.flight.requests.per.connection = 1 限制客戶端在單個鏈接上可以發送的未響應請求的個數。設置此值是1表示kafka broker在響應請求以前client不能再向同一個broker發送請求。注意:設置此參數是爲了不消息亂序
  • 使用KafkaProducer.send(record, callback)而不是send(record)方法   自定義回調邏輯處理消息發送失敗
  • callback邏輯中最好顯式關閉producer:close(0) 注意:設置此參數是爲了不消息亂序
  • unclean.leader.election.enable=false   關閉unclean leader選舉,即不容許非ISR中的副本被選舉爲leader,以免數據丟失
  • replication.factor >= 3   這個徹底是我的建議了,參考了Hadoop及業界通用的三備份原則
  • min.insync.replicas > 1 消息至少要被寫入到這麼多副本纔算成功,也是提高數據持久性的一個參數。與acks配合使用
  • 保證replication.factor > min.insync.replicas  若是二者相等,當一個副本掛掉了分區也就無法正常工做了。一般設置replication.factor = min.insync.replicas + 1便可

2. Consumer端code

  consumer端丟失消息的情形比較簡單:若是在消息處理完成前就提交了offset,那麼就有可能形成數據的丟失。因爲Kafka consumer默認是自動提交位移的,因此在後臺提交位移前必定要保證消息被正常處理了,所以不建議採用很重的處理邏輯,若是處理耗時很長,則建議把邏輯放到另外一個線程中去作。爲了不數據丟失,現給出兩點建議:blog

  • enable.auto.commit=false  關閉自動提交位移
  • 在消息被完整處理以後再手動提交位移
相關文章
相關標籤/搜索