Kafka使用一個叫Franz Kafka的文學家的名字用來命名的。java
Kafka是一款開源的消息引擎系統。也是一個分佈式流處理平臺。node
Kafka同時支持點對點模型以及發佈/訂閱模型。算法
爲何要使用Kakfa?四個字:削峯填谷!apache
Replica,副本,數據冗餘。bootstrap
Kafka新版本客戶端代碼開始徹底由java語言編寫,因而有些人開始「JAVA VS SCALA」的大討論。並從語言特性上分析爲何社區擯棄Scala轉而投向Java的懷抱。api
其實事情沒有那麼複雜,僅僅是由於社區來了一批Java程序猿,而之前老的scala程序猿隱退了罷了。緩存
Kafka總共演進了7個大版本安全
最後還有個建議,不論你使用的是哪一個版本,都請儘可能保持服務端版本和客戶端版本一致,不然你將損失不少Kafka爲你提供的性能優化收益。性能優化
江湖經驗:不要輕易成爲新版本的小白鼠。服務器
磁盤容量舉例:
假設公司有個業務須要天天向Kafka集羣發送 1 億條信息。每條消息保存兩份來防止數據丟失。消息默認保存兩週時間。並假設消息的平均大小是1KB。問你的Kafka集羣須要爲這個業務預留多少磁盤空間?
總大小:1億 1KB 2備份 * 14 ~= 2800G
加上Kafka的一些索引數據,爲它預留10%,那麼總大小變爲 2800 * (1 + 10%) ~= 3TB
Kafka支持數據壓縮,壓縮比0.75的話,那麼應該預留的存儲空間爲2.25TB左右。
帶寬舉例
與其說是帶寬資源的規劃,其實真正要規劃的是Kafka服務器的數量。
假設公司機房環境1Gbps,現有個業務,須要在1小時內處理1TB的業務數據。
通常單臺服務器 規劃使用70%的帶寬資源的1/3 ~= 240Mbps。
1TB須要1小時處理,則每秒差很少須要處理2336Mbps的數據,除 240Mbps,則差很少須要10臺機器。若是消息還須要額外複製的話,那麼還要對應乘上備份數。
配置名稱 | 示例 | 建議值 |
---|---|---|
log.dirs | /home/kafka1,/home/kafka2 | kafka寫日誌多路徑,不只能提高寫性能,在1.1版本中還能支持故障轉移功能。 |
zookeeper.connect | zk1:2181,zk2:2181,zk3:2181/kafka1 | |
listens | listeners=PLAINTEXT://dn1.ambari:6667 | |
auto.create.topics.enable | true | false,不建議能夠自動建立主題 |
unclean.leader.election.enable | false | false,若是設置爲true有丟數據風險 |
auto.leader.rebalance.enable | false | false,不按期進行leader副本的選舉 |
log.retention.hours | 168 | 默認保持7天數據 |
log.retention.bytes | -1 | 保存多少數據均可以 |
message.max.bytes | 1000000 | 默認值建議調大。該值表明Broker能處理的最大消息大小 |
壓縮配置
compression.type
壓縮算法
總結一下壓縮和解壓縮,Producer端壓縮,Broker端保持,Consumer端解壓縮。
分爲生產者攔截器和消費者攔截器。
典型的應用場景能夠應用於客戶端監控、端到端系統性能測試、消息審計等多種功能在內的場景。
connections.max.idle.ms > 0
,則步驟一中的TCP鏈接會被自動關閉;若是設置該參數-1,那麼步驟一中建立的鏈接沒法被關閉,會成爲殭屍進程。建立的3個時機
消費者程序會建立3類TCP鏈接
消息交付可靠性保障,常見的承諾有如下三種
Kafka默認是最少一次
要保證精確一次,就須要冪等和事務。不過性能會想對較差。
冪等性有不少好處。其最大的優點在於咱們能夠安全地重試任何冪等性操做,反正它們不會破壞咱們的系統狀態。
在0.11.0.0版本引入了冪等生產者,只要更改配置props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true)
。
使用冪等生產者要注意
設置事務型Producer
transctional.id
。最好爲其設置一個有意義的名字此外代碼也要作一些調整變化。
producer.initTransactions(); try { producer.beginTransaction(); producer.send(record1); producer.send(record2); producer.commitTransaction(); } catch (KafkaException e) { producer.abortTransaction(); }
Rebalance發生的時機有三個
後面兩個一般是運維的主動操做,沒法避免。主要仍是針對組成員數量減小的狀況。增長通常也是人爲主動的。
那麼避免由於參數或邏輯不合理而致使的成員退出,與之相關的主要參數
重平衡過程是經過 消費者端的心跳線程來通知到其餘消費者實例的。
0.10.1.0版本以前,發送心跳請求是在消費者主線程完成的,也就是kafkaConsumer.poll方法的那個線程。這樣作有諸多弊端,由於消息處理也是在這個線程中完成的。所以當業務邏輯處理消耗了較長時間,心跳請求就沒法及時發送到協調者那邊了。致使協調者 錯誤地認爲該消費者已經死了。
0.10.1.0版本開始,社區引入了一個單獨的線程來專門執行心跳發送。
定義了5種狀態
各個狀態的流轉
一個消費者組最開始是Empty狀態,當重平衡過程開啓後,它會被置爲PreparingRebalance狀態等待成員加入,以後變動到CompletingRebalance狀態等待分配方案,最後流轉到Stable狀態完成重平衡。
當有新成員或已有成員退出時,消費者組的狀態從Stable直接跳到PreparingRebalance狀態,此時,全部現存成員就必須從新申請加入組。
當全部成員都退出組後,消費者組狀態變動爲Empty。
Kafka自動按期刪除過時位移的條件就是,組要處於Empty狀態。
JoinGroup請求
SyncGroup請求
max.poll.interval.ms
,新版本默認是5分鐘。max.poll.records
鑑於KafkaConsumer不是線程安全的事實,制定兩套多線程方案。
核心代碼 ``` public class KafkaConsumerRunner implements Runnable { private final AtomicBoolean closed = new AtomicBoolean(false); private final KafkaConsumer consumer; public void run() { try { consumer.subscribe(Arrays.asList("topic")); while (!closed.get()) { ConsumerRecords records = consumer.poll(Duration.ofMillis(10000)); // 執行消息處理邏輯 } } catch (WakeupException e) { // Ignore exception if closing if (!closed.get()) throw e; } finally { consumer.close(); } } // Shutdown hook which can be called from a separate thread public void shutdown() { closed.set(true); consumer.wakeup(); } ```
核心代碼 ``` private final KafkaConsumer<String, String> consumer; private ExecutorService executors; ... private int workerNum = ...; executors = new ThreadPoolExecutor( workerNum, workerNum, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(1000), new ThreadPoolExecutor.CallerRunsPolicy()); ... while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1)); for (final ConsumerRecord record : records) { executors.submit(new Worker(record)); } } ```
兩種方案各有特色。
副本機制的好處:
但Kafka只有第一種好處,緣由是這樣的設計,Kafka有兩點好處
指當你用生產者API向Kafka成功寫入消息後,立刻使用消費者API去讀取剛纔生產的消息
在屢次消費信息時,不會看到該消息一會存在一會不存在的狀況。
判斷Follower副本與Leader副本是否同步的標準,Broker參數replia.lag.time.max.ms
的參數值。Kafka有一個in-sync Replicas(ISR)集合的概念。
控制器組件(Controller),是Kafka的核心組件,它的主要做用是在Apache Zookeeper的幫助下管理和協調整個Kafka集羣。
每臺Broker都能充當控制器,在Broker啓動時,會嘗試去Zookeeper中建立/controller節點。Kafka當前選舉規則,第一個成功建立/controller節點的Broker會被指定爲控制器。
這些數據其實也在Zookeeper中存儲了一份。
小竅門分享:當你以爲控制器出現問題時,好比主題沒法刪除了,重分區hang住了,你能夠不用重啓broker或者控制器,快速簡便的方法,直接去Zookeeper手動刪除/controller節點。
這樣作的好處是,既能夠引起控制器的重選舉,又能夠避免重啓Broker致使的消息中斷。
Kafka方案相似於Reactor模式
那麼Kafka相似的方案是這樣的。網絡線程池默認參數num.network.threads=3
好了,客戶端發來的請求會被Aceptor線程分發到任意一個網絡線程中,由他們進行處理。你可能會認爲,網絡線程池是順序處理不就行了?實際上,Kafka在這個環節上又作了一層異步線程池的處理。
IO線程池執行真正的處理。若是是PRODUCER生產請求,則將消息寫入到底層的磁盤日誌中;若是是FETCH請求,則從磁盤或頁緩存中讀取消息。當IO請求處理完請求後,會將生成的響應放入網絡線程池的響應隊列中,並由對應的網絡線程負責將Response反還給客戶端。
請求隊列是全部網絡線程共享的,而響應隊列則是每一個網絡線程專屬的。
IO線程池默認參數num.io.threads=8
圖中還有一個Purgatory的組件,這是Kafka中著名的「煉獄」組件。
它是用來緩存延時請求的,所謂延時請求,就是那些一時未知足條件的不可馬上處理的請求。
高水位做用:
這塊感受有點晦澀難懂!須要有時間再研究!
該機制是用來防止數據丟失和不一致的問題。由於僅僅依靠高水位更新,是會有時間錯配的問題的。
沒有使用Leader Epoch前,丟失數據的一個場景。
假設生產者成功向Kafka發送了兩條信息,Leader和副本B都寫成功了。
Leader的高水位更新完成,但副本B高水位還未更新。
副本B所在的Broker宕機了,當它重啓後,此時LEO值會變動爲高水位值1。也就是說位移值爲1的那條信息被副本B從磁盤中刪除了。此時副本B的底層磁盤文件中只保存有一條信息,即位移值爲0的那條消息。
執行完日誌截斷操做以後,副本B開始向Leader拉取消息,執行正常的消息同步。那麼就在這個節骨眼,Leader又掛掉了。那麼Kafka就別無選擇,讓副本B成爲了Leader。那麼當副本A回來以後,就會將高水位調整爲與B相同的值,也是1。
這樣操做以後,位移值爲1的那條消息就完全從這兩個副本中被永遠地抹去了。
場景和以前相似,可是引用了Leader Epoch機制後,副本B重啓回來後,會向A發送一個特殊的請求去獲取Leader的LEO值。
發現Leader LEO的值不比它本身的小,且緩存中也沒有起始位移>2的Epoch條目,所以B不須要執行任何的日誌截斷操做。這就是對高水位機制的一個明顯改進,副本是否執行日誌截斷再也不依賴高水位進行判斷。
A宕機了,B成爲了Leader,一樣地發送一個特殊的請求,發現也不須要執行日誌截斷操做。當後面生產者往B寫入消息後,B生成了新的Epoch條目。
kafka 2.2版本示例
bin/kafka-topics.sh --bootstrap-server broker_host:port --create --topic my_topic_name --partitions 1 --replication-factor 1
bin/kafka-topics.sh --bootstrap-server broker_host:port --list
bin/kafka-topics.sh --bootstrap-server broker_host:port --describe --topic <topic_name>
bin/kafka-topics.sh --bootstrap-server broker_host:port --alter --topic <topic_name> --partitions < 新分區數 >
這裏要注意的是,分區數必須大於原有分區數,不然會報錯InvalidPartitionsException.
假設咱們要設置主題的級別參數爲max.message.bytes
bin/kafka-configs.sh --zookeeper zookeeper_host:port --entity-type topics --entity-name <topic_name> --alter --add-config max.message.bytes=10485760
先設置Broker端參數
bin/kafka-configs.sh --zookeeper zookeeper_host:port --alter --add-config 'leader.replication.throttled.rate=104857600,follower.replication.throttled.rate=104857600' --entity-type brokers --entity-name 0
注意--entity-name指的是Broker ID.若是該主題副本分別上0,1,2,3上,那麼你還要爲Broker 1,2,3執行這條命令。
設置好這個參數以後,還須要爲該主題設置要限速的副本。示例中但願對全部的副本進行限速,使用通配符*。
bin/kafka-configs.sh --zookeeper zookeeper_host:port --alter --add-config 'leader.replication.throttled.replicas=*,follower.replication.throttled.replicas=*' --entity-type topics --entity-name test
bin/kafka-topics.sh --bootstrap-server broker_host:port --delete --topic <topic_name>
一般這個步驟,你須要耐心等待一段時間,Kafka只是標記成已刪除狀態,須要一段時間在後臺默默刪除。
當你碰到主題沒法刪除的狀況,你能夠採用這樣的方法:
事實上第三步要當心,其實執行前面兩步就夠了。
bin/kafka-console-consumer.sh --bootstrap-server kafka_host:port --topic __consumer_offsets --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --from-beginning
bin/kafka-console-consumer.sh --bootstrap-server kafka_host:port --topic __consumer_offsets --formatter "kafka.coordinator.group.GroupMetadataManager\$GroupMetadataMessageFormatter" --from-beginning
要顯示的使用jstack命令查看下kafka-log-cleaner-thread前綴的線程的狀態。一般狀況下,是因爲該線程掛掉致使的。你只能重啓相應的Broker了。
1.1.0版本以前,都是經過配置server.properties。
當一些配置要更改的時候,咱們須要修改這個配置文件,而後重啓Broker,以此來達到修改配置生效。
可是生產環境的機子怎麼能說重啓就重啓呢。針對這個痛點,社區1.1.0版本正式引入了動態參數。
kafka broker config官網增長了DYNAMIC UPDATE MODE列。而該列有三類值。
其中config/brokers是真正保存動態Broker參數的地方。
參數優先級:pre-broker > cluster-wide > read-only > 默認值
設置cluster-wide值
bin/kafka-configs.sh --bootstrap-server kafka-host:port --entity-type brokers --entity-default --alter --add-config unclean.leader.election.enable=true
刪除cluster-wide值
`# 刪除 cluster-wide 範圍參數
$ bin/kafka-configs.sh --bootstrap-server kafka-host:port --entity-type brokers --entity-default --alter --delete-config unclean.leader.election.enable`
設置pre-broker值
bin/kafka-configs.sh --bootstrap-server kafka-host:port --entity-type brokers --entity-name 1 --alter --add-config unclean.leader.election.enable=false
刪除pre-broker值
`# 刪除 per-broker 範圍參數
$ bin/kafka-configs.sh --bootstrap-server kafka-host:port --entity-type brokers --entity-name 1 --alter --delete-config unclean.leader.election.enable`
基於這些緣由,社區於0.11版本正式推出了Java客戶端版本的AdminClient。