消息系統: Kafka 和傳統的消息系統(也稱做消息中間件)都具有系統解耦、冗餘存儲、流量削峯、緩衝、異步通訊、擴展性、可恢復性等功能。與此同時,Kafka 還提供了大多數消息系統難以實現的消息順序性保障及回溯消費的功能。java
存儲系統: Kafka 把消息持久化到磁盤,相比於其餘基於內存存儲的系統而言,有效地下降了數據丟失的風險。也正是得益於 Kafka 的消息持久化功能和多副本機制,咱們能夠把 Kafka 做爲長期的數據存儲系統來使用,只須要把對應的數據保留策略設置爲「永久」或啓用主題的日誌壓縮功能便可。shell
流式處理平臺: Kafka 不只爲每一個流行的流式處理框架提供了可靠的數據來源,還提供了一個完整的流式處理類庫,好比窗口、鏈接、變換和聚合等各種操做。bootstrap
分區中的全部副本統稱爲 AR(Assigned Replicas)。全部與 leader 副本保持必定程度同步的副本(包括 leader 副本在內)組成ISR(In-Sync Replicas),ISR 集合是 AR 集合中的一個子集。數組
ISR的伸縮:
leader 副本負責維護和跟蹤 ISR 集合中全部 follower 副本的滯後狀態,當 follower 副本落後太多或失效時,leader 副本會把它從 ISR 集合中剔除。若是 OSR 集合中有 follower 副本「追上」了 leader 副本,那麼 leader 副本會把它從 OSR 集合轉移至 ISR 集合。默認狀況下,當 leader 副本發生故障時,只有在 ISR 集合中的副本纔有資格被選舉爲新的 leader,而在 OSR 集合中的副本則沒有任何機會(不過這個原則也能夠經過修改相應的參數配置來改變)。緩存
replica.lag.time.max.ms : 這個參數的含義是 Follower 副本可以落後 Leader 副本的最長時間間隔,當前默認值是 10 秒。安全
unclean.leader.election.enable:是否容許 Unclean 領導者選舉。開啓 Unclean 領導者選舉可能會形成數據丟失,但好處是,它使得分區 Leader 副本一直存在,不至於中止對外提供服務,所以提高了高可用性。網絡
HW 是 High Watermark 的縮寫,俗稱高水位,它標識了一個特定的消息偏移量(offset),消費者只能拉取到這個 offset 以前的消息。多線程
LSO是LogStartOffset,通常狀況下,日誌文件的起始偏移量 logStartOffset 等於第一個日誌分段的 baseOffset,但這並非絕對的,logStartOffset 的值能夠經過 DeleteRecordsRequest 請求(好比使用 KafkaAdminClient 的 deleteRecords()方法、使用 kafka-delete-records.sh 腳本、日誌的清理和截斷等操做進行修改。
架構
如上圖所示,它表明一個日誌文件,這個日誌文件中有9條消息,第一條消息的 offset(LogStartOffset)爲0,最後一條消息的 offset 爲8,offset 爲9的消息用虛線框表示,表明下一條待寫入的消息。日誌文件的 HW 爲6,表示消費者只能拉取到 offset 在0至5之間的消息,而 offset 爲6的消息對消費者而言是不可見的。框架
LEO 是 Log End Offset 的縮寫,它標識當前日誌文件中下一條待寫入消息的 offset,上圖中 offset 爲9的位置即爲當前日誌文件的 LEO,LEO 的大小至關於當前日誌分區中最後一條消息的 offset 值加1。分區 ISR 集合中的每一個副本都會維護自身的 LEO,而 ISR 集合中最小的 LEO 即爲分區的 HW,對消費者而言只能消費 HW 以前的消息。
LW 是 Low Watermark 的縮寫,俗稱「低水位」,表明 AR 集合中最小的 logStartOffset 值。副本的拉取請求(FetchRequest,它有可能觸發新建日誌分段而舊的被清理,進而致使 logStartOffset 的增長)和刪除消息請求(DeleteRecordRequest)都有可能促使 LW 的增加。
能夠經過分區策略體現消息順序性。
分區策略有輪詢策略、隨機策略、按消息鍵保序策略。
按消息鍵保序策略:一旦消息被定義了 Key,那麼你就能夠保證同一個 Key 的全部消息都進入到相同的分區裏面,因爲每一個分區下的消息處理都是有順序的,故這個策略被稱爲按消息鍵保序策略
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); return Math.abs(key.hashCode()) % partitions.size();
消息在經過 send() 方法發往 broker 的過程當中,有可能須要通過攔截器(Interceptor)、序列化器(Serializer)和分區器(Partitioner)的一系列做用以後才能被真正地發往 broker。攔截器(下一章會詳細介紹)通常不是必需的,而序列化器是必需的。消息通過序列化以後就須要肯定它發往的分區,若是消息 ProducerRecord 中指定了 partition 字段,那麼就不須要分區器的做用,由於 partition 表明的就是所要發往的分區號。
處理順序 :攔截器->序列化器->分區器
KafkaProducer 在將消息序列化和計算分區以前會調用生產者攔截器的 onSend() 方法來對消息進行相應的定製化操做。
而後生產者須要用序列化器(Serializer)把對象轉換成字節數組才能經過網絡發送給 Kafka。
最後可能會被髮往分區器爲消息分配分區。
整個生產者客戶端由兩個線程協調運行,這兩個線程分別爲主線程和 Sender 線程(發送線程)。
在主線程中由 KafkaProducer 建立消息,而後經過可能的攔截器、序列化器和分區器的做用以後緩存到消息累加器(RecordAccumulator,也稱爲消息收集器)中。
Sender 線程負責從 RecordAccumulator 中獲取消息並將其發送到 Kafka 中。
RecordAccumulator 主要用來緩存消息以便 Sender 線程能夠批量發送,進而減小網絡傳輸的資源消耗以提高性能。
整個生產者客戶端由兩個線程協調運行,這兩個線程分別爲主線程和 Sender 線程(發送線程)。在主線程中由 KafkaProducer 建立消息,而後經過可能的攔截器、序列化器和分區器的做用以後緩存到消息累加器(RecordAccumulator,也稱爲消息收集器)中。Sender 線程負責從 RecordAccumulator 中獲取消息並將其發送到 Kafka 中。
老版本的 Consumer Group 把位移保存在 ZooKeeper 中。Apache ZooKeeper 是一個分佈式的協調服務框架,Kafka 重度依賴它實現各類各樣的協調管理。將位移保存在 ZooKeeper 外部系統的作法,最顯而易見的好處就是減小了 Kafka Broker 端的狀態保存開銷。
ZooKeeper 這類元框架其實並不適合進行頻繁的寫更新,而 Consumer Group 的位移更新倒是一個很是頻繁的操做。這種大吞吐量的寫操做會極大地拖慢 ZooKeeper 集羣的性能
通常來講若是消費者過多,出現了消費者的個數大於分區個數的狀況,就會有消費者分配不到任何分區。
開發者能夠繼承AbstractPartitionAssignor實現自定義消費策略,從而實現同一消費組內的任意消費者均可以消費訂閱主題的全部分區:
public class BroadcastAssignor extends AbstractPartitionAssignor{ @Override public String name() { return "broadcast"; } private Map<String, List<String>> consumersPerTopic( Map<String, Subscription> consumerMetadata) { (具體實現請參考RandomAssignor中的consumersPerTopic()方法) } @Override public Map<String, List<TopicPartition>> assign( Map<String, Integer> partitionsPerTopic, Map<String, Subscription> subscriptions) { Map<String, List<String>> consumersPerTopic = consumersPerTopic(subscriptions); Map<String, List<TopicPartition>> assignment = new HashMap<>(); //Java8 subscriptions.keySet().forEach(memberId -> assignment.put(memberId, new ArrayList<>())); //針對每個主題,爲每個訂閱的消費者分配全部的分區 consumersPerTopic.entrySet().forEach(topicEntry->{ String topic = topicEntry.getKey(); List<String> members = topicEntry.getValue(); Integer numPartitionsForTopic = partitionsPerTopic.get(topic); if (numPartitionsForTopic == null || members.isEmpty()) return; List<TopicPartition> partitions = AbstractPartitionAssignor .partitions(topic, numPartitionsForTopic); if (!partitions.isEmpty()) { members.forEach(memberId -> assignment.get(memberId).addAll(partitions)); } }); return assignment; } }
注意組內廣播的這種實現方式會有一個嚴重的問題—默認的消費位移的提交會失效。
在舊消費者客戶端中,消費位移是存儲在 ZooKeeper 中的。而在新消費者客戶端中,消費位移存儲在 Kafka 內部的主題__consumer_offsets 中。
當前消費者須要提交的消費位移是offset+1
一個線程對應一個 KafkaConsumer 實例,咱們能夠稱之爲消費線程。一個消費線程能夠消費一個或多個分區中的消息,全部的消費線程都隸屬於同一個消費組。
兩個方案對比:
在執行完腳本以後,Kafka 會在 log.dir 或 log.dirs 參數所配置的目錄下建立相應的主題分區,默認狀況下這個目錄爲/tmp/kafka-logs/。
在 ZooKeeper 的/brokers/topics/目錄下建立一個同名的實節點,該節點中記錄了該主題的分區副本分配方案。示例以下:
[zk: localhost:2181/kafka(CONNECTED) 2] get /brokers/topics/topic-create {"version":1,"partitions":{"2":[1,2],"1":[0,1],"3":[2,1],"0":[2,0]}}
能夠增長,使用 kafka-topics 腳本,結合 --alter 參數來增長某個主題的分區數,命令以下:
bin/kafka-topics.sh --bootstrap-server broker_host:port --alter --topic <topic_name> --partitions <新分區數>
當分區數增長時,就會觸發訂閱該主題的全部 Group 開啓 Rebalance。
首先,Rebalance 過程對 Consumer Group 消費過程有極大的影響。在 Rebalance 過程當中,全部 Consumer 實例都會中止消費,等待 Rebalance 完成。這是 Rebalance 爲人詬病的一個方面。
其次,目前 Rebalance 的設計是全部 Consumer 實例共同參與,所有從新分配全部分區。其實更高效的作法是儘可能減小分配方案的變更。
最後,Rebalance 實在是太慢了。
不支持,由於刪除的分區中的消息很差處理。若是直接存儲到現有分區的尾部,消息的時間戳就不會遞增,如此對於 Spark、Flink 這類須要消息時間戳(事件時間)的組件將會受到影響;若是分散插入現有的分區,那麼在消息量很大的時候,內部的數據複製會佔用很大的資源,並且在複製期間,此主題的可用性又如何獲得保障?與此同時,順序性問題、事務性問題,以及分區和副本的狀態機切換問題都是不得不面對的。
在 Kafka 中,性能與分區數有着必然的關係,在設定分區數時通常也須要考慮性能的因素。對不一樣的硬件而言,其對應的性能也會不太同樣。
可使用Kafka 自己提供的用於生產者性能測試的 kafka-producer- perf-test.sh 和用於消費者性能測試的 kafka-consumer-perf-test.sh來進行測試。
增長合適的分區數能夠在必定程度上提高總體吞吐量,但超過對應的閾值以後吞吐量不升反降。若是應用對吞吐量有必定程度上的要求,則建議在投入生產環境以前對同款硬件資源作一個完備的吞吐量相關的測試,以找到合適的分區數閾值區間。
分區數的多少還會影響系統的可用性。若是分區數很是多,若是集羣中的某個 broker 節點宕機,那麼就會有大量的分區須要同時進行 leader 角色切換,這個切換的過程會耗費一筆可觀的時間,而且在這個時間窗口內這些分區也會變得不可用。
分區數越多也會讓 Kafka 的正常啓動和關閉的耗時變得越長,與此同時,主題的分區數越多不只會增長日誌清理的耗時,並且在被刪除時也會耗費更多的時間。