Kafka 消息發送分同步 (sync)、異步 (async) 兩種方式,默認使用同步方式,可經過java
producer.type=sync 同步模式
producer.type=async 異步模式 redis
//不提供回調 public Future<RecordMetadata> send(ProducerRecord<K, V> record); //提供回調 public Future<RecordMetadata> send(ProducerRecord<K, V> record, new Callback( public void onCompletion(RecordMetadata metadata, Exception exception) { // 發送失敗 if (exception != null) { // 同步發送,須要設置阻塞時間,否則會一直阻塞 producer.send(proRecord).get(timeout); // 發送成功 }else{ } } ))
bootstrap.servers=${KAFKA_SERVER_IN} key.serializer=org.apache.kafka.common.serialization.StringSerializer value.serializer=com.unionpay.cloudatlas.galaxy.common.protocol.kafka.RecordKryoSerializer #max time to wait,default to 60s max.block.ms=10000 batch.size=65536 buffer.memory=134217728 retries=3
經過上述代碼和配置能夠看出apache
問題定位bootstrap
生產建議api
做者:sheen口開河
連接:https://www.jianshu.com/p/45258f744425
來源:簡書
簡書著做權歸做者全部,任何形式的轉載都請聯繫做者得到受權並註明出處。網絡
同步異步
kafka同步生產者:這個生產者寫一條消息的時候,它就立馬發送到某個分區去。follower還須要從leader拉取消息到本地,follower再向leader發送確認,leader再向客戶端發送確認。因爲這一套流程以後,客戶端才能獲得確認,因此很慢。
kafka異步生產者:這個生產者寫一條消息的時候,先是寫到某個緩衝區,這個緩衝區裏的數據還沒寫到broker集羣裏的某個分區的時候,它就返回到client去了。雖然效率快,可是不能保證消息必定被髮送出去了。async
經過 request.required.acks 屬性進行配置:值可設爲 0, 1, -1(all) -1 和 all 等同函數
0 表明:不等待 broker 的 ack,這一操做提供了一個最低的延遲,broker 一接收到尚未寫入磁盤就已經返回,當 broker 故障時有可能丟失數據;性能
1 表明:producer 等待 broker 的 ack,partition 的 leader 落盤成功後返回 ack,若是在 follower 同步成功以前 leader 故障,那麼將會丟失數據;
-1 表明:producer 等待 broker 的 ack,partition 的 leader 和 follower 所有落盤成功後才返回 ack,數據通常不會丟失,延遲時間長可是可靠性高;可是這樣也不能保證數據不丟失,好比當 ISR 中只有 leader 時( ISR 中的成員因爲某些狀況會增長也會減小,最少就只剩一個 leader),這樣就變成了 acks = 1 的狀況;
還有第四種,部分保存策略。須要配置。
另一個就是使用高級消費者存在數據丟失的隱患: 消費者讀取完成,高級消費者 API 的 offset 已經提交,可是尚未處理完成DB操做,消費者就掛掉了,此時 offset 已經更新,沒法再消費以前丟失的數據. 解決辦法消費者使用低級api,讀取到消息後,處理完成,在手動commit給kafka,這樣子一旦處理失敗,offset不會提交,下次還能夠繼續消費。
1.acks=0,不和Kafka集羣進行消息接收確認,則當網絡異常、緩衝區滿了等狀況時,消息可能丟失;
2.acks=一、同步模式下,只有Leader確認接收成功後但掛掉了,副本沒有同步,數據可能丟失;
生產者:同步發送消息,且消息配置爲-1或all,leader分區和全部follwer都寫到磁盤裏。
異步模式下,爲防止緩衝區滿,能夠在配置文件設置不限制阻塞超時時間,當緩衝區滿時讓生產者一直處於阻塞狀態。可是處於阻塞狀態會影響其餘正常性能。
生產者:手動提交,即讀取到消息後,確認消息消費完畢,才手動提交offset。可是要避免邏輯處理時間過長,致使鏈接超時,會使消息重複消費。
acks = -1 的狀況下,數據發送到 leader 後 ,部分 ISR 的副本同步,leader 此時掛掉。好比 follower1 和 follower2 都有可能變成新的 leader, producer 端會獲得返回異常,producer 端會從新發送數據,數據可能會重複
另外, 在高階消費者中,offset 採用自動提交的方式, 自動提交時,假設 1s 提交一次 offset 的更新,設當前 offset = 10,當消費者消費了 0.5s 的數據,offset 移動了 15,因爲提交間隔爲 1s,所以這一offset 的更新並不會被提交,這時候咱們寫的消費者掛掉,重啓後,消費者會去 ZooKeeper 上獲取讀取位置,獲取到的 offset 仍爲10,它就會重複消費. 解決辦法使用低級消費者.
針對消息重複:將消息的惟一標識保存到外部介質中,每次消費時判斷是否處理過便可。好比redis中
消息可使用惟一id標識
生產者(ack=all 表明至少成功發送一次)
消費者 (offset手動提交,業務邏輯成功處理後,提交offset)
落表(主鍵或者惟一索引的方式,避免重複數據)
業務邏輯處理(選擇惟一主鍵存儲到Redis或者mongdb中,先查詢是否存在,若存在則不處理;若不存在,先插入Redis或Mongdb,再進行業務邏輯處理)