這裏主要是 Kafka 集羣基本配置的相關內容。java
Kafka 集羣基本硬件的保證緩存
OS page cache:應當能夠緩存全部活躍的 Segment(Kafka 中最基本的數據存儲單位);性能優化
fd 限制:100k+;網絡
禁用 swapping:簡單來講,swap 做用是當內存的使用達到一個臨界值時就會將內存中的數據移動到 swap 交換空間,可是此時,內存可能還有不少空餘資源,swap 走的是磁盤 IO,對於內存讀寫很在乎的系統,最好禁止使用 swap 分區;session
TCP 調優;架構
JVM 配置併發
JDK 8 而且使用 G1 垃圾收集器;app
至少要分配 6-8 GB 的堆內存。負載均衡
使用多塊磁盤,並配置爲 Kafka 專用的磁盤;異步
JBOD vs RAID10;
JBOD(Just a Bunch of Disks,簡單來講它表示一個沒有控制軟件提供協調控制的磁盤集合,它將多個物理磁盤串聯起來,提供一個巨大的邏輯磁盤,數據是按序存儲,它的性能與單塊磁盤相似)
JBOD 的一些缺陷:
任何磁盤的損壞都會致使異常關閉,而且須要較長的時間恢復;
數據不保證一致性;
多級目錄;
社區也正在解決這麼問題,能夠關注 KIP 1十二、113:
必要的工具用於管理 JBOD;
自動化的分區管理;
磁盤損壞時,Broker 能夠將 replicas 遷移到好的磁盤上;
在同一個 Broker 的磁盤間 reassign replicas;
RAID 10 的特色:
能夠容許單磁盤的損壞;
性能和保護;
不一樣磁盤間的負載均衡;
高命中來減小 space;
單一的 mount point;
文件系統:
使用 EXT 或 XFS;
SSD;
Kafka 集羣須要監控的一些指標,這些指標反應了集羣的健康度。
CPU 負載;
Network Metrics;
File Handle 使用;
磁盤空間;
磁盤 IO 性能;
GC 信息;
ZooKeeper 監控。
Partition 有兩種副本:Leader,Follower;
Leader 負責維護 in-sync-replicas(ISR)
replica.lag.time.max.ms
:默認爲10000,若是 follower 落後於 leader 的消息數超過這個數值時,leader 就將 follower 從 isr 列表中移除;
num.replica.fetchers
,默認爲1,用於從 leader 同步數據的 fetcher 線程數;
min.insync.replica
:Producer 端使用來用於保證 Durability(持久性);
當發現 replica 的配置與集羣的不一樣時,通常狀況都是集羣上的 replica 少於配置數時,能夠從如下幾個角度來排查問題:
JMX 監控項:kafka.server:type=ReplicaManager,name=UnderReplicatedPartitions;
可能的緣由:
Broker 掛了?
Controller 的問題?
ZooKeeper 的問題?
Network 的問題?
解決辦法:
調整 ISR 的設置;
Broker 擴容。
負責管理 partition 生命週期;
避免 Controller’s ZK 會話超時:
ISR 抖動;
ZK Server 性能問題;
Broker 長時間的 GC;
網絡 IO 問題;
監控:
kafka.controller:type=KafkaController,name=ActiveControllerCount,應該爲1;
LeaderElectionRate。
容許不在 isr 中 replica 被選舉爲 leader。
這是 Availability 和 Correctness 之間選擇,Kafka 默認選擇了可用性;
unclean.leader.election.enable
:默認爲 true,即容許不在 isr 中 replica 選爲 leader,這個配置能夠全局配置,也能夠在 topic 級別配置;
監控:kafka.controller:type=ControllerStats,name=UncleanLeaderElectionsPerSec。
Broker 級別有幾個比較重要的配置,通常須要根據實際狀況進行相應配置的:
log.retention.{ms, minutes, hours}
, log.retention.bytes
:數據保存時間;
message.max.bytes
, replica.fetch.max.bytes
;
delete.topic.enable
:默認爲 false,是否容許經過 admin tool 來刪除 topic;
unclean.leader.election.enable
= false,參見上面;
min.insync.replicas
= 2:當 Producer 的 acks 設置爲 all 或 -1 時, min.insync.replicas
表明了必須進行確認的最小 replica 數,若是不夠的話 Producer 將會報 NotEnoughReplicas
或 NotEnoughReplicasAfterAppend
異常;
replica.lag.time.max.ms
(超過這個時間沒有發送請求的話,follower 將從 isr 中移除), num.replica.fetchers;
replica.fetch.response.max.bytes
;
zookeeper.session.timeout.ms
= 30s;
num.io.threads
:默認爲8,KafkaRequestHandlerPool 的大小。
歡迎學Java和大數據的朋友們加入java架構交流: 855835163
羣內提供免費的架構資料還有:Java工程化、高性能及分佈式、高性能、深刻淺出。高架構。性能調優、Spring,MyBatis,Netty源碼分析和大數據等多個知識點高級進階乾貨的免費直播講解 能夠進來一塊兒學習交流哦
Broker 評估
每一個 Broker 的 Partition 數不該該超過2k;
控制 partition 大小(不要超過25GB);
集羣評估(Broker 的數量根據如下條件配置)
數據保留時間;
集羣的流量大小;
集羣擴容:
磁盤使用率應該在 60% 如下;
網絡使用率應該在 75% 如下;
集羣監控
保持負載均衡;
確保 topic 的 partition 均勻分佈在全部 Broker 上;
確保集羣的階段沒有耗盡磁盤或帶寬。
Partition 數:kafka.server:type=ReplicaManager,name=PartitionCount;
Leader 副本數:kafka.server:type=ReplicaManager,name=LeaderCount;
ISR 擴容/縮容率:kafka.server:type=ReplicaManager,name=IsrExpandsPerSec;
讀寫速率:Message in rate/Byte in rate/Byte out rate;
網絡請求的平均空閒率:NetworkProcessorAvgIdlePercent;
請求處理平均空閒率:RequestHandlerAvgIdlePercent。
partition 數
Partition 數應該至少與最大 consumer group 中 consumer 線程數一致;
對於使用頻繁的 topic,應該設置更多的 partition;
控制 partition 的大小(25GB 左右);
考慮應用將來的增加(可使用一種機制進行自動擴容);
使用帶 key 的 topic;
partition 擴容:當 partition 的數據量超過一個閾值時應該自動擴容(實際上還應該考慮網絡流量)。
根據吞吐量的要求設置 partition 數:
假設 Producer 單 partition 的吞吐量爲 P;
consumer 消費一個 partition 的吞吐量爲 C;
而要求的吞吐量爲 T;
那麼 partition 數至少應該大於 T/P、T/c 的最大值;
更多的 partition,意味着:
更多的 fd;
可能增長 Unavailability(可能會增長不可用的時間);
可能增長端到端的延遲;
client 端將會使用更多的內存。
這裏簡單講述一下,Partition 的增長將會帶來如下幾個優勢和缺點:
增長吞吐量:對於 consumer 來講,一個 partition 只能被一個 consumer 線程所消費,適當增長 partition 數,能夠增長 consumer 的併發,進而增長系統的吞吐量;
須要更多的 fd:對於每個 segment,在 broker 都會有一個對應的 index 和實際數據文件,而對於 Kafka Broker,它將會對於每一個 segment 每一個 index 和數據文件都會打開相應的 file handle(能夠理解爲 fd),所以,partition 越多,將會帶來更多的 fd;
可能會增長數據不可用性(主要是指增長不可用時間):主要是指 broker 宕機的狀況,越多的 partition 將會意味着越多的 partition 須要 leader 選舉(leader 在宕機這臺 broker 的 partition 須要從新選舉),特別是若是恰好 controller 宕機,從新選舉的 controller 將會首先讀取全部 partition 的 metadata,而後才進行相應的 leader 選舉,這將會帶來更大不可用時間;
可能增長 End-to-end 延遲:一條消息只有其被同步到 isr 的全部 broker 上後,才能被消費,partition 越多,不一樣節點之間同步就越多,這可能會帶來毫秒級甚至數十毫秒級的延遲;
Client 將會須要更多的內存:Producer 和 Consumer 都會按照 partition 去緩存數據,每一個 partition 都會帶來數十 KB 的消耗,partition 越多, Client 將會佔用更多的內存。
避免被惡意 Client 攻擊,保證 SLA;
設置 produce 和 fetch 請求的字節速率閾值;
能夠應用在 user、client-id、或者 user 和 client-id groups;
Broker 端的 metrics 監控:throttle-rate、byte-rate;
replica.fetch.response.max.bytes
:用於限制 replica 拉取請求的內存使用;
進行數據遷移時限制貸款的使用, kafka-reassign-partitions.sh -- -throttle option
。
使用 Java 版的 Client;
使用 kafka-producer-perf-test.sh
測試你的環境;
設置內存、CPU、batch 壓縮;
batch.size:該值設置越大,吞吐越大,但延遲也會越大;
linger.ms:表示 batch 的超時時間,該值越大,吞吐越大、但延遲也會越大;
max.in.flight.requests.per.connection
:默認爲5,表示 client 在 blocking 以前向單個鏈接(broker)發送的未確認請求的最大數,超過1時,將會影響數據的順序性;
compression.type
:壓縮設置,會提升吞吐量;
acks
:數據 durability 的設置;
避免大消息
會使用更多的內存;
下降 Broker 的處理速度;
若是吞吐量小於網絡帶寬
增長線程;
提升 batch.size;
增長更多 producer 實例;
增長 partition 數;
設置 acks=-1 時,若是延遲增大:能夠增大 num.replica.fetchers
(follower 同步數據的線程數)來調解;
跨數據中心的傳輸:增長 socket 緩衝區設置以及 OS tcp 緩衝區設置。
batch-size-avg
compression-rate-avg
waiting-threads
buffer-available-bytes
record-queue-time-max
record-send-rate
records-per-request-avg
使用 kafka-consumer-perf-test.sh
測試環境;
吞吐量問題:
partition 數太少;
OS page cache:分配足夠的內存來緩存數據;
應用的處理邏輯;
offset topic( __consumer_offsets
)
offsets.topic.replication.factor
:默認爲3;
offsets.retention.minutes
:默認爲1440,即 1day;
– MonitorISR,topicsize;
offset commit較慢:異步 commit 或 手動 commit。
fetch.min.bytes
、 fetch.max.wait.ms
;
max.poll.interval.ms
:調用 poll()
以後延遲的最大時間,超過這個時間沒有調用 poll()
的話,就會認爲這個 consumer 掛掉了,將會進行 rebalance;
max.poll.records
:當調用 poll()
以後返回最大的 record 數,默認爲500;
session.timeout.ms
;
Consumer Rebalance
– check timeouts
– check processing times/logic
– GC Issues
網絡配置;
consumer 是否跟得上數據的發送速度。
Consumer Lag:consumer offset 與 the end of log(partition 能夠消費的最大 offset) 的差值;
監控
metric 監控:records-lag-max;
經過 bin/kafka-consumer-groups.sh
查看;
用於 consumer 監控的 LinkedIn’s Burrow;
減小 Lag
分析 consumer:是 GC 問題仍是 Consumer hang 住了;
增長 Consumer 的線程;
增長分區數和 consumer 線程;
這個是經常使用的配置,
block.on.buffer.full
:默認設置爲 false,當達到內存設置時,可能經過 block 中止接受新的 record 或者拋出一些錯誤,默認狀況下,Producer 將不會拋出 BufferExhaustException,而是當達到 max.block.ms
這個時間後直接拋出 TimeoutException。設置爲 true 的意義就是將 max.block.ms
設置爲 Long.MAX_VALUE,將來版本中這個設置將被遺棄,推薦設置 max.block.ms
。
歡迎學Java和大數據的朋友們加入java架構交流: 855835163 羣內提供免費的架構資料還有:Java工程化、高性能及分佈式、高性能、深刻淺出。高架構。性能調優、Spring,MyBatis,Netty源碼分析和大數據等多個知識點高級進階乾貨的免費直播講解 能夠進來一塊兒學習交流哦