1. kafka線上集羣環境規劃
jvm參數設置
因爲kafka是scala語言開發,運行在JVM上,須要對JVM參數合理設置。修改bin/kafka-start-server.sh中的jvm設置,假設機器是32G內存,能夠以下設置:redis
export KAFKA_HEAP_OPTS="-Xmx16G -Xms16G -Xmn10G -XX:MetaspaceSize=256M -XX:+UseG1GC -XX:MaxGCPauseMillis=50 -XX:G1HeapRegionSize=16M"
這種大內存的狀況通常都要用G1垃圾收集器,由於年輕代內存比較大,用G1能夠設置GC最大停頓時間,不至於一次minor gc就花費太長時間,固然,由於像kafka,rocketmq,es這些中間件,寫數據到磁盤會用到操做系統的page cache,因此JVM內存不宜分配過大,須要給操做系統的緩存留出幾個G。bootstrap
2. 消息丟失問題
kafka在生產端發送消息 和 消費端消費消息 時均可能會丟失一些消息緩存
①:生產者消息丟失
生產者在發送消息時,會有一個ack機制,當ack=0 或者 ack=1時,均可能會丟消息。以下所示:網絡
②:消費端消息丟失
消費端丟消息最主要體如今消費端offset的自動提交,若是開啓了自動提交,萬一消費到數據還沒處理完,此時你consumer直接宕機了,未處理完的數據 丟失了,下次也消費不到了,由於offset已經提交完畢,下次會從offset出開始消費新消息。架構
解決辦法是採用消費端的手動提交jvm
//手動提交offset /** * 注意若是要使用手動提交offset,須要如下三點 * ①:配置文件配置手動提交方式 * ②:加上參數Acknowledgment ack * ③:方法中使用ack.acknowledge();手動提交 */ ack.acknowledge();
3. 消息重複消費
消息的重複消費在生產端和消費端均可能發生,下面一一講解:分佈式
①:生產端消息重複發送
發送消息若是配置了重試機制,好比因爲網絡波動,生產者未獲得broker收到了消息的響應,就會觸發重試機制,3秒後再次發送此消息。broker以前已經收到過這個消息,但生產者因爲觸發了重試機制,就致使了消息的重複發送。那麼broker就會在磁盤緩存多條一樣的消息,消費端從broker拉取消息時,就會形成重複消費。ide
注意:kafka新版本已經在broker中保證了接收消息的冪等性(好比2.4版本),只需在生產者加上參數 props.put(「enable.idempotence」, true) 便可,默認是false不開啓。工具
新版kafka的broker冪等性具體實現原理:
kafka每次發送消息會生成PID和Sequence Number,並將這兩個屬性一塊兒發送給broker,broker會將PID和Sequence Number跟消息綁定一塊兒存起來,下次若是生產者重發相同消息,broker會檢查PID和Sequence Number,若是相同不會再接收。性能
①:消費端消息重複消費
對於消費端消息的重複消費問題,若是消費端拉取了一部分數據,消費完畢後,準備執行手動提交(或自動提交)時,消費者掛掉了!此時offset還未提交呢,那麼當服務重啓時,仍是會拉取相同的一批數據重複處理!形成消息重複消費
不管是生產者仍是消費者的重複消息,通常都會在消費端卡死,作冪等性處理。
冪等性能夠用redis的setnx分佈式鎖來實現。好比操做訂單消息,能夠把訂單id做爲key,在消費消息時,經過setnx命令設置一下,offset提交完成後,在redis中刪除訂單id的key。setnx命令保證一樣的訂單消息,只有一個能被消費,可有效保證消費的冪等性!
4. 順序消息
kafka想要保證消息順序,是須要犧牲必定性能的,方法就是一個消費者,消費一個分區,能夠保證消費的順序性。但也僅限於消費端消費消息的有序性,沒法保證生產者發送消息有序。
好比:若是發送端配置了重試機制,kafka不會等以前那條消息徹底發送成功纔去發送下一條消息,這樣可能會出現,發送了1,2,3條消息,第一條超時了,後面兩條發送成功,再重試發送第1條消息,這時消息在broker端的順序就是2,3,1了。發送端消息發送已經亂序,到了消費端消費時,天然沒法保證順序!
若是必定要保證生產-消費全鏈路消息有序,發送端須要同步發送,ack回調不能設置爲0。且只能有一個分區,一個消費者進行消費,但這樣明顯有悖於kafka的高性能理論!
問題:如何在多個分區中保證消息順序和消息處理效率呢?
首先使用多個分區,消息能夠被髮送端發送至多個分區,保證消息發送的效率。而後在消費端在拉消息時使用ConutdownLunch來記錄一組有序消息的個數。若是達到個數,說明已拉取到完整的一組有序消息。而後在消費端根據消息序號進行排序,消費端將排好序的消息發到內存隊列(能夠搞多個),一個內存隊列開啓一個線程順序處理消息。便可最大程度上既保證順序又保證效率!
5. 消息積壓
線上有時由於發送方發送消息速度過快,或者消費方處理消息過慢,可能會致使broker積壓大量未消費消息。
解決方案:此種狀況若是積壓了上百萬未消費消息須要緊急處理,能夠修改消費端程序,讓其將收到的消息快速轉發到其餘topic(能夠設置不少分區),而後再啓動多個消費者同時消費新主題的不一樣分區。如圖所示:
因爲消息數據格式變更或消費者程序有bug,致使消費者一直消費不成功,也可能致使broker積壓大量未消費消息。
解決方案:此種狀況能夠將這些消費不成功的消息轉發到其它隊列裏去(相似死信隊列),後面再慢慢分析死信隊列裏的消息處理問題。這個死信隊列,kafka並無提供,須要整合第三方插件!
5. 延時消息
延時隊列存儲的對象是延時消息。所謂的「延時消息」是指消息被髮送之後,並不想讓消費者馬上獲取,而是等待特定的時間後,消費者才能獲取這個消息進行消費,延時隊列的使用場景有不少, 好比 :
但kafka沒有提供延時消息功能,能夠用rocketmq、rabbitmq都作延時消息。若是必定要用kafka實現延時消息呢?
實現思路:發送延時消息時先把消息按照不一樣的延遲時間段發送到指定的隊列中(topic_1s,topic_5s,topic_10s,…topic_2h,這個通常不能支持任意時間段的延時),而後經過定時器進行輪訓消費這些topic,查看消息是否到期,若是到期就把這個消息發送到具體業務處理的topic中,隊列中消息越靠前的到期時間越早,具體來講就是定時器在一次消費過程當中,對消息的發送時間作判斷,看下是否延遲到對應時間了,若是到了就轉發,若是還沒到這一次定時任務就能夠提早結束了。
6. 消息回溯若是某段時間對已消費消息計算的結果以爲有問題,多是因爲程序bug致使的計算錯誤,當程序bug修復後,這時可能須要對以前已消費的消息從新消費,能夠指定從多久以前的消息回溯消費,這種能夠用consumer的offsetsForTimes、seek等方法指定從某個offset偏移的消息開始消費,完成消息的回溯消費!
7. kafka高性能的緣由
高性能緣由以下:
分佈式存儲架構
磁盤順序讀寫
kafka消息不能修改以及不會從文件中間刪除保證了磁盤順序讀,kafka的消息寫入文件都是追加在文件末尾,不會寫入文件中的某個位置(隨機寫)保證了磁盤順序寫。
讀寫數據的批量batch處理以及壓縮傳輸
數據傳輸的零拷貝
kafka相對於rocketMQ、rabbitMQ來講,與它們最大的區別就是分佈式存儲,這也是kafka高性能的最主要緣由。使用分佈式存儲理念,一個主題下多個分區,同時能夠被多個消費者和生產者去使用,也增長了接受消息和消費消息的能力!
但分區數也並非越多越好,若是沒法肯定開多少分區,可使用kafka壓測工具本身測試分區數不一樣,各類狀況下的吞吐量來決定分區數
# 往test裏發送一百萬消息,每條設置1KB # throughput 用來進行限流控制,當設定的值小於 0 時不限流,當設定的值大於 0 時,當發送的吞吐量大於該值時就會被阻塞一段時間 bin/kafka-producer-perf-test.sh --topic test --num-records 1000000 --record-size 1024 --throughput -1 --producer-props bootstrap.servers=192.168.65.60:9092 acks=1
分區數到達某個值吞吐量反而開始降低,實際上不少事情都會有一個臨界值,當超過這個臨界值以後,不少本來符合既定邏輯的走向又會變得不一樣。通常狀況分區數跟集羣機器數量至關就差很少了。