是否真正的存在數據丟失問題,好比有不少時候多是其餘同事操做了測試環境,因此首先確保數據沒有第三方干擾。緩存
理清你的業務流程,數據流向,數據究竟是在什麼地方丟失的數據,在kafka 以前的環節或者kafka以後的流程丟失?好比kafka的數據是由flume提供的,也許是flume丟失了數據,kafka 天然就沒有這一部分數據。安全
如何發現有數據丟失,又是如何驗證的。從業務角度考慮,例如:教育行業,每一年高考後數據量巨大,可是卻反常的比高考前還少,或者源端數據量和目的端數據量不符網絡
定位數據是否在kafka以前就已經丟失還事消費端丟失數據的session
若是auto.commit.enable=true,當consumer fetch了一些數據但尚未徹底處理掉的時候,恰好到commit interval出發了提交offset操做,接着consumer crash掉了。這時已經fetch的數據尚未處理完成但已經被commit掉,所以沒有機會再次被處理,數據丟失。架構
網絡負載很高或者磁盤很忙寫入失敗的狀況下,沒有自動重試重發消息。沒有作限速處理,超出了網絡帶寬限速。kafka必定要配置上消息重試的機制,而且重試的時間間隔必定要長一些,默認1秒鐘並不符合生產環境(網絡中斷時間有可能超過1秒)。app
若是磁盤壞了,會丟失已經落盤的數據異步
單批數據的長度超過限制會丟失數據,報kafka.common.MessageSizeTooLargeException異常 解決:ide
Consumer side:fetch.message.max.bytes- this will determine the largest size of a message that can be fetched by the consumer. Broker side:replica.fetch.max.bytes- this will allow for the replicas in the brokers to send messages within the cluster and make sure the messages are replicated correctly. If this is too small, then the message will never be replicated, and therefore, the consumer will never see the message because the message will never be committed (fully replicated). Broker side:message.max.bytes- this is the largest size of the message that can be received by the broker from a producer. Broker side (per topic):max.message.bytes- this is the largest size of the message the broker will allow to be appended to the topic. This size is validated pre-compression. (Defaults to broker'smessage.max.bytes.)
partition leader在未完成副本數follows的備份時就宕機的狀況,即便選舉出了新的leader可是已經push的數據由於未備份就丟失了! kafka是多副本的,當你配置了同步複製以後。多個副本的數據都在PageCache裏面,出現多個副本同時掛掉的機率比1個副本掛掉的機率就很小了。(官方推薦是經過副原本保證數據的完整性的)oop
kafka的數據一開始就是存儲在PageCache上的,按期flush到磁盤上的,也就是說,不是每一個消息都被存儲在磁盤了,若是出現斷電或者機器故障等,PageCache上的數據就丟失了。 能夠經過log.flush.interval.messages和log.flush.interval.ms來配置flush間隔,interval大丟的數據多些,小會影響性能但在0.8版本,能夠經過replica機制保證數據不丟,代價就是須要更多資源,尤爲是磁盤資源,kafka當前支持GZip和Snappy壓縮,來緩解這個問題 是否使用replica取決於在可靠性和資源代價之間的balance性能
同時kafka也提供了相關的配置參數,來讓你在性能與可靠性之間權衡(通常默認):
log.flush.interval.messages=10000
log.flush.interval.ms=1000
log.flush.scheduler.interval.ms = 3000
設計上保證數據的可靠安全性,依據分區數作好數據備份,設立副本數等。 push數據的方式:同步異步推送數據:權衡安全性和速度性的要求,選擇相應的同步推送仍是異步推送方式,當發現數據有問題時,能夠改成同步來查找問題。
flush是kafka的內部機制,kafka優先在內存中完成數據的交換,而後將數據持久化到磁盤.kafka首先會把數據緩存(緩存到內存中)起來再批量flush. 能夠經過log.flush.interval.messages和log.flush.interval.ms來配置flush間隔
能夠經過replica機制保證數據不丟. 代價就是須要更多資源,尤爲是磁盤資源,kafka當前支持GZip和Snappy壓縮,來緩解這個問題 是否使用replica(副本)取決於在可靠性和資源代價之間的balance(平衡)
broker到 Consumer kafka的consumer提供兩種接口.
high-level版本已經封裝了對partition和offset的管理,默認是會按期自動commit offset,這樣可能會丟數據的
low-level版本本身管理spout線程和partition之間的對應關係和每一個partition上的已消費的offset(按期寫到zk) 而且只有當這個offset被ack後,即成功處理後,纔會被更新到zk,因此基本是能夠保證數據不丟的即便spout線程crash(崩潰),重啓後仍是能夠從zk中讀到對應的offset
kafka不像hadoop更致力於處理大量級數據,kafka的消息隊列更擅長於處理小數據。針對具體業務而言,如果源源不斷的push大量的數據(eg:網絡爬蟲),能夠考慮消息壓縮。可是這也必定程度上對CPU形成了壓力,仍是得結合業務數據進行測試選擇
topic設置多分區,分區自適應所在機器,爲了讓各分區均勻分佈在所在的broker中,分區數要大於broker數。分區是kafka進行並行讀寫的單位,是提高kafka速度的關鍵。
broker能接收消息的最大字節數的設置必定要比消費端能消費的最大字節數要小,不然broker就會由於消費端沒法使用這個消息而掛起。
broker可賦值的消息的最大字節數設置必定要比能接受的最大字節數大,不然broker就會由於數據量的問題沒法複製副本,致使數據丟失
關閉自動更新offset,等到數據被處理後再手動跟新offset。 在消費前作驗證前拿取的數據是不是接着上回消費的數據,不正確則return先行處理排錯。 通常來講zookeeper只要穩定的狀況下記錄的offset是沒有問題,除非是多個consumer group 同時消費一個分區的數據,其中一個先提交了,另外一個就丟失了。
問題: kafka的數據一開始就是存儲在PageCache上的,按期flush到磁盤上的,也就是說,不是每一個消息都被存儲在磁盤了,若是出現斷電或者機器故障等,PageCache上的數據就丟失了。
這個是總結出的到目前爲止沒有發生丟失數據的狀況
//producer用於壓縮數據的壓縮類型。默認是無壓縮。正確的選項值是none、gzip、snappy。壓縮最好用於批量處理,批量處理消息越多,壓縮性能越好 props.put("compression.type", "gzip"); //增長延遲 props.put("linger.ms", "50"); //這意味着leader須要等待全部備份都成功寫入日誌,這種策略會保證只要有一個備份存活就不會丟失數據。這是最強的保證。, props.put("acks", "all"); //無限重試,直到你意識到出現了問題,設置大於0的值將使客戶端從新發送任何數據,一旦這些數據發送失敗。注意,這些重試與客戶端接收到發送錯誤時的重試沒有什麼不一樣。容許重試將潛在的改變數據的順序,若是這兩個消息記錄都是發送到同一個partition,則第一個消息失敗第二個發送成功,則第二條消息會比第一條消息出現要早。 props.put("retries ", MAX_VALUE); props.put("reconnect.backoff.ms ", 20000); props.put("retry.backoff.ms", 20000); //關閉unclean leader選舉,即不容許非ISR中的副本被選舉爲leader,以免數據丟失 props.put("unclean.leader.election.enable", false); //關閉自動提交offset props.put("enable.auto.commit", false); 限制客戶端在單個鏈接上可以發送的未響應請求的個數。設置此值是1表示kafka broker在響應請求以前client不能再向同一個broker發送請求。注意:設置此參數是爲了不消息亂序 props.put("max.in.flight.requests.per.connection", 1);
強行kill線程,致使消費後的數據,offset沒有提交,partition就斷開鏈接。好比,一般會遇到消費的數據,處理很耗時,致使超過了Kafka的session timeout時間(0.10.x版本默認是30秒),那麼就會re-blance重平衡,此時有必定概率offset沒提交,會致使重平衡後重復消費。 若是在close以前調用了consumer.unsubscribe()則有可能部分offset沒提交,下次重啓會重複消費
kafka數據重複 kafka設計的時候是設計了(at-least once)至少一次的邏輯,這樣就決定了數據多是重複的,kafka採用基於時間的SLA(服務水平保證),消息保存必定時間(一般爲7天)後會被刪除 kafka的數據重複通常狀況下應該在消費者端,這時log.cleanup.policy = delete使用按期刪除機制