在startup方法裏,會實例化KafkaControlleride
class KafkaServer { def startup() { ....... kafkaController = new KafkaController(config, zkUtils, time, metrics, threadNamePrefix) kafkaController.startup() ...... } }
ControllerEventManager實例化一個線程,用來單獨處理KafkaController發送的事件。函數
class ControllerEventManager(rateAndTimeMetrics: Map[ControllerState, KafkaTimer], eventProcessedListener: ControllerEvent => Unit) { // eventProcessedListener是event的回調函數 @volatile private var _state: ControllerState = ControllerState.Idle // event隊列 private val queue = new LinkedBlockingQueue[ControllerEvent] private val thread = new ControllerEventThread("controller-event-thread") def state: ControllerState = _state def start(): Unit = thread.start() def close(): Unit = thread.shutdown() // KafkaController調用put方法,添加事件 def put(event: ControllerEvent): Unit = queue.put(event) class ControllerEventThread(name: String) extends ShutdownableThread(name = name) { override def doWork(): Unit = { // 從queue取出event val controllerEvent = queue.take() // 更新狀態爲event的狀態 _state = controllerEvent.state try { // event自身包含process方法,定義瞭如何處理event rateAndTimeMetrics(state).time { controllerEvent.process() } } catch { case e: Throwable => error(s"Error processing event $controllerEvent", e) } // 回調 try eventProcessedListener(controllerEvent) catch { case e: Throwable => error(s"Error while invoking listener for processed event $controllerEvent", e) } // 更新狀態爲Idle _state = ControllerState.Idle } } }
state表示處理event時,controller的狀態。fetch
process表示處理event的程序this
sealed trait ControllerEvent { def state: ControllerState def process(): Unit }
ControllerState目前有10種線程
object ControllerState { case object Idle extends ControllerState { def value = 0 override protected def hasRateAndTimeMetric: Boolean = false } case object ControllerChange extends ControllerState { def value = 1 } case object BrokerChange extends ControllerState { def value = 2 override def rateAndTimeMetricName = Some("LeaderElectionRateAndTimeMs") } case object TopicChange extends ControllerState { def value = 3 } case object TopicDeletion extends ControllerState { def value = 4 } case object PartitionReassignment extends ControllerState { def value = 5 } case object AutoLeaderBalance extends ControllerState { def value = 6 } case object ManualLeaderBalance extends ControllerState { def value = 7 } case object ControlledShutdown extends ControllerState { def value = 8 } case object IsrChange extends ControllerState { def value = 9 }
KafkaController在啓動的時候,會生成Startup事件,添加到EventManagescala
class KafkaController { def startup() = { eventManager.put(Startup) eventManager.start() } }
Startup定義debug
case object Startup extends ControllerEvent { def state = ControllerState.ControllerChange override def process(): Unit = { // 註冊zookeeper的鏈接事件 registerSessionExpirationListener() // 註冊"/controller"節點的數據變化事件(節點存儲主controller的id,數據變化意味着主controller發送變化) registerControllerChangeListener() // 選舉,嘗試爭取controller elect() } private def registerSessionExpirationListener() = { zkUtils.zkClient.subscribeStateChanges(new SessionExpirationListener(this, eventManager)) } class SessionExpirationListener(controller: KafkaController, eventManager: ControllerEventManager) extends IZkStateListener with Logging { @throws[Exception] override def handleNewSession(): Unit = { // 當從新鏈接到zookeeper時,發送Reelect事件 eventManager.put(controller.Reelect) } } private def registerControllerChangeListener() = { zkUtils.zkClient.subscribeDataChanges(ZkUtils.ControllerPath, new ControllerChangeListener(this, eventManager)) } }
#Reelect事件 Reelect代表競選主controllercode
case object Reelect extends ControllerEvent { def state = ControllerState.ControllerChange override def process(): Unit = { val wasActiveBeforeChange = isActive // 獲取主controller的id activeControllerId = getControllerID() // 先前是主controller,如今是從controller if (wasActiveBeforeChange && !isActive) { // 關閉先前zookeeper事件的監聽 onControllerResignation() } // 嘗試競選主controller elect() } } def elect(): Unit = { val timestamp = time.milliseconds // 節點數據,爲brokerId + timestamp val electString = ZkUtils.controllerZkData(config.brokerId, timestamp) activeControllerId = getControllerID() // activeControllerId爲-1,表示主controller尚未 if (activeControllerId != -1) { debug("Broker %d has been elected as the controller, so stopping the election process.".format(activeControllerId)) return } try { // 競爭建立臨時節點。若是未成功,會拋出ZkNodeExistsException異常 val zkCheckedEphemeral = new ZKCheckedEphemeral(ZkUtils.ControllerPath, electString, controllerContext.zkUtils.zkConnection.getZookeeper, controllerContext.zkUtils.isSecure) zkCheckedEphemeral.create() info(config.brokerId + " successfully elected as the controller") activeControllerId = config.brokerId // 註冊監聽事件,這些都是主controller負責 onControllerFailover() } catch { case _: ZkNodeExistsException => activeControllerId = getControllerID if (activeControllerId != -1) debug("Broker %d was elected as controller instead of broker %d".format(activeControllerId, config.brokerId)) else warn("A controller has been elected but just resigned, this will result in another round of election") case e2: Throwable => error("Error while electing or becoming controller on broker %d".format(config.brokerId), e2) triggerControllerMove() } } def onControllerFailover() { info("Broker %d starting become controller state transition".format(config.brokerId)) readControllerEpochFromZookeeper() incrementControllerEpoch() // 分區分配事件 registerPartitionReassignmentListener() // 通知事件 registerIsrChangeNotificationListener() // replica選舉事件 registerPreferredReplicaElectionListener() // topic變化事件 registerTopicChangeListener() // topic刪除事件 registerTopicDeletionListener() // broker事件 registerBrokerChangeListener() // 初始化context initializeControllerContext() val (topicsToBeDeleted, topicsIneligibleForDeletion) = fetchTopicDeletionsInProgress() topicDeletionManager.init(topicsToBeDeleted, topicsIneligibleForDeletion) sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq) // 啓動副本狀態機 replicaStateMachine.startup() // 啓動分片狀態機 partitionStateMachine.startup() // 監聽partition改變事件 controllerContext.allTopics.foreach(topic => registerPartitionModificationsListener(topic)) info("Broker %d is ready to serve as the new controller with epoch %d".format(config.brokerId, epoch)) maybeTriggerPartitionReassignment() topicDeletionManager.tryTopicDeletion() val pendingPreferredReplicaElections = fetchPendingPreferredReplicaElections() onPreferredReplicaElection(pendingPreferredReplicaElections) info("starting the controller scheduler") kafkaScheduler.startup() if (config.autoLeaderRebalanceEnable) { scheduleAutoLeaderRebalanceTask(delay = 5, unit = TimeUnit.SECONDS) } }
KafkaController負責集羣中的一些事件,好比topic的新增和刪除。orm
KafkaController在kafka集羣中,扮演着很重要的角色。因此爲了保證controller的高可用,集羣的每一個節點,都會運行KafkaController服務,經過zookeeper的選舉,保證任什麼時候刻只有一個主的controller。隊列
KafkaController在啓動的時候,就會嘗試競爭主controller。若是競選成功,就會在zookeeper中註冊監聽事件。