這篇博客就和你們分享到這裏,若是你們在研究學習的過程中有什麼問題,能夠加羣進行討論或發送郵件給我,我會盡我所能爲您解答,與君共勉!
另外,博主出書了《Hadoop大數據挖掘從入門到進階實戰》,喜歡的朋友或同窗, 能夠在公告欄那裏點擊購買連接購買博主的書進行學習,在此感謝你們的支持。
平時在使用Kafka的時候,可能關注的更多的是Kafka系統層面的。今天來給你們剖析一下Kafka的控制器,瞭解一下Kafka控制器的選舉流程。函數
Kafka控制器,其實就是一個Kafka系統的Broker。它除了具備通常Broker的功能以外,還具備選舉主題分區Leader節點的功能。在啓動Kafka系統時,其中一個Broker會被選舉爲控制器,負責管理主題分區和副本狀態,還會執行分區從新分配的管理任務。oop
若是在Kafka系統運行過程當中,當前的控制器出現故障致使不可用,那麼Kafka系統會從其餘正常運行的Broker中從新選舉出新的控制器。post
在Kafka集羣中,每一個Broker在啓動時會實例化一個KafkaController類。該類會執行一系列業務邏輯,選舉出主題分區的Leader節點,步驟以下:學習
控制器建立的優先級是按照Kafka系統代理節點成功啓動的順序來建立的。用戶能夠經過改變Kafka系統代理節點的啓動順序,來查看控制器的建立優先級。以後,能夠在Zookeeper系統中查看/controller臨時節點的內容,例如:大數據
# 進入Zookeeper集羣 [hadoop@dn1 bin]$ zkCli.sh -server dn1:2181 # 執行查看命令 [zk: dn1:2181(CONNECTED) 1] get /controller
成功執行命令後,能夠看到代理節點0(即dn1節點)上成功建立了控制器,以下圖所示:spa
當前啓動順序爲:dn一、dn二、dn3,修改啓動順序爲:dn三、dn一、dn2。再次查看Zookeeper系統中執行「get /controller」命令,輸出結果以下圖所示:代理
當控制器被關閉或者與Zookeeper系統斷開鏈接時,Zookeeper系統上的臨時節點就會被清除。Kafka集羣中的監聽器會接收到變動通知,各個代理節點會嘗試到Zookeeper系統中建立一個控制器的臨時節點。第一個成功在Zookeeper系統中建立的代理節點,將會成爲新的控制器。每一個新選舉出來的控制器,會在Zookeeper系統中獲取一個遞增的controller_epoch值。調試
選舉控制器的核心思路是:各個代理節點公平競爭搶佔Zookeeper系統中建立/controller臨時節點,最早建立成功的代理節點會成爲控制器,並擁有選舉主題分區Leader節點的功能。選舉流程以下圖所示:code
當Kafka系統實例化KafkaController類時,主題分區Leader節點的選舉流程便會開始。其中涉及的核心類包含KafkaController、ZookeeperLeaderElector、LeaderChangeListener、SessionExpirationListener。orm
Kafka系統的控制器主要負責管理主題、分區和副本。 Kafka系統在操做主題、分區和副本時,控制器會在Zookeeper系統的/brokers/topics節點,以及其子節點路徑上註冊一系列的監聽器。 使用Kafka應用接口或者是Kafka系統腳本建立一個主題時,服務端會將建立後的結果返回給客戶端。當客戶端收到建立成功的提示時,其實服務端並無實際建立主題,而只是在Zookeeper系統的/brokers/topics節點中建立了該主題對應的子節點名稱。
代理節點調用onBecomingLeader()函數實際上調用的是onControllerFailover()函數,因此在控制器調用onControllerFailover()函數時,會在初始化階段分別建立分區狀態機和副本狀態機。代碼以下所示:
def onControllerFailover() { if(isRunning) { info("Broker %d starting become controller state transition".format(config.brokerId)) readControllerEpochFromZookeeper() incrementControllerEpoch(zkUtils.zkClient) // 在/brokers/topics節點註冊監聽器 registerReassignedPartitionsListener() registerIsrChangeNotificationListener() registerPreferredReplicaElectionListener() partitionStateMachine.registerListeners() // 註冊分區狀態機 replicaStateMachine.registerListeners() // 註冊副本狀態機 initializeControllerContext() // 在控制器初始化以後,在狀態機啓動以前,須要發送更新元數據請求 sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq) replicaStateMachine.startup() // 啓動副本狀態機 partitionStateMachine.startup() // 啓動分區狀態機 // 在自動故障轉移中爲全部主題註冊分區更改監聽器 controllerContext.allTopics.foreach(topic => partitionStateMachine. registerPartitionChangeListener(topic)) info("Broker %d is ready to serve as the new controller with epoch %d". format(config.brokerId, epoch)) maybeTriggerPartitionReassignment() maybeTriggerPreferredReplicaElection() if (config.autoLeaderRebalanceEnable) { info("starting the partition rebalance scheduler") autoRebalanceScheduler.startup() autoRebalanceScheduler.schedule("partition-rebalance-thread", checkAndTriggerPartitionRebalance, 5, config.leaderImbalanceCheckIntervalSeconds.toLong, TimeUnit.SECONDS) } deleteTopicManager.start() } else info("Controller has been shut down, aborting startup/failover") }
主題的分區狀態機經過registerListeners()函數,在Zookeeper系統中的/brokers/topics節點上註冊了TopicChangeListener和DeleteTopicListener兩個監聽器。建立一個主題時,主題信息、主題分區和副本會被寫到Zookeeper系統的/brokers/topics節點中,這就會觸發分區和副本狀態機註冊監聽器。
Kafka系統總體來講,調試還算方便。下載Kafka源代碼,導入到IDE中,就能夠啓動整個Kafka系統了,能夠經過DEBUG的方式來親自了解控制器的執行流程。
這篇博客就和你們分享到這裏,若是你們在研究學習的過程中有什麼問題,能夠加羣進行討論或發送郵件給我,我會盡我所能爲您解答,與君共勉!
另外,博主出書了《Hadoop大數據挖掘從入門到進階實戰》,喜歡的朋友或同窗, 能夠在公告欄那裏點擊購買連接購買博主的書進行學習,在此感謝你們的支持。