本套技術專欄是做者(秦凱新)平時工做的總結和昇華,經過從真實商業環境抽取案例進行總結和分享,並給出商業應用的調優建議和集羣環境容量規劃等內容,請持續關注本套博客。期待加入IOT時代最具戰鬥力的團隊。QQ郵箱地址:1120746959@qq.com,若有任何學術交流,可隨時聯繫。node
某一個broker被選舉出來承擔特殊的角色,就是控制器Controller。緩存
Leader會向zookeeper上註冊Watcher,其餘broker幾乎不用監聽zookeeper的狀態變化。學習
Controller集羣就是用來管理和協調Kafka集羣的,具體就是管理集羣中全部分區的狀態和分區對應副本的狀態。this
每個Kafka集羣任意時刻都只能有一個controller,當集羣啓動的時候,全部的broker都會參與到controller的競選,最終只能有一個broker勝出。spa
Controller維護的狀態分爲兩類:1:管理每一臺Broker上對應的分區副本。2:管理每個Topic分區的狀態。線程
KafkaController 核心代碼,其中包含副本狀態機和分區狀態機scala
class KafkaController(val config : KafkaConfig, zkClient: ZkClient,
val brokerState: BrokerState) extends Logging with KafkaMetricsGroup {
this.logIdent = "[Controller " + config.brokerId + "]: "
private var isRunning = true
private val stateChangeLogger = KafkaController.stateChangeLogger
val controllerContext = new ControllerContext(zkClient, config.zkSessionTimeoutMs)
val partitionStateMachine = new PartitionStateMachine(this)
val replicaStateMachine = new ReplicaStateMachine(this)
private val controllerElector = new ZookeeperLeaderElector(controllerContext, ZkUtils.ControllerPath, onControllerFailover,
onControllerResignation, config.brokerId)
// have a separate scheduler for the controller to be able to start and stop independently of the
// kafka server
private val autoRebalanceScheduler = new KafkaScheduler(1)
var deleteTopicManager: TopicDeletionManager = null
val offlinePartitionSelector = new OfflinePartitionLeaderSelector(controllerContext, config)
private val reassignedPartitionLeaderSelector = new ReassignedPartitionLeaderSelector(controllerContext)
private val preferredReplicaPartitionLeaderSelector = new PreferredReplicaPartitionLeaderSelector(controllerContext)
private val controlledShutdownPartitionLeaderSelector = new ControlledShutdownLeaderSelector(controllerContext)
private val brokerRequestBatch = new ControllerBrokerRequestBatch(this)
private val partitionReassignedListener = new PartitionsReassignedListener(this)
private val preferredReplicaElectionListener = new PreferredReplicaElectionListener(this)
複製代碼
KafkaController中共定義了五種selector選舉器日誌
一、ReassignedPartitionLeaderSelector
從可用的ISR中選取第一個做爲leader,把當前的ISR做爲新的ISR,將重分配的副本集合做爲接收LeaderAndIsr請求的副本集合。
二、PreferredReplicaPartitionLeaderSelector
若是從assignedReplicas取出的第一個副本就是分區leader的話,則拋出異常,不然將第一個副本設置爲分區leader。
三、ControlledShutdownLeaderSelector
將ISR中處於關閉狀態的副本從集合中去除掉,返回一個新新的ISR集合,而後選取第一個副本做爲leader,而後令當前AR做爲接收LeaderAndIsr請求的副本。
四、NoOpLeaderSelector
原則上不作任何事情,返回當前的leader和isr。
五、OfflinePartitionLeaderSelector
從活着的ISR中選擇一個broker做爲leader,若是ISR中沒有活着的副本,則從assignedReplicas中選擇一個副本做爲leader,leader選舉成功後註冊到Zookeeper中,並更新全部的緩存。
複製代碼
kafka修改分區和副本數code
../bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --describe --topic test1
Topic:test1 PartitionCount:3 ReplicationFactor:2 Configs:
Topic: test1 Partition: 0 Leader: 2 Replicas: 2,4 Isr: 2,4
Topic: test1 Partition: 1 Leader: 3 Replicas: 3,5 Isr: 3,5
Topic: test1 Partition: 2 Leader: 4 Replicas: 4,1 Isr: 4,1
複製代碼
topic 分區擴容cdn
./kafka-topics.sh --zookeeper 127.0.0.1:2181 -alter --partitions 4 --topic test1
複製代碼
Replica有7種狀態:
1 NewReplica: 在partition reassignment期間KafkaController建立New replica
2 OnlineReplica: 當一個replica變爲一個parition的assingned replicas時
其狀態變爲OnlineReplica, 即一個有效的OnlineReplica
3 Online狀態的parition才能轉變爲leader或isr中的一員
4 OfflineReplica: 當一個broker down時, 上面的replica也隨之die, 其狀態轉變爲Onffline;
ReplicaDeletionStarted: 當一個replica的刪除操做開始時,其狀態轉變爲ReplicaDeletionStarted
5 ReplicaDeletionSuccessful: Replica成功刪除後,其狀態轉變爲ReplicaDeletionSuccessful
6 ReplicaDeletionIneligible: Replica成功失敗後,其狀態轉變爲ReplicaDeletionIneligible
7 NonExistentReplica: Replica成功刪除後, 從ReplicaDeletionSuccessful狀態轉變爲NonExistentReplica狀態
複製代碼
ReplicaStateMachine 所在文件: core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
startup: 啓動ReplicaStateMachine
initializeReplicaState: 初始化每一個replica的狀態, 若是replica所在的broker是live狀態,則此replica的狀態爲OnlineReplica。
處理能夠轉換到Online狀態的Replica, handleStateChanges(controllerContext.allLiveReplicas(), OnlineReplica), 而且發送LeaderAndIsrRequest到各broker nodes: handleStateChanges(controllerContext.allLiveReplicas(), OnlineReplica)
當建立某個topic時,該topic下全部分區的全部副本都是NonExistent。
當controller加載Zookeeper中該topic每個分區的全部副本信息到內存中,同時將副本的狀態變動爲New。
以後controller選擇該分區副本列表中的第一個副本做爲分區的leader副本並設置全部副本進入ISR,而後在Zookeeper中持久化該決定。
一旦肯定了分區的Leader和ISR以後,controller會將這些消息以請求的方式發送給全部的副本。
同時將這些副本狀態同步到集羣的全部broker上以便讓他們知曉。
最後controller 會把分區的全部副本狀態設置爲Online。
Partition有以下四種狀態
NonExistentPartition: 這個partition尚未被建立或者是建立後又被刪除了;
NewPartition: 這個parition已建立, replicas也已分配好,但leader/isr還未就緒;
OnlinePartition: 這個partition的leader選好;
OfflinePartition: 這個partition的leader掛了,這個parition狀態爲OfflinePartition;
複製代碼
當建立Topic時,controller負責建立分區對象,它首先會短暫的將全部分區狀態設置爲NonExistent。
以後讀取Zookeeper副本分配方案,而後令分區狀態設置爲NewPartion。
處於NewPartion狀態的分區還沒有有leader和ISR,所以Controller會初始化leader和ISR信息並設置分區狀態爲OnlinePartion,此時分區正常工做。
本套技術專欄是做者(秦凱新)平時工做的總結和昇華,經過從真實商業環境抽取案例進行總結和分享,並給出商業應用的調優建議和集羣環境容量規劃等內容,請持續關注本套博客。期待加入IOT時代最具戰鬥力的團隊。QQ郵箱地址:1120746959@qq.com,若有任何學術交流,可隨時聯繫。
本套技術專欄是做者(秦凱新)平時工做的總結和昇華,經過從真實商業環境抽取案例進行總結和分享,並給出商業應用的調優建議和集羣環境容量規劃等內容,請持續關注本套博客。期待加入IOT時代最具戰鬥力的團隊。QQ郵箱地址:1120746959@qq.com,若有任何學術交流,可隨時聯繫。
kafka集羣Controller主要幹經過ZK持久化副本分配方案,根據副本分配方案建立分區,監聽ZK znode狀態變化作執行處理,維護分區和副本ISR機制穩定運行。感謝huxihx技術博客以及相關書籍,讓我理解了Controller核心機制,寫一篇學習筆記,做爲總結,辛苦成文,實屬不易,謝謝。
秦凱新 於深圳 201812021541