kafka學習(8)如何保證數據有序,處理數據丟失和數據重複消費

首先了解一下kafka消息生產者和消費者是如何處理消息的

生產者發送消息有兩種方式,同步(sync)和異步 (async) 

Kafka 消息發送分同步 (sync)、異步 (async) 兩種方式,默認使用同步方式,可經過java

producer.type=sync 同步模式 
producer.type=async 異步模式 redis

//不提供回調
public Future<RecordMetadata> send(ProducerRecord<K, V> record);
//提供回調
public Future<RecordMetadata> send(ProducerRecord<K, V> record, new Callback(
    public void onCompletion(RecordMetadata metadata, Exception exception) {
        // 發送失敗
        if (exception != null) {
        // 同步發送,須要設置阻塞時間,否則會一直阻塞
        producer.send(proRecord).get(timeout);
        
        // 發送成功
        }else{
        
        }
}
))
bootstrap.servers=${KAFKA_SERVER_IN}
key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=com.unionpay.cloudatlas.galaxy.common.protocol.kafka.RecordKryoSerializer
#max time to wait,default to 60s
max.block.ms=10000
batch.size=65536
buffer.memory=134217728
retries=3

經過上述代碼和配置能夠看出apache

  • 最大block事件爲1000ms,也就是10s
  • buffer配置的較大,爲134M
  • 生產者先是異步發送,若是發送失敗,則執行一次同步發送

問題定位bootstrap

  • 在異步發送的的回調裏使用了同步的方式再次發送,因爲kafka producer的同步發送是阻塞等待,且使用的是不帶超時時間的無限期等待(future.get()中未指定超時時間),所以當不被喚醒時會一直wai下去
  • kafka生產者的IO線程(實際執行數據發送的線程)是單線程模型,且回調函數是在IO線程中執行的,所以回調函數的阻塞會直接致使IO線程阻塞,因而生產者緩衝區的數據沒法被髮送
  • kafka生產者還在不斷的被應用調用,所以緩衝區一直累積並增大,當緩衝區滿的時候,生產者線程會被阻塞,最大阻塞時間爲max.block.time,若是改時間到達以後仍是沒法將數據塞入緩衝區,則會拋出一個異常,所以日誌中看到達到10s以後,打印出異常棧
  • 因爲使用了get沒有指定超時時間,且該await一直沒法被喚醒,所以這種狀況會一直持續,在沒有人工干預的狀況下,永遠不會發送成功

生產建議api

  • kafka生產者推薦使用異步方式發送,而且提供回調以響應發送成功或者失敗
  • 若是須要使用future.get的方式模擬同步發送,則須要在get里加上合適的超時時間,避免由於不可預知的外部因素致使線程沒法被喚醒,即便用Future.get(long timeout)的api而不是不帶超時參數的Future.get()
  • 不要在異步回調中執行阻塞操做或者耗時比較久的操做,若是有必要能夠考慮交給另外一個線程(池)去作



做者:sheen口開河
連接:https://www.jianshu.com/p/45258f744425
來源:簡書
簡書著做權歸做者全部,任何形式的轉載都請聯繫做者得到受權並註明出處。網絡

同步異步

kafka同步生產者:這個生產者寫一條消息的時候,它就立馬發送到某個分區去。follower還須要從leader拉取消息到本地,follower再向leader發送確認,leader再向客戶端發送確認。因爲這一套流程以後,客戶端才能獲得確認,因此很慢。
kafka異步生產者:這個生產者寫一條消息的時候,先是寫到某個緩衝區,這個緩衝區裏的數據還沒寫到broker集羣裏的某個分區的時候,它就返回到client去了。雖然效率快,可是不能保證消息必定被髮送出去了。async

 

經過 request.required.acks 屬性進行配置:值可設爲 0, 1, -1(all)    -1 和 all 等同函數

0 表明:不等待 broker 的 ack,這一操做提供了一個最低的延遲,broker 一接收到尚未寫入磁盤就已經返回,當 broker 故障時有可能丟失數據;性能

1 表明:producer 等待 broker 的 ack,partition 的 leader 落盤成功後返回 ack,若是在 follower 同步成功以前 leader 故障,那麼將會丟失數據;

-1 表明:producer 等待 broker 的 ack,partition 的 leader 和 follower 所有落盤成功後才返回 ack,數據通常不會丟失,延遲時間長可是可靠性高;可是這樣也不能保證數據不丟失,好比當 ISR 中只有 leader 時( ISR 中的成員因爲某些狀況會增長也會減小,最少就只剩一個 leader),這樣就變成了 acks = 1 的狀況;

還有第四種,部分保存策略。須要配置。

消費者消息消息的方式,高級api和低級api

另一個就是使用高級消費者存在數據丟失的隱患: 消費者讀取完成,高級消費者 API 的 offset 已經提交,可是尚未處理完成DB操做,消費者就掛掉了,此時 offset 已經更新,沒法再消費以前丟失的數據. 解決辦法消費者使用低級api,讀取到消息後,處理完成,在手動commit給kafka,這樣子一旦處理失敗,offset不會提交,下次還能夠繼續消費。

消息丟失場景

1.acks=0,不和Kafka集羣進行消息接收確認,則當網絡異常、緩衝區滿了等狀況時,消息可能丟失;
2.acks=一、同步模式下,只有Leader確認接收成功後但掛掉了,副本沒有同步,數據可能丟失;

如何防止數據丟失

生產者:同步發送消息,且消息配置爲-1或all,leader分區和全部follwer都寫到磁盤裏。

             異步模式下,爲防止緩衝區滿,能夠在配置文件設置不限制阻塞超時時間,當緩衝區滿時讓生產者一直處於阻塞狀態。可是處於阻塞狀態會影響其餘正常性能。

生產者:手動提交,即讀取到消息後,確認消息消費完畢,才手動提交offset。可是要避免邏輯處理時間過長,致使鏈接超時,會使消息重複消費。

消息重複的緣由

acks = -1 的狀況下,數據發送到 leader 後 ,部分 ISR 的副本同步,leader 此時掛掉。好比 follower1 和 follower2 都有可能變成新的 leader, producer 端會獲得返回異常,producer 端會從新發送數據,數據可能會重複

另外, 在高階消費者中,offset 採用自動提交的方式, 自動提交時,假設 1s 提交一次 offset 的更新,設當前 offset = 10,當消費者消費了 0.5s 的數據,offset 移動了 15,因爲提交間隔爲 1s,所以這一offset 的更新並不會被提交,這時候咱們寫的消費者掛掉,重啓後,消費者會去 ZooKeeper 上獲取讀取位置,獲取到的 offset 仍爲10,它就會重複消費. 解決辦法使用低級消費者.

消息重複解決方案

針對消息重複:將消息的惟一標識保存到外部介質中,每次消費時判斷是否處理過便可。好比redis中

消息可使用惟一id標識

生產者(ack=all 表明至少成功發送一次)

消費者 (offset手動提交,業務邏輯成功處理後,提交offset)

落表(主鍵或者惟一索引的方式,避免重複數據)

業務邏輯處理(選擇惟一主鍵存儲到Redis或者mongdb中,先查詢是否存在,若存在則不處理;若不存在,先插入Redis或Mongdb,再進行業務邏輯處理)

相關文章
相關標籤/搜索