Apache Kafka在全球各個領域各大公司得到普遍使用,得益於它強大的功能和不斷完善的生態。其中Kafka動態配置是一個比較高頻好用的功能,下面咱們就來一探究竟。html
Kafka初始開源的幾個版本,當broker初始化啓動時,全部配置信息只能從server.properties靜態文件讀取,之後再也不發生任何更改,隨着Kafka逐步迭代,在線業務對穩定性和個性化要求愈來愈突出,須要能支持在線修改功能動態生效的需求應運而生。例如:按照topic維度清理數據,依據clientid限流,根據用戶名稱進行訪問權限控制等等。目前Kafka最新版本支持如下幾類動態配置。java
動態配置文件發展歷程:shell
kafka在0.8.0對topic的管理功能分佈在三個shell中,它們分別是kafka-list-topic.sh、kafka-create-topic.sh、kafka-delete-topic.sh、kafka-add-partitions.sh,後來社區考慮到topic管理功能過於分散,到了0.8.1版本有關topic全部功能收斂到kafka-topics.sh中。0.8.0中只有topic的建立、刪除和列表及添加分區功能,到了0.8.1開始支持topics動態配置了。 apache
0.9.0.0開始支持client(producer和consumer)客戶端配額限流支持,確保不由於某個或少數幾個topic的客戶端佔滿了broker帶寬資源和磁盤IO資源,影響其餘客戶端的正常讀寫,致使集羣內主從同步也受到影響。這個功能對確保系統SLA大有好處,經過服務降級,保證寫/生產不受影響,下降或暫停讀/消費流量更容易解決系統資源瓶頸。json
0.9.0版本動態配置與topic管理分離,爲了保持向下兼容kafka-topics.sh依然包含操做topic動態配置功能,新增kafka-configs.sh支持clients和topics動態配置功能,因此kafka-topics.sh和kafka-configs.sh任意一個均可以修改topic動態配置緩存
0.10.1.0版本新增支持users和brokers動態配置功能,user動態配置用於訪問資源的權限控制,提高集羣的訪問和數據安全性,例如:用戶對讀/寫/建立/刪除等操做和API、topic、group資源訪問控制。broker動態配置,在不用重啓及影響服務運行狀況下,broker級別功能實現動態生效,例如:副本註冊複製速率、磁盤內掛載點間數據遷移速率、網絡請求的線程數、處理請求的I/O線程數等等全局參數等等。安全
0.10.1.0~2.3.1版本都支持topics、clients、users、brokers四類型動態配置的11種粒度配置對象,只是配置模塊和屬性字段有增減與調整。網絡
用戶使用kafka-configs.sh腳本,根據格式和參數規範要求,ConfigCommand類進行相關邏輯處理、json格式和內容校驗,生成notification json,寫入到序列化持久節點上,zk路徑爲xxx/config/changes/config_change_seqNo,節點名稱爲config_change_seqNo,其中seqNo從1開始的自增序號。kafka集羣中全部broker經過監聽zk上xxx/config/changes的children變化,每次得到比當前內存中last_seqNo大的seqNo的json內容,從中讀取entity_type/entity_name相對路徑,由此判斷如何從xxx/config/topics|clients|users|brokers四種類型中讀取哪一個配置路徑。同一個Broker在操做過程當中任什麼時候刻只能串行讀寫一種類型的配置,多種配置須要串行操做。性能
各個角色的做用:ui
kafka-config.sh: 負責寫dynamic config和notification,寫順序上圖有前後標識。
broker:負責監聽xxx/config/changes子節點變化和讀取entity_type/entity_name路徑節點上內容
zk:負責存儲notification和dynamic config及下發配置給相應的broker
notification json內容:
V1 0.10.0.1及之前版本有效
{ "version": 1, "entity_type": "topics", "entity_name": "finalTest" }
V2 0.10.1.0~2.3.1 當前最新版本都有效
以上不論是version 1仍是version2,本質上沒有變化。都是經過entity_type/entity_name得到entity_path的zk相對路徑,全路徑爲xxx/config/entityType/entityName,具體請看以下詳圖
entity_type=topics | clients | users | brokers
entity_name=topicName | clientId | userId | (brokerId | <default>)
當entity_type爲brokers時,brokerId爲broker編號與本身的server.properties對應,只對某個broker生效。「<default>」指對全部broker生效。而entity_type爲topics | clients | users對全部broker都生效。經過以上entity_type/entity_name六種組合成六個zk相對路徑。
topics和clients組合原理同樣,但users和brokers卻略有不一樣,他們各自有2個組合,除了普通組合還有複合組合,兩種類型組合在一塊兒,例如users有users與clients組合,zk路徑爲users/<user>/clients/<clientId>;brokers動態配置很是實用,不須要重啓就能動態更改任意數量brokers配置,更改全部brokers爲xxx/brokers/<default>
四類動態配置11種zk相對路徑,根據11種zk相對路徑能夠讀取11種粒度配置對象dynamic config。
<default>說明:某種類型下全部做用域生效,例如xxx/clients/<default>和xxx/brokers/<default>就是集羣內全部All clients和集羣內全部All brokers配置都會生效,其餘同理。
dynamic config內容示例:
entity_type/entity_name=topics/<topic_name>=topics/finalTest
{ "version": 1, "config": { "retention.bytes": "102400000", "flush.ms": "5000", "cleanup.policy": "compact", "flush.messages", ... } }
entity_type/entity_name=clients/<clientId>=clients/camusall
{ "version": 1, "config": { "producer_byte_rate": "20971520", "consumer_byte_rate": "20971520" } }
entity_type/entity_name=brokers/all brokers=brokers/<default>
{ "version": 1, "config": { "leader.replication.throttled.rate": "5000", "follower.replication.throttled.rate": "60000", "replica.alter.log.dirs.io.max.bytes.per.second": "5000", "log.retention.hours": "24", "log.flush.interval.messages": "5000", "min.insync.replicas": "2", ... } }
entity_type/entity_name=brokers/brokerId 與all brokers的配置造成徹底同樣,只是做用域範圍不一樣而已,此處省略。
寫配置格式校驗
若是寫入配置不進行規範校驗,broker就會讀取處理過程當中,就會卡住或阻塞,影響服務運行穩定性。因此配置校驗相當重要,校驗規則以下:
config_change_seqNo生成規則
kafka-configs.sh腳本每成功執行一次,在zk上就建立一個新的seqNode節點(即/xxx/config/changes/config_change_seqNo),seqNode是zk的持久順序節點(PERSISTENT_SEQUENTIAL),它的組成是seqNode = seqNodePrefix + seqNodeSuffix,config_change_固定爲seqNode的前綴,seqNodeSuffix = seqNo爲seqNode的後綴,seqNo是10位數字的序列號,這個序列號後綴是自增的,由zk服務端自動生成和維護,每次事務請求成功就加1,它與MySQL的自增id原理同樣。
config_change_seqno清除規則
集羣通過長期運行積累,xxx/config/changes下會留存大量歷史節點,若是不及時清理,會有如下影響:
綜上所述,所以必需要及時清除無用的seqNode集合,清除公式步驟以下:
config_change_seqno判斷處理
前面提到每當觸發回調處理,seqNode節點建立時間過時15分鐘會被刪除,刪除條件是觸發纔會被執行,若是長時間不建立就可能有少數幾個seqNode一直保留。若是短期內(15分鐘內)建立大量seqNode,又不會當即被刪除,只有等到下次觸發達到條件才行,那怎麼判斷哪些會被處理呢?broker緩存中維護一個變量lastExecutedSeqNo,它負責保存執行歷史中seqNode最大順序號,因此每當觸發回調獲取seqNodeSet列表時,都能輕易判斷出哪些須要處理計算,也會同步更新lastExecutedSeqNo。
notification做用
靜態與動態區別。靜態配置是Broker內置默認配置和靜態配置文件server.properties,broker啓動前能夠任意修改,啓動後不可修改。動態配置是broker啓動運行後,能夠在線更新生效,偷偷說一句離線也能夠改,就是不生效而已。
配置優先級。以上4個圖包含4類型配置既有動態也有靜態,那優先級如何呢?動態配置優先級高於靜態配置。如上圖一、二、4,環越小優先級越高,對於動態配置來講,修改配置做用域範圍越小優先級越高,反之亦然。優先級最高的,會逐級覆蓋相同配置項。
當broker啓動時。讀取順序依次爲broker內置默認配置,broker靜態配置文件,動態配置。當配置項相同時,高優先級覆蓋次優先的,其餘依次類推
圖示說明:上圖一、二、三、4中,其中圖一、二、4中環數字表示配置優先級關係,數字從1~5表示優先級從高到低。圖3爲兩級關聯。Users和Clients組合實現配置管理,這二者組合用於客戶端配額限流,Users與Clients就像兩級目錄同樣,一個User能夠包含一個、多個clientId或全部clientId。圖4中既有優先級關係也有配置參數包含關係,topics類型配置是brokers類型配置的子集,brokers除了包含topic配置外還有DynamicThreadPool、DynamicListenerConfig、DynamicConnectionQuota、LogCleaner配置。
如想更深刻了解kafka-configs.sh用法,請查看
從設計原理中瞭解config_change_seqNo生成規則
寫上文中理解了寫Notification的做用,從而理解什麼場景會適合使用zk中持久順序節點(PERSISTENT_SEQUENTIAL)
Release Notes - Kafka - Version 0.8.1:https://archive.apache.org/dist/kafka/0.8.1/RELEASE_NOTES.html
Release Notes - Kafka - Version 0.10.1.0:https://archive.apache.org/dist/kafka/0.10.1.0/RELEASE_NOTES.html
Release Notes - Kafka - Version 0.10.2.0:https://archive.apache.org/dist/kafka/0.10.2.0/RELEASE_NOTES.html
Release Notes - Kafka - Version 0.10.2.1:https://archive.apache.org/dist/kafka/0.10.2.1/RELEASE_NOTES.html
Release Notes - Kafka - Version 1.1.0:https://archive.apache.org/dist/kafka/1.1.0/RELEASE_NOTES.html
Release Notes - Kafka - Version 1.1.1:https://archive.apache.org/dist/kafka/1.1.1/RELEASE_NOTES.html
Release Notes - Kafka - Version 2.0.0:https://archive.apache.org/dist/kafka/2.0.0/RELEASE_NOTES.html
KIP-21 - Dynamic Configuration:https://cwiki.apache.org/confluence/display/KAFKA/KIP-21+-+Dynamic+Configuration
KIP-226 - Dynamic Broker Configuration:https://cwiki.apache.org/confluence/display/KAFKA/KIP-226+-+Dynamic+Broker+Configuration
Make DynamicConfigManager to use the ZkNodeChangeNotificationListener introduced as part of KAFKA-2211:https://issues.apache.org/jira/browse/KAFKA-2547
KIP-257 - Configurable Quota Management:https://cwiki.apache.org/confluence/display/KAFKA/KIP-257+-+Configurable+Quota+Management
KIP-73 Replication Quotas:https://cwiki.apache.org/confluence/display/KAFKA/KIP-73+Replication+Quotas